SpringBoot 配置多套 Kafka

傅嘏
• 阅读 896

背景说明

业务中需要同一个应用连接2套Kafka做消息的收发,于是引入以下配置实现

环境配置

spring
  kafka:
    default:
      bootstrap-servers: kafka_server
      producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
        type: sync
        acks: all
        retries: 3
      consumer:
        auto-commit-interval-ms: 1000
        auto-offset-reset: earliest
        enable-auto-commit: true
        group-id: kafka_group_id
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 500
      listener:
        type: BATCH
        concurrency: 4
    other:
      bootstrap-servers: kafka.server.url:port
      consumer:
        auto-commit-interval-ms: 1000
        auto-offset-reset: earliest
        enable-auto-commit: true
        group-id: kafka_group_id
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 500

配置类

第一套Kafka配置

@EnableKafka
@Configuration
public class DefaultKafkaConfig {

    @Value("${spring.kafka.default.bootstrap-servers}")
    private String server;
    @Value("${spring.kafka.default.consumer.auto-commit-interval-ms}")
    private String autoCommitIntervalMs;
    @Value("${spring.kafka.default.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.default.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.default.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.default.consumer.key-deserializer}")
    private String keyDeserializer;
    @Value("${spring.kafka.default.consumer.value-deserializer}")
    private String valueDeserializer;
    @Value("${spring.kafka.default.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.default.listener.concurrency}")
    private Integer concurrency;
    @Value("${spring.kafka.default.producer.key-serializer}")
    private String keySerializer;
    @Value("${spring.kafka.default.producer.value-serializer}")
    private String valueSerializer;
    @Value("${spring.kafka.default.producer.acks}")
    private String acks;
    @Value("${spring.kafka.default.producer.retries}")
    private String retries;

    @Bean
    public KafkaTemplate<String, Object> defaultKafkaTemplate() {
        ProducerFactory<String, Object> factory = producerFactory();
        return new KafkaTemplate<>(factory);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> defaultKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(4000);
        factory.setBatchListener(true);
        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        properties.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        properties.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        properties.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        properties.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        properties.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    private DefaultKafkaProducerFactory<String, Object> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        properties.putIfAbsent(ProducerConfig.ACKS_CONFIG, acks);
        properties.putIfAbsent(ProducerConfig.RETRIES_CONFIG, retries);
        return new DefaultKafkaProducerFactory<>(properties);
    }
}

第二套Kafka配置

@EnableKafka
@Configuration
public class BigDataKafkaConfig {

    @Value("${spring.kafka.big-data.bootstrap-servers}")
    private String server;
    @Value("${spring.kafka.big-data.consumer.auto-commit-interval-ms}")
    private String autoCommitIntervalMs;
    @Value("${spring.kafka.big-data.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.big-data.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.big-data.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.big-data.consumer.key-deserializer}")
    private String keyDeserializer;
    @Value("${spring.kafka.big-data.consumer.value-deserializer}")
    private String valueDeserializer;
    @Value("${spring.kafka.big-data.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.default.listener.concurrency}")
    private Integer concurrency;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> bigDataKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(4000);
        factory.setBatchListener(true);

        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        properties.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        properties.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        properties.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        properties.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        properties.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);

        return new DefaultKafkaConsumerFactory<>(properties);
    }
}

具体使用

消费方使用containerFactory区分具体Kafka环境

@KafkaListener(topics = {"my_kafka_topic"}, containerFactory = "defaultKafkaListenerContainerFactory")

发送方使用

@Autowired
@Qualifier("defaultKafkaTemplate")
private KafkaTemplate defaultKafkaTemplate;
kafkaTwoTemplate.send("my_kafka_topic", "message");

参考资料:
https://zhuanlan.zhihu.com/p/374546956

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Easter79 Easter79
3年前
spring集成kafka
1、引入依赖jar包<dependency<groupIdorg.springframework.kafka</groupId<artifactIdspringkafka</artifactId</dependency2、配置kafka信息
Stella981 Stella981
3年前
Kafka配置消息保存时间的方法
配置参考然后不废话,直接贴最终的关键配置:想实现消息队列中保存2小时的消息,那么配置应该像这样:log.roll.hours1log.retention.hours2log.segment.delete.delay.ms0Kafka日志存储机制分析
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
KAFKA官方教程笔记
 因为kafka配置较多,所以单独写一篇博客来记录。       1,broker配置   主要的配置项有三个broker.id集群内唯一log.dir数据目录zookeeper.connectzookeeper连接地址Topiclevelconfigurationsanddefaultsa
Easter79 Easter79
3年前
Springboot2整合Kafka
依赖<dependency<groupIdorg.springframework.kafka</groupId<artifactIdspringkafka</artifactId</dependency配置spring:kafka:bootstrapservers:外网ip:9092
Stella981 Stella981
3年前
SpringBoot ——kafka消费多个不同服务器地址消息解决方案
一、背景       在springboot实际项目开发中,kafka可能需要消费多个不同服务器地址的数据,这时懂得如何进行配置就显得非常必要了。二、配置      1、KafkaConfig.java配置packagecom.lantaiyuan.ebu
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
5个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(