™技术博客

apm | SpringKafkaListener动态扩展

2021年3月21日

需求背景

  • 框架: spring-kafka
  • 需求:实现@KafkaListener的动态生成

参考来源

stackoverflow
github

样例

配置

1
2
3
4
5
6
7
8
9
apm:
kafka:
listeners:
enable: true
details:
- beanName: segmentTopic
topic: segment-topic
groupId: segment-topic-group
concurrency: 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Data
@Configuration
@ConditionalOnProperty(value = "apm.kafka.listeners.enable", havingValue = "true")
@ConfigurationProperties(prefix = "apm.kafka.listeners")
public class ApmListenerConfig {

private List<Listener> details;

@Data
public static class Listener {
private String beanName;
private String topic;
private String groupId;
private String concurrency;
}
}

Listener类配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ApmListener {

private static final Gson gson = new Gson();

@KafkaListener(
topics = "${apm.topic}",
groupId = "${apm.groupId}",
concurrency = "${apm.concurrency}"
)
public void listener(List<ConsumerRecord<String, String>> records) {
records.forEach((record) -> {
ApmObject apmObject = gson.fromJson(record.value(), ApmObject.class);
System.out.println(apmObject);
});
}
}

动态Bean加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Data
@Configuration
@AutoConfigureAfter(ApmListenerConfig.class)
public class ApmListenerFactory {

@Autowired
private ApmListenerConfig config;

@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, GenericApplicationContext context) {
return args -> {
Properties props = new Properties();
PropertiesPropertySource source = new PropertiesPropertySource("dynamicListener", props);
context.getEnvironment().getPropertySources().addLast(source);
config.getDetails().forEach(listener -> {
ApmListener apmListener = new ApmListener();
props.setProperty("apm.topic", listener.getTopic());
props.setProperty("apm.groupId", listener.getGroupId());
props.setProperty("apm.concurrency", listener.getConcurrency());
context.registerBean(listener.getBeanName(), ApmListener.class, () -> apmListener);
context.getBean(listener.getBeanName());
});
registry.getListenerContainerIds().forEach(System.out::println);
};
}
}
Tags: apm

扫描二维码,分享此文章