Kafka-Broker的基本模块

杏奴
• 阅读 1206

1.SocketServer
SocketServer作为Broker对外提供Socket服务的模块,主要用于接收socket连接的请求,然后产生相应为之服务的SocketChannel对象。
内部主要包括三个模块:
Acceptor主要用于监听Socket连接;
Processor主要用于转发Socket的请求和响应。
RequestChannel主要用于缓存Socket的请求和响应。
1.1Acceptor对象主要功能
(1)开启socket服务
(2)注册Accept事件
(3)监听此ServerChannel上的ACCEPT事件,当其发生时,将其以轮询的方式把对应的 SocketChannel转交给Processor处理线程。
1.2Processor对象主要功能
(1)当有新的SocketChannel对象进来的时候,注册其上的OP_READ事件以便接收客户端的请求。
(2)从RequestChannel中的响应队列获取对应客户端的请求,然后产生OP_WRITE事件。
(3)监听selector上的事件。如果是读事件,说明有新的request到来,需要转移给 RequestChannel的请求队列;如果是写事件,说明之前的request已经处理完毕,需要从 RequestChannel的响应队列获取响应并发送回客户端;如果是关闭事件,说明客户端已经关闭了 该Socket连接,此时服务端也应该释放相关资源。
1.3RequestChannel
本质上就是为了解耦SocketServer和KafkaApis两个模块,内部包含Request的阻塞队列和Response的阻塞队列。
注:SocketServer为了防止空闲连接大量存在,采用了LRU算法,即最近最少使用算法,会将长时间没有交互的SocketChannel对象关闭,及时释放资源。因此Processor仅仅是起到了接收Request,发送Response的作用,其处理Request的具体业务逻辑是由KafkaApis层负责的,并且两者之间是通过RequestChannel相互联系起来的。
Kafka-Broker的基本模块
总结可得,SocketServer负责下面三个方面:
(1)建立Socket,保持和客户端的通信;
(2)转发客户端的Request;
(3)返回Response给客户端。最后通过RequestChannel与其他模块解耦。
2.KafkaRequestHandlerPool
KafkaRequestHandlerPool本质上就是一个线程池,里面包含了num.io.threads 个IO处理线程,默认 为8个。KafkaRequestHandlerPool在内部启动了若干个KafkaRequestHandler处理线程,并将RequestChannel对象和KafkaApis对象传递给了KafkaRequestHandler处理线程,因为KafkaRequestHandler需要从前者的requestQueue中取出Request,并且利用后者来完成具体的业务逻辑。
3.KafkaApis
KafkaApis负责具体的业务逻辑,它主要和Producer、Consumer、Broker Server交互。 KafkaApis主要依赖以下四个组件来完成具体的业务逻辑:
LogManager提供针对Kafka的topic日志的读取和写入功能。
ReplicaManager提供针对topic分区副本数据的同步功能。
OffsetManager提供针对提交至Kafka偏移量的管理功能。
KafkaSchedule为其他模块提供定时的调度和管理功能。
3.1LogManager
LogManager负责提供Broker Server上topic的分区数据读取和写入功能,负责读取和写入位于Broker Server上的所有分区副本数据;如果Partition有多个Replica,则每个Broker Server不会存在相同Partition的Replica;如果存在的话,一旦遇到Broker Server下线,则会立刻丢失Partition的多份副本,失去 了一定的可靠性。
Topic、Partition和Replica三者之间的关联关系:
Kafka-Broker的基本模块
3.2ReplicaManager
ReplicaManager负责提供针对topic的分区副本数据的同步功能,需要针对不同的变化做出及时响应,例如Partition的Replicas发送Leader切换时,Partition的Replicas所在的Broker Server离线的时候,Partition的Replicas发生Follower同步Leader数据异常的时候,等等。
分区两个名词:AR和ISR
AR是Assign Replicas的缩写,代表已经分配给Partition的副本。
ISR是In-Sync Replicas的缩写,代表处于同步状态的副本。
并不是所有的AR都是ISR,尤其是当Broker Server离线的时候会导致对应TopicAndPartition的Replica没有及时同步Leader状态的Replica,从而该Replica不是ISR。
a.ReplicaManager是如何实现Replica数据的同步?
主要利用ReplicaFetcherThread(副本数据拉取线程)和Height Watermark Mechanism(高水位线机制)来实现数据的同步管理。
b.什么是高水位?
本质上代表的是ISR中的所有replicas的last commited message的最小起始偏移量,即在这偏移之前的数据都被ISR所有的replicas所接收,但是在这偏移之后的数据被ISR中的部分replicas所接收。
Kafka-Broker的基本模块
其中RecoverPoint代表的是recover-point-offset-checkpoint文件中记录的偏移量,LogEndOffset代表的是当前TopicAndPartition的replica所接收到消息的最大偏移量,HeightWatermark代表的是已经同步给所有ISR的最小偏移量。Replica的HeightWatermark发生更新在以下两种情况:
(1)Leader状态的Replica接收到其他Follower状态的Replica的FetchRequest请求时,会选择性得更新HeightWatermark。
(2)Follower状态的Replica接收到来自Leader状态的Replica的FetchResponse时,会选择性更新HeightWatermark,即ReplicaFetcherThread内部的processPartitionData流程。
4.OffsetManager
4.1Kafka提供两种保存Consumer偏移量的方法:
(1)将偏移量保存到Zookeeper中。
(2)将偏移量保存至Kafka内部一个名为_consumer_offsets的Topic里面。
将偏移量保存至Zookeeper中是kafka一直就支持的,但是考虑到zookeeper并不太适合大批量的频繁写入操作,大数据培训因此kafka开始支持将Consumer的偏移量保存再Kafka内部的topic中,即_consumer_offsets Topic。当用户配置offsets.storage=kafka时,高级消费者会将偏移量保存至Topic里面,同时通过OffsetManager提供对这些偏移量的管理。
4.2 OffsetManager主要功能
缓存最新的偏移量。
提供对偏移量的查询。
Compact,保留最新的偏移量,以此来控制Topic日志的大小。
Kafka如何将Consumer Group 产生的偏移量信息保存在_consumer_offsets的不同分区?
本质是通过计算不同Consumer Group的hash值和_consumer_offsets的分区数的模数,其结果作为指定分区的索引。
5.KafkaScheduler
KafkaScheduler为其他模块提供定时任务的调度和管理,例如LogManager内部的cleanupLogs定时任务,flushDirtyLogs定时任务和checkpointRecoverPointOffsets定时任务;ReplicaManager模块内部的maybeShrinkIsr定时任务;OffsetManager内部的offsets-cache-compactor定时任务等等。KafkaScheduler内部是基于ScheduledThreadPoolExecutor实现的,对外封装了任务调度的接口schedule,线程个数由参数background.threads决定,默认值为10。
6.KafkaHealthcheck
KafkaHealthcheck主要提供Broker Server健康状态的上报。Broker Server健康状态本质上就是指Broker Server是否在线,如果Broker Server在线,说明处于健康状态,如果Broker Server离线,说明处于死亡状态。
Broker Server如何上报健康状态?
BrokerChangeListener通过监听目录为/brokers/ids的zookeeper路径,当发生有数据变化时,则获取当前目录下的数据,从而获取当前集群的在线Broker Server列表。而KafkaHealthcheck正是提供了在目录为/brokers/ids的Zookeeper路径上注册节点的能力,该节点所在路径为EphemeralPath(非永久路径),当Broker Server由于异常情况导致下线时,此EphemeralPath随着Broker Server和zookeeper链接的断开而消失。
7.TopicConfigManager
kafka提供对topic配置参数的在线修改能力,修改完成之后无需重新启动kafka集群,在线生效。Topic配置参数包括:数据文件的大小,索引文件的大小,索引项的大小,索引项的粒度,日志文件保留的策略等等;
Topic的配置参数位于路径为/config/topics/[topic]的zookeeper上,Broker Server内部为了避免针对每个Topic都在相关路径上建立监听器,对外提供了一个被通知的路径,其位于/brokers/config_changes,如果检测到该路径 上发生变化,则读取该路径上的数据,获取配置文件待更新的Topic,然后再从/config/topics/[topic]上加载最新的配置文件。

点赞
收藏
评论区
推荐文章
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
待兔 待兔
1年前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
4年前
java通过ServerSocket与Socket实现通信
首先说一下ServerSocket与Socket.1.ServerSocketServerSocket是用来监听客户端Socket连接的类,如果没有连接会一直处于等待状态.ServetSocket有三个构造方法:(1)ServerSocket(intport);这个使用指定的端口
Wesley13 Wesley13
4年前
java学习 网络编程 tcp
有客户端和服务端,使用tcp传输day26 27//客户端发数据到服务端/\\Tcp传输,客户端建立的过程。\1,创建tcp客户端socket服务。使用的是Socket对象。\建议该对象一创建就明确目的地。要连接的主机。\2,如果连接建立成功,说明数据传输通道已建立。\该通道就是socket流
Easter79 Easter79
4年前
Tcp服务端判断客户端是否断开连接
   今天搞tcp链接弄了一天,前面创建socket,绑定,监听等主要分清自己的参数,udp还是tcp的。好不容易调通了,然后就是一个需求,当客户端主动断开连接时,服务端也要断开连接,这样一下次客户端请求链接的时候才能成功链接。   然后就开始找各种方法。其中简单的是看recv()返回为0,表明断开了链接,但是recv函数始终返回SOCKET\
Stella981 Stella981
4年前
Android Socket 通信
Androidsocket通信安卓编写Socket客户端,实现连接Socket服务端通信。创建Socket连接并获取服务端数据先创建几个全局变量吧privateBufferedWriterwriternull;Socketsocket;
Stella981 Stella981
4年前
Muduo网络库源码分析之Acceptor和TcpServer
Acceptor用于accept一个TCP连接,accept接受成功后通知TCP连接的使用者。Acceptor主要是供TcpServer使用的,其生命期由后者控制。一个Acceptor相当于持有服务端的一个socket描述符,该socket可以accept多个TCP客户连接,这个accept操作就是A
Wesley13 Wesley13
4年前
KAFKA官方教程笔记
 因为kafka配置较多,所以单独写一篇博客来记录。       1,broker配置   主要的配置项有三个broker.id集群内唯一log.dir数据目录zookeeper.connectzookeeper连接地址Topiclevelconfigurationsanddefaultsa
Stella981 Stella981
4年前
Flink实战(八)
1概览1.1预定义的源和接收器Flink内置了一些基本数据源和接收器,并且始终可用。该预定义的数据源包括文件,目录和插socket,并从集合和迭代器摄取数据。该预定义的数据接收器支持写入文件和标准输入输出及socket。1.2绑定连接器连接器提供用于与各种第三方系统连接的代码。目
Stella981 Stella981
4年前
Noark入门之协议映射
0x00消息控制器消息控制器,主要作用就是为每个模块提供消息处理的入口.这里的消息不仅仅是协议,还有内部指令,事件等等逻辑入口,这也是为了响应线程模型作出的一种支撑,只要入口在此消息控制器内,那必然走期望的线程调度。@Controller用于标识一个类为当前模块的消息控制器入口.@Controller(threadGroup
桥蕤 桥蕤
1年前
Python socket、multiprocessing和threading模块的使用
1.socketPython用socket模块可以实现简单的网络通信1.1创建客户端pythonimportsocketdefclientsocket():定义客户端clientsocket.socket(socket.AFINET,socket.SOCK