Canal 如何保证数据库库事务的一致性

Stella981
• 阅读 398

点击上方“中间件兴趣圈”,选择“设为星标”

做积极的人,越努力越幸运!Canal 如何保证数据库库事务的一致性 本文将主要介绍在 EventParser binlog 日志同步流程中关于环形缓存区的使用技巧。

1、环形缓存区


关系型数据库讲究的是ACID 4个特性,故引入了数据库事务的概念,一个数据库事务中的多条SQL引发的多条数据变更要么全部成功,要么全部失败,即数据的一致性,那同样在数据同步的场景,在解析一个事务的 binlog 日志时,一次数据同步应该至少以事务为单位,一个事务内的所有 Event 应该作为一个批次提交到数据消费端,让消费端有能力一次同步一个事务中的数据,而不是一条一条变更日志的处理,这样容易造成数据不一致。

环形缓存区的引用就是为了解决将一个事务的完整数据一次提交到消费端,既然是多条消息,故一定需要用到缓存,环形缓存区就在这样的背景下被引入。

在 Canal 中关于事务 Event 的环形缓存区实现类为 EventTransactionBuffer。

1.1 类图

EventTransactionBuffer 的类图如下:

Canal 如何保证数据库库事务的一致性

根据类图我们可以到其存储结构还是比较简单的。

  • int bufferSize环形缓存区的长度,默认为 1024,该长度必须为 2 的幂次方,因为对位运算非常友好。

  • int indexMask环形缓存区下标掩码,其值为 bufferSize - 1 ,sequence * indexMask 能快速定位序号 sequence 所在环形缓存区中的具体下标。

  • CannalEntry.Entry[] entries环形缓存区数据数组,即缓存区实际存储数据的内存区域,为数组结构,长度为 bufferSize。

  • AtomicLong putSequence当前写入的序号,每调用 add 方法添加一条数据,该值增加一,可超过缓存区的实际长度。

  • AtomicLong flushSequence当前已处理的数据序号,flushSequence <= putSequence,(putSequence - flushSequence)表示未处理的数据,即缓存区累积的有效数据。

  • TransactionFlushCallback flushCallbackflush 回调函数,这个和环形缓存区本身关系不大,这个与 Canal 特定业务的,环形缓存区中收集到一个完整的事务变更日志列表后,将这部分内容传入业务回调方法,并重新利用这些缓存空间。

环形缓存区的重大要义就是循环利用。

1.2 环形缓存区存储实现

接下来我们通过其 add 方法来看一下环形缓存区的,在研究环形缓存区之前,将结合8个元素的环形缓存区进行讲解。

Canal 如何保证数据库库事务的一致性

EventTransactionBuffer 的 add 方法代码如下:

Canal 如何保证数据库库事务的一致性

首先根据 binlog 事件类型来决定是否调用 flush 方法,这个就是实现将一个事务的事务一起提交到消费端,回到环形缓存区的具体实现,我们重点关注 put 方法 与 flush 方法的实现。

EventTransactionBuffer#put

Canal 如何保证数据库库事务的一致性

其实现的核心步骤:

  1. 检测当前环形缓存区是否已满,如果未满,则向缓存区中添加一条数据。添加数据的具体逻辑:
  • 获取下一个写入的序号 next,等于当前已写入的序号 + 1,即 putSequence + 1。

  • 通过 next & indexMask 取得放入 CannalEntry.Entry[] entries 中的下标,与 next % bufferSize 效果等同。

  1. 如果已满,则首先将缓存区中的数据刷新,即将未处理的数据全部抽取,提交到数据消费方,然后释放缓存区,继续添加数据。

关键在于如何判断环形缓存区已满,具体算法如下:

Canal 如何保证数据库库事务的一致性

EventTransactionBuffer#checkFreeSlotAt

为了加强对这段代码的理解,我举一个示例,在一个8个元素的环形缓存区中,假设一个事务包含5条日志,首先依次写入5条日志,其环形缓存区如下:

Canal 如何保证数据库库事务的一致性

此时 putSequence 为 4,flushSequence 为 -1 ,我们应该能发现,在第一轮时,由于 sequeue 小于 bufferSize ,如果不执行 flush 操作,连续写入 8条数据,sequence = 7 时,sequence - bufferSize > flushSequence 这个表达式都不会满足,即代表缓存区未满,但在写入第9条消息时,sequence = 8 ,此时 sequence - bufferSize > flushSequence 已满足,即缓存区已满,需要先刷新数据,然后才能再填充。

再回到本示例中,一个事务只包含5条日志,在写满 5条日志后会即调用 flush 方法,将环形缓存区中下标为 0~4 的消息传入数据消费方,在 Canal 中会将这批消息一次传入 EventSink 组件。执行完 flush 方法后,flushSequence 等于4,其环形缓存区如下图所示:

Canal 如何保证数据库库事务的一致性

此时 putSequence = flushSequence = 4,那这个时候环形缓存区的容量是多少呢? 其实就是又恢复到 bufferSize 了,那我们怎么计算环形缓存区当前已写入的消息呢? 其实很简单,putSequence - flushSequence 表示已写入的元素数量。 那当前剩余容量就等于 bufferSize - (putSequence - flushSequence ),即只需要 bufferSize - (putSequence - flushSequence ) > 0 就表示有剩余空闲。 有了这一层思路,就能明白 checkFreeSlotAt 的算法,这也是环形缓存区的核心所在。

思考,Canal 基于环形缓存区的实现,一定能保证一个事务的所有变更日志都一次提交到 EventSink 组件吗,大家可以简单思考一下,在文末的总结部分有笔者的思考。

答案是否定的,如果一个事务包含的日志条目超过了环形缓存区的长度,为了保证数据不丢失,会首先将环形缓存区的数据全部提交,然后接收新的数据,这样一个事务中的消息会被分成多次提交到 EventSink。

原创不易,如果对你有所帮助请你为本文点个【在看】吧,这将是我写作更多优质文章的最强动力。


欢迎加入我的知识星球,一起交流源码,探讨架构,揭秘亿级订单的架构设计与实践经验,打造高质量的技术交流圈,为广大星友提供高质量问答服务,长按如下二维码加入。

Canal 如何保证数据库库事务的一致性

本文分享自微信公众号 - 中间件兴趣圈(dingwpmz_zjj)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
刚刚好 刚刚好
4个月前
css问题
1、在IOS中图片不显示(给图片加了圆角或者img没有父级)<div<imgsrc""/</divdiv{width:20px;height:20px;borderradius:20px;overflow:h
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
小森森 小森森
4个月前
校园表白墙微信小程序V1.0 SayLove -基于微信云开发-一键快速搭建,开箱即用
后续会继续更新,敬请期待2.0全新版本欢迎添加左边的微信一起探讨!项目地址:(https://www.aliyun.com/activity/daily/bestoffer?userCodesskuuw5n)\2.Bug修复更新日历2.情侣脸功能大家不要使用了,现在阿里云的接口已经要收费了(土豪请随意),\\和注意
晴空闲云 晴空闲云
4个月前
css中box-sizing解放盒子实际宽高计算
我们知道传统的盒子模型,如果增加内边距padding和边框border,那么会撑大整个盒子,造成盒子的宽度不好计算,在实务中特别不方便。boxsizing可以设置盒模型的方式,可以很好的设置固定宽高的盒模型。盒子宽高计算假如我们设置如下盒子:宽度和高度均为200px,那么这会这个盒子实际的宽高就都是200px。但是当我们设置这个盒子的边框和内间距的时候,那
艾木酱 艾木酱
3个月前
快速入门|使用MemFire Cloud构建React Native应用程序
MemFireCloud是一款提供云数据库,用户可以创建云数据库,并对数据库进行管理,还可以对数据库进行备份操作。它还提供后端即服务,用户可以在1分钟内新建一个应用,使用自动生成的API和SDK,访问云数据库、对象存储、用户认证与授权等功能,可专
Stella981 Stella981
1年前
Canal 初次启动时如何定位同步位点(文末附流程图)
点击上方“中间件兴趣圈”,选择“设为星标”做积极的人,越努力越幸运!!(https://oscimg.oschina.net/oscnet/c6fd38c9376ee653af3b103bc9b8f65296c.png)(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fmp.wei
Stella981 Stella981
1年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Stella981 Stella981
1年前
Canal binlog 日志 Dump 流程分析
点击上方“中间件兴趣圈”,选择“设为星标”做积极的人,越努力越幸运!!(https://oscimg.oschina.net/oscnet/034e6b7d680e414332a79a68829ec542244.png)(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fmp.wei
Stella981 Stella981
1年前
Canal binlog 日志管理器与GTID简介
点击上方“中间件兴趣圈”,选择“设为星标”做积极的人,越努力越幸运!!(https://oscimg.oschina.net/oscnet/5e255a9fa4595f0a585dbd97af6b9315b9a.png)(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fmp.wei
helloworld_28799839 helloworld_28799839
4个月前
常用知识整理
Javascript判断对象是否为空jsObject.keys(myObject).length0经常使用的三元运算我们经常遇到处理表格列状态字段如status的时候可以用到vue