`
bit1129
  • 浏览: 1048955 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark十二】Spark任务调度和作业执行流程初步

 
阅读更多

任务调度以及作业执行流程是Spark的核心,本文不进行源码级别的探究,只是概述Spark的核心组件、它们的作用以及它们如何协作以完成计算作业。

 

Spark核心组件

  • SparkContext
  • Job
  • RDD
  • DAGScheduler
  • DAG
  • TaskScheduler
  • Stage
  • TaskSet
  • Task
  • BlockManager
  • BlockTracker
  • ShuffleTracker

Spark集群架构概览

 

 

 

 在上面这幅图片中,用户将任务提交给Driver,Driver将任务分发到所有的Worker节点(Driver最好跟Work节点在同一个局域网内,以使得任务的分发和结果回送更快)。Worker节点根据Driver提交过来的任务,算出位于本地的那部分数据,然后对它进行计算(这就是数据本地性的概念)。具体的做法是,Workder首先将数据加载到内存(如果内存中没有的话),形成RDD(所以,RDD存在于内存中),然后对RDD进行接下来的计算。

在这个架构中,Driver和Worker构成了Master/Slave架构,Driver负责分发任务,以及等待任务结果

 

另外一副架构图,在这个图中,Master和Driver是分开的,实际上是否会在一起呢?

 

 
 

spark计算速度快的原因

1.基于内存计算

2.general computation graph --即DAG,Worker对DAG进行优化,然后提交给TaskScheduler去执行。这里的问题是DAG何时构造,由谁构造,DAG的数据结构如何,DAG包含哪些信息,这个暂时放这里。目前要了解的重点是,DAG是提交给TaskScheduler等待调度执行。

 

Spark组件

 

 

 

上图中,自己编写程序提交到Master上,而Master是由四大部分组成(RDD Graph,Scheduler,Block Tracker以及Shuffle Tracker),启动RDD Graph就是DAG,它会提交给Task Scheduler任务调度器等待调度执行,具体执行时,Task Scheduler会把任务提交到Worker节点上。Block Tracker用于记录计算数据在Worker节点上的块信息。Shuffle Blocker用于记录RDD在计算过程中遇到Shuffle过程时会进行物化,那么Shuffle Tracker用于记录这些物化的RDD的存放信息

 ,

 

RDD Graph


 

 

 上面浅绿色的四个圆柱形框构成一个RDD,每个圆柱框都是一个Partition,每个Partition分配一个任务来执行。浅绿色圆柱框内的绿色矩形框表示实施RDD操作后的数据集,比如对于Task1,先对Partion执行map操作,再执行filter操作得到两个绿色矩形框。

   因为map操作或者filter是对RDD进行调用的,所以,RDD中的Partition都会执行相同的动作序列,每个操作结束时,每个Partition都会产生一个数据集,这些数据集对应一个RDD,如MappedRDD,FilteredRDD。这样,就形成了RDD Graph,如上图中的八个绿色框,上面四个框形成一个RDD,下面四个框形成一个RDD。

 

作业与任务调度

 

 

 DAGScheduler

1.三个输入元素



 

 问题:

1.target RDD是什么RDD是初始RDD还是包含了所有的RDD,比如rdd.map().filter()操作,target RDD是什么

2.针对partition的function指的是什么,比如rdd.map().filter()操作,是map和filter函数都包括吗?

 

具体的,DAG Scheduler完成下面三个任务:

1.为每个Job(一个Action类型的操作如collect,count)分割Stage,同时决定最佳路径。DAGScheduler会记录哪个RDD或者Stage会被物化,从而寻找一个最佳调度方案。

2.将TaskSet提交给Task Tracker

3.重新提交输出lost的Stage

 

2. DAGScheduler优化

1.stage的操作是pipleline的

比如,stage内有5个操作,Spark的做法是1+1+1+1+1=5,而对于Hadoop而言,它的做法是1+1=2, 2+1=3,3+1=4,4+1=5,即每计算一步就先存入HDFS,然后后面的操作再从HDFS上都出来,因此IO消耗非常大。

2. 基于Partition选择最小化的join算法,减少Shuffle操作

在Hadoop中,Shuffle相当于Barrier(Join等待合并结果),Reduce操作需要等待Map操作完全执行完

3. 重用RDD Cache过的数据

因为DAGScheduler知道哪些RDD和Stage已经物化过,所以DAGScheduler在执行路径上,会尽可能的使用已经缓存过的数据

 

Stage

 

 

 

 从上图中可以看到,AB位于同一个Stage,CDE位于同一个Stage。AB和CDE的结果做Join是产生了一个新的Stage

如下两个阶段一定会产生Stage

1.从数据源加载数据形成RDD时,一定会有Stage的产生

2.进行Shuffle,即有宽依赖的时候一定有Stage的产生,所以上面的DE应该产生一个Stage

 

 

 

 

Job执行流程

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  • 大小: 27.4 KB
  • 大小: 12.8 KB
  • 大小: 31.7 KB
  • 大小: 69.4 KB
  • 大小: 55.4 KB
  • 大小: 67.2 KB
  • 大小: 53.8 KB
  • 大小: 245.9 KB
分享到:
评论
2 楼 wjhdtx 2015-06-11  
JavaDStream<String> lines = SparkStreamUtil.getKafakConsumerStream......
ssc.sparkContext().setCheckpointDir......
AbstractAcType.init(null);
//vip: 业务转换为model key value 形式处理
JavaPairDStream<String, Long> baseModelRdd = lines.flatMapToPair(new SparkDealBaseModelFunction());



AbstractAcType.init(null); 注册业务处理类为静态变量
在SparkDealBaseModelFunction中使用业务处理类

那是不是AbstractAcType.init(null);在driver中运行,而SparkDealBaseModelFunction是在一个executor上作为一个任务运行?
1 楼 wjhdtx 2015-06-11  
再问下,DAGScheduler划分和构建调度阶段,一个调度阶段对于一个任务集。
①每个任务作用在rdd的一个分区上吗?
②一个任务集是一组关联的、没有shuffle依赖关系的任务,那我想问下:任务是如何确定的,是一个function就是一个任务吗?

ps:之前问你的部署问题,原因可能是我在driver中注册了一些静态处理类,在function中用的时候,因为这些function在executor中运行,和driver不是一个机器或一个jvm,所以得不到整个处理类,所以后面的处理都对没有流,也不会忘kafka发送消息;本地模式,在一个jvm中,所以就能跑。

相关推荐

Global site tag (gtag.js) - Google Analytics