Apache Flink 与 Apache Hive 的集成

Stella981
• 阅读 736

Apache Flink 与 Apache Hive 的集成

分享嘉宾:李锐&王刚 @ 阿里巴巴

编辑整理:于广超

内容来源:Flink Forward ASIA

出品平台:DataFun

注:欢迎转载,转载请留言。

导读:随着 Flink 在流式计算的应用场景逐渐成熟和流行。如果 Flink 能同时把批量计算的应用场景处理好,就能减少用户在使用 Flink 的开发和维护成本,并且能够丰富 Flink 的生态。因为 SQL 是批计算比较常用的工具,所以 Flink 针对于批计算主要以 SQL 为主要接口。本次分享主要针对 Flink 对批处理的设计与 Hive 的集成。

主要分为下面三点展开:

  • 设计架构

  • 项目进展

  • 性能测试

Apache Flink 与 Apache Hive 的集成

首先和大家分享一下 Flink 批处理的设计架构。

1. 背景

Apache Flink 与 Apache Hive 的集成

Flink 提升批处理的主要原因,是为了减少客户的维护成本和更新成本,还有更好的完善 Flink 生态环境。又因为 SQL 是批计算场景中一个非常重要的工具,所以我们希望以 SQL 做为在批计算场景的主要接口。所以我们着重优化了 FlinkSQL 的功能。目前 FlinkSQL 主要有下面几点不足需要优化。

  • 需要完整的元数据管理的体制。

  • 缺少对 DDL ( 数据定义语言 DDL 用来创建数据库中的各种对象-----表、视图、索引、同义词、聚簇等 ) 的支持。

  • 不是很方便的与外部系统进行对接,尤其是 Hive,因为 Hive 是大数据领域最早的 SQL 引擎,所以 Hive 的用户基础非常广泛,新的一些 SQL 工具,如 SparkSql、Impala 都提供了与 Hive 对接的功能,这样用户才能更好地把他们的应用从 Hive 迁移过来,所以与 Hive 对接对 FlinkSQL 而言也是一个很重要的功能。

2. 目标

Apache Flink 与 Apache Hive 的集成

所以我们这个项目要完成下面这些目标:

  • 定义统一的 Catalog 接口,这个是 FlinkSQL 更方便与外部对接的前提条件。如果大家用过 Flink 的 TableSouces 和 TableSink 来对接外部的系统的表,会发现不管是通过写程序还是配置 yaml 文件会跟传统的 SQL 使用方式会有些不同。所以我们肯定不希望 Hive 的用户迁移 FlinkSQL 需要通过定义 TableSouces 和 TableSink 的方式来与 Hive 进行交互。所以我们提供了一套新的 Catalog 接口以一种更接近传统 SQL 的方式与 Hive 进行交互。

  • 统一的 Catalog 定义好后,我们要提供基于内存和可持久化的实现。基于内存就是 Flink 原有的方式,用户所有的元数据的生命周期是跟他的 Session ( 会话 ) 绑定的,Session ( 会话 ) 结束之后他所有的元数据都没有了。因为要跟 Hive 交互所以肯定还要提供一个持久化的 Catalog。

  • 与 Hive 互操作。有了 Catalog 之后用户就可以通过 Catalog 访问 Hive 的元数据,提供 Data Connector 让用户能通过 Flink 读写 Hive 的实际数据,实现 Flink 与 Hive 的交互。

  • 长期目标是使用 Flink 作为 Hive 的计算引擎,像 HiveOnSpark,HiveOnTez。

3. 全新设计的 CatalogAPI ( FlIP-30 )

Apache Flink 与 Apache Hive 的集成

用户通过 SQL Client  或者 Table API  提交请求,Flink 会创建 TableEnvironment, TableEnvironment 会创建 CatalogManager 加载并配置 Catalog 实例,并且 Catalog 支持多种元数据类型 table、database、function、view、partition 等,在 1.9.0 的版本当中 Catalog 会有两个实现:

  • 一个是基于内存的 GenericinMemoryCatalog。

  • 另一个是 HiveCatalog,HiveCatalog 通过 HiveShim 与 Hive Metasotre 交互来操作 Hive 元数据,HiveShim 的作用是处理 Hive 在大版本中 Hive Metastore 不兼容的问题。

从这种实现的方式可以看出,用户可以创建多个 Catalog,也可以访问多个 Hive Metastore,来达到跨 Catalog 查询的操作。

4. 读写 Hive 数据

Apache Flink 与 Apache Hive 的集成

有了元数据之后我们就可以实现 FlinkSQL 的 Data Connector 来真正的读写 Hive 实际数据。FlinkSQL 写入的数据必须要兼容 Hive 的数据格式,也就是 Hive 可以正常读取 Flink 写入的数据,反过来也是一样的。为了实现这一点我们大量复用 Hive 原有的 Input/Output Format、SerDe 等 API,一是为了减少代码冗余,二是尽可能的保持兼容性。

在 Data Connect 中读取 Hive 表数据具体实现类为:

HiveTableSource、HiveTableInputFormat。

写 Hive 表的具体实现类为:

HiveTableSink、HiveTableOutputFormat。

Apache Flink 与 Apache Hive 的集成

其次和大家分享 Flink 1.9.0 的现状和 1.10.0 中的新特性还有未来工作。

1. Flink 1.9.0 的现状

Apache Flink 与 Apache Hive 的集成

FlinkSQL 作为 1.9.0 版本中作为试用功能发布的,它的功能还不是很完善:

  • 支持的数据类型还不全。1.9.0 中带参数的数据类型基本上都不支持:如DECIMAL,CHAR 等

  • 对分区表的支持不完善,只能读取分区表,不能写分区表

  • 不支持表的 INSERT OVERWRITE

2. Flink 1.10.0 中的新特性

Apache Flink 与 Apache Hive 的集成

FlinkSQL 在 1.10.0 版本里我们做了比较多的进一步开发,与 Hive 集成的功能更加完整:

  • 支持读写静态分区和动态分区表

  • 在表级别和分区级别都支持 INSERT OVERWRITE

  • 支持了更多的数据类型。( 除 UNION 类型都支持 )

  • 支持更多的 DDL。( CREATE TABLE / DATABASE )

  • 支持在 Flink 中调用 Hive 的内置函数。( Hive 大约200多个内置函数 )

  • 支持了更多的 Hive 版本。( Hive 的 1.0.0~3.1.1 )

  • 做了很多性能优化如,Project/Predicate Pushdown,向量的读取 ORC 数据等

3. Module 接口

Apache Flink 与 Apache Hive 的集成

为了能让用户调用 FlinkSQL 中调用 Hive 的内置函数,我们在 Flink 1.10 当中引入了一个 Module 接口。这个 Module 是为了让用户能够方便的把外部系统的内置函数接入到系统当中。

  • 他的使用方式和 Catalog 类似,用户可以通过 Table API 或 Yaml 文件来配置 Module。

  • Module 可以同时加载多个,Flink 解析函数的时候通过 Module 的加载顺序在多个 Module 中查找函数的解析。也就是如果两个 Module 包含名字相同的 Function,先加载的 Module 会提供 Function 的定义。

  • 目前 Module 有两个实现,CoreModule 提供了 Flink 原生的内置函数,HiveModule 提供了 Hive 的内置函数。

4. 未来工作

Apache Flink 与 Apache Hive 的集成

未来的工作主要是先做功能的补全,其中包括:

  • View 的支持 ( 有可能在1.11中完成 )

  • 持续改进 SQL CLI 的易用性,现在支持翻页显示查询结果,后续支持滚动显示。并支持 Hive 的 -e -f 这种非交互式的使用方式

  • 支持所有的 Hive 常用 DDL,例如 CREATE TABLE AS

  • 兼容 Hive 的语法,让原来在 Hive 上的工程在 Flink 的顺滑的迁移过来

  • 支持 SQL CLI 的远程模式,类似 HiveServer2 的远程连接模式

  • 支持流式的写入 Hive 数据

Apache Flink 与 Apache Hive 的集成

下面是 Flink 在批处理作业下与 HiveMR 对比测试的测试环境和结果。

1. 测试环境

Apache Flink 与 Apache Hive 的集成

首先我们的测试环境使用了21个节点的物理机群,一个 Master 节点和20个 Slave 节点。节点的硬件配置是32核,64个线程,256内存,网络做了端口聚合,每个机器是12块的 HDD 硬盘。

Apache Flink 与 Apache Hive 的集成

测试工具使用了 hortonworks 的 hive-testbench 在 github 中是一个开源的工具。我们使用这个工具生成了 10TB 的 TPC-DS 测试数据集,然后分别通过 FlinkSQL 和 Hive 对该数据集进行 TPC-DS 的测试。一方面我们对比了 Flink 和 Hive 的性能,另一方面我们验证了 FlinkSQL 能够很好的访问 Hive 的数据。测试用到了 Hive 版本是 3.1.1,Flink 用到的是 Master 分支代码。

2. 测试结果

Apache Flink 与 Apache Hive 的集成

测试结果 FlinkSQL 对比 Hive On MapReduce 取得了大约7倍的性能提升。这得益于 FlinkSQL 所做的一系列优化,比如在调度方面的优化,以及执行计划的优化等。总体来说如果用的是 Hive On MapReduce,迁移到 FlinkSQL 会有很大性能的提升。

Apache Flink 与 Apache Hive 的集成

本文分享自微信公众号 - 浪尖聊大数据(bigdatatip)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
2年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
4cast
4castpackageloadcsv.KumarAwanish发布:2020122117:43:04.501348作者:KumarAwanish作者邮箱:awanish00@gmail.com首页:
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这