Kafka.network包源码解读

Stella981
• 阅读 449

最近阅读了kafka network包的源码,主要是想了解下kafka底层通信的一些细节,这部分都是用NIO实现的,并且用的是最基本的NIO实现模板,代码阅读起来也比较简单。抛开zookeeper这部分的通信不看,我们就看最基本的producer和consumer之间的基于NIO的通信模块。在network中主要包含以下类:Kafka.network包源码解读

我们挑选几个最主要的类说明,先从SocketServer的描述看起:

/**
 * An NIO socket server. The thread model is
 *   1 Acceptor thread that handles new connections
 *   N Processor threads that each have their own selectors and handle all requests from their connections synchronously
 */

在 SocketServer 中采用 processors 数组保存 processor

Private val processors = new Array[Processor](numProcessorThreads)

在AbstractServerThread继承了runnable,其中采用闭锁控制开始和结束,主要作用是为了实现同步。同时打开selector,为后续的继承者使用。

protected val selector = Selector.open();
  protected val logger = Logger.getLogger(getClass())
  private val startupLatch = new CountDownLatch(1)
  private val shutdownLatch = new CountDownLatch(1)
  private val alive = new AtomicBoolean(false)

这个类是后续讲到的两个类的基类,并且闭锁的应用是整个同步作用实现的关键,我们看一组 stratup 的闭锁操作,其中 Unit 在 scala 语法中你可以把他认为是 void ,也就是方法的返回值为空:

/**
   * Wait for the thread to completely start up
   */
  def awaitStartup(): Unit = startupLatch.await

  /**
   * Record that the thread startup is complete
   */
  protected def startupComplete() = {
    alive.set(true)
    startupLatch.countDown
  }

Acceptor继承了AbstractServerThread,虽然叫Acceptor,但是它并没有单独拿出来使用,而是直接被socketServer引用,这点在命名和使用上与一般的通信框架不同:

private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {

这个类中主要实现了ServerSocketChannel的相关工作:

val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    serverChannel.socket.bind(new InetSocketAddress(port))
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    logger.info("Awaiting connections on port " + port)
    startupComplete()

其内部操作和NIO一样:

/*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
    
    val socketChannel = serverSocketChannel.accept()
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)

    if (logger.isDebugEnabled()) {
      logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize() 
          + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]")
    }

    processor.accept(socketChannel)
  }

Procesor类继承了abstractServerThread,其实主要是在Acceptor类中的accept方法中,又新启一个线程来处理读写操作:

private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
                               val time: Time,
                               val stats: SocketServerStats,
                               val maxRequestSize: Int) extends AbstractServerThread

所以整个kafka中使用的NIO的模型可以归结为下图:

Kafka.network包源码解读

socketServer中引用Acceptor处理多个client过来的connector,并为每个connection创建出一个processor去单独处理,每个processor中均引用独立的selector。

整体来说,这样的设计和我们在用NIO写传统的通信没有什么区别,只是这里在同步上稍微做了点儿文章。更详细的网络操作还是请看mina系列的分析。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
Netty(二)
一、先讲下NIO编程。NIO(NonblockI/O),亦叫做非阻塞I/O与Socket类和ServerSocket类相对应,NIO也提供了SocketChannel和ServerSocketChannel两种不同的套接字通道实现。1 缓冲区Buffer这里首先介绍缓冲区的概念,NIO和原I/O的一个重要区别就是NIO库中,所有数据都是用缓
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
2年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
4个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这