Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询

京东云开发者
• 阅读 360

1概况

本文展示如何使用 Flink CDC + Iceberg + Doris 构建实时湖仓一体的联邦查询分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。

2系统架构

我们整理架构图如下,

Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询

1.首先我们从Mysql数据中使用Flink 通过 Binlog完成数据的实时采集

2.然后再Flink 中创建 Iceberg 表,Iceberg的元数据保存在hive里

3.最后我们在Doris中创建Iceberg外表

4.在通过Doris 统一查询入口完成对Iceberg里的数据进行查询分析,供前端应用调用,这里iceberg外表的数据可以和Doris内部数据或者Doris其他外部数据源的数据进行关联查询分析

Doris湖仓一体的联邦查询架构如下:

Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询

1.Doris 通过 ODBC 方式支持:MySQL,Postgresql,Oracle ,SQLServer

2.同时支持 Elasticsearch 外表

3.1.0版本支持Hive外表

4.1.1版本支持Iceberg外表

5.1.2版本支持Hudi 外表

3 创建MySQL数据库表并初始化数据

CREATE DATABASE demo;
USE demo;
CREATE TABLE userinfo (
  id int NOT NULL AUTO_INCREMENT,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255),
  PRIMARY KEY (`id`)
)ENGINE=InnoDB ;
INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);

4 创建Iceberg Catalog

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
);

5 创建 Mysql CDC 表

CREATE TABLE user_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    `id` DECIMAL(20, 0) NOT NULL,
    name STRING,
    address STRING,
    phone_number STRING,
    email STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'MyNewPass4!',
    'database-name' = 'demo',
    'table-name' = 'userinfo'
  );

6 创建Iceberg表

---查看catalog
show catalogs;
---使用catalog
use catalog hive_catalog;
--创建数据库
CREATE DATABASE iceberg_hive; 
--使用数据库
use iceberg_hive;
​

7 创建表

CREATE TABLE all_users_info (
    database_name STRING,
    table_name    STRING,
    `id`          DECIMAL(20, 0) NOT NULL,
    name          STRING,
    address       STRING,
    phone_number  STRING,
    email         STRING,
    PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  ) WITH (
    'catalog-type'='hive'
  );

从CDC表里插入数据到Iceberg表里

use catalog default_catalog;
​
insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

我们去查询iceberg表

select * from hive_catalog.iceberg_hive.all_users_info

8 Doris 查询 Iceberg

8.1 创建Iceberg外表

CREATE TABLE `all_users_info` 
ENGINE = ICEBERG
PROPERTIES (
"iceberg.database" = "iceberg_hive",
"iceberg.table" = "all_users_info",
"iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",
"iceberg.catalog.type"  =  "HIVE_CATALOG"
);

参数说明

•ENGINE 需要指定为 ICEBERG

•PROPERTIES 属性:

iceberg.hive.metastore.uris:Hive Metastore 服务地址

iceberg.database:挂载 Iceberg 对应的数据库名

iceberg.table:挂载 Iceberg 对应的表名,挂载 Iceberg database 时无需指定。

iceberg.catalog.type:Iceberg 中使用的 catalog 方式,默认为 HIVE_CATALOG,当前仅支持该方式,后续会支持更多的 Iceberg catalog 接入方式。

mysql> CREATE TABLE `all_users_info`
    -> ENGINE = ICEBERG
    -> PROPERTIES (
    -> "iceberg.database" = "iceberg_hive",
    -> "iceberg.table" = "all_users_info",
    -> "iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",
    -> "iceberg.catalog.type"  =  "HIVE_CATALOG"
    -> );
Query OK, 0 rows affected (0.23 sec)
​
mysql> select * from all_users_info;
+---------------+------------+-------+----------+-----------+--------------+-------+
| database_name | table_name | id    | name     | address   | phone_number | email |
+---------------+------------+-------+----------+-----------+--------------+-------+
| demo          | userinfo   | 10004 | user_113 | shenzheng | 13347420870  | NULL  |
| demo          | userinfo   | 10005 | user_114 | hangzhou  | 13347420870  | NULL  |
| demo          | userinfo   | 10002 | user_111 | xian      | 13347420870  | NULL  |
| demo          | userinfo   | 10003 | user_112 | beijing   | 13347420870  | NULL  |
| demo          | userinfo   | 10001 | user_110 | Shanghai  | 13347420870  | NULL  |
| demo          | userinfo   | 10008 | user_117 | guangzhou | 13347420870  | NULL  |
| demo          | userinfo   | 10009 | user_118 | xian      | 13347420870  | NULL  |
| demo          | userinfo   | 10006 | user_115 | guizhou   | 13347420870  | NULL  |
| demo          | userinfo   | 10007 | user_116 | chengdu   | 13347420870  | NULL  |
+---------------+------------+-------+----------+-----------+--------------+-------+
9 rows in set (0.18 sec)

上述Doris On Iceberg我们只演示了Iceberg单表的查询,你还可以联合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等进行联合查询分析,通过Doris对外提供统一的查询分析入口。

自此我们完整从搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介绍完了,Doris朝着数据仓库和数据融合的架构演进,支持湖仓一体的联邦查询,给我们的开发带来更多的便利,更高效的开发,省去了很多数据同步的繁琐工作。

作者:京东零售 吴化斌

来源:京东云开发者社区 转载请注明来源

点赞
收藏
评论区
推荐文章
Stella981 Stella981
2年前
Flink集成数据湖之实时数据写入iceberg
背景iceberg简介flink实时写入准备sqlclient环境创建catalog创建db创建table插入数据查询代码版本总结
Stella981 Stella981
2年前
Flink 助力美团数仓增量生产
简介:本文由美团研究员、实时计算负责人鞠大升分享,主要介绍Flink助力美团数仓增量生产的应用实践。内容包括:1、数仓增量生产;2、流式数据集成;3、流式数据处理;4、流式OLAP应用;5、未来规划。一、数仓增量生产1.美团数仓架构先介绍一下美团数仓的架构以及增量生产。如下图所示,这是美团数仓的简单架构,我
Stella981 Stella981
2年前
Flink 1.11 与 Hive 批流一体数仓实践
导读:Flink从1.9.0开始提供与Hive集成的功能,随着几个版本的迭代,在最新的Flink1.11中,与Hive集成的功能进一步深化,并且开始尝试将流计算场景与Hive进行整合。本文主要分享在Flink1.11中对接Hive的新特性,以及如何利用Flink对Hive数仓进行实时化改造,从而实现批流
笑面虎 笑面虎
2个月前
基于Flink+Iceberg构建可落地的企业级实时数据湖教程
基于FlinkIceberg构建可落地的企业级实时数据湖教程给大家分享一套课程——基于FlinkIceberg构建可落地的企业级实时数据湖教程,附代码课件。iceberg是一种表格式。我们可以简单理解为他是基于计算层(flink、spark)和存储层
鲍二家的 鲍二家的
2个月前
实战Flink+Doris实时数仓
学习地址1:https://pan.baidu.com/s/1joHFuWdo7WbwpP7hwZ1GMg提取码:cgss学习地址2:https://share.weiyun.com/OL2DSs1X密码:srevyt一、什么是DorisDoris是一个M
双寿 双寿
2个月前
实战Flink+Doris实时数仓教程
参考资料1:https://pan.baidu.com/s/1KoaLsoYaB4JMFAI0aDyXg提取码:pr5c参考资料2:https://share.weiyun.com/OL2DSs1X密码:srevyt一、什么是实时数仓实时数据仓库(Real
程昱 程昱
2个月前
Flink 从0到1实战实时风控系统|同步追更
Flink从0到1实战实时风控系统|同步追更download》quangneng.com/2323/一、Flink从0到1实战实时风控系统的项目介绍"从0到1"构建一个实时风控系统是一个复杂而又具有挑战性的项目。这样的项目需要从搭建基础架构到开发算法模型以
乐和 乐和
1个月前
Flink 从0到1实战实时风控系统|同步追更
Flink从0到1实战实时风控系统|同步追更download》shanxueit.com/2323/从零开始构建实时风控系统:基于Flink的实战指南摘要:本文将介绍如何利用ApacheFlink构建实时风控系统,从零开始逐步搭建一个高效可靠的实时数据处理
邢德全 邢德全
2个月前
Flink 从0到1实战实时风控系统|同步追更
Flink从0到1实战实时风控系统|同步追更download》chaoxingit.com/2323/构建实时风控系统是保障企业安全的重要组成部分,而使用ApacheFlink来构建实时风控系统则能够提供高性能、低延迟和可伸缩性。以下是从零开始构建实时风控
邢德全 邢德全
2个月前
Flink 从0到1实战实时风控系统|同步追更
Flink从0到1实战实时风控系统|同步追更download》chaoxingit.com/2323/构建实时风控系统是保障企业安全的重要组成部分,而使用ApacheFlink来构建实时风控系统则能够提供高性能、低延迟和可伸缩性。以下是从零开始构建实时风控