`
bit1129
  • 浏览: 1052294 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark四十六】Spark RDD物理执行图与Stage

 
阅读更多

理解记忆如下要点:

1. 划分Stage的方法是从后向前,遇到 shuffle Dependency就断开,如果是Narrow Dependency就划入当前的Stage。所以,ShuffledRDD通常是一个Stage的开头(Stage的开头未必是 ShuffledRDD,也有可能是数据源转换而来的RDD)。所以上面一共3个Stage。需要注意的是,shuffle的结果未必成为 ShuffledRDD,例如上面的CoGroupedRDD,一半来源于Shuffle。

2. 每个 stage 里面 task 的数目由该 stage 最后一个 RDD 中的 partition 个数决定。Task分为两类,一类是ShuffleMapTask,一类是ResultTask。而且一个Stage中的Task类型只能是其中一种类型

3. 数据用的时候再算,而且数据是流到要计算的位置的。所谓用到的时候再算,是指从后向前,本RDD追溯父RDD的数据,直到数据源,然后数据源的原始数据,依次经历所有的RDD的算子转换操作。所谓的计算位置,指的是ShuffleMapTask或者ResultTask运行的地方

4.对于没有 parent stage 的 stage,该 stage 最左边的 RDD 是可以立即计算的,而且每计算出一个 record 后便可以流入 f 或 g(所谓的f,g是指RDD链上的不同RDD的算子操作)。如 果 f 中的 record 关系是 1:1 的,那么 f(record1) 计算结果可以立即顺着 computing chain 流入 g 中。如果 f 的 record 关系是 N:1,record1 进入 f() 后也可以被回收。总结一下,computing chain 从后到前建立,而实际计算出的数据从前到后流动,而且计算出的第一个 record 流动到不能再流动后,再计算下一个 record。这样,虽然是要计算后续 RDD 的 partition 中的 records,但并不是要求当前 RDD 的 partition 中所有 records 计算得到后再整体向后流动。(这段话描述了,RDD的元素计算不是全部加载然后再计算,是边加载边计算的模式)

5. 对于有 parent stage 的 stage,先等着所有 parent stages 中 final RDD 中数据计算好,然后经过 shuffle 后,问题就又回到了计算 “没有 parent stage 的 stage”。这里明确了Stage依赖之后,依赖的Stage的任务必须先执行完,然后才开始下一个任务。而不是依赖的Stage任务完成了一部分,然后下个Stage的任务就开始执行了。

6. RDD的实现细节:

6.1每个 RDD 包含的 getDependency() 负责确立 RDD 的数据依赖

6.2 compute() 方法负责接收 parent RDDs 或者 data block 流入的 records,进行计算,然后输出 record。

经常可以在 RDD 中看到这样的代码firstParent[T].iterator(split, context).map(f)。firstParent 表示该 RDD 依赖的第一个 parent RDD,iterator() 表示 parentRDD 中的 records 是一个一个流入该 RDD 的,map(f) 表示每流入一个 recod 就对其进行 f(record) 操作,输出 record。为了统一接口,这段 compute() 仍然返回一个 iterator,来迭代 map(f) 输出的 records。

总结一下:整个 computing chain 根据数据依赖关系自后向前建立,遇到 ShuffleDependency 后形成 stage。在每个 stage 中,每个 RDD 中的 compute() 调用 parentRDD.iter() 来将 parent RDDs 中的 records 一个个 fetch 过来。

 

 

给定一个逻辑执行图,如何进行Stage划分,如下所示的逻辑执行图,包含几个Stage?

 

 

 

 

 cartesian算子对应的窄依赖逻辑图和物理图



 

 上图中,包含了六个ResultTask,仅有ResultTask,比如黑色线对应的这个Result,它需要计算两个数据块,然后对3个RDD的partition进行处理,这一整套的过程都是在一个ResultTask中执行的,ResultTask个数与最后一个RDD的分区数(这里是CartesianRDD)相同

 

插播一个computing chain的图片,在最开始出用到了

 

 

 

 

  • 大小: 156.8 KB
  • 大小: 206.2 KB
  • 大小: 186.1 KB
  • 大小: 163.8 KB
分享到:
评论

相关推荐

    spark-2.2.2安装流程

    RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。 DAG Scheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到Task...

    Spark RDD是什么?

    Spark 的核心是建立在统一的抽象弹性分布式数据集(Resiliennt Distributed Datasets,RDD)之上的,这使得 Spark 的各个组件可以无缝地进行集成,能够在同一个应用程序中完成大数据处理。本节将对 RDD 的基本概念及...

    Spark学习笔记—Spark工作机制

    RDD的Action算子触发job的提交,提交到Spark的Job生成RDD DAG,由DAGScheduler转换为Stage DAG,每个Stage中产生相应的Task集合,TaskScheduler将任务分发到Executor执行。每个任务对应的数据块,使用用户定义的函数...

    spark源码分析.pdf

    spark源码分析,RDD、Iterator、Job、DAG、Stage、Taskset、task等

    Databricks Spark 知识库

    Spark最佳实践 最佳实践 避免使用 GroupByKey 不要将大型 RDD 的所有元素拷贝到请求驱动者 常规故障处理 Job aborted due to stage failure: Task not serializable 缺失依赖 执行 start-all.sh 错误 - Connection ...

    Spark基础知识04——窄依赖、宽依赖、DAG、缓存

    一、宽窄依赖 RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄...ps:Spark中产生宽窄依赖的依据是shuffle,当发生shuffle时,会产生宽依赖,基本上shuffle算子都会产生宽依赖,但是join除外,在执行join算子之前

    大数据高频面试题.pdf

    6) 数据调度弹性 Spark把这个JOB执⾏模型抽象为通⽤的有向⽆环图DAG,可以将多Stage的任务串联或并⾏执⾏,调度引擎⾃动处理Stage的失败以及Task 的失败。 7) 数据分⽚的⾼度弹性 可以根据业务的特征,动态调整数据...

    native-sql-engine:适用于Spark SQL的本机SQL Engine插件,具有矢量化SIMD优化

    介绍 Spark SQL与基于行的结构化数据配合得很好。 它使用WholeStageCodeGen通过Java JIT代码来提高性能。 但是,Java JIT在利用最新的SIMD指令时通常效果不佳,尤其是在复杂的查询下。 提供了CPU缓存友好的列式内存...

    大数据面试之——Spark

    Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。 1.Spark有几种部署模式,各个模式的特点 1.本地模式 Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。方便调试,本地模式分三...

    4399大数据笔试题.pdf

    10.Spark Stage的数量有什么决定? 答案:Partition ⼆、填空题 ⼆、填空题 1.Spark的三种部署模式? 2.RDD有哪些缓存机制? 3.RDD类型有⼏种?每⼀种有哪些操作? 4.map和flatMap的区别是什么? 5.RDD的依赖⽅式?...

Global site tag (gtag.js) - Google Analytics