Canal+Camus快速采集MySQL Binlog到数据仓库

Stella981
• 阅读 655

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

Canal+Camus快速采集MySQL Binlog到数据仓库

Canal+Camus快速采集MySQL Binlog到数据仓库

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

Canal+Camus快速采集MySQL Binlog到数据仓库

Canal+Camus快速采集MySQL Binlog到数据仓库

大数据真好玩

点击右侧关注,大数据真好玩!

Canal+Camus快速采集MySQL Binlog到数据仓库

数据仓库的同步方法

我们的数据仓库长久以来一直使用天级别的离线同步方法:采用Sqoop或DataX按天定时获取各个MySQL表的全量或增量数据,然后载入到Hive里对应的各个表中。这种方法门槛低,容易操作,在数仓建设阶段能够快速启动。但是随着时间的推移,它暴露出了一些缺点:

  • 从MySQL获取数据只能靠select,如果一次select数据量过大,会造成慢查询,甚至影响线上业务;

  • 随着业务量的增长和新业务的加入,数据量会相应增加,离线同步一次的耗时会越来越长;

  • 增量同步方式无法检测到MySQL中被delete掉的记录,如果没有时间戳字段的话,也较难检测到被update的记录。

所以,我们最近致力于按照变动数据获取(Change Data Capture,CDC)的方式改造我们的数仓,分三步走:

  • 首先订阅MySQL库的Binlog,将其存储到临时表中;

  • 然后对需要入库的表一次性制作快照,并将存量数据导入Hive;

  • 最后基于存量数据和Binlog记录的变动进行合并,还原出与业务库相同的数据。

本文要说的就是第一步的实现方案。我们采用阿里的开源组件Canal来接入MySQL Binlog,并投递到Kafka;采用LinkedIn的开源组件Camus获取Kafka中的Binlog,并落地到Hive。限于篇幅,我们不会从源码级别探索Canal和Camus的内部机制,参考官方文档或者自行读源码都非常简单。

Canal的配置

采用最新的1.1.3版本: https://github.com/alibaba/canal/releases/tag/canal-1.1.3 Kafka版本则是1.0.1。

canal.properties配置

只列出关键的配置项如下,其中有些是和Kafka的配置对应的。完整的配置及其含义还请参见Canal项目的GitHub Wiki。

# 如果要做高可用的话,把ZooKeeper配置好canal.zkServers = 10.10.99.130:2181,10.10.99.132:2181,10.10.99.133:2181,10.10.99.124:2181,10.10.99.125:2181# Binlog格式,MySQL的binlog-format也应该为ROWcanal.instance.binlog.format = ROW# 是否过滤掉DCL、DML、DDL语句canal.instance.filter.query.dcl = truecanal.instance.filter.query.dml = falsecanal.instance.filter.query.ddl = false# 允许自动检测Canal监听实例的变更,60秒一次canal.auto.scan = truecanal.auto.scan.interval = 60# 默认值tcp,改为投递到Kafkacanal.serverMode = kafka# Kafka bootstrap.servers,可以不用写上全部的brokerscanal.mq.servers = 10.10.99.132:9092,10.10.99.133:9092,10.10.99.134:9092,10.10.99.135:9092# 投递失败的重试次数,默认0,改成2canal.mq.retries = 2# Kafka batch.size,即producer一个微批次的大小,默认16K,这里加倍canal.mq.batchSize = 32768# Kafka max.request.size,即一个请求的最大大小,默认1M,这里也加倍canal.mq.maxRequestSize = 2097152# Kafka linger.ms,即sender线程在检查微批次是否就绪时的超时,默认0ms,改为150ms# 满足batch.size和linger.ms其中之一,就会发送消息canal.mq.lingerMs = 150# Kafka buffer.memory,缓存大小,默认32Mcanal.mq.bufferMemory = 33554432# 获取Binlog数据的批次大小,默认50canal.mq.canalBatchSize = 50# 获取Binlog数据的超时时间,默认200mscanal.mq.canalGetTimeout = 200# 是否将Binlog转为JSON格式。如果为false,就是原生Protobuf格式canal.mq.flatMessage = true# 压缩类型,官方文档没有描述canal.mq.compressionType = none# Kafka acks,默认all,表示分区leader会等所有follower同步完才给producer发送ack# 0表示不等待ack,1表示leader写入完毕之后直接ackcanal.mq.acks = all# Kafka消息投递是否使用事务# 主要针对flatMessage的异步发送和动态多topic消息投递进行事务控制来保持和Canal Binlog位置的一致性canal.mq.transaction = false
instance.properties配置

我们最终采用多topic单partition的方式把Binlog存入Kafka,也就是每张表对应一个topic,每个topic只有一个partition。这样可以保证表级别Binlog的有序性,并且实测热点表对应topic的压力也不大。

# 需要接入Binlog的表名,支持正则,但我们手动指定每张表canal.instance.filter.regex=mall\\.address,mall\\.base_category,mall\\.orders,mall\\.order_product,mall\\.product,mall\\.mall_category,mall\\.mall_comment# 不需要接入Binlog表的黑名单canal.instance.filter.black.regex=# 单topic模式下的表名# canal.mq.topic=example# 多topic模式下的topic名与表名的对应关系,同样支持正则canal.mq.dynamicTopic=bl_mall_address:mall\\.address,bl_mall_base_category:mall\\.base_category,bl_mall_orders:mall\\.orders,bl_mall_order_product:mall\\.order_product,bl_mall_product:mall\\.product,bl_mall_mall_category:mall\\.mall_category,bl_mall_mall_comment:mall\\.mall_comment# 单partition模式下的分区号canal.mq.partition=0# 多partition模式下的分区hash规则,需要按主键组来# canal.mq.partitionsNum=3# canal.mq.partitionHash=test.table:id^name,.*\\..*

通过kafka-topics工具观察自动生成的topic:

Canal+Camus快速采集MySQL Binlog到数据仓库

Camus的配置

Camus在国内并没有Canal那么有名,但十分好用。它是LinkedIn开源的,基于Hadoop MapReduce的Kafka到HDFS数据管道。它支持Kafka topic的自动发现与offset管理,基于Avro或JSON的数据schema,以及按时间分区的功能。另外它也提供了数据读取和写入的自定义逻辑入口,比较灵活。

Camus在很久之前就作为一个子项目合并到了同为LinkedIn开源的数据交换组件Gobblin中,不再单独维护。本文使用的是Confluent维护的镜像版本,仍然在更新,传送门:https://github.com/confluentinc/camus。另外采用的Hadoop版本是CDH自带的2.6.0。

将Camus源码clone到本地之后,执行mvn clean package编译并打包,就可以准备使用了。

camus.properties配置

这个properties文件的名字可以随便起,每一个properties就代表了一个Camus job(本质上是MR job)的定义。仍然只列出关键的配置项如下。

# Kafka brokerskafka.brokers=10.10.99.132:9092,10.10.99.133:9092,10.10.99.134:9092,10.10.99.135:9092# job名称camus.job.name=binlog-fetch# Kafka数据落地到HDFS的位置。Camus会按照topic名自动创建子目录etl.destination.path=/user/hive/warehouse/binlog.db# HDFS上用来保存当前Camus job执行信息的位置,如offset、错误日志等etl.execution.base.path=/camus/exec# HDFS上保存Camus job执行历史的位置etl.execution.history.path=/camus/exec/history# 即core-site.xml中的fs.defaultFS参数fs.default.name=hdfs://mycluster# Kafka消息解码器,默认有JsonStringMessageDecoder和KafkaAvroMessageDecoder# Canal的Binlog是JSON格式的。当然我们也可以自定义解码器camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder# 落地到HDFS时的写入器,默认支持Avro、SequenceFile和字符串# 这里我们采用一个自定义的WriterProvider,代码在后面# etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvideretl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.CanalBinlogRecordWriterProvider# JSON消息中的时间戳字段,用来做分区的# 注意这里采用Binlog的业务时间,而不是日志时间camus.message.timestamp.field=es# 时间戳字段的格式camus.message.timestamp.format=unix_milliseconds# 时间分区的类型和格式,默认支持小时、天,也可以自定义时间etl.partitioner.class=com.linkedin.camus.etl.kafka.partitioner.TimeBasedPartitioneretl.destination.path.topic.sub.dirformat='pt_hour'=YYYYMMddHH# 拉取过程中MR job的mapper数mapred.map.tasks=20# 按照时间戳字段,一次性拉取多少个小时的数据过后就停止,-1为不限制kafka.max.pull.hrs=-1# 时间戳早于多少天的数据会被抛弃而不入库kafka.max.historical.days=3# 每个mapper的最长执行分钟数,-1为不限制kafka.max.pull.minutes.per.task=-1# Kafka topic白名单和黑名单kafka.blacklist.topics=__consumer_offsets,binlog_dym_test,binlog_mall_test,test010802,test_kylin_streaming2,user_persona4scheduler,HbaseRequestsPerSecondkafka.whitelist.topics=# 设定输出数据的压缩方式,支持deflate、gzip和snappymapred.output.compress=false# etl.output.codec=gzip# etl.deflate.level=6# 设定时区,以及一个时间分区的单位etl.default.timezone=Asia/Shanghaietl.output.file.time.partition.mins=60
自定义Binlog落地方式

我们想要在数据输出时就符合各表的定义,而不是之后再去费力解析JSON。这可以通过实现Camus提供的RecordWriterProvider接口来自定义。不多说,直接上代码:

public class CanalBinlogRecordWriterProvider implements RecordWriterProvider {    static class CanalBinlogRecordWriter extends RecordWriter<IEtlKey, CamusWrapper> {        private DataOutputStream outputStream;        private String fieldDelimiter;        private String rowDelimiter;        public CanalBinlogRecordWriter(DataOutputStream outputStream, String fieldDelimiter, String rowDelimiter) {            this.outputStream = outputStream;            this.fieldDelimiter = fieldDelimiter;            this.rowDelimiter = rowDelimiter;        }        @Override        public void write(IEtlKey key, CamusWrapper value) throws IOException, InterruptedException {            if (value == null) {                return;            }            String recordStr = (String) value.getRecord();            JSONObject record = JSON.parseObject(recordStr, Feature.OrderedField);            if (record.getString("isDdl").equals("true")) {                return;            }            JSONArray data = record.getJSONArray("data");            for (int i = 0; i < data.size(); i++) {                JSONObject obj = data.getJSONObject(i);                if (obj != null) {                    StringBuilder fieldsBuilder = new StringBuilder();                    fieldsBuilder.append(record.getLong("id"));                    fieldsBuilder.append(fieldDelimiter);                    fieldsBuilder.append(record.getLong("es"));                    fieldsBuilder.append(fieldDelimiter);                    fieldsBuilder.append(record.getLong("ts"));                    fieldsBuilder.append(fieldDelimiter);                    fieldsBuilder.append(record.getString("type"));                    for (Entry<String, Object> entry : obj.entrySet()) {                        fieldsBuilder.append(fieldDelimiter);                        fieldsBuilder.append(entry.getValue());                    }                    fieldsBuilder.append(rowDelimiter);                    outputStream.write(fieldsBuilder.toString().getBytes());                }            }        }        @Override        public void close(TaskAttemptContext context) throws IOException, InterruptedException {            outputStream.close();        }    }    @Override    public String getFilenameExtension() {        return "";    }    @Override    public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(        TaskAttemptContext context,        String fileName,        CamusWrapper data,        FileOutputCommitter committer    ) throws IOException, InterruptedException {        Configuration conf = context.getConfiguration();        String rowDelimiter = conf.get("etl.output.record.delimiter", "\n");        Path path = new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, fileName, getFilenameExtension()));        FileSystem fs = path.getFileSystem(conf);        FSDataOutputStream outputStream = fs.create(path, false);        return new CanalBinlogRecordWriter(outputStream, "\t", rowDelimiter);    }}

这样,我们就只留下了需要关心的数据,并且格式化为制表符分隔、换行符结尾的文本格式,可以直接符合数仓中对Hive表的定义规范了。

Camus job的执行和调度

可以通过hadoop jar命令来执行Camus job,不过项目内直接提供了camus-run工具,写法就很简单了:

bin/camus-run -P conf/binlog-fetch-camus.properties

通过Crontab或者Azkaban调度它都行,不再赘述。目前我们是半小时调度一次,运行良好。查看生成的目录结构。内部分区的格式是“pt_hour=YYYYMMddHH”:

Canal+Camus快速采集MySQL Binlog到数据仓库

在上述/camus/exec目录下也可以看到Kafka offset的存储。

Canal+Camus快速采集MySQL Binlog到数据仓库

Canal+Camus快速采集MySQL Binlog到数据仓库

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。

编辑|冷眼丶

微信公众号|import_bigdata

欢迎点赞+收藏+转发朋友圈素质三连

Canal+Camus快速采集MySQL Binlog到数据仓库

文章不错?点个【在看】吧!** 👇**

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
2年前
ClickHouse大数据领域企业级应用实践和探索总结
点击上方蓝色字体,选择“设为星标”回复”资源“获取更多资源!(https://oscimg.oschina.net/oscnet/bb00e5f54a164cb9827f1dbccdf87443.jpg)!(https://oscimg.oschina.net/oscnet/dc8da835ff1b4
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这