Spark2.x精通:ShuffleReader过程源码深度剖析

Stella981
• 阅读 366

Spark2.x精通:ShuffleReader过程源码深度剖析

一、概述

    之前我们写了几篇文章详细讲解了Spark Shuffle的Writer原理、技术演进历程及Spark2.x中三种Writer机制的具体实现,这里我们对Shuffler Read的源码进行深度剖析。

    对于每个stage来说,它的上边界,要么从外部存储读取数据,要么读取parent stage的输出。而下边界要么是写入到本地文件系统(需要有shuffle),提供给child stage进行读取,要么就是最后一个stage,需要输出结果。这里的stage在运行时就可以以流水线的方式进行运行一组Task,除了最后一个stage对应的ResultTask,其余的stage全部对应的ShuffleMapTask。

  除了需要从外部存储读取数据和RDD已经做过cache或者checkPoint的Task。一般的Task都是从Shuffle RDD的ShuffleRead开始的。

二、源码剖析

 1.我们先从ResultTask的runtask()函数开始讲解,代码如下:

override def runTask(context: TaskContext): U = {

2.这里ShuffleRDD.compute()函数从sparkEnv中获取对应的shuffleManager,这里对应的是SortShuffleManager,代码如下:

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {

调用getReader函数返回的应该是BlockStoreShuffleReader实例,代码如下:

  override def getReader[K, C](

3.最后调用BlockStoreShuffleReader的read()函数对数据进行读取,代码如下:

 override def read(): Iterator[Product2[K, C]] = {

4.回到上面第1中的ShuffleBlockFetcherIterator初始化,由于这个涉及到数据的拉取,比较重要,再去跟踪下他的实例化代码,里面主要是调用了initialize()函数进行了初始化,代码如下:

private[this] def initialize(): Unit = {

 

    总结一下,上面的read()函数里面的实现代码比较多,这里只看了重要的流程代码,它主要干了三件事:

    1.首先实例化ShuffleBlockFetcherIterator并进行数据的拉取;

    2.其次就是对数据进行了聚合,生成聚合迭代器;

    3.最后对数据进行了排序,生成排序迭代器。

ShuffleBlockFetcherIterator上面也说了几个参数,但是有一个参数特别重要参数,经常会用来优化shuffle reader:

    spark.reducer.maxSizeInFlight

         默认值48MB,设置ShuffleReadTask拉取数据的缓冲区大小,决定每次能够拉取多少数据。如果你内存充足,可适当调大成64MB、96MB减少拉取次数和数据传输次数,如果内存不太多,可适当调小为24MB,防止OOM,减少每次拉取的数据。

如果觉得我的文章能帮到您,请关注微信公众号“大数据开发运维架构”,并转发朋友圈,谢谢支持!


相关阅读:

  1. Spark2.x精通:TaskRunner运行源码深度剖析

  2. Spark2.x精通:Shuffle原理及对应的Consolidation优化机制

  3. Spark2.x精通:三种ShuffleWriter触发条件

  4. Spark2.x精通:Shuffle演进历程及Shuffle两阶段划分

  5. Spark2.x精通:源码剖析BypassMergeSortShuffleWriter具体实现

  6. Spark2.x精通:源码剖析UnsafeShuffleWriter具体实现

  7. Spark2.x精通:源码剖析SortShuffleWriter具体实现

Spark2.x精通:ShuffleReader过程源码深度剖析

本文分享自微信公众号 - 大数据开发运维架构(JasonLu1986)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
秃头王路飞 秃头王路飞
4个月前
webpack5手撸vue2脚手架
webpack5手撸vue相信工作个12年的小伙伴们在面试的时候多多少少怕被问到关于webpack方面的知识,本菜鸟最近闲来无事,就尝试了手撸了下vue2的脚手架,第一次发帖实在是没有经验,望海涵。languageJavaScript"name":"vuecliversion2","version":"1.0.0","desc
光头强的博客 光头强的博客
4个月前
Java面向对象试题
1、请创建一个Animal动物类,要求有方法eat()方法,方法输出一条语句“吃东西”。创建一个接口A,接口里有一个抽象方法fly()。创建一个Bird类继承Animal类并实现接口A里的方法输出一条有语句“鸟儿飞翔”,重写eat()方法输出一条语句“鸟儿吃虫”。在Test类中向上转型创建b对象,调用eat方法。然后向下转型调用eat()方
刚刚好 刚刚好
4个月前
css问题
1、在IOS中图片不显示(给图片加了圆角或者img没有父级)<div<imgsrc""/</divdiv{width:20px;height:20px;borderradius:20px;overflow:h
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
小森森 小森森
4个月前
校园表白墙微信小程序V1.0 SayLove -基于微信云开发-一键快速搭建,开箱即用
后续会继续更新,敬请期待2.0全新版本欢迎添加左边的微信一起探讨!项目地址:(https://www.aliyun.com/activity/daily/bestoffer?userCodesskuuw5n)\2.Bug修复更新日历2.情侣脸功能大家不要使用了,现在阿里云的接口已经要收费了(土豪请随意),\\和注意
晴空闲云 晴空闲云
4个月前
css中box-sizing解放盒子实际宽高计算
我们知道传统的盒子模型,如果增加内边距padding和边框border,那么会撑大整个盒子,造成盒子的宽度不好计算,在实务中特别不方便。boxsizing可以设置盒模型的方式,可以很好的设置固定宽高的盒模型。盒子宽高计算假如我们设置如下盒子:宽度和高度均为200px,那么这会这个盒子实际的宽高就都是200px。但是当我们设置这个盒子的边框和内间距的时候,那
艾木酱 艾木酱
3个月前
快速入门|使用MemFire Cloud构建React Native应用程序
MemFireCloud是一款提供云数据库,用户可以创建云数据库,并对数据库进行管理,还可以对数据库进行备份操作。它还提供后端即服务,用户可以在1分钟内新建一个应用,使用自动生成的API和SDK,访问云数据库、对象存储、用户认证与授权等功能,可专
NVIDIA安培架构下MIG技术分析
关键词:NVIDIA、MIG、安培一什么是MIG2020年5月,NVIDIA发布了最新的GPU架构:安培,以及基于安培架构的最新的GPU:A100。安培提供了许多新的特性,MIG是其中一项非常重要的新特性。MIG的全名是MultiInstanceGPU。NVIDIA安培架构中的MIG模式可以在A100GPU上并行运行七个作业。多实
helloworld_28799839 helloworld_28799839
4个月前
常用知识整理
Javascript判断对象是否为空jsObject.keys(myObject).length0经常使用的三元运算我们经常遇到处理表格列状态字段如status的时候可以用到vue
密钥管理系统-为你的天翼云资产上把“锁
本文关键词:数据安全,密码机,密钥管理一、你的云上资产真的安全么?1.2021年1月,巴西的一个数据库30TB数据被破坏,泄露的数据包含有1.04亿辆汽车和约4000万家公司的详细信息,受影响的人员数量可能有2.2亿;2.2021年2月,广受欢迎的音频聊天室应用Clubhouse的用户数据被恶意黑客或间谍窃取。据悉,一位身份不明的用户能够将Clubho