DM 源码阅读系列文章(七)定制化数据同步功能的实现

AlgoZenith
• 阅读 1240

作者:王相

本文为 DM 源码阅读系列文章的第七篇,在 上篇文章 中我们介绍了 relay log 的实现,主要包括 relay log 目录结构定义、relay log 数据的处理流程、主从切换支持、relay log 的读取等逻辑。本篇文章我们将会对 DM 的定制化数据同步功能进行详细的讲解。

在一般的数据同步中,上下游的数据是一一对应的,即上下游的库名、表名、列名以及每一列的值都是相同的,但是很多用户因为业务的原因希望 DM 在同步数据到 TiDB 时进行一些定制化的转化。下面我们将主要介绍数据同步定制化中的库表路由(Table routing)、黑白名单(Black & white table lists)、列值转化(Column mapping)、binlog 过滤(Binlog event filter)四个主要功能的实现。值得注意的是,由于其他一些工具(例如 TiDB Lightning 和 TiDB Binlog)也需要类似的功能,所以这四个功能都以 package 的形式维护在 tidb-tools 项目下,这样方便使用和维护。

库表路由(Table routing

库表路由顾名思义就是对库名和表名根据一定的路由规则进行转换。比如用户在上游多个 MySQL 实例或者 schema 有多个逻辑上相同的表,需要把这些表的数据同步到 TiDB 集群的同一个表中,这个时候就可以使用 table-router 功能,如下图所示:

DM 源码阅读系列文章(七)定制化数据同步功能的实现

该功能实现在 pkg/table-router 中,库表路由的规则定义在结构 TableRule 中,其中的属性 SchemaPatternTablePattern 用于配置原库名和表名的模式,TargetSchemaTargetTable 用于配置目标库和表名,即符合指定 pattern 的库和表名都将转化成目标库名和表名。

使用结构 Table 对路由规则进行维护,Table 提供了如下方法:

方法 说明
AddRule 增加规则
UpdateRule 修改规则
RemoveRule 删除规则
Route 获取路由后的结果

Table 结构中组合了 SelectorSelector 用于管理指定模式的库、表的规则,提供如下方法:

方法 说明
Insert 增加规则
Match 查找指定的库、表匹配到的规则
Remove 删除规则
AllRules 返回所有的规则

Selector 的底层实现是 trieSelector,使用了单词查找树的结构来维护库、表与规则的对应关系,感兴趣的同学可以阅读代码深入了解一下。 trieSelector 中使用 cache 缓存了库、表到规则的映射关系,这样可以减少相同库、表匹配规则的资源消耗。除了 table routing,以下的列值转化和 binlog 过滤功能也都使用了 Selector,在下面的介绍中就不再赘述。

黑白名单(black & white table lists

黑白名单功能用来选择同步哪些库和表,以及不同步哪些库和表,这部分代码维护在 pkg/filter 中。

黑白名单规则配置在 Rules 结构中,该结构包括 DoTablesDoDBsIgnoreTablesIgnoreDBs 四个属性,下面以判断表 test.t 是否应该被过滤的例子说明配置的作用:

  1. 首先 schema 过滤判断。

    • 如果 do-dbs 不为空,则判断 do-dbs 中是否存在一个匹配的 schema。

      • 如果存在,则进入 table 过滤判断。
      • 如果不存在,则过滤 test.t
  • 如果 do-dbs 为空并且 ignore-dbs 不为空,则判断 ignore-dbs 中是否存在一个匹配的 schema。

    • 如果存在,则过滤 test.t
    • 如果不存在,则进入 table 过滤判断。
    • 如果 do-dbsignore-dbs 都为空,则进入 table 过滤判断。
  • 进行 table 过滤判断。

    • 如果 do-tables 不为空,则判断 do-tables 中是否存在一个匹配的 table。

      - 如果存在,则同步 `test.t`。
      - 如果不存在,则过滤 `test.t`。
    • 如果 ignore-tables 不为空,则判断 ignore-tables 中是否存在一个匹配的 table。

      - 如果存在,则过滤 `test.t`。
      - 如果不存在,则同步 `test.t`。
    • 如果 do-tablesignore-tables 都为空,则同步 test.t
  • 使用 Filter 对黑白名单进行管理,Filter 提供了 ApplyOn 方法来判断一组 table 中哪些表可以同步。

    列值转化(Column mapping

    列值转化功能用于对指定列的值做一些转化,主要用于分库分表的同步场景。比较典型的场景是:在上游分表中使用自增列作为主键,这样数据在同步到 TiDB 的一个表时会出现主键冲突,因此我们需要根据一定规则对主键做转化,保证每个主键在全局仍然是唯一的。

    该功能实现在 pkg/column-mapping 中的 PartitionID:修改列的值的最高几位为 PartitionID 的值(只能作用于 Int64 类型的列)。

    代码中使用 Rule 来设置 column mapping 的规则,Rule 的属性及说明如下表所示:

    属性 说明
    PatternSchema 匹配规则的库的模式 可以设置为指定的库名,也可以使用通配符 “*” 和 “?”
    PatternTable 匹配规则的表的模式 可以设置为指定的表名,也可以使用通配符 “*” 和 “?”
    SourceColumn 需要转化的列 列名
    TargetColumn 转化后的值保存到哪个列 列名
    Expression 转化表达式 目前只支持 PartitionID
    Arguments 转化所需要的参数 Expression 为 PartitionID,参数为 InstanceID、schema 名称前缀、table 名称前缀以及前缀与 ID 的分割符号

    Expression 为 PartitionID 的配置和转化的计算方式都较为复杂,下面举个例子说明。

    例如 Arguments 为 [1, “test”, “t”, “_”]1 表示数据库实例的 InstanceID“test” 为库名称的前缀,“t” 为表名称的前缀,“_” 为前缀与 ID 的分隔符,则表 test_1.t_2SchemaID1TableID2。转化列值时需要对 InstanceIDSchemaIDTableID 进行一定的位移计算,然后与原始的值进行或运算得出一个新的值。对于具体的计算方式,可以查看代码 partitionIDcomputePartitionID。下面是一个 PartitionID 逻辑简化后的示意图:

    DM 源码阅读系列文章(七)定制化数据同步功能的实现

    使用 Mapping 结构对 column mapping 的规则进行管理,Mapping 提供列如下方法:

    方法 说明
    AddRole 增加规则
    UpdateRule 修改规则
    RemoveRule 删除规则
    HandleRowValue 获取转化结果

    binlog 过滤(binlog event filter

    binlog 过滤功能支持过滤指定类型的 binlog,或者指定模式的 query,该功能维护在 pkg/binlog-filter 中。某些用户不希望同步一些指定类型的 binlog,例如 drop table 和 truncate table,这样就可以在下游仍然保存这些表的数据作为备份,或者某些 SQL 语句在 TiDB 中不兼容,希望可以在同步中过滤掉,都可以通过配置 binlog event filter 功能来实现。

    DM 源码阅读系列文章(七)定制化数据同步功能的实现

    首先需要对 binlog 进行分类,可以查看代码 Event Type List。然后再定义过滤规则 BinlogEventRule,包括以下属性:

    属性 说明
    SchemaPattern 匹配规则的库的模式 可以设置为指定的库名,也可以使用通配符 “*” 和 “?”
    TablePattern 匹配规则的表的模式 可以设置为指定的表名,也可以使用通配符 “*” 和 “?”
    Events 规则适用于哪些类型的 binlog binlog event 的类型
    SQLPattern 匹配的 SQL 的模式 SQL 语句的模式,支持适用正则表达式
    Action 是否对符合上面要求的 binlog 进行过滤 Ignore 或者 Do

    例如,TiDB 对 ADD PARTITIONDROP PARTITION 语句不兼容,在同步时需要过滤掉相关的 SQL 语句,就可以在 DM 中使用如下配置:

    filter-partition-rule:
        schema-pattern: "*"
        sql-pattern: ["ALTER\\s+TABLE[\\s\\S]*ADD\\s+PARTITION", "ALTER\\s+TABLE[\\s\\S]*DROP\\s+PARTITION"]
        action: Ignore

    如果需要过滤掉所有的 DROP DATABASE 语句,则可以在 DM 中使用如下配置:

     filter-schema-rule:
        schema-pattern: "*"
        events: ["drop database"]
        action: Ignore
    

    代码中通过 BinlogEvent 结构对 binlog event 过滤规则做统一的管理,BinlogEvent 提供了如下的方法:

    方法 说明
    AddRule 增加规则
    UpdateRule 修改规则
    RemoveRule 删除规则
    Filter 判断指定的 binlog 是否应该过滤

    小结

    以上就是定制化数据同步功能中库表路由(Table routing)、黑白名单(Black & white table lists)、列值转化(Column mapping)、binlog 过滤(Binlog event filter)的实现介绍。欢迎大家阅读相关代码深入了解,也欢迎给我们提 pr 优化代码。下一篇我们将介绍 DM 是如何支持上游 online DDL 工具(pt-osc,gh-ost)的 DDL 同步场景的。

    原文阅读https://www.pingcap.com/blog-cn/dm-source-code-reading-7/

    DM 源码阅读系列文章(七)定制化数据同步功能的实现

    点赞
    收藏
    评论区
    推荐文章
    blmius blmius
    4年前
    MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
    文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
    美凌格栋栋酱 美凌格栋栋酱
    7个月前
    Oracle 分组与拼接字符串同时使用
    SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
    Wesley13 Wesley13
    3年前
    MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
    背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
    皕杰报表之UUID
    ​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
    Jacquelyn38 Jacquelyn38
    4年前
    2020年前端实用代码段,为你的工作保驾护航
    有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
    Stella981 Stella981
    3年前
    Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
    Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
    Stella981 Stella981
    3年前
    KVM调整cpu和内存
    一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
    Wesley13 Wesley13
    3年前
    DM 源码阅读系列文章(一)序
    作者:杨非前言TiDBDM(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fgithub.com%2Fpingcap%2Fdm)是由PingCAP开发的一体化数据同步任务管理平台,支持从MySQL或MariaDB到TiDB的全量数据迁移和增量数据
    Easter79 Easter79
    3年前
    Twitter的分布式自增ID算法snowflake (Java版)
    概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
    Stella981 Stella981
    3年前
    Django中Admin中的一些参数配置
    设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
    Python进阶者 Python进阶者
    1年前
    Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
    大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这