参考资料1:https://pan.baidu.com/s/1KoaL_soYaB4JMFAI0aDyXg 提取码: pr5c 参考资料2:https://share.weiyun.com/OL2DSs1X 密码:srevyt
一、什么是实时数仓 实时数据仓库(Real-time Data Warehouse)是指能够实时地处理和分析数据,使得数据仓库中的数据是最新的、最准确的,并且可以实时响应用户的查询和分析需求的一种数据仓库系统。
与传统的数据仓库相比,实时数据仓库更加注重数据的实时性和对业务的实时响应能力。传统数据仓库通常是每日、每周或每月定期进行数据的抽取、转换和加载(ETL),更新的速度较慢,一般不支持实时查询和分析。而实时数据仓库则更加注重数据的实时性和对业务的实时响应能力,能够在数据发生变化时及时响应用户的查询和分析需求。
二、安装Flink 步骤 1:下载 # 为了运行Flink,只需提前安装好 Java 11。你可以通过以下命令来检查 Java 是否已经安装正确。
java -version 下载 release 1.20-SNAPSHOT 并解压。
$ tar -xzf flink-1.20-SNAPSHOT-bin-scala_2.12.tgz $ cd flink-1.20-SNAPSHOT-bin-scala_2.12 步骤 2:启动集群 # Flink 附带了一个 bash 脚本,可以用于启动本地集群。
$ ./bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host. Starting taskexecutor daemon on host.
步骤 3:提交作业(Job) # Flink 的 Releases 附带了许多的示例作业。你可以任意选择一个,快速部署到已运行的集群上。
$ ./bin/flink run examples/streaming/WordCount.jar $ tail log/flink--taskexecutor-.out (nymph,1) (in,3) (thy,1) (orisons,1) (be,4) (all,2) (my,1) (sins,1) (remember,1) (d,4) 另外,你可以通过 Flink 的 Web UI 来监视集群的状态和正在运行的作业。
三、安装Doris 下载安装包 低版本(V1.0之前的版本):安装 Doris,需要先通过源码编译,主要有两种方式:使用 Docker 开发镜像编译(推荐)、直接编译。 高版本:直接下载官网的tar包即可,无需再手动编译;本文安装的是doris-1.1.5版本 前置准备 创建目录作为doris的安装目录 [whybigdata@node01 ~]# mkdir /opt/module/doris-1.1.5 修改可打开文件数(每个节点都要修改) [whybigdata@node01 ~]# sudo vim /etc/security/limits.conf
- soft nofile 65535
- hard nofile 65535
- soft nproc 65535
- hard nproc 65535 安装部署FE 创建 fe 元数据存储的目录 [whybigdata@node01 ~]# mkdir /opt/module/doris-1.1.5/doris-meta 注意:此处如果没有提前创建该目录(配置文件中指定元数据路径),则启动fe后执行 jps 或者 ps –ef | grep doris 或者 ps –ef | grep fe 都找不到相应的进程。
四、实战Flink+Doris实时数仓
建表
因业务数据经常伴随有 UPDATE,DELETE 等操作,为了保持实时数仓的数据粒度与业务库一致,所以选择 Doris Unique 模型(数据模型在下文有重点介绍)具体建表语句如下:
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT '数据处理时间',
data_status INT COMMENT '数据是否删除,1表示正常,-1表示数据已经删除'
)
ENGINE=OLAP
UNIQUE KEY(key1
,key2
,key3
)
COMMENT "xxx"
DISTRIBUTED BY HASH(key2
) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);
可以看到,表结构中有两个字段分别是 data_deal_datetime,data_status。
data_deal_datetime 主要是相同 key 情况下数据覆盖的判断依据 data_status 用来兼容业务库对数据的删除操作 数据导入任务 Doris 提供了主动拉取 Kafka 数据的功能,配置如下: CREATE ROUTINE LOAD database.table1 ON table1 COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status), ORDER BY data_deal_datetime PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "10", "max_batch_rows" = "500000", "max_batch_size" = "209715200", "format" = "json", "json_root" = "$.data", "jsonpaths"="["$.key1","$.key2","$.key3","$.value1","$.value2", "$.value3","$.data_deal_datetime","$.data_status"]" )FROM KAFKA ( "kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3", "kafka_topic"="topic_name", "property.group.id"="group_id", "property.kafka_default_offsets"="OFFSET_BEGINNING" );
五、未来规划 实时 Schema Change 目前通过 Flink CDC 实时接入数据时,当上游业务表进行 Schema Change 操作时,必须先手动修改 Doris 中的 Schema 和 Flink 任务中的 Schema,最后再重启任务,新的 Schema 的数据才可以同步过来。这样使用方式需要人为的介入,会给用户带来极大的运维负担。后续会针对 CDC 场景做到支持 Schema 实时变更,上游的 Schema Change 实时同步到下游,全面提升 Schema Change 的效率。
Doris 多表写入 目前 Doris Sink 算子仅支持同步单张表,所以对于整库同步的操作,需要手动在 Flink 层面进行分流,写到多个 Doris Sink 中,这无疑增加了开发者的难度,在后续版本中我们也将支持单个 Doris Sink 同步多张表,这样就大大的简化了用户的操作。