`
bit1129
  • 浏览: 1051617 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark二四】Spark内核源码剖析

 
阅读更多

Spark内核源码非常复杂,同时也是Spark的精髓所在,目前只做记录所学的点点滴滴,回头再整理总结。

 

RDD,DAGScheduler,TaskScheduler,Worker

Spark根据应用程序的算子(转换算子,行动算子)生成一张DAG图,比如rdd1.join(rdd2).groupBy(..).filter(..)。DAGScheduler对于RDD,在遇到涉及Shuffle操作的时候,划分出不同的Stage,所以Shuffle是划分Stage的边界。属于同一个Stage的task会提交给TaskScheduler(这些Task包装成TaskSet)。

DAG图中,每个action会触发一个Job的提交,一个Job可以包含多个Stage,每个Stage可以包含多个Task,Task最终会提交给Worker上的Executor去执行。

DAG图中,可以有多个Action,有几个Action就会对应几个Job,这些Job是并行执行?DAG图中,Job是串行的即不同Job要等到它的上一个Job运行完才能进行下一个Job,但是同一个Job的多个Task是并行的。

TaskScheduler中有个TaskManager,TaskManager是管理TaskSet中的不同任务的执行的?看代码没找到。

Executor对应一个线程池(?),可并行的多任务并行计算

 

 

SparkContext

SparkContext是Application通往Spark集群的通道,Spark有多个构造方法,一个主构造方法,多个附属构造方法。不管是主构造方法还是附属构造方法(首先调用主构造方法),都会调用类的方法体外的函数语句,也就是说,主构造方法的方法体是由类中在方法之外的语句构成的。

 

SparkContext的主要功能有,

 

1.从原始数据源读取数据,比如SparkContext.textFile(...)

2. SparkContext负责创建TaskScheduler同时调用TaskScheduler的start方法启动TaskScheduler(TaskScheduler接收DAGScheduler传输来的TaskSet,然后通过Workers的Backend提交给Executor?),

3. SparkContext负责创建DAGScheduler,创建时,需要把TaskScheduler实例作为DAGScheduler的构造参数传递给DAGScheduler(DAGSCheduler需要TaskScheduler的原因是DAGScheduler需要调用TaskScheduler以给它提交TaskSet)

4. SparkContext负责创建SparkUI对象,SparkUI对象内部使用jetty负责Spark的UI请求和展现

 

1.当RDD调用了Action类的算子后,RDD内部通过SparkContext的runJob方法提交任务

2.SparkContext的runJob方法调用DAGScheduler的runJob方法

 

 

 

 

 

 

SparkConf

1. Spark应用参数配置都会加载到SparkConf中,代码中从classpath和system properties中获取关于spark的配置信息

2. SparkContext在构造时,主构造方法需要传入SparkConf对象

3. 在SparkConf,会对executor-memory进行读取和设置, 如果没有设置,则默认是512M,如下代码来自于SparkContext

 

TaskScheduler

1. 一个TaskScheduler只为一个SparkContext提供任务调度的服务

2. TaskScheduler接收DAGScheduler的任务提交消息,任务提交是每个Stage对应的TaskSet

3. TaskScheduler中有个一个TaskManager,负责任务的管理,TaskManager最重要的职责是将Task发送给集群中的Executor进行执行。任务执行成功和失败,TaskSetManager都会通知DAGScheduler

4. Task运行失败,TaskScheduler负责重试

5. TaskScheduler发现某个Task一直没有运行完,那么TaskScheduler将启动一个新的Executor来重新运行这个任务

6.TaskScheduler的initialize方法设置任务调度的策略,是FIFO还是FAIR的调度策略

 

7. SparkContext创建TaskScheduler时,是基于MasterURL的,根据不同的模式选择不同的创建逻辑,模式可以是local,local-cluster,standalone, mesos,zk,simr

8. TaskScheduler有一个Backend实例,它是一个SchedulerBackend,可以有LocalBackend,也可以是CoarseGrainedSchedulerBackend。这个Backend用于将任务发送给Executor执行。TaskScheduler的start/stop方法负责Backend的启停

 

 

1.DAGScheduler调用TaskScheduler的submitTasks方法将任务提交时,TaskScheduler首先创建TaskManager对象,将TaskSet传给TaskManager对象

2.调用backend的reviveOffers方法,这里的backend是CoarseGrainedSchedulerBackend,问题是,当CoarseGrainedSchedulerBackend的方法reviveOffers调用后,如何回溯得到TaskSet任务

 

 

 

 

DAGScheduler

1. DAGSCheduler对DAG图进行划分,以创建不同的Stage

2. DAGScheduler调用initializeEventProcessor来进行消息的接收和发送,在内部DAGScheduler是一个即EventProcessor的消息收发系统,EventProcessor实现是基于Akka的分布式消息驱动

 

1.DAGScheduler的runJob方法调用DAGScheduler的submitJob方法提交任务

2.DAGScheduler的submitJob方法通过EventProcessActor给自己发送Akka消息提交任务(eventProcessActor ! JobSubmitted)

3.在DAGScheduler的handleJob方法中,首先调用newStage方法创建finalStage,然后调用submitStage(finalStage)  ///问题:finalStage是把所有的父Stage构造出来了吗?

4.在submitStage方法中,通过递归的方式,把还未提交的stage进行提交。如果stage没有依赖的stage,则将该stage关联的任务集(TaskSet)进行提交

5.提交已就绪的stage的taskset的方法是submitMissingTasks,

6.在submitMissingTasks中,将每个Task封装成ShuffleMapTask或者ResultTask

7.DAGScheduler将这些ShuffleMapTask或者ResultTask封装成TaskSet,然后调用TaskScheduler的submitTasks方法将任务提交

 

 

 

Executor

1. CoarseGrainedSchedulerBackend接收任务的方法是launchTask,具体的实现是发消息(LaunchTask)给Executor。

2. Executor接收到消息,的launchTask负责将Task提交给线程池执行

 

1. Executor收到DriverActor传递来的消息,执行如下代码:

 

  def launchTask(
      context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, taskName, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

 

 

TaskRunner

1. TaskRunner是实现Java Runnable接口的任务类

2. 在Executor中,通过threadPool将TaskRunner对象提交给线程池执行

3. TaskRunner构造的细节是

 

//context: ExecutorBackend
//taskId: 任务ID
//taskName: 任务的名字
//serializedTask: 序列化的任务的字节
val tr = new TaskRunner(context, taskId, taskName, serializedTask)

def launchTask(
      context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, taskName, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

 

4. 在TaskRunner的run方法中,

a.调用ExecutorBackend的statusUpdate方法汇报Task的执行状态

b.反序列化serializedTask字节以构造Task对象,Task抽象有两个实现类,分别是ShuffleMapTask和ResultTask两个类型,而这正式DAGScheduler提交TaskSet时,TaskSet任务集包含的两种任务类型,

c. 调用Task对象的run方法,完成具体的Task逻辑(所以,ShuffleMapTask和ResultTask都有自己定义的执行逻辑,这也说明,Executor只是任务执行框架,具体要执行什么逻辑,由任务编写者提供)

 

 

 

ShuffleMapTask和ResultTask

 

ResultTask

1.ResultTask的runTask由它的父类Taskrun方法调用,代码如下,可见,ResultTask的实现逻辑非常简单

 

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context)) ////对rdd进行遍历,依次调用func函数
  }

 

ShuffleMapTask

1.ShuffleMapTask的runTask由它的父类Taskrun方法调用,代码是:

 

  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

从上面的代码中可以看到,ShuffleMapTask的主要工作

a. ShuffleDependency以及ShuffleHandle是用来做什么的?

b. 使用ShuffleManager提供的ShuffleWriter进行写操作(调用write方法),写到哪里去了?磁盘还是内存?

 

ShuffleWriter

1. Spark提供了ShuffleWriter的两个实现SortShuffleWriter和HashShuffleWriter

2. 问题:ShuffleWriter中间结果写到哪里去了,比如Join和groupBy等操作,

 

ShuffleDependency

 Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, the RDD is transient since we don't need it on the executor side.

 

Stage

1. 每个Stage都有一个ID,称之为stageId

 

LocalBackend/CoarseGrainedSchedulerBackend

0.CoarseGrainedSchedulerBackend构造时,会将TaskScheduler实例会作为构造函数的参数传入

1.每个Backend都有start和end方法,在start方法中,启动Actor,对于LocalBackend是LocalActor,对于CoarseGrainedSchedulerBackend而言是DriverActor。
2. Backend的start和end方法是在TaskScheduler的start/stop方法中调用的

3.CoarseGrainedSchedulerBackend是Worker进程上,执行具体任务的代表,即对外界而言,Work进程通过CoarseGrainedSchedulerBackend来进行任务的接收和结果的回送?

4.CoarseGrainedSchedulerBackend接收任务的方法是launchTask,具体的实现是发消息给Executor

 

 

1.当TaskScheduler调用CoarseGrainedSchedulerBackend的reviveOffers方法时,在reviveOffers方法中,通过driverActor ! ReviveOffers给DriverActor发送ReviveOffers消息

 

 

 

 

DriverActor

1. DriverActor是在CoarseGrainedSchedulerBackend文件中定义的Actor类

2.当DriverActor收到ReviveOffers消息时,调用自身的makeOffers方法,makeOffers会调用launchTask方法,launchTasks的参数是WorkOffer序列(Seq[WorkOffer])?不是,launchTask的方法签名是

 

 

def launchTasks(tasks: Seq[Seq[TaskDescription]])

 

 

makeOffers方法在调用launchTask前需要准备launchTask的参数(如上所示,任务序列,tasks:Seq,而tasks序列中的每个元素又是一个Seq(序列),这个序列中元素类型是TaskDescription)

a.调用TaskScheduler的resourceOffers方法,这个方法返回的就是Seq[Seq[TaskDescription]]

b.调用TaskScheduler的resourceOffers方法时,传入的参数是offers: Seq[WorkerOffer],因此

 executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
 }.toSeq)

 

得到的结果是Seq[WorkOffer],executorDataMap.map方法传入的是一个函数字面量,而executorDataMap是Scala的HashMap类型,(scala.collection.mutable.HashMap)


 

    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

 

 

 executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
 }.toSeq)

 

3. 在DriverActor的launchTask方法,通过如下的代码行给executorActor发送Task执行请求?是的

          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))

 上面关键的数据结构是executorDataMap,这个DriverActor中的持有的executorDataMap是从何处而来?

 

 

这种颜色表示Job执行的轨迹

这种颜色表示Spark Runtime的初始化关系,所有的工作都在SparkContext sc = new SparkContext中展开

 

 

分享到:
评论

相关推荐

    Spark2.2版本内核源码深度剖析(完整笔记)

    spark2.2是一个里程碑的版本,因为之前的版本很多特性都是实验性的,所以2.2是第一个真正完全可以把Spark的所有特性在生产环境...当然也不会少一点整个spark内核架构,只要干这一行掌握的关键源码知识都会含在笔记里。

    Spark2.2版本内核源码深度剖析.zip

    Spark2.2版本内核源码深度剖析.zip

    Spark2.2版本内核源码深度剖析.zip.zip

    Spark2.2版本内核源码深度剖析.zip.zip

    深入理解Spark+核心思想与源码分析.pdf

    深入理解Sp深入理解SPARK:核心思想与源码分析》结合大量图和示例,对Spark的架构、部署模式和工作模块的设计理念、实现源码与使用技巧进行了深入的剖析与解读。 《深入理解SPARK:核心思想与源码分析》一书对Spark...

    Spark从入门到精通

    上百节课详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 本课程主要讲解的内容包括:Scala编程、Hadoop与Spark...四、Spark内核源码深度剖析 五、Spark性能优化 六、Spark SQL 七、Spark Streaming

    Spark技术内幕 深入解析Spark内核架构设计

    多位Spark的贡献者和专家联袂推荐,详细剖析spark内核各个模块并辅以相应源码解析的著作。全面分析spark内核各个模块的设计思想和实现原理,深入理解其内部运作机制和实现细节。

    Spark大数据商业实战三部曲:内核解密|商业案例|性能调优

    以Spark内核解密为基石,分为上篇、中篇、下篇,对企业生产环境下的Spark商业案例与性能调优抽丝剥茧地进行剖析。上篇基于Spark源码,从一个动手资源太大,传百度网盘了,链接在附件中,有需要的同学自取。

    Spark全面精讲

    基于大量案例实战,深度剖析和讲解Spark,并且会包含完整的从企业真实复杂业务需求中抽取出的案例作为实战,课程会涵盖Scala编程详解、Spark核心编程、Spark SQL和Spark Streaming、Spark内核以及源码剖析、性能优化...

    Spark大数据商业实战三部曲:内核解密|商业案例|性能调优 完整版

    上篇基于Spark源码,从一个动手实战案例入手,循序渐进地全面解析了Spark 2.2新特性及Spark内核源码;中篇选取Spark开发中*有代表的经典学习案例,深入浅出地介绍,在案例中综合应用Spark的大数据技术;下篇性能调优...

    spark入门课程

    scala编程详解,Spark核心编程,Spark内核源码深度剖析,Spark性能优化,Spark SQL

    《Spark大数据商业实战三部曲:内核解密 商业案例 性能调优》2018.02出版

    上篇基于Spark源码,从一个动手实战案例入手,循序渐进地全面解析了Spark2.2新特性及Spark内核源码;中篇选取Spark开发中具有代表的经典学习案例,深入浅出地介绍,在案例中综合应用Spark的大数据技术;下篇性能调优...

    Spark 3.0技术及原理

    全书共分4篇,内核解密篇基于Spark源码,从一个实战案例入手,循序渐进地全面解析Spark 2.4.X版本的新特性及Spark内核源码;商业案例篇选取Spark开发中最具代表性的经典学习案例,在案例中综合介绍Spark的大数据技术;...

    大数据热门技术Spark+机器学习+贝叶斯算法第13季

    内容涵盖Spark核心编程、Spark SQL和Spark Streaming、Spark内核以及源码剖析、推荐系统、Kafka消费机制、Spark机器学习、朴素贝叶斯算法、企业级实战案例等。 通过理论和实际的紧密结合,可以使学员对大数据Spark...

    电光石火间体验Spark 3.0开发实战

    全书共分4篇,内核解密篇基于Spark源码,从一个实战案例入手,循序渐进地全面解析Spark 2.4.X版本的新特性及Spark内核源码;商业案例篇选取Spark开发中最具代表性的经典学习案例,在案例中综合介绍Spark的大数据技术;...

Global site tag (gtag.js) - Google Analytics