Debezium日常运维手机

Stella981 等级 572 0 0

关于Kafka-Connect:

(1) 是否可以动态添加已有数据的新表? 

不可以,Kafka-Connect需要配置先行。如果是已有数据的新表,无法通过修改已有的kafka-connect配置进行新表的Snapshot初始化。

建议通过table white list功能,进行新表的snapshot。然后等到稳定后,再合并到同一个Kafka-connect配置里面。

(2) Kafka-Connect什么时候回触发Snapshot ? 

参考 https://debezium.io/docs/connectors/mysql/ 

关于配置项:snapshot.mode

(A) initial 

表示指定connector在logic server name 没有offset记录 的情况下,会运行一次snapshot。(默认值)

(B) when_needed

这表示connector在启动的时候,判断是否需要运行snapshot。当没有offsets数据,或者之前的binlog offset记录或者 GTID 在mysql服务器上面,找不到了。

配置binlog超过现有的可读binlog范围,则会强制触发一个snapshot。(手动修改 kafka-connect distribute版本的内建kafka对列:connect-offsets 的值,达到这个效果)

(C) never

nerver表示,不会使用snapshot操作,所以当启动connector的时候,会根据logical server name,会从binlog的最开始处进行数据读取。这需要谨慎地使用,因为只有在binlog数据包含了所有的数据库历史记录的时候,才会有效。

如果你不需要topics包含一致性历史快照,且只需要增量变化的数据,那么可以使用schema_only。

(D) schema_only

只对表结构进行snapshot,数据会按照增量的形式进行添加。

(E) schema_only_recovery

schema_only_recovery 用于修复一个现有的connector,因为崩溃 或者丢失数据库历史topic,或者是周期性执行了clean up操作,清空了数据库历史topic(database history topic). 它不会同步任何数据,只是将schema修复到与数据库一致。

(3) 遇到问题:

com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.000001' at 760357, the last event read from './mysql-bin.000001' at 1888540, the last byte read from './mysql-bin.000001' at 1888540.

at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)

at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)

at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)

at java.lang.Thread.run(Thread.java:748)

注意修改database.server.id 是唯一值,用来区分每一个mysql client instance 

[database.server.id](https://www.oschina.net/action/GoToLink?url=http%3A%2F%2Fdatabase.server.id%2F)

random

A numeric ID of this database client, which must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL database cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.

(4) 遇到问题:

org.apache.kafka.connect.errors.ConnectException: no viable alternative at.... antlr ... 遇到不能解析的ddl语句

解决:配置database.history.skip.unparseable.ddl = true

(5) 遇到问题:

Encountered change event for table xxx.xxxx whose schema isn't known to this connector

解决思路:配置 inconsistent.schema.handling.mode = warn

原理,在遇到表schema不一致的时候,内部维护的表结构与数据库表结构不一致的时候,默认会failed整个处理逻辑。

但是通过配置 handling.mode = warn 可以使Kafka对该表重新进行一次snapshot操作。

不过过程可能会对Kafka插入重复的数据。需要在写入端进行数据去重操作。

(6) 遇到问题:

Error recording the DDL statement(s) in the database history Kafka topic ori_xxx_history:0 using brokers at null
解决方法:

配置 snapshot.mode = schema_only_recovery 修复完成后,再改回来 when_needed

(7) 遇到问题:

ConnectException: OffsetStorageWriter is already flushing 

解决方法:

使用restful PUT API 进行配置更新,出现背景是,Kafka Broker进行了维护重启,上一次的offset提交出现了异常。

重启Kafka_connect的task后, 应该可以解决。PS. 这个异常或许会自己自动恢复,或者需要重启。

(8) 由于修改了数据库链接配置,导致Kafka-Connect 的Debezium无法读取Binlog异常

解决方法:

shutdown 所有的Kafka-Connect 集群服务器,并且进行重启。故障消除。

(9) 遇到问题 binlog event 的列和数据库表的列数据不对应:

The binlog event does not contain expected number of columns; the internal schema representation is probably out of sync with the real database schema, or the binlog contains events recorded with binlog_row_image other than FULL or the table in question is an NDB table。

2020-6-23:先使用schemal_only进行修复,等2分钟后,然后再使用schema_only_recovery 进行增量同步。

(10) 遇到金融云数据库binlog归档异常:

Could not find existing binlog information while attempting schema only recovery snapshot 

将配置改从SCHEMA_ONLY_RECOVERY 改成 "snapshot.mode": "SCHEMA_ONLY" 。问题修复。

Schema_only只能修复schema不完整的问题,然后再使用snapshot.mode 如 never或者 schema_only_recovery 读取binlog数据。

如果schema数据不完整,就会遇到错误:Encountered change event for table xxx.xxx.xxx whose schema isn't known to this
connector。当遇到这个错误的时候,就需要使用Schema_only 或者 schema_only_recovery 来修复元数据。

(11) 遇到binlog文件归档后丢失的情况:

Could not find existsing binlog information while attempting schema only recovery snapshot 

实际上,是由于binlog服务器归档操作,导致binlog日志文件,在slave同步数据之前,被磁盘清理或转移。导致binlog tcp dump无法读取之前的binlog文件。

解决方法有2个:

(a) 让运维同事,把归档或转移掉的binlog文件,重新迁移到binlog的目录中,纳入mysql的管理。

(b) 更改配置文件的name 比如原配置name叫: "name": "debezium-mysql-xxx" ,名字可以改成 "name": "debezium-mysql-xxx2" 这样。

改了名字以后,kafka-connect会把这个配置认为是新的配置项,然后从当前可以读到的binlog position中读取,但是会丢失部分数据,需要重新拉取数据。

(12) 遇到change event isn't known to this connector:

org.apache.kafka.connect.errors.ConnectException: Encountered change event for table pay.comp_accountwhose schema isn't known to this connector

查询google,结果指向https://gitter.im/debezium/dev/archives/2018/04/05 

里面说,history topic 数据异常,建议使用schema_only_recovery来修复数据。

后来发现是Mysql MASTER-MASTER架构的原因,MHA架构部署debezium,请参考:https://github.com/debezium/debezium-examples/tree/master/failover

(13) 解决Kafka-Connect History 数据过大的情况:

由于History存放是表结构变更的数据,而且过期时间为无限,所以时间久了,History的topic会变得非常大。

首先,配置 database.history.store.only.monitored.tables.ddl = true。该配置根据table filter,来减少记录到history topic的数据。

最后,如果history topic实在是过大,可以重新创建一个topic,并通过schema_only_recovery来重启connector。由于binlog offset信息存储在connector的topic里面,所以不会受到影响。

(14) Kafka-Connect配置binlog的读取位置:

可以,参考Url:https://debezium.io/documentation/faq/#how_to_change_the_offsets_of_the_source_database 

通过修改对应配置offset.storage.topic的值,可以做到。

$ kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'

Partition(11) [ "inventory-connector" ,{ "server" : "dbserver1" }] { "ts_sec" :1530088501, "file" : "mysql-bin.000003" , "pos" :817, "row" :1, "server_id" :223344, "event" :2}

Partition(11) [ "inventory-connector" ,{ "server" : "dbserver1" }] { "ts_sec" :1530168941, "file" : "mysql-bin.000004" , "pos" :3261, "row" :1, "server_id" :223344, "event" :2}

如上述所示,根据你的connector_name所示 对应的offset.storage.topic的key是这样的:["inventory-connector",{"server":"dbserver1"}]

对应的Kafka分区固定是11

最后的**Value**值是:{"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}

这里用到了一个kafkacat工具,当然,也可以通过python代码的方式进行写入。

(15) 遇到异常如下:

org.apache.kafka.connect.errors.ConnectException: Encountered change event for table xxxx. xxxx whose schema isn't known to this connector

判断:有可能是kafka-connect的source程序,对于使用history topic生成schema的元数据与生产库订阅binlog而来的数据的元数据不一致。

解决方法是,shutdown 当前connect配置,修改database.history.kafka.topic的名称。使用一个新的topic名称,一般就在原本topic名称后+1。

然后重启kafka-source程序。注意,当前使用的snapshot.mode为schema_only_recovery。问题得到解决。

收藏
评论区

相关推荐

阿里Java架构师谈:2021年最新Java面试经历
第一家是美团美团的话,三面下来,设计的内容知识也是挺广的吧,有MySQL、Redis、Kafka、线程、算法、+、volatile、线程、并发、设计模式等等... 一面问题:MySQL+Redis+Kafka+线程+算法 mysql知道哪些存储引擎,它们的区别 mysql索引在什么情况下会失效 mysql在项目中的优化场景,慢查询解决等 my
MySQL数据库优化技巧
MySQL优化三大方向 ① 优化MySQL所在服务器内核(此优化一般由运维人员完成)。 ② 对MySQL配置参数进行优化(my.cnf)此优化需要进行压力测试来进行参数调整。 ③ 对SQL语句以及表优化。 MySQL参数优化 1:MySQL 默认的最大连接数为 100,可以在 mysql 客户端使用以下命令查看 mysql>
MySQL运维之
**1、mysqldump备份一个数据库** mysqldump命令备份一个数据库的基本语法: mysqldump -u user -p pwd dbname > Backup.sql 我们来讲解一下备份的文件都包含了什么?\-- MySQL dump 10.13 Distrib 5.5.20, for Win32 (x86)\--
mysql Backup &recovery
如果您要在MySQL数据库中存储任何您不想丢失的内容,那么定期备份数据以保护数据免受损失非常重要。本教程将向您展示两种简单的方法来备份和恢复MySQL数据库中的数据。您还可以使用此过程将数据移动到新的Web服务器。 * [从命令行备份(使用mysqldump)](https://www.oschina.net/action/GoToLink?url=h
mysqldump导出数据时,如何调整每个insert语句的values值的数量?
当我们对一个包含1千万行记录的表history执行导出时,假设只用的备份语句如下:   #mysqldump -uroot -p'123456'  --set-gtid-purged=OFF tdb history>history.sql 当我们执行表的恢复时,执行如下语句: mysql> source history.sql 。。。 Query
CDH集群手动导入scm库
一、手动导入 scm 库 ============ 背景:正常安装 cloudera-scm-server 时,安装 scm 库是通过脚本 /usr/share/cmf/schema/scm\_prepare\_database.sh 来自动建库的。 /usr/share/cmf/schema/scm_prepare_database.sh my
CentOS7 搭建Kafka(三)工具篇
_CentOS7 搭建Kafka(三)工具篇_ ======================= _做为一名懒人,自然不喜欢敲那些命令,一个是容易出错,另外一个是懒得记,能有个工具就最好了,一查还挺多,我们用个最主流的Kafka Manager_ Kafka Manager ============= kafka manager是yahoo为了维护kaf
Debezium 处理 mysql timestamp 的坑
使用Debezium订阅 mysql binlog  Debezium对于Timestamp的处理,会变成字符串,处理的核心代码是: ZonedDateTime expectedTimestamp = ZonedDateTime.of( LocalDateTime.parse("2014-09-08T17:51:04.780"),
Debezium接入Mysql遇到到的Tinyint坑
问题背景: 在Debezium做数据初始化的时候,对于一些tinyint字段的值,出现0,1的值的异常。 经过源码排查,数据在JDBC上面,读取到的数据是Boolean值。 通过排查,原来是MYSQL特有的数据问题,需要在JDBC上面加上关键字,问题解决。 JAVA数据类型 和 MYSQL的数据类型转换,要注意tinyInt 类型,且存储长度为1的情
Debezium日常运维手机
关于Kafka-Connect: (1) 是否可以动态添加已有数据的新表?  不可以,Kafka-Connect需要配置先行。如果是已有数据的新表,无法通过修改已有的kafka-connect配置进行新表的Snapshot初始化。 建议通过table white list功能,进行新表的snapshot。然后等到稳定后,再合并到同一个Kafka-con
Kafka 常用脚本与配置
脚本 作用 kafka-server-start.sh kafka启动 kafka-server-stop.sh kafka停止 kafka-topics.sh 查看创建删除topic kafka-console-consumer.sh 消费者操作,例如监听topic kafka-console-producer.sh 生产者操作,例如
Kafka常用命令
Kafka命令行工具 ========== 1. 查看Kafka现有的Topic 1 bin/kafka-topics.sh --zookeeper localhost:2181 --list 2. 查看Topic详情 1 bin/kafka-topics.sh --zo
Kafka常用操作
Kafka的版本间差异较大,下面是0.8.1的操作方法 首先cd到kafaka的bin目录下,操作kafka的工具都在这里呢。如果发现找不到,或者名字不对,说明kafka版本不对。 以topic是test为例子 --zookeeper 后的ip改成你们自己的 创建topic ./kafka-topics.sh --topic test --create
Kafka监控工具kafka
**Kafka Monitor**为Kafka的可视化管理与监控工具,为Kafka的稳定运维提供高效、可靠、稳定的保障,这里主要简单介绍Kafka Monitor的相关功能与页面的介绍;   Kafka Monitor v0.1 主要功能有:**Kafka基本信息仪表盘、broker列表、topic列表、当前消费者列表、Topic添加删除、Topic数据
springboot+kafka集成
kafka概念   Topic:消息根据Topic进行归类,可以理解为一个队里。   Producer:消息生产者,就是向kafka broker发消息的客户端。   Consumer:消息消费者,向kafka broker取消息的客户端。   broker:每个kafka实例(server),一台kafka服务器就是一个broker,一个集群