Spark系列——作业原理详解

Stella981
• 阅读 398

前言

本篇文章主要是从作业提交到最后获取到作业结果,
从源码的角度,
但是不涉及源码进行的分析.
其目的是读完本篇文章,
你将对作业的基本流程有个清晰的认识。
当然如果你阅读过源码,
那么读起来应该会比较舒服,
否则可能会有一定不适,
因为本文写的不是那么有逻辑~~~

1.任务提交过程

  • 首先,我们知道,一个action算子是触发一个job生成的地方,
    当遇见action算子,会执行sparkcontext的runjob方法,
    最后会交给dagSchedule的submitjob,
    这里会创建一个jobwaiter对象,并发送一个JobSubmitted消息进行作业任务的执行,
    同时 waiter.awaitResult() 会等待作业执行结果的返回:成功或者失败。
    到这里,我们对于作业应该有个基本的认识了,
    那么接下来我们再来深入一点,这个作业submit之后发生了什么呢?

2.划 分 调 度 阶 段

  • spark是资源调度是粗粒度的,我们这里不讨论资源申请,
    当我们提交一个任务之后(此时资源应该都是在集群中申请好了),
    Spark首先会对我们的作业任务划分调度阶段,
    而这个调度阶段的划分是由 DAGScheduler 负责的,
    其调度是基于stage的,那么下面我们看看stage是怎么划分的。

  • 一个application中的rdd集合相互依赖形成了一个依赖树,
    DAGScheduler 通过其 getParentStages 方法会从最后一个 finalrdd 开始,
    判断整颗树中RDD之间的依赖是否有 宽依赖ShuffleDependency,
    如果没有,就只生成一个stage,
    如果有,调用 getAncestorShuffleDepend,使用广度优先遍历整个依赖树,
    当遇到 ShuffleDependency 的时候,就会通过newOrUsedShuffleStag生成一个个stage,
    并划分为两个调度阶段,这样一个job也就被划分成了一个或者多个stage了。

  • 到这里我们的作业已经被划分成了一个个stage了,
    接下来就看看stage是怎么被提交的吧。。。

3.提 交 调 度 阶 段

  • 前面我们提到了JobSubmitted消息,
    那么这个消息实际上会触发 DAGScheduler 的 handleJobSubmitted 方法,
    首先该方法会在生成 finalStage 的同时建立起所有调度阶段的依赖关系(至于怎么建立的,我们后面慢慢深入),
    然后通过 fmalStage 生成一个作业实例ActiveJob,
    然后在submitStage(finalStage)开始提交作业。

  • 在作业提交调度阶段开始时,
    在 submitStage 方法中调用 getMissingParentStages 方法获取finalStage 父调度阶段,
    如果不存在父调度阶段,则使用 submitMissingTasks(stage) 方法提交执行;
    如果存在父调度阶段,则把该调度阶段存放到 waitingStages 列表中,
    同时递归调用 submitStage,
    直到找到没有父stage的stage调用 submitMissingTasks(stage),
    将该阶段提交去执行。
    这样一次调度任务就发送到Excutor开始执行了。

  • 当Excutor的task执行完成时发通知消息 CompleteEvent,
    会调用到 DAGschedule的handleTaskCompletion更新状态,
    并且判断该 task 所属的 stage 是否所有任务都已经完成,
    如果完成,则扫描等待运行调度阶段列表,检查它们的父调度阶段是否存在未完成,
    如果不存在则表明该调度阶段准备就绪,生成实例并提交运行。(至于其中失败重试的机制不做讨论)

  • 到此,stage提交的基本情况我们已经了解,
    但是对于一个了解spark的人来说,我们熟悉的task还没有出现,
    接下来,我们就来看看stage的task的执行流程吧。

4.提 交 任 务

  • 前面我们说到提交 stage 的方法 submitStage 进行Stage 的提交,
    该方法内部会调用到 DAGScheduler 的 submitMissingTasks 方法对每个stage 的 task 进行提交,
    其task生成规则如下:
    首先根据每个 stage 最后一个rdd的 Partition 个数拆分对应个数的 task ,
    这些 task 组成一个任务集 taskset 提交到 TaskScheduler 进行处理。
    对于 ResultStage (作业中最后的stage)生 成 ResultTask ,
    对 于 ShuffleMapStage 生成 ShuffleMapTask 。

  • 当 TaskScheduler 收到发送过来的任务集时,
    在 submitTasks 方法中(在 TaskSchedulerlmpl 类中进行实现),
    构建一个 TaskSetManager 的实例,用于管理这个任务集的生命周期,
    并通过 schedulableBuilder 的 addTaskSetManager 放入系统的调度池中,进行调度。

  • 接下来就是将调度池中待调度的任务发往Excutor。
    通过调用 SchedulerBackend 的 reviveOffers ,
    向 DriverEndPoint 终端点发送 ReviveOffers 消息,
    会触发 SchedulerBackend 的 makeOffers 方法,
    开始进行资源的检查和分配
    该方法首先会获取集群中可用的 Executor ,
    并通过 TaskSchedulerlmpl 的 resourceOffers 按照就近原则对进行资源的分配,
    并划分 PROCESS _ LOCAL、 NODE LOCAL、 NO PREF 、 RACK_LOCAL和 ANY 五个等级。
    这时候每个Task就知道自己将要去往的Excutor在哪里了,
    可以直接进行 launchtask 操作,
    这样就把分配好资源的 task 一个个发送到 Worker 节点上的 CoarseGrainedExecutorBackend ,
    真正将任务提交到了 Excutor。

  • 至此,我们的 task 算是正式提交到excutor准备执行了。

5.执 行 任 务

  • 当 CoarseGrainedExecutorBackend(excutor的守护进程) 接收到 LaunchTask 消息时,
    会调用 Executor 的 launchTask 方法进行处理。
    在 Executor 的 launchTask 方法中,
    初始化一个 TaskRunner 来封装任务,
    它用于管理任务运行时的细节,
    再把 TaskRumier 对象放入到 ThreadPool (线程池)中去执行。
    在 TaskRunner 的 run 方法里,
    首先会对发送过来的 Task 本身以及它所依赖的 Jar 等文件的反序列,
    然后对反序列化的任务调用 Task 的 runTask 方法。
    由于 Task 本身是一个抽象类,
    具体的 TaskRunner 方法是由它的两个子类 ShuffleMapTask 和 RedultTask 来实现的。

  • 对于 ShuffleMapTask 而言,它的计算结果会写到 BlockManager 之中,
    最终返回给 DAGScheduler 的是一个 MapStatus 对象。
    该对象中保存着了 ShuffleMapTask 的运算结果存储到BlockManager 里的相关存储信息,
    而不是计算结果本身,
    这些存储信息将会成为下一阶段的任务需要获得的输入数据时的依据。

  • 对于 ResultTask 的 runTask 方法而言,
    它最终返回的是最后的计算结果。

  • 至此,task计算结束,下面我们看看计算的结果是怎么处理的。

6.获 取 执 行 结 果

  • Excutor 端对结果数据进行处理,
    根据处理的Task类型不一样是有不一样的处理方式

  • ShuffleTask
    将结果封装成一个 MatStatus 对象,
    该 MatStatus 会记录结果的位置信息 和 文件大小,
    Driver端的 TaskSchedule 会将 MatStatus 注册到
    MapOutputTrackerMaster的 mapStatuses 中进行保存,
    当下游的 stage 需要数据的时候,
    由其MapOutputTrackerWorker向MapOutputTrackerMaster 查找,
    以获取其所需要处理数据的信息。

同时也需要判断该 Stage 的 task 是否已经全部完成,
如果完成,那么将开始下一轮的Stage任务。

  • ResultTask
    如果是使用了类似 Collect 等需要将数据拉回Driver端的算子,
    则需要根据结果的大小有不同的策略。

    (1) 生成结果大小大于1GB结果直接丢弃,
    该配置项可以通过 spark . driver.maxResultSize进行设置。

    (2) 生成结果大小在[128 MB -200 KB,1 GB] :
    如果生成的结果大于等于(128 MB -200 KB )时,
    会把该结果以taskld 为编号存入到 BlockManager 中,
    然后把该编号通过 Netty 发送给 Driver终端点,
    该阈值是 Netty 框架传输的最大值 spark.akka.frameSize(默认为128 MB)和 Netty 的预留空间 reservedSizeBytes (200 KB ) 差值。

    (3) 生成结果大小在(0 , 128 MB -200 KB):
    通过 Netty 直接发送到 Driver 终端点。

    对于(2)的结果,Diver 端收到一个 IndirectTaskResult 的结果,
    需要通过 sparkEnv.blockManager.getRemoteBytes(blockld)来获取结果;

    对于(3)的结果,Diver 端收到一个 DirectTaskResult ,
    那么结果就无需远程获取了。

    如果不需要拉回Driver端
    其结果则直接是我们所写的算子决定的,
    只需要通知Driver端,
    Diver 端会查看当前作业的Task是不是全部完成,
    如果完成,
    那么作业也就完成了,
    Driver 会清除作业依赖的资源,
    并发送消息给系统监听总线告知作业执行完毕。

以上是成功消息的处理,如果是失败的任务,
并且在 TaskSchedulerImpl 重试 3 次后还是失败,
那么会 将消息失败的任务通知 DAGScheduler ,
DAGScheduler 会对整个 Stage 进行4次重试,
如果还是失败,那么整个任务就失败了

总结

当我们提交一个job,
首先会被 DAGScheduler 通过宽窄依赖解析成一个个 stage,
然后按顺序将 stage 以 taskset 的形式提交给 TaskScheduler ,
TaskScheduler 将 taskset 构建成 TaskSetManager 对象管理,
并按照调度系统给定的策略向 Executor 提交任务,
Executor 将接受的到 task 以 taskrunner 的方式执行计算出结果,
并储存到 BlockManager ,
然后向 TaskScheduler 返回一个记录了结果信息的MapStatus对象,
并注册到 driver 端的 MapOutputTrackerMaster,
然后进行下一轮的 stage 调度 (如果是ResultTask执行结果,那么数据是我们算子决定了他最后会落地在哪的)

本文同步分享在 博客“code_solve”(JianShu)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
2年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Karen110 Karen110
2年前
​一篇文章总结一下Python库中关于时间的常见操作
前言本次来总结一下关于Python时间的相关操作,有一个有趣的问题。如果你的业务用不到时间相关的操作,你的业务基本上会一直用不到。但是如果你的业务一旦用到了时间操作,你就会发现,淦,到处都是时间操作。。。所以思来想去,还是总结一下吧,本次会采用类型注解方式。time包importtime时间戳从1970年1月1日00:00:00标准时区诞生到现在
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这