SpringBoot ——kafka消费多个不同服务器地址消息解决方案

Stella981
• 阅读 774

一、背景

       在springboot实际项目开发中,kafka可能需要消费多个不同服务器地址的数据,这时懂得如何进行配置就显得非常必要了。

二、配置

       1、KafkaConfig.java配置

package com.lantaiyuan.ebus.kafka;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

@Configuration
@EnableKafka
public class KafkaConfig {
    
    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;
    @Value("${kafka.session.timeout-ms}")
    private Integer sessionTimeoutMs;
    @Value("${kafka.auto-commit.enable}")
    private boolean enableAutoCommit;
    @Value("${kafka.auto-commit.interval-ms}")
    private Integer autoCommitIntervalMs;
    @Value("${kafka.auto-offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.group.id}")
    private String groupId;
    @Value("${kafka.topic.city-code}")
    private String topicCityCodeStr;

    /**
     * 以下是新增配置,获取司机上下班打卡信息(注入新增的kafka服务消费地址)
     */
    @Value("${kafka.bootstrap.driver-servers}")
    private String kafkaBootstrapDriverServers;
    
    @Bean
    @Primary//重要!!!指定该ContainerFactory为主要的容器工厂,kafka消费者默认关联该容器
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }


    /**
     * 以下为司机打卡kafka配置,即在原有kafka消费配置基础上重新再复制一份(共3个方法,记得同步更改方法名)
     */


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryForDriverSchedule() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryForDriverSchedule());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactoryForDriverSchedule() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigsForDriverSchedule());
    }


    /**
     *
     * 司机打卡信息kafka消费者配置
     */
    @Bean
    public Map<String, Object> consumerConfigsForDriverSchedule() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapDriverServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    @Bean(name="topicCityMap")
    public Map<String, String> topicCityMap() {
        Map<String, String> map = new HashMap<>();
        String[] topicCityArray = topicCityCodeStr.split("\\|");
        for (String topicCity : topicCityArray) {
            String[]  array = topicCity.split(":");
            map.put(array[0], array[1]);
        }
        return map;
    }
    
    
}

      2、KafkaConsumer.java更改说明

package com.lantaiyuan.ebus.kafka;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;

import com.lantaiyuan.ebus.dao.BaseDriverScheduleMapper;
import com.lantaiyuan.ebus.model.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lantaiyuan.ebus.constants.KafkaConsts;
import com.lantaiyuan.ebus.service.BaseRouteServiceI;
import com.lantaiyuan.ebus.service.BaseStationServiceI;
import com.lantaiyuan.ebus.service.JpushServiceI;
import com.lantaiyuan.ebus.service.MyTrailServiceI;
import com.lantaiyuan.ebus.service.NoticeHistoryServiceI;
import com.lantaiyuan.ebus.service.OnBusInfoServiceI;
import com.lantaiyuan.ebus.service.RelRouteBusServiceI;
import com.lantaiyuan.ebus.service.TaskTimerServiceI;
import com.lantaiyuan.ebus.util.Tools;
import com.lantaiyuan.ebus.virtual.card.model.VirtualCardSwipeHistory;
import com.lantaiyuan.ebus.virtual.card.model.enummodel.VirtualCardRecordHeaderTypeEnum;
import com.lantaiyuan.ebus.virtual.card.service.VirtualCardSwipeHistoryServiceI;


@Component
public class KafkaConsumer {


    //.........................


    //containerFactory默认关联kafkaListenerContainerFactory容器工厂
    @KafkaListener(topics = "${kafka.topic.od-topic}")
    public void odConsumer(ConsumerRecord<Integer, String> msg) {
        String record = msg.value();
        JSONObject jsonObject = JSONObject.parseObject(record);
        if (!StringUtils.isEmpty(jsonObject.getString("startStationId"))) { // O点
            ProduceOriginPoint originPoint = JSONObject.parseObject(record, ProduceOriginPoint.class);
            myTrailService.insertOriginPoint(originPoint);
        } else { // D点
            ProduceDestPoint destPoint = JSONObject.parseObject(record, ProduceDestPoint.class);
            myTrailService.updateDestPoint(destPoint);
        }
    }
    

    //.........................
    

    /***
     *
     * <p>Title: </p>
     * <p>Description: 消费司机考勤消息</p>
     */
    //containerFactory手动关联到kafkaListenerContainerFactoryForDriverSchedule容器工厂
    @KafkaListener(topics = "${kafka.topic.driverSheduleTopic}", containerFactory = "kafkaListenerContainerFactoryForDriverSchedule")
    public void driverSheduleConsumer(ConsumerRecord<Integer, String> msg) {
        String record = msg.value();
        JSONObject jsonObject = JSONObject.parseObject(record);

        BaseDriverSchedule baseDriverSchedule = new BaseDriverSchedule();
        baseDriverSchedule.setCityCode(jsonObject.getString("cityCode"));
        baseDriverSchedule.setDriverName(jsonObject.getString("driverName"));
        baseDriverSchedule.setOccurTime(jsonObject.getDate("occurTime"));

        //jsonObject的typeId为int类型,model为String类型,但是可以jsonObject.getString()进行自动类型转换
        baseDriverSchedule.setOnboardId(jsonObject.getString("onBoardId"));
        
        //jsonObject的typeId为int类型,model为byte类型,但是可以jsonObject.getByte()进行自动类型转换
        baseDriverSchedule.setTypeId(jsonObject.getByte("typeId"));
        baseDriverSchedule.setWorkId(jsonObject.getString("workerId"));

        this.baseDriverScheduleMapper.insertSelective(baseDriverSchedule);
    }
}

三、附KafkaListener注解源码

/*
 * Copyright 2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.messaging.handler.annotation.MessageMapping;

/**
 * Annotation that marks a method to be the target of a Kafka message listener on the
 * specified topics.
 *
 * The {@link #containerFactory()} identifies the
 * {@link org.springframework.kafka.config.KafkaListenerContainerFactory
 * KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
 * set, a <em>default</em> container factory is assumed to be available with a bean name
 * of {@code kafkaListenerContainerFactory} unless an explicit default has been provided
 * through configuration.
 *
 * <p>
 * Processing of {@code @KafkaListener} annotations is performed by registering a
 * {@link KafkaListenerAnnotationBeanPostProcessor}. This can be done manually or, more
 * conveniently, through {@link EnableKafka} annotation.
 *
 * <p>
 * Annotated methods are allowed to have flexible signatures similar to what
 * {@link MessageMapping} provides, that is
 * <ul>
 * <li>{@link org.apache.kafka.clients.consumer.ConsumerRecord} to access to the raw Kafka
 * message</li>
 * <li>{@link org.springframework.kafka.support.Acknowledgment} to manually ack</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated
 * method arguments including the support of validation</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated
 * method arguments to extract a specific header value, defined by
 * {@link org.springframework.kafka.support.KafkaHeaders KafkaHeaders}</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated
 * argument that must also be assignable to {@link java.util.Map} for getting access to
 * all headers.</li>
 * <li>{@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for
 * getting access to all headers.</li>
 * <li>{@link org.springframework.messaging.support.MessageHeaderAccessor
 * MessageHeaderAccessor} for convenient access to all method arguments.</li>
 * </ul>
 *
 * <p>When defined at the method level, a listener container is created for each method.
 * The {@link org.springframework.kafka.listener.MessageListener} is a
 * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter},
 * configured with a {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}.
 *
 * <p>When defined at the class level, a single message listener container is used to
 * service all methods annotated with {@code @KafkaHandler}. Method signatures of such
 * annotated methods must not cause any ambiguity such that a single method can be
 * resolved for a particular inbound message. The
 * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} is
 * configured with a
 * {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}.
 *
 * @author Gary Russell
 *
 * @see EnableKafka
 * @see KafkaListenerAnnotationBeanPostProcessor
 * @see KafkaListeners
 */
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {

    /**
     * The unique identifier of the container managing for this endpoint.
     * <p>If none is specified an auto-generated one is provided.
     * @return the {@code id} for the container managing for this endpoint.
     * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
     */
    String id() default "";

    /**
     * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
     * to use to create the message listener container responsible to serve this endpoint.
     * <p>If not specified, the default container factory is used, if any.
     * @return the container factory bean name.
     */
    String containerFactory() default "";

    /**
     * The topics for this listener.
     * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
     * Expression must be resolved to the topic name.
     * Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
     * @return the topic names or expressions (SpEL) to listen to.
     */
    String[] topics() default {};

    /**
     * The topic pattern for this listener.
     * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
     * Expression must be resolved to the topic pattern.
     * Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
     * @return the topic pattern or expression (SpEL).
     */
    String topicPattern() default "";

    /**
     * The topicPartitions for this listener.
     * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
     * @return the topic names or expressions (SpEL) to listen to.
     */
    TopicPartition[] topicPartitions() default {};

    /**
     * If provided, the listener container for this listener will be added to a bean
     * with this value as its name, of type {@code Collection<MessageListenerContainer>}.
     * This allows, for example, iteration over the collection to start/stop a subset
     * of containers.
     * @return the bean name for the group.
     */
    String group() default "";

}
点赞
收藏
评论区
推荐文章
Easter79 Easter79
2年前
springboot中切换配置(多个配置文件
问题描述:在springboot项目中可能有测试环境、开发环境、生产环境,在这些环境中我们可能要使用不同的配置,如果每次切换环境的时候都要重新写一份配置文件就很麻烦了,所以下面提供一种方法可以快速且简便的切换不同环境下的配置。解决方案:1、首先在resources目录下创建完整的配置文件(包括测试、开发、生产环境下的相关配置文件
Easter79 Easter79
2年前
Storm结合kafka参数配置详解+代码示例(累计单词出现的次数)
kafka参数配置详情:publicfinalBrokerHostshosts;//设置kafka从哪里获取相关的配置信息publicfinalStringtopic;//从哪个topic开始消费publicfinalStringclientId;//设置客户端标识publicintfetchSizeBytes10
Stella981 Stella981
2年前
Spring Cloud系列教程(十):分布式配置中心Spring Cloud Config(Finchley版本)
一、前言在分布式、微服务系统架构中,一个大的项目在进行服务拆分之后,变成了众多个子服务,由于服务的数量居多,每个服务都有自己的一套配置文件,这时候就不像传统的单体架构SSM、SSH、以及当下比较流行的SpringBoot快速开发框架,一个项目基本一份配置文件就可以搞定。而在微服务项目架构中,为了方便众多个服务的配置文件统一进行集中管
Stella981 Stella981
3年前
Kafka初入门简单配置与使用
一Kafka概述1.1Kafka是什么在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。1)ApacheKafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。2)Kafka最初是由LinkedIn开发
Wesley13 Wesley13
3年前
KAFKA官方教程笔记
 因为kafka配置较多,所以单独写一篇博客来记录。       1,broker配置   主要的配置项有三个broker.id集群内唯一log.dir数据目录zookeeper.connectzookeeper连接地址Topiclevelconfigurationsanddefaultsa
Easter79 Easter79
2年前
SpringBoot2.0之六 多环境配置
  开发过程中面对不同的环境,例如数据库、redis服务器等的不同,可能会面临一直需要修改配置的麻烦中,在以前的项目中,曾通过Tomcat的配置来实现,有的项目甚至需要手动修改相关配置,这种方式费时费力,出错的概率还极大,SpringBoot为我们提供了更加简单方便的配置方案来解决多环境的配置问题,下面我们看看怎么实现。一、新建一个项目(本文以上篇的代码
Stella981 Stella981
2年前
SpringBoot2.0之六 多环境配置
  开发过程中面对不同的环境,例如数据库、redis服务器等的不同,可能会面临一直需要修改配置的麻烦中,在以前的项目中,曾通过Tomcat的配置来实现,有的项目甚至需要手动修改相关配置,这种方式费时费力,出错的概率还极大,SpringBoot为我们提供了更加简单方便的配置方案来解决多环境的配置问题,下面我们看看怎么实现。一、新建一个项目(本文以上篇的代码
Wesley13 Wesley13
3年前
Java版Kafka使用及配置解释
Java版Kafka使用及配置解释一.Java示例kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考。kafka的安装请参考官方文档。引入Maven库首先我们需要新建一个maven项目,然后在pom中
Easter79 Easter79
2年前
Springboot集成Kafka
 Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka服务器和消费机集群来分区消息。支持Hadoop并行数据加载。Springboot的基本搭建和配置我
胖大海 胖大海
2年前
Centos7 kafka安装与配置
前言,我这边使用的3A服务器centos7.9系统里进行操作的,使用kafka需要安装JDK,zookeeper一:从官网下载安装包http://archive.apache.org/dist/kafka/二:安装和配置kafka1.解压kafka压缩包并放到/usr/local下tarxzfkafka2.100.8.1.1.tar.gz2.配置serv