踩坑记 | Flink 事件时间语义下数据乱序丢数踩坑

语音识
• 阅读 4026

踩坑记 | Flink 事件时间语义下数据乱序丢数踩坑

本文详细介绍了在上游使用处理时间语义的 flink 任务出现故障后,重启消费大量积压在上游的数据并产出至下游数据乱序特别严重时,下游 flink 任务使用事件时间语义时遇到的大量丢数问题以及相关的解决方案。

本文分为以下几个部分:

  • 「1.本次踩坑的应用场景」
  • 「2.应用场景中发生的丢数故障分析」
  • 「3.待修复的故障点」
  • 「4.丢数故障解决方案及原理」
  • 「5.总结」

应用场景

应用场景如下:

  • 「flink 任务 A」「处理时间」语义做过滤产出新增 xx 明细数据至 「Kafka Y」
  • 「flink 任务 B」「事件时间」语义消费 「Kafka Y」 做窗口聚合操作产出分钟级别聚合指标至 「Kafka Z」
  • 「Kafka Z」 实时导入至 「Druid」 以做即时 OLAP 分析,并且展示在 BI 应用看板

踩坑记 | Flink 事件时间语义下数据乱序丢数踩坑

丢数故障分析

简要介绍下这次生产中故障场景。整条故障追踪链路如下:

故障一:

  • 收到报警反馈 「flink 任务 A」 入口流量为 0
  • 定位 「flink 任务 A」 中某个算子的故障导致整个 job 卡住
  • 导致此 「flink 任务 A」 上游 「kafka X」 积压了大量数据
  • 重启 「flink 任务 A」后,消费大量积压在上游 「kafka X」 数据完成,任务恢复正常

故障一从而引发下游的故障二:

  • 由于 「flink 任务 A」 使用了「处理时间」语义处理数据,并且有过滤和 keyBy 分桶窗口逻辑,在重启后消费大量积压在上游的数据时,导致 sink rebalance 后产出到下游 「kafka Y」 各个分区数据中的 server_timestamp 是乱序的
  • 下游 「flink 任务 B」 在消费 「Kafka Y」 时使用了「事件时间」语义处理数据,并且使用了数据中的 server_timestamp 作为「事件时间」时间戳
  • 「flink 任务 B」 消费了乱序很严重的数据之后,导致在窗口聚合计算时丢失了大量数据
  • 最终展示在 BI 应用中的报表有丢失数据的情况

踩坑记 | Flink 事件时间语义下数据乱序丢数踩坑

待修复的故障点

  • 1.「flink 任务 A」 的稳定性故障,这部分解决方案暂不在本文中介绍
  • 2.「flink 任务 B」 消费上游乱序丢数故障,解决方案在下文介绍

解决方案以及原理

丢数故障解决方案

解决方案是以下游 「flink 任务 B」 作为切入点,直接给出 「flink 任务 B」 的 sql 代码解决方案,java code 也可以按照这个方案实现,其本质原理相同。下文进行原理解释。

SELECT
  to_unix_timestamp(server_timestamp / bucket) AS timestamp, -- format 成原有的事件时间戳
  count(id) as id_cnt,
  sum(duration) as duration_sum
FROM
  source_table
GROUP BY
  TUMBLE(proctime, INTERVAL '1' MINUTE),
  server_timestamp / bucket -- 根据事件时间分桶计算,将相同范围(比如 1 分钟)事件时间的数据分到一个桶内

解决方案原理

首先明确一个无法避免的问题,在不考虑 watermark 允许延迟设置特别大的情况下,只要上游使用到了处理时间语义,下游使用事件时间语义,一旦上游发生故障重启并在短时间内消费大量数据,就不可避免的会出现上述错误以及故障。

在下游消费方仍然需要将对应事件时间戳的数据展示在 BI 平台报表中、并且全链路时间语义都为处理时间保障不丢数的前提下。解决方案就是在聚合并最终产出对应事件时间戳的数据。

最后的方案如下:整条链路全部为处理时间语义,窗口计算也使用处理时间,但是产出数据中的时间戳全部为事件时间戳。在出现故障的场景下,一分钟的窗口内的数据的事件时间戳可能相差几个小时,但在最终窗口聚合时可以根据事件时间戳划分到对应的事件时间窗口内,下游 BI 应用展示时使用此事件时间戳即可。

注意:sql 中的 bucket 需要根据具体使用场景进行设置,如果设置过于小,比如非故障场景下按照处理时间开 1 分钟的窗口,bucket 设为 60000(1 分钟),那么极有可能,这个时间窗口中所有数据的 server_timestamp 都集中在某两分钟内,那么这些数据就会被分到两个桶(bucket)内,则会导致严重的数据倾斜。

输入数据样例

模拟上述故障,「flink B」 的任务某一个窗口内的数据输入如下。

server_timestampidduration
2020/9/01 21:14:381300
2020/9/01 21:14:501500
2020/9/01 21:25:382600
2020/9/01 21:25:383900
2020/9/01 21:25:382800

输出数据样例

按照上述解决方案中的 sql 处理过后,输出数据如下,则可以解决此类型丢数故障。

timestampid_cntduration_sum
2020/9/01 21:14:002900
2020/9/01 21:25:0032300

总结

本文分析了在 flink 应用中:

  • 「上游使用处理时间语义的 flink 任务出现故障、重启消费大量积压数据并产出至下游数据乱序特别严重时,下游使用事件时间语义时遇到的大量丢数问题」
  • 「以整条链路为处理时间语义的前提下,产出的数据时间戳为事件时间戳解决上述问题」
  • 「以 sql 代码给出了丢数故障解决方案样例」

学习资料

flink

踩坑记 | Flink 事件时间语义下数据乱序丢数踩坑

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Easter79 Easter79
3年前
SpringBoot整合Redis乱码原因及解决方案
问题描述:springboot使用springdataredis存储数据时乱码rediskey/value出现\\xAC\\xED\\x00\\x05t\\x00\\x05问题分析:查看RedisTemplate类!(https://oscimg.oschina.net/oscnet/0a85565fa
Wesley13 Wesley13
3年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
4个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(