如何用gpss实现MySQL到Greenplum的增量同步

ElasticSearch
• 阅读 4724

​数据同步一般分为两种方式:全量和增量。增量数据是一类典型的流数据,基于日志的增量同步几乎已经是所有数据库的标配,它可以减少常规ETL工作对系统带来的影响,并大大降低数据的延迟。作为Greenplum的流计算引擎,Greenplum Stream Server(gpss)能将不同源端的增量数据同步到Greenplum中。为更好的支持这一应用场景,即将发布的gpss 1.3.6 对增量同步的功能做了增强。

Greenplum Stream Server(简称gpss),是Greenplum的下一代数据加载解决方案,相比于gpfdist,GPSS会提供流数据支持及API接口,有更好的扩展性,支持更丰富的功能,并开放更细粒度的任务控制接口。在即将发布gpss 1.3.6 中,对增量同步所做的的功能增强包括:

  • 可以根据指定的递增排序字段,确保最新的消息生效
  • Merge可支持insert,update和delete三种操作

本文将以MySQL为例,简要介绍下gpss如何实现向Greenplum的增量同步。

1测试环境

  • MySQL 8.0.13
  • Maxwell 1.25.0
  • Kafka 2.2.2
  • Greenplum 6.4.0
  • GPSS 1.3.6

我们要完成的工作是:

  • 通过Maxwell监听MySQL中binlog的增量变化(略)
  • 将增量数据以json的格式发送到kafka中(略)
  • 利用gpss解析kafka中的json消息
  • 将变化的数据更新到Greenplum的目标表中

MySQSL和Maxwell的配置和使用,本文将不做深入介绍,大家可以自行访问文章链接阅读学习,访问相关文章请点击文章底部的“阅读原文”。

2 测试数据简介

测试使用的表在MySQL中定义如下:

​create table t_update_delete_0 (k1 decimal,
               k2 text,
               v1 decimal,
               v2 decimal,
               v3 text,
               c1 decimal,
               c2 text);

其中 k1 和 k2 列为键,用来唯一标识一条记录, v1, v2, v3 为每次更新的数据。

在源端分别对这个表进行了insert,update和delete操作,每个语句为单独的transaction。

Insert语句为:

insert into t_update_delete_0 (k1,k2,v1,v2,v3,c1,c2) 
        values (1,'k_1', 1, 3, 'v_1', 1, 'c1');

Update语句为:

update t_update_delete_0 
      set v1=100,v2=300,v3='v_100' 
      where k1='1' and k2='k_1';

Delete语句为:

delete from t_update_delete_0 where k1='1' and k2='k_1';

3 Kafka的消息格式

Maxwell可以将捕获到binlog解析为json格式并发送到kafka,不同的操作生成的Kafka消息有细微的区别。为了将这些消息正确的恢复到Greenplum中,我们先对这三种类型的消息进行简单的分析。

Insert时生成的消息示例如下:

{
  "database": "test",
  "table": "t_update_delete_0",
  "type": "insert",
  "ts": 1586956781,
  "xid": 1398209,
  "commit": true,
  "data": {
    "k1": 41,
    "k2": "k_41",
    "v1": 818,
    "v2": 2454,
    "v3": "v_818",
    "c1": 41,
    "c2": "c_41"
  }
}

database和table表示源表的表名,ts和xid字段用于表示消息的顺序,type和data表示执行的操作及对应的数据。这些是所有消息类型通用的。

Delete生成的消息如下,type为"delete",同时data中包含了完整的内容。

{
  "database": "test",
  "table": "t_update_delete_0",
  "type": "delete",
  "ts": 1586956781,
  "xid": 1398195,
  "commit": true,
  "data": {
    "k1": 44,
    "k2": "k_44",
    "v1": 744,
    "v2": 2232,
    "v3": "v_744",
    "c1": 44,
​    "c2": "c_44"
  }
}

Update除了包含新数据外,还包含了更新之前的数据(old),这里我们只需要新数据就够了。

{
  "database": "test",
  "table": "t_update_delete_0",
  "type": "update",
  "ts": 1586956707,
  "xid": 1281915,
  "commit": true,
  "data": {
    "k1": 99,
    "k2": "k_99",
    "v1": 798,
    "v2": 2394,
    "v3": "v_798",
    "c1": 99,
    "c2": "c_99"
  },
  "old": {
    "v1": 800,
    "v2": 2400,
    "v3": "v_800"
  }
}

根据生成的消息,我们需要执行如下操作:

  • 根据ts和xid对数据进行排序
  • 根据k1和k2进行匹配
  • 对type为delete的列执行删除操作
  • 对其它type类型执行Merge(upsert)操作

4 执行gpss的Kafka JOB

Greenplum中的定义包含了排序的字段,用来区分消息更新的先后顺序,定义如下:

create table t_update_delete_0 (k1 decimal,
               k2 text,
               v1 decimal,
               v2 decimal,
               v3 text,
               c1 decimal,
               c2 text,
               ts decimal,
               xid decimal,
               del_mark boolean);

根据数据同步的需求,gpss需要的yaml配置文件如下:

DATABASE: test
USER: gpadmin
HOST: mdw
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: kafkahost:9092
        TOPIC: test
      VALUE:
        COLUMNS:
          - NAME: c1
            TYPE: json
        FORMAT: json
      ERROR_LIMIT: 100
   OUTPUT:
      MODE: MERGE
      MATCH_COLUMNS:
        - k1
        - k2
      UPDATE_COLUMNS:
        - v1
        - v2
        - v3
      ORDER_COLUMNS:
        - ts
        - xid
      DELETE_CONDITION: del_mark
      TABLE: t_update_delete_0
      MAPPING:
         k1 : (c1->'data'->>'k1')::decimal
         k2 : (c1->'data'->>'k2')::text
         v1 : (c1->'data'->>'v1')::decimal
         v2 : (c1->'data'->>'v2')::decimal
         v3 : (c1->'data'->>'v3')::text
         c1 : (c1->'data'->>'c1')::decimal
         c2 : (c1->'data'->>'c2')::text
         ts : (c1->>'ts')::decimal
         xid: (c1->>'xid')::decimal
         del_mark: (c1->>'type')::text = 'delete'
   COMMIT:
      MINIMAL_INTERVAL: 2000

几个主要的配置含义如下:

  • ORDER_COLUMNS:递增排序的字段,每个batch中,gpss会使用`ORDER_COLUMNS`最大的消息内容对目标表进行操作。
  • DELETE_CONDITION:软删除标记,gpss会删除包含`DELETE_CONDITION`字段的匹配记录
  • MATCH_COLUMNS:记录的标识,也就是键(candidate key)
  • UPDATE_COLUMNS:需要更新的列

概括下来,gpss执行的步骤为:

  1. 在一个batch中,针对MATCH_COLUMNS相同的所有记录,先根据ORDER_COLUMNS去重
  2. 目标表中存在MATCH_COLUMNS匹配的记录时,根据UPDATE_CONDITION或者DELETE_CONDITION执行更新或者删除操作
  3. 目标表中不存在时匹配记录时,执行插入操作。

(由于有去重操作,为保证不丢失数据,在UPDATE时,Kafka的消息中需要包含整行的数据,而不仅仅是更新部分的数据。)

配置文件准备好后,我们通过gpkafka来执行加载:

gpkafka load mysql.yaml

gpkafka便会从kafka中拉取对应的消息,按照设定的操作将Kafka中的增量数据同步到目标表中。

5 小结

本文简单介绍了如何用gpss从MySQL进行增量同步,其它数据库(例如Oracle,SQL Server等)也都可以利用类似的方案实现同步。不同的消息类型需要不同的处理逻辑,gpss的配置文件中有很多可以进行后处理的部分,更详细的内容可以参考官方文档:https://gpdb.docs.pivotal.io/...。由于源端系统的多样性,gpss的增量复制仍有很多需要完善的地方。在gpss后续版本我们会持续增强相关功能,例如一对多(一个topic到多个目标表)的同步,自动依据topic offset排序等;欢迎大家使用,反馈,指导。也欢迎大家前往askGP(ask.greenplum.cn)交流。

获得Greenplum更多干货内容,欢迎前往Greenplum中文社区网站

如何用gpss实现MySQL到Greenplum的增量同步

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
MySQL如何实时同步数据到ES?试试这款阿里开源的神器
摘要mall项目中的商品搜索功能,一直都没有做实时数据同步。最近发现阿里巴巴开源的canal可以把MySQL中的数据实时同步到Elasticsearch中,能很好地解决数据同步问题。今天我们来讲讲canal的使用,希望对大家有所帮助!canal简介canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消
Stella981 Stella981
3年前
Python3:sqlalchemy对mysql数据库操作,非sql语句
Python3:sqlalchemy对mysql数据库操作,非sql语句python3authorlizmdatetime2018020110:00:00coding:utf8'''
Stella981 Stella981
3年前
MongoDB 定位 oplog 必须全表扫描吗?
MongoDBoplog(类似于MySQLbinlog)记录数据库的所有修改操作,除了用于主备同步;oplog还能玩出很多花样,比如1.全量备份增量备份所有的oplog,就能实现MongoDB恢复到任意时间点的功能2.通过oplog,除了实现到备节点的同步,也可以额外再往单独的集群同步数据(甚至是异构的数据库),实现容
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
5个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
ElasticSearch
ElasticSearch
Lv1
画图恰似归家梦,千里河山寸许长。
文章
3
粉丝
0
获赞
0