Kafka 入门和 Spring Boot 集成

Stella981
• 阅读 1091

Kafka 入门和 Spring Boot 集成

概述

kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流指的是数据流)。由java 和 Scala 语言编写,最早由 LinkedIn 开发,并 2011年开源,现在由 Apache 开发维护。

应用场景

下面列举了一些kafka常见的应用场景。

消息队列 : Kafka 可以作为消息队列使用,可用于系统内异步解耦,流量削峰等场景。

应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如应用程序相关的日志,服务器相关的 CPU、占用率、 IO、内存、连接数、 TPS、 QPS等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。 例如, 很多公司采用 Kafka 与 ELK(ElasticSearch、 Logstash 和Kibana)整合构建应用服务的监控系统。

流处理:比如将 kafka 接收到的数据发送给 Storm 流式计算框架处理。

基本概念

record(消息):kafka 通信的基本单位,每一条消息称为record

producer (生产者 ):发送消息的客户端。

consumer(消费者 ):消费消息的客户端。

consumerGroup (消费者组):每一个消费者都属于一个特定的消费者组。

消费者和消费者组的关系

  • 如果a,b,c 属于同一个消费者组,那一条消息只能被 a,b,c 中的某一个消费者消费。
  • 如果a,b,c 属于不同的消费者组(比如 ga,gb,gc) ,那一条消息过来,a,b,c 三个消费者都能消费到。

topic (主题): kafka的消息通过topic来分类,类似于数据库的表。 producer 发布消息到 topic,consumer订阅 topic 进行消费

partition( 分区):一个topic会被分成一到多个分区(partition),然后多个分区可以分布在不同的机器上,这样一个主题就相当于运行在了多台机子上,kafka用分区的方式提高了性能和吞吐量

replica (副本):一个分区有一到多个副本,副本的作用是提高分区的 可用性。

offset(偏移量):偏移量 类似数据库自增int Id,随着数据的不断写入 kafka 分区内的偏移量会不断增加,一条消息由一个唯一的偏移量来标识。偏移量的作用是,让消费者知道自己消费到了哪个位置,下次可以接着从这里消费。如下图: Kafka 入门和 Spring Boot 集成 消费者A 消费到了 offset 为 9 的记录,消费者 B 消费到了offset 为 11 的记录。

基本结构

kafka 最基本的结构如下,跟常见的消息队列结构一样。 Kafka 入门和 Spring Boot 集成 消息通过生产者发送到 kafka 集群, 然后消费者从 kafka 集群拉取消息进行消费。

和Spring Boot 集成

集成概述

本集成方式采用的是 spring boot 官方文档说的集成方式,官方链接,集成的大体思路是,通过在 spring boot application.properties 中配置 生产者和消费者的基本信息,然后spring boot 启动后会创建 KafkaTemplate 对象,这个对象可以用来发送消息到Kafka,然后用 @KafkaListener 注解来消费 kafka 里面的消息,具体步骤如下。

集成环境

spring boot:1.5.13 版本 spring-kafka:1.3.5 版本 kafka:1.0.1 版本

kafka 环境搭建

先启动Zookeeper:

docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime zookeeper:latest 

再启动Kafka:替换下面的IP为你服务器IP即可

docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.10.253 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:1.0.1

Spring Boot 和 Spring for Apache Kafka 集成步骤

  1. 首先pom中引入 Spring for Apache Kafka

    org.springframework.kafka spring-kafka 1.3.5.RELEASE
  2. 然后 application.properties 配置文件中加入如下配置: 各个配置的解释见:spring boot 附录中的 kafka 配置,搜索kafka 关键字即可定位。

    server.port=8090

    ####### kafka

    producer 配置

    spring.kafka.producer.bootstrap-servers=192.168.10.48:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

    consumer 配置

    spring.kafka.consumer.bootstrap-servers=192.168.10.48:9092 spring.kafka.consumer.group-id=anuoapp spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.max-poll-records=1 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.listener.concurrency=5

  3. 创建 Kafka Producer 生产者

    package com.example.anuoapp.kafka;

    import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture;

    @Component public class KafkaProducer { @Autowired KafkaTemplate kafkaTemplate; public void kafkaSend() throws Exception { UserAccount userAccount=new UserAccount(); userAccount.setCard_name("jk"); userAccount.setAddress("cd"); ListenableFuture send = kafkaTemplate.send("jktopic", "key", JSON.toJSONString(userAccount)); } }

  4. 创建 Kafka Consumer 消费者

    package com.example.anuoapp.kafka;

    import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;

    @Component public class KafkaConsumer {

    public static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
    
    @KafkaListener(topics = {"jktopic"})
    public void jktopic(ConsumerRecord consumerRecord) throws InterruptedException {
    
        System.out.println(consumerRecord.offset());
        System.out.println(consumerRecord.value().toString());
        Thread.sleep(3000);
    
    
    }
    

    }

  5. 创建一个rest api 来调用 Kafka 的消息生产者

    package com.example.anuoapp.controller;

    import com.example.anuoapp.kafka.KafkaProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController;

    @RestController @RequestMapping("/api/system") public class SystemController { private Logger logger = LoggerFactory.getLogger(SystemController.class); @Autowired KafkaProducer kafkaProducer; @RequestMapping(value = "/Kafka/send", method = RequestMethod.GET) public void WarnInfo() throws Exception { int count=10; for (int i = 0; i < count; i++) { kafkaProducer.kafkaSend(); } } }

  6. 用 post man 调用 第 5 步创建的接口, 就可以看到 如下消费者产生的输出信息

    30 {"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false} 31 {"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false} 32 {"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}

最后

恭喜你 ! spring boot kafka 集成完毕。

完整的基础源码见: 链接: https://pan.baidu.com/s/1E2Lmbj9A9uruTXG54uPl_g 密码: e6d6

踩过的坑

  1. 集成 spring kafka的时候注意版本要选对 ,目前spring boot 1.5.1 对应集成 spring kafka 1.x 版本 , 如果直接集成 spring kafka 2.x 版本 的话会报错, 要用 spring kafka 2.x 的话, 需要升级spring boot 到 2.x 版本。

  2. 消费者配置的时候 spring.kafka.consumer.max-poll-records=1 ,轮询拉取的最大记录数要设置成1,不然会出现重复消费,即:如果消费者程序崩了,再启动起来,会消费到以前消费过的数据,造成重复消费。 设置成 1 就没得这个问题了, 不知道是不是 spring kafka 这个lib封装得有问题。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这