开源数据集成平台SeaTunnel:MySQL实时同步到es

javalover123
• 阅读 374

一、前言

  • 最近,项目有几个表要从 MySQL 实时同步到 另一个 MySQL,也有同步到 ElasticSearch 的。
  • 目前,公司生产环境同步,用的是 阿里云的 DTS,每个同步任务每月 500多元,有点小贵。
  • 其他环境:MySQL同步到ES,用的是 CloudCanal,不支持 数据转换,添加同步字段比较麻烦,社区版限制5个任务,不够用;MySQL同步到MySQL,用的是 debezium,不支持写入 ES。
  • 恰好3年前用过 SeaTunnel 的 前身 WaterDrop,那就开始吧。本文以 2.3.1 版本,Ubuntu 系统为例

二、开源数据集成平台SeaTunnel

1. 简介

  • SeaTunnel 是 Apache 软件基金会下的一个高性能开源大数据集成工具,为数据集成场景提供灵活易用、易扩展并支持千亿级数据集成的解决方案。
  • Seaunnel 为实时(CDC)和批量数据提供高性能数据同步能力,支持十种以上数据源,已经在B站、腾讯云、字节等数百家公司使用。
  • 可以选择 SeaTunnel Zeta 引擎上运行,也可以在 Apache Flink 或 Spark 引擎上运行。
    开源数据集成平台SeaTunnel:MySQL实时同步到es

2. 安装

  • 下载,这里选择 2.3.1 版本,执行 tar -xzvf apache-seatunnel-*.tar.gz 解压缩

  • 因为 2.3.2 版本,MySQL-CDC 找不到驱动bug修复详见

    Caused by: java.sql.SQLException: No suitable driver
          at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
          at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106)
          ... 20 more
    
          ... 11 more
    
          at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
          at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)

3. 安装 connectors 插件

  • 执行 bash bin/install-plugin.sh,国内建议先配置 maven 镜像,不然容易失败 或者 慢
  • 官方文档写着执行 sh bin/install-plugin.sh,我在 Ubuntu 20.04.2 LTS 上执行报错(bin/install-plugin.sh: 54: Bad substitution),我提了PR
    开源数据集成平台SeaTunnel:MySQL实时同步到es

4. 编写配置文件

  • config 目录下,新建配置文件:如 mysql-es-test.conf

  • 添加 env 配置

  • 因为是 实时同步,这里 job.mode = "STREAMING"*,execution.parallelism 是 并发数

    env {
     # You can set flink configuration here
     execution.parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 2000
     #execution.checkpoint.interval = 10000
     #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
    }
  • MySQL 实时同步,需开启 binlog

  • 添加 数据源 配置 result_table_name 取个 临时表名,便于后续使用。table-names 必须是 数据库.表名,base-url 必须指定 数据库。
    startup.mode 默认是 INITIAL,先同步历史数据,后增量同步,详情点击

    source {
    MySQL-CDC {
      result_table_name = "t1"
      server-id = 5656
      username = "root"
      password = "pwd"
      table-names = ["db.t1"]
      base-url = "jdbc:mysql://host:3306/db"
    }
    }
  • 添加 转换 配置,sql 比较灵活函数列表请点击

    transform {
    Sql {
      source_table_name = "t1"
      query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
    }
    }
  • 添加 输出 配置

  • CDC 实时同步 es,必须配置 primary_keys*

    sink {
       Elasticsearch {
           hosts = ["host:9200"]
           username = "elastic"
           password = "pwd"
    
           index = "index_t1"
           # cdc required options
           primary_keys = ["id"]
       }
    }
  • 最终配置截图
    开源数据集成平台SeaTunnel:MySQL实时同步到es

5. 启动任务

这里以 本地模式为例,另有 集群、spark、flink 模式。

./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf

三、总结

本文遵守【CC BY-NC】协议,转载请保留原文出处及本版权声明,否则将追究法律责任。
本文首先发布于 https://www.890808.xyz/ ,其他平台需要审核更新慢一些。

开源数据集成平台SeaTunnel:MySQL实时同步到es

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
2年前
MySQL如何实时同步数据到ES?试试这款阿里开源的神器
摘要mall项目中的商品搜索功能,一直都没有做实时数据同步。最近发现阿里巴巴开源的canal可以把MySQL中的数据实时同步到Elasticsearch中,能很好地解决数据同步问题。今天我们来讲讲canal的使用,希望对大家有所帮助!canal简介canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消
Stella981 Stella981
2年前
Scapy 从入门到放弃
0x00前言最近闲的没事,抽空了解下地表最强的嗅探和收发包的工具:scapy。scapy是一个python模块,使用简单,并且能灵活地构造各种数据包,是进行网络安全审计的好帮手。0x01安装因为2020年python官方便不再支持python2,所以使用python3安装。!(https://oscimg.oschina.net/os
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Stella981 Stella981
2年前
ELK学习笔记之ElasticSearch的索引详解
0x00ElasticSearch的索引和MySQL的索引方式对比Elasticsearch是通过Lucene的倒排索引技术实现比关系型数据库更快的过滤。特别是它对多条件的过滤支持非常好,比如年龄在18和30之间,性别为女性这样的组合查询。倒排索引很多地方都有介绍,但是其比关系型
韦康 韦康
1个月前
Flink 从0到1实战实时风控系统|同步追更
Flink从0到1实战实时风控系统|同步追更download》quangneng.com/2323/关于Flink从0到1实战实时风控系统的介绍ApacheFlink是一个快速、可扩展且容错的开源流处理和批处理框架。它提供了高效处理大规模数据流和批处理作业
贾蓁 贾蓁
3个月前
Flink 从0到1实战实时风控系统|同步追更
Flink从0到1实战实时风控系统|同步追更Flink实时风控系统概述Flink是一个快速、可扩展且容错的开源流处理和批处理框架,它提供了高效处理大规模数据流和批处理作业的能力,具有低延迟、高吞吐量和精确一次语义等特点1。在实时风控系统中,Flink可以用
何婆子 何婆子
2个月前
Flink 从0到1实战实时风控系统|同步追更
Flink从0到1实战实时风控系统|同步追更download》chaoxingit.com/2323/Flink从0到1实战实时风控系统的介绍建立一个实时风控系统是一个复杂而关键的任务,需要综合使用流式计算、机器学习和实时数据处理技术。ApacheFlin
程昱 程昱
2个月前
Flink 从0到1实战实时风控系统|同步追更
Flink从0到1实战实时风控系统|同步追更Flink实时风控系统概述Flink是一个快速、可扩展且容错的开源流处理和批处理框架,它提供了高效处理大规模数据流和批处理作业的能力,具有低延迟、高吞吐量和精确一次语义等特点1。在实时风控系统中,Flink可以用
程昱 程昱
1个月前
Flink 从0到1实战实时风控系统|同步追更
Flink从0到1实战实时风控系统|同步追更download》quangneng.com/2323/一、Flink从0到1实战实时风控系统的项目介绍"从0到1"构建一个实时风控系统是一个复杂而又具有挑战性的项目。这样的项目需要从搭建基础架构到开发算法模型以
javalover123
javalover123
Lv1
10年Java经验,多个开源项目贡献者。https://github.com/javalover123
文章
16
粉丝
2
获赞
5