Apache Iceberg 小文件合并

Stella981
• 阅读 1123
本文是《Apache Iceberg 入门教程》专题的第 1 篇,共 9 篇:

下一篇文章 »

在 《一条数据在 Apache Iceberg 之旅:写过程分析》 这篇文章中我们分析了 Apache Iceberg 写数据的源码。如下是我们使用 Spark 写两次数据到 Iceberg 表的数据目录布局(测试代码在 这里):

/data/hive/warehouse/default.db/iteblog
├── data
│   └── ts_year=2020
│       ├── id_bucket=0
│       │   ├── 00000-0-19603f5a-d38a-4106-aeb9-47285d15a6bd-00001.parquet
│       │   ├── 00000-0-491c447b-e05f-40ac-8c32-44d5ef39353b-00001.parquet
│       │   ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00002.parquet
│       │   └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00002.parquet
│       └── id_bucket=1
│           ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00001.parquet
│           └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00001.parquet
└── metadata
    ├── 00000-0934ba0e-8ed9-48e3-9db2-25dca1f896f2.metadata.json
    ├── 00001-454ee707-dbc0-4fd8-8c64-0910d8d6315b.metadata.json
    ├── 00002-7bdd0e6b-ec2b-4b1a-b355-6a81a3f5a0ae.metadata.json
    ├── 2170dbc3-334e-41ad-ac2c-68dd7470f001-m0.avro
    ├── 22e1674c-71ea-4d84-8982-276c3370d5c0-m0.avro
    ├── snap-5353330338887146702-1-2170dbc3-334e-41ad-ac2c-68dd7470f001.avro
    └── snap-6683043578305788250-1-22e1674c-71ea-4d84-8982-276c3370d5c0.avro

5 directories, 13 files

因为我们每次写入的数据就几条,Iceberg 每个分区写文件的时候都是产生新的文件,这就导致底层文件系统里面产生了很多大小才几KB的文件。如果我们是使用 Spark Streaming 的方式7*24小时不断地往 Apache Iceberg 里面写数据,这将产生大量的小文件。

使用 Iceberg 来压缩文件

值得高兴的是,Apache Iceberg 给我们提供了相关 Actions API 来合并这些小文件,具体如下:

Configuration conf = new Configuration();
conf.set(METASTOREURIS.varname, "thrift://localhost:9083");

Map<String, String> maps = Maps.newHashMap();
maps.put("path", "default.iteblog");
DataSourceOptions options = new DataSourceOptions(maps);

Table table = findTable(options, conf);

SparkSession.builder()
        .master("local[2]")
        .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
        .config("spark.hadoop." + METASTOREURIS.varname, "thrift://localhost:9083")
        .config("spark.executor.heartbeatInterval", "100000")
        .config("spark.network.timeoutInterval", "100000")
        .enableHiveSupport()
        .getOrCreate();

Actions.forTable(table).rewriteDataFiles()
        .targetSizeInBytes(10 * 1024) // 10KB
        .execute();

运行完上面代码之后,可以将 Iceberg 的小文件进行合并,得到的新数据目录如下:

⇒  tree /data/hive/warehouse/default.db/iteblog
/data/hive/warehouse/default.db/iteblog
├── data
│   └── ts_year=2020
│       ├── id_bucket=0
│       │   ├── 00000-0-19603f5a-d38a-4106-aeb9-47285d15a6bd-00001.parquet
│       │   ├── 00000-0-491c447b-e05f-40ac-8c32-44d5ef39353b-00001.parquet
│       │   ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00002.parquet
│       │   ├── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00002.parquet
│       │   └── 00001-1-da4e85fa-096d-4b74-9b3c-a260e425385d-00001.parquet
│       └── id_bucket=1
│           ├── 00000-0-1d5871d5-08ac-4e43-a589-70a5d50dd4d2-00001.parquet
│           ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00001.parquet
│           └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00001.parquet
└── metadata
    ├── 00000-0934ba0e-8ed9-48e3-9db2-25dca1f896f2.metadata.json
    ├── 00001-454ee707-dbc0-4fd8-8c64-0910d8d6315b.metadata.json
    ├── 00002-7bdd0e6b-ec2b-4b1a-b355-6a81a3f5a0ae.metadata.json
    ├── 00003-d987d15f-2c7c-427c-849e-b8842d77d28e.metadata.json
    ├── 2170dbc3-334e-41ad-ac2c-68dd7470f001-m0.avro
    ├── 22e1674c-71ea-4d84-8982-276c3370d5c0-m0.avro
    ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m0.avro
    ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m1.avro
    ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m2.avro
    ├── snap-3634417817414108593-1-25126b97-5a87-42b7-b45a-499aa41e7359.avro
    ├── snap-5353330338887146702-1-2170dbc3-334e-41ad-ac2c-68dd7470f001.avro
    └── snap-6683043578305788250-1-22e1674c-71ea-4d84-8982-276c3370d5c0.avro

5 directories, 20 files

对比最新的结果可以得出:

  • ts_year=2020/id_bucket=0 新增了名为 00001-1-da4e85fa-096d-4b74-9b3c-a260e425385d-00001.parquet 的数据文件,这个其实就是把之前四个文件进行和合并得到的新文件;
  • ts_year=2020/id_bucket=1 新增了名为 00000-0-1d5871d5-08ac-4e43-a589-70a5d50dd4d2-00001.parquet 的数据文件,这个其实就是把之前两个文件进行和合并得到的新文件。

Iceberg 小文件合并原理

Iceberg 小文件合并是在 org.apache.iceberg.actions.RewriteDataFilesAction 类里面实现的。小文件合并其实是通过 Spark 并行计算的,这也就是上面 DEMO 初始化了一个 SparkSession 的原因。我们可以通过 RewriteDataFilesAction 类的 targetSizeInBytes 方法来设置输出的合并文件大小。

注意:经过我多次测试,有以下结论:

  • 如果你设置的 targetSizeInBytes 值比底层文件的大小都小,其实原来大于这个尺寸的文件并没被拆成一个个大小最多为 targetSizeInBytes 的文件,而相当于把原来每个文件读出来,再写到新文件里面。
  • 如果你设置的 targetSizeInBytes 值比底层文件的大小要大,这时候合并才真正生效。

这个看起来像是把文件大小小于 targetSizeInBytes 的文件进行了合并,大于这个值的文件好像就是简单读出来,再写到新文件里面。

当我们调用了 execute() 方法,RewriteDataFilesAction 类会先创建出一个 org.apache.iceberg.DataTableScan,然后会把对应表的最新快照(Snapshot)拿出来,紧接着拿出这个快照对应的底层所有数据文件。然后按照分区 Key 进行分组(group),同一个分区的文件放到一起,并将这些信息放到 Map<StructLikeWrapper, Collection> groupedTasks 的结果里面,groupedTasks 的 Key 就是分区信息,如果表不是分区表,那就是空分区;groupedTasks 的 value 就是对应分区底下的文件列表。

由于分区里面可能存在一个文件,这时候就没必要去执行文件合并,这时候可以去掉这部分分区,得到了 Map<StructLikeWrapper, Collection> filteredGroupedTasks。如果 filteredGroupedTasks 里面没有需要合并的分区那就直接返回了。

如果 filteredGroupedTasks 不为空,则对每个分区里面的文件进行 split 和 combine 操作,如下:

// Split and combine tasks under each partition
List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
        .map(scanTasks -> {
          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
        })
        .flatMap(Streams::stream)
        .collect(Collectors.toList());

combinedScanTasks 结构如下:

如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号: iteblog_hadoop

combinedScanTasks 里面其实就是封装了 BaseCombinedScanTask 类,这个类里面的 task 就是标识哪些 Iceberg 的数据文件需要合并到新文件里面。得到 combinedScanTasks 之后会构造出一个 RDD:

JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());

然后最终会调用 taskRDD 的 map 方法,遍历 combinedScanTasks 里面的 task,将 task 里面对应的 Iceberg 读出来,再写到新文件里面:

public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) {
    JavaRDD<TaskResult> taskCommitRDD = taskRDD.map(this::rewriteDataForTask);

    return taskCommitRDD.collect().stream()
        .flatMap(taskCommit -> Arrays.stream(taskCommit.files()))
        .collect(Collectors.toList());
}

rewriteDataForTask 的实现如下:

private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
    TaskContext context = TaskContext.get();
    int partitionId = context.partitionId();
    long taskId = context.taskAttemptId();

    RowDataReader dataReader = new RowDataReader(
        task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);

    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
        properties, schema, SparkSchemaUtil.convert(schema));
    OutputFileFactory fileFactory = new OutputFileFactory(
        spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);

    BaseWriter writer;
    if (spec.fields().isEmpty()) {
      writer = new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE);
    } else {
      writer = new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema);
    }

    try {
      while (dataReader.next()) {
        InternalRow row = dataReader.get();
        writer.write(row);
      }

      dataReader.close();
      dataReader = null;
      return writer.complete();

    } catch (Throwable originalThrowable) {
    ......
    }
}

rewriteDataForTasks 执行完会返回新创建文件的路径,最后会写到新的快照里面。在快照里面会将新建的文件表示为 org.apache.iceberg.ManifestEntry.Status#ADDED,上一个快照里面的文件标记为 org.apache.iceberg.ManifestEntry.Status#DELETED。

好了,本文就分享到这里。

**本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Apache Iceberg 小文件合并】(https://www.iteblog.com/archives/9896.html)

**

喜欢 (2) 赏 分享 (0)

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Wesley13 Wesley13
2年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这