Kafka消息存储之FileMessageSet

点击者
• 阅读 3935

摘要

看过前面几篇博客的盆友可能会问,逼逼了这么多还不知道消息到底存到哪儿了,分明标题党嘛。这一次我们就来看与存储切实相关的底层操作类FileMessageSet。它是MessageSet的一个子类,操作消息和文件之间的读写操作。想想我们也知道,这特么就是要写增删改查啊。这一次的代码确实没啥好说的,但是FileMessageSet确是比较重要的一个类,还是简短讲一下吧。

FileMessageSet的功能

  • 消息的增删改查

  • 进行必要的检查,比如是否是指定的消息格式(检查Magic值)

  • 进行消息格式的转换

对于最核心的功能——增删改查,我们在这里进一步展开。首先FileMessageSet只处理最外层的消息,而不考虑嵌套的消息,嵌套消息会移交给之前的ByteBufferMessageSet处理。某种程度上,我们也可以把ByteBufferMessageSet看做是嵌套消息。

FileMessageSet的删除也分为两种,一种是从特定位置截断,一种是直接删除整个文件。其查询主要是从消息的序号也就是offset获得其在文件中的位置。其增加只允许向尾部追加,若想在中间添加,必须先截断。

我们列一下几个重要的原子操作吧

  • read(buffer,position,length),read(position,length):FileMessageSet

  • writeTo(channel,position,size)

  • truncate(size)

  • search(offset):position

  • close

  • flush

FileMesssage的设计

FileMessageSet使用FileChannel来进行读写,我们的操作依赖于position进行,需要首先定位。同样,FileMessageSet允许支持切片,也就是截取文件中的一部分,指定start和end。但是这样每次检查末尾都需要考虑end了。

这里首先要注意的第一点是channel的游标应该始终定位在set的尾部,这是为了保证写入是顺序的,所以在初始化的时候就应该将游标移到尾部。

第二点是在关闭channel的时候需要先做flush然后截断。这一点可能不太好理解,这里举个例子,如果我使用了分片,并在位置end后写入了一条新消息,由于必须保证消息是有序的,所以后面所有的消息必须丢弃。这也是保证消息的顺序写特性。


     def close() {
        flush()
        trim()
        channel.close()
      }

第三点是迭代的过程,这里面几乎所有的原子操作均是从遍历实现的,遍历中需要进行较多的检查操作,主要是以下几点。

  • 如果当前读取的messageSize小于最小的消息头大小,说明消息出现错误

  • 如果当前读取的messageSize大于剩余的容量,说明最后一条消息不完整

  • 如果剩下的容量小于offsetSize+MessageSizeLength,说明已经没有消息了

但是这里的容量需要同时考虑指定的end和channel的结尾,下面以生成迭代器为例。


    override def makeNext(): MessageAndOffset = {
            //最后一条消息出现在end之后
            if(location + sizeOffsetLength >= end)
              return allDone()
    
    
            // read the size of the item
            sizeOffsetBuffer.rewind()
            channel.read(sizeOffsetBuffer, location)
    
            //最后一条消息出现在下一文件中
            if(sizeOffsetBuffer.hasRemaining)
              return allDone()
    
            sizeOffsetBuffer.rewind()
            val offset = sizeOffsetBuffer.getLong()
            val size = sizeOffsetBuffer.getInt()
    
            //最后一条消息被end截断或消息大小出现问题
            if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size > end)
              return allDone()
          //消息过大
            if(size > maxMessageSize)
              throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
    
            // read the item itself
            val buffer = ByteBuffer.allocate(size)
            channel.read(buffer, location + sizeOffsetLength)
    
            //最后一条消息被文件截断
            if(buffer.hasRemaining)
              return allDone()
            buffer.rewind()
    
            // increment the location and return the item
            location += size + sizeOffsetLength
            new MessageAndOffset(new Message(buffer), offset)
          }

第四条是追加是以ByteBufferMessageSet为单位的,这主要是将嵌套消息和一般消息还有批量写入统一在一个方法下。

第五条是一个有趣的代码细节

def delete(): Boolean = {
    CoreUtils.swallow(channel.close())
    file.delete()
  }

def swallow(log: (Object, Throwable) => Unit, action: => Unit) {
    try {
      action
    } catch {
      case e: Throwable => log(e.getMessage(), e)
    }
  }

这里将代码块包裹在try catch中,通过这种方法调用的形式,非常简洁优美,有点类似于使用AOP收集异常,值得借鉴。

消息读入的过程

写到这儿,让我们来回顾一下整个消息存储的内容并整理出完整的流程吧。

  1. 首先FileMessageSet读取最外层消息

  2. 若该消息是嵌套消息,则生成ByteBufferMessageSet解压缩并生成原子消息集

  3. 通过调用message自身的方法进行检验和获取基本信息比如消息格式

  4. 通过MessageAndMeta加上译码器获得key-value对象

消息写入的过程

  1. 首先MessageWriter写入key-value和消息头生成buffer

  2. 对于嵌套消息使用刚刚的buffer生成 ByteBufferMessageSet并convert压缩成新的ByteBufferMessageSet

  3. 再使用FileMessageSet追加ByteBUfferMessageSet

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Karen110 Karen110
4年前
​一篇文章总结一下Python库中关于时间的常见操作
前言本次来总结一下关于Python时间的相关操作,有一个有趣的问题。如果你的业务用不到时间相关的操作,你的业务基本上会一直用不到。但是如果你的业务一旦用到了时间操作,你就会发现,淦,到处都是时间操作。。。所以思来想去,还是总结一下吧,本次会采用类型注解方式。time包importtime时间戳从1970年1月1日00:00:00标准时区诞生到现在
Stella981 Stella981
4年前
Kafka(3)
消息的存储原理:  消息的文件存储机制:  前面我们知道了一个topic的多个partition在物理磁盘上的保存路径,那么我们再来分析日志的存储方式。通过   \root@localhost~\ls/tmp/kafkalogs/firstTopic1/命令找到对应partition下的日志内容
Wesley13 Wesley13
4年前
MQ消息中间件,面试能问些什么?
MQ消息中间件,面试能问些什么?为什么使用消息队列?消息队列的优点和缺点?kafka、activemq、rabbitmq、rocketmq都有什么优缺点?面试官角度分析:(1)你知不知道你们系统里为什么要用消息队列这个东西?(2)既然用了消息队列这个东西,你知不知道用了有什么好处?(3
Stella981 Stella981
4年前
RabbitMQ之消息确认机制(事务+Confirm)
概述在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有
Stella981 Stella981
4年前
Kafka 简介
Kafka简介_Kafka是分布式流平台。_一个流平台有3个主要特征:发布和订阅消息流,这一点与传统的消息队列相似。以容灾持久化方式的消息流存储。在消息流发生时处理消息流。Kafka通常使用在两大类应用中:在系统或应用之间,构建实时、可靠的消息流管道。构建实时流应用
Stella981 Stella981
4年前
Kafka 消息存储与索引设计
消息中间件的性能好坏,它的消息存储的机制是衡量该性能的最重要指标之一,而Kafka具有高性能、高吞吐、低延时的特点,动不动可以上到几十上百万TPS,离不开它优秀的消息存储设计。下面我按照自己的理解为大家讲解Kafka消息存储设计的那些事。在Kafka的设计思想中,消息的存储文件被称作日志,我们Java后端绝大部分人谈到日志,一般会联想到
Stella981 Stella981
4年前
Kafka实战解惑
一、Kafka简介Kafka是LinkedIn使用Scala开发的一个分布式消息中间件,它以水平扩展能力和高吞吐率著称,被广泛用于日志处理、ETL等应用场景。Kafka具有以下主要特点:\\消息的发布、订阅均具有高吞吐量:\\据统计数字表明,Kafka每秒可以生产约25万消息(50MB),每秒处理55万消息(110MB)。
Stella981 Stella981
4年前
Kafka 原理详解
Kafka原理详解1kakfa基础概念说明Broker:消息服务器,就是我们部署的一个kafka服务Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存储Replica:消息的副本,即备份消息,存储在其他的broker上,当leader挂掉
Wesley13 Wesley13
4年前
Java版Kafka使用及配置解释
Java版Kafka使用及配置解释一.Java示例kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考。kafka的安装请参考官方文档。引入Maven库首先我们需要新建一个maven项目,然后在pom中