理解记忆如下要点:
1. 划分Stage的方法是从后向前,遇到 shuffle Dependency就断开,如果是Narrow Dependency就划入当前的Stage。所以,ShuffledRDD通常是一个Stage的开头(Stage的开头未必是 ShuffledRDD,也有可能是数据源转换而来的RDD)。所以上面一共3个Stage。需要注意的是,shuffle的结果未必成为 ShuffledRDD,例如上面的CoGroupedRDD,一半来源于Shuffle。
2. 每个 stage 里面 task 的数目由该 stage 最后一个 RDD 中的 partition 个数决定。Task分为两类,一类是ShuffleMapTask,一类是ResultTask。而且一个Stage中的Task类型只能是其中一种类型
3. 数据用的时候再算,而且数据是流到要计算的位置的。所谓用到的时候再算,是指从后向前,本RDD追溯父RDD的数据,直到数据源,然后数据源的原始数据,依次经历所有的RDD的算子转换操作。所谓的计算位置,指的是ShuffleMapTask或者ResultTask运行的地方4.对于没有 parent stage 的 stage,该 stage 最左边的 RDD 是可以立即计算的,而且每计算出一个 record 后便可以流入 f 或 g(所谓的f,g是指RDD链上的不同RDD的算子操作)。如 果 f 中的 record 关系是 1:1 的,那么 f(record1) 计算结果可以立即顺着 computing chain 流入 g 中。如果 f 的 record 关系是 N:1,record1 进入 f() 后也可以被回收。总结一下,computing chain 从后到前建立,而实际计算出的数据从前到后流动,而且计算出的第一个 record 流动到不能再流动后,再计算下一个 record。这样,虽然是要计算后续 RDD 的 partition 中的 records,但并不是要求当前 RDD 的 partition 中所有 records 计算得到后再整体向后流动。(这段话描述了,RDD的元素计算不是全部加载然后再计算,是边加载边计算的模式)
5. 对于有 parent stage 的 stage,先等着所有 parent stages 中 final RDD 中数据计算好,然后经过 shuffle 后,问题就又回到了计算 “没有 parent stage 的 stage”。这里明确了Stage依赖之后,依赖的Stage的任务必须先执行完,然后才开始下一个任务。而不是依赖的Stage任务完成了一部分,然后下个Stage的任务就开始执行了。
6. RDD的实现细节:
6.1每个 RDD 包含的 getDependency() 负责确立 RDD 的数据依赖
6.2 compute() 方法负责接收 parent RDDs 或者 data block 流入的 records,进行计算,然后输出 record。
经常可以在 RDD 中看到这样的代码firstParent[T].iterator(split, context).map(f)。firstParent 表示该 RDD 依赖的第一个 parent RDD,iterator() 表示 parentRDD 中的 records 是一个一个流入该 RDD 的,map(f) 表示每流入一个 recod 就对其进行 f(record) 操作,输出 record。为了统一接口,这段 compute() 仍然返回一个 iterator,来迭代 map(f) 输出的 records。
总结一下:整个 computing chain 根据数据依赖关系自后向前建立,遇到 ShuffleDependency 后形成 stage。在每个 stage 中,每个 RDD 中的 compute() 调用 parentRDD.iter() 来将 parent RDDs 中的 records 一个个 fetch 过来。
相关推荐
RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。 DAG Scheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到Task...
Spark 的核心是建立在统一的抽象弹性分布式数据集(Resiliennt Distributed Datasets,RDD)之上的,这使得 Spark 的各个组件可以无缝地进行集成,能够在同一个应用程序中完成大数据处理。本节将对 RDD 的基本概念及...
RDD的Action算子触发job的提交,提交到Spark的Job生成RDD DAG,由DAGScheduler转换为Stage DAG,每个Stage中产生相应的Task集合,TaskScheduler将任务分发到Executor执行。每个任务对应的数据块,使用用户定义的函数...
spark源码分析,RDD、Iterator、Job、DAG、Stage、Taskset、task等
Spark最佳实践 最佳实践 避免使用 GroupByKey 不要将大型 RDD 的所有元素拷贝到请求驱动者 常规故障处理 Job aborted due to stage failure: Task not serializable 缺失依赖 执行 start-all.sh 错误 - Connection ...
一、宽窄依赖 RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄...ps:Spark中产生宽窄依赖的依据是shuffle,当发生shuffle时,会产生宽依赖,基本上shuffle算子都会产生宽依赖,但是join除外,在执行join算子之前
6) 数据调度弹性 Spark把这个JOB执⾏模型抽象为通⽤的有向⽆环图DAG,可以将多Stage的任务串联或并⾏执⾏,调度引擎⾃动处理Stage的失败以及Task 的失败。 7) 数据分⽚的⾼度弹性 可以根据业务的特征,动态调整数据...
介绍 Spark SQL与基于行的结构化数据配合得很好。 它使用WholeStageCodeGen通过Java JIT代码来提高性能。 但是,Java JIT在利用最新的SIMD指令时通常效果不佳,尤其是在复杂的查询下。 提供了CPU缓存友好的列式内存...
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。 1.Spark有几种部署模式,各个模式的特点 1.本地模式 Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。方便调试,本地模式分三...
10.Spark Stage的数量有什么决定? 答案:Partition ⼆、填空题 ⼆、填空题 1.Spark的三种部署模式? 2.RDD有哪些缓存机制? 3.RDD类型有⼏种?每⼀种有哪些操作? 4.map和flatMap的区别是什么? 5.RDD的依赖⽅式?...