`
bit1129
  • 浏览: 1050488 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark三十六】Spark On Yarn之yarn-client方式部署

 
阅读更多

按照Spark的部署设置,对于Spark运行于Yarn之上,有如下四种选择方式(本质上是两种),

  • yarn-client+client
  • yarn-cluster+cluster
  • yarn-client(部署方式默认为client)
  • yarn-cluster(部署方式默认为cluster)

yarn-client+cluster组合以及yarn-cluster+client是不正确的组合,Spark报错退出。

本文首先探讨Spark On Yarn之yarn-client+client方式部署下的代码执行流程

 

程序提交给Yarn运行时环境

 

  • 对于部署方式是Client的情况,SparkSubmit的main函数中通过反射执行应用程序的main方法
  • 在应用程序的main方法中,创建SparkContext实例
  • 在创建SparkContext的实例过程中,通过如下语句创建Scheduler和Backend实例

 

  private[spark] var (schedulerBackend, taskScheduler) =  SparkContext.createTaskScheduler(this, master)

 

  • 由于当前是yarn-client和client组合部署模式,

        1.代码执行逻辑是: taskScheduler是org.apache.spark.scheduler.cluster.YarnClientClusterScheduler实例,它是TaskSchedulerImpl的子类,它的文档说明为

 

 

/**
 * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
 */
 
      2.schedulerBackend是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend实例,它是CoarseGrainedSchedulerBackend的子类,它是文档说明为无

 

 

  • 继续SparkContext实例的创建过程,调用taskScheduler的start方法,也即YarnClientClusterScheduler的start方法,因为YarnClientClusterScheduler并没有覆盖TaskSchedulerImpl的start方法,所以执行逻辑进入到TaskSchedulerImpl的start方法中
  • 在TaskSchedulerImpl的start方法中,调用backend的start方法,由于此处的backend是YarnClientSchedulerBackend,所以代码逻辑进入到YarnClientSchedulerBackend的start方法中
  • 在YarnClientSchedulerBackend的start方法中,创建YarnClient(将用户编写的应用程序提交给Yarn的ResourceManager)
  • 在YarnClient创建yarn.Client对象,然后调用submitApplication,等待Application执行完,如下代码所示
    client = new Client(args, conf) //yarn.client
    appId = client.submitApplication()
    waitForApplication() ///阻塞等待Application进入Running状态
    asyncMonitorApplication() ///异步监控Application的运行状态,If the application has exited for any reason, stop the SparkContext.

 

  • 程序逻辑进入了yarn.Client调用submitApplication的逻辑,执行代码:

1.submitApplication的代码(Spark)

  /**
   * Submit an application running our ApplicationMaster to the ResourceManager.
   *
   * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
   * creating applications and setting up the application submission context. This was not
   * available in the alpha API.
   */
  ////这里借助Hadoop Yarn提供的API提交应用程序,这里的API是Hadoop Yarn的YarnClient
  override def submitApplication(): ApplicationId = {
    yarnClient.init(yarnConf) ////yarnClient是通过YarnClient.createYarnClient创建,而YarnClient是Hadoop API,所以yarnClient也是Hadoop的API
    yarnClient.start() ///启动Yarn

    logInfo("Requesting a new application from cluster with %d NodeManagers" ///NodeManager是什么概念?
      .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

    // Get a new application from our RM
    val newApp = yarnClient.createApplication()///代码运行到此处,还没有真正的把程序代码提交给Yarn去运行;这里使用YarnClient创建一个Application,类型为YarnClientApplication
    val newAppResponse = newApp.getNewApplicationResponse() ///返回GetNewApplicationResponse类型
    val appId = newAppResponse.getApplicationId() ///获取applicationId

    // Verify whether the cluster has enough resources for our AM
    verifyClusterResources(newAppResponse)

    // Set up the appropriate contexts to launch our AM
    val containerContext = createContainerLaunchContext(newAppResponse) ///创建启动ApplicationMaster容器的上下文环境
    val appContext = createApplicationSubmissionContext(newApp, containerContext)///依据创建的newApp和containerContext,创建应用上下文环境

    // Finally, submit and monitor the application
    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
    yarnClient.submitApplication(appContext) ///根据创建的applicationContext,由Hadoop Yarn的yarnClient提交作业
    appId
  }

2. YarnClient的createApplication代码(Hadoop Yarn)

 

 

  @Override
  public YarnClientApplication createApplication()
      throws YarnException, IOException {
    ApplicationSubmissionContext context = Records.newRecord
        (ApplicationSubmissionContext.class);
    GetNewApplicationResponse newApp = getNewApplication();
    ApplicationId appId = newApp.getApplicationId();
    context.setApplicationId(appId);
    return new YarnClientApplication(newApp, context);
  }

 

3. YarnClientApplication的getNewApplicationResponsed代码(Hadoop Yarn)

 

 public GetNewApplicationResponse getNewApplicationResponse() {
    return newAppResponse;//类型为GetNewApplicationResponse 
  }

 

4. createContainerLaunchContext代码(Spark)

 /**
   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
   * This sets up the launch environment, java options, and the command for launching the AM.
   */
  ///创建启动ApplicationMaster容器的上下文环境
  protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
      : ContainerLaunchContext = {
    logInfo("Setting up container launch context for our AM")

    val appId = newAppResponse.getApplicationId
    val appStagingDir = getAppStagingDir(appId)
    val localResources = prepareLocalResources(appStagingDir) ///准备本地资源
    val launchEnv = setupLaunchEnv(appStagingDir)
    val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) ///创建ApplicationMaster容器,类型为ContainerLaunchContext
    amContainer.setLocalResources(localResources)
    amContainer.setEnvironment(launchEnv)

    val javaOpts = ListBuffer[String]()

    // Set the environment variable through a command prefix
    // to append to the existing value of the variable
    var prefixEnv: Option[String] = None

    // Add Xmx for AM memory
    javaOpts += "-Xmx" + args.amMemory + "m"

    val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
    javaOpts += "-Djava.io.tmpdir=" + tmpDir

    // TODO: Remove once cpuset version is pushed out.
    // The context is, default gc for server class machines ends up using all cores to do gc -
    // hence if there are multiple containers in same node, Spark GC affects all other containers'
    // performance (which can be that of other Spark containers)
    // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
    // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
    // of cores on a node.
    val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
    if (useConcurrentAndIncrementalGC) {
      // In our expts, using (default) throughput collector has severe perf ramifications in
      // multi-tenant machines
      javaOpts += "-XX:+UseConcMarkSweepGC"
      javaOpts += "-XX:+CMSIncrementalMode"
      javaOpts += "-XX:+CMSIncrementalPacing"
      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
    }

    // Forward the Spark configuration to the application master / executors.
    // TODO: it might be nicer to pass these as an internal environment variable rather than
    // as Java options, due to complications with string parsing of nested quotes.
    for ((k, v) <- sparkConf.getAll) {
      javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
    }

    // Include driver-specific java options if we are launching a driver
    if (isLaunchingDriver) {  //什么情况是启动Driver??
      sparkConf.getOption("spark.driver.extraJavaOptions")
        .orElse(sys.env.get("SPARK_JAVA_OPTS"))
        .foreach(opts => javaOpts += opts)
      val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
        sys.props.get("spark.driver.libraryPath")).flatten
      if (libraryPaths.nonEmpty) { ///此处对prefixEnv进行唯一的赋值,
        prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
      }
    }

    // For log4j configuration to reference
    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)

    val userClass = ///只有启动Driver的情况下才会设置--class
      if (isLaunchingDriver) {
        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
      } else {
        Nil
      }
    val userJar = ////应用程序的jar文件
      if (args.userJar != null) {
        Seq("--jar", args.userJar)
      } else {
        Nil
      }
    val amClass = ///如果是Driver,则是ApplicationMaster,否则是ExecutorLauncher
      if (isLaunchingDriver) {
        Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }
    val userArgs = args.userArgs.flatMap { arg =>
      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
    }
    val amArgs = ///ApplicationMaster --class --jar
      Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
      Seq(
        "--executor-memory", args.executorMemory.toString + "m",
        "--executor-cores", args.executorCores.toString,
        "--num-executors ", args.numExecutors.toString)

    // Command for the ApplicationMaster
    ///封装要执行的命令,使用java -server javaOpts armArgs
    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

    // TODO: it would be nicer to just make sure there are no null commands here
    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
    //amContainer包含的命令
    amContainer.setCommands(printableCommands)

    logDebug("===============================================================================")
    logDebug("Yarn AM launch context:")
    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
    logDebug("    env:")
    launchEnv.foreach { case (k, v) => logDebug(s"        $k -> $v") }
    logDebug("    resources:")
    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
    logDebug("    command:")
    logDebug(s"        ${printableCommands.mkString(" ")}")
    logDebug("===============================================================================")

    // send the acl settings into YARN to control who has access via YARN interfaces
    val securityManager = new SecurityManager(sparkConf)
    amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
    setupSecurityToken(amContainer)
    UserGroupInformation.getCurrentUser().addCredentials(credentials)

    amContainer
  }

 

 

5. createApplicationSubmissionContext的代码(Spark)

  /**
   * Set up the context for submitting our ApplicationMaster.
   * This uses the YarnClientApplication not available in the Yarn alpha API.
   */
  ///提交ApplicationMaster的上下文环境
  def createApplicationSubmissionContext(
      newApp: YarnClientApplication,
      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
    val appContext = newApp.getApplicationSubmissionContext
    appContext.setApplicationName(args.appName) ///应用程序的名字
    appContext.setQueue(args.amQueue)
    appContext.setAMContainerSpec(containerContext) ///将创建的ContainerLaunchContext包装到appContext中
    appContext.setApplicationType("SPARK") ////UI上当应用程序执行完成后,显示的应用程序的类型
    val capability = Records.newRecord(classOf[Resource])
    capability.setMemory(args.amMemory + amMemoryOverhead)
    appContext.setResource(capability)
    appContext
  }

 

  • 上面分析了yarn.Client调用submitApplication详细逻辑,程序回到的start方法中,继续下面的逻辑
    waitForApplication() ///阻塞等待程序进入Running状态
    asyncMonitorApplication() ///异步监控程序的运行状态

 

  • 当作业提交到Yarn中之后,Yarn创建一个进程。如果是Driver则运行ApplicationMaster,否则运行ExecutorLauncher。是否是Driver通过yarn.client的isLaunchingDriver变量决定。isLaunchingDriver取值依赖于args.userClass是否存在,用户的指令中提供了--class参数,则args.userClass中的值就是用户提供的--class参数。此逻辑在.yarn.ClientArguments类中的parseArgs中
  • 我们的spark-submit提供了--class参数,所以,Yarn将启动ApplicationMaster进程。也就是说,在Yarn的运行时环境中,启动了Spark的ApplicationMaster进程

在Yarn运行时环境中运行Spark的ApplicationMaster进程的执行逻辑

  • 代码逻辑进入到ApplicationMaster的main方法中

 

  def main(args: Array[String]) = {
    SignalLogger.register(log)
    val amArgs = new ApplicationMasterArguments(args) ///args是什么内容?
    SparkHadoopUtil.get.runAsSparkUser { () =>
      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) ///因为ApplicationMaster需要跟Yarn的ResourceManager交互,所以这里需要访问RM的YarnRMClientImpl实例
      System.exit(master.run()) ///执行ApplicationMaster的run方法
    }
  }

 

  •  在ApplicationMaster的run方法中调用ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个??
  • 此处的逻辑先暂时不表,因为还没有清楚ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个,不过最终的逻辑都会走进ApplicationMaster的registerAM中,先继续吧!!!
  • 在ApplicationMaster的registerAM方法中,调用YarnRMClient的register方法,如下是代码:

 

  override def register(
      conf: YarnConfiguration,
      sparkConf: SparkConf,
      preferredNodeLocations: Map[String, Set[SplitInfo]],
      uiAddress: String,
      uiHistoryAddress: String,
      securityMgr: SecurityManager) = {
    amClient = AMRMClient.createAMRMClient()
    amClient.init(conf)
    amClient.start()
    this.uiHistoryAddress = uiHistoryAddress

    logInfo("Registering the ApplicationMaster") ///注册APPlicationMaster到Yarn
    synchronized {
      amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)///调用AMRMClient的registerApplicationMaster方法
      registered = true
    }
    new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
      preferredNodeLocations, securityMgr) ///创建了一个YarnAllocationHandler,register方法不需要返回值
  }

 

  • 在ApplicationMaster的registerRM方法中,继续调用YarnAllocator的allocateResources方法。这个方法将近300行,主要的功能如其方法说明文档所说,主要是分配container,按照本机优先,本机架次之,其它机架最后的次序进行container分配
  /**
   * Allocate missing containers based on the number of executors currently pending and running.
   *
   * This method prioritizes the allocated container responses from the RM based on node and
   * rack locality. Additionally, it releases any extra containers allocated for this application
   * but are not needed. This must be synchronized because variables read in this block are
   * mutated by other methods.
   */

 

  • 在allocateResources的过程,会执行提交ExecutorRunnable实例到线程池的操作,(这里是对每个分配到的container交给线程池来处理使用)
          val executorRunnable = new ExecutorRunnable( ///实现了Runnable接口的任务
            container,
            conf,
            sparkConf,
            driverUrl,
            executorId,
            executorHostname,
            executorMemory,
            executorCores,
            appAttemptId.getApplicationId.toString,
            securityMgr)
          launcherPool.execute(executorRunnable) ///提交给线程池
        }

 

  • ExecutorRunnable在container中执行run方法中,封装启动org.apache.spark.executor.CoarseGrainedExecutorBackend进程的指令,这个逻辑是在prepareCommand中完成的,代码片段如下:
 val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", ///java命令所在的位置
      "-server",
      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
      // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
      // an inconsistent state.
      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
      // 'something' to fail job ... akin to blacklisting trackers in mapred ?
      "-XX:OnOutOfMemoryError='kill %p'") ++
      javaOpts ++
      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", ///启动CoarseGrainedExecutorBackend进程
      masterAddress.toString,
      slaveId.toString,
      hostname.toString,
      executorCores.toString,
      appId,
      "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
      "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

 

  • 调用NMClient.startContainer启动Container
    // Send the start request to the ContainerManager
    nmClient.startContainer(container, ctx) //nmClient是NodeManager的实例,输入Hadoop Yarn的API

 

  • 当CoarseGrainedExecutorBackend启动时,代码逻辑回到我们熟悉的轨道上来。调用preStart方法启动Executor
  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    driver = context.actorSelection(driverUrl)
    driver ! RegisterExecutor(executorId, hostPort, cores) ///给Driver发消息RegisterExecutor
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }

 

  • DriverActor收到RegisterExecutor的消息后,调用makeOffer方法,在makeOffer方法中,调用launchTasks方法给ExecutorActor发消息LaunchTask
  • ExecutorBackEnd收到LaunchTask的消息后,调用Executor的launchTask方法,然后通过Executor里面的线程池提交任务到线程池执行

 

总结

 囫囵吞枣似的的将yarn-client模式的执行流程走了一遍,毕竟是第一次接触到这里,同时对Yarn也没有很好的理解,所以这中间还有很多不明白的东西,还得不断的思考,细化。

以两幅图片作为结束,

 

 

 

 

 

 

  • 大小: 8.5 KB
  • 大小: 100.2 KB
分享到:
评论

相关推荐

    spark初始化源码阅读sparkonyarn的client和cluster区别

    spark初始化源码阅读sparkonyarn的client和cluster区别

    spark-3.2.2-bin-3.0.0-cdh6.3.2

    内容概要:由于cdh6.3.2的spark版本为2.4.0,并且spark-sql被阉割,现基于cdh6.3.2,scala2.12.0,java1.8,maven3.6.3,,对spark-3.2.2源码进行编译 应用:该资源可用于cdh6.3.2集群配置spark客户端,用于spark-sql

    Spark环境搭建——on yarn集群模式

    http://spark.apache.org/docs/latest/running-on-yarn.html 准备工作 安装启动Hadoop(需要使用HDFS和YARN,已经ok) 安装单机版Spark(已经ok) 注意:不需要集群,因为把Spark程序提交给YARN运行本质上是把字节码给...

    Spark的Yarn模式

    Spark基础理论: https://blog.csdn.net/weixin_45102492/article/details/104318250 Spark安装及Local模式:https://blog.csdn.net/weixin_45102492/article/details/104318738 ...有yarn-client和yarn-clus

    Spark学习-day 1

    ./bin/spark-shell --master yarn --deploy-mode client --executor-cores 4 --num-executors 3 ② ./bin/spark-shell --master spark://zly:7077 --total-executor-cores 6 常⻅疑问 1) Spark与Apache Hadoop有...

    spark mllib 协同过滤推荐算法(ALS) python 实现 完整实例程序

    一个完成的spark mllib 协同过滤推荐算法ALS 完整实例程序,基于 spark yarn-client模式运行,另外,包括训练数据。

    spark-terasort

    -deploy-mode client --master yarn spark-terasort-0.1.jar 1G /user/root/teragenTeraSort 用法: $ spark-submit --class terasort.TeraSort --master $MESOS_MASTER spark-terasort-0.1.jar

    word源码java-spark_demo:spark_demo

    yarn-client相当于是命令行 会将你输入的代码提交到yarn上面执行 yarn-cluster是将你写好的程序打成jar包然后提交到yarn上面去执行 然后yarn会将jar包分发到各个节点 并负责资源分配和任务管理 rack: null) dead for...

    flink依赖jar包——解决NoClassDefFoundError: com/sun/jersey

    当flink on yarn模式运行时,发生如下异常信息,需要将压缩包中的4个依赖jar包放入flink安装路径下的lib目录下。 Exception in thread "main" java.lang.NoClassDefFoundError: ...

    MapReduceVsSpark

    SPARK COMMAND spark2-submit --class pagerank.rddPagerank --master yarn --deploy-mode client spark2.jar / user / ec2-user / spark_assignment / input / yellow_tripdata * / user / ec2-user / spark_...

    比较S3对象

    火花壳-主纱-部署模式客户端-驱动程序内存30G --executor内存30G --executor-cores 12 --num-executors 30 --conf spark.driver.maxResultSize = 20g sc.getConf.getAll.sorted.foreach(println) rows.rdd....

    spark-practice

    yarn-client客户端模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。yarn-cluster集群模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。SparkSession和SparkContextSparkSession是Spark 2.0引如的新概念。...

    FusionInsightHD华为大数据平台.pdf

    如图所⽰,MapReduce通过实现YARN的Client和ApplicationMaster接⼝集成到YARN中,利⽤YARN申请计算所需资源。 Storm 提供分布式、⾼性能、⾼可靠、容错的实时计算平台,可以对海量数据进⾏实时处理。CQL...

    XSQL:基于SparkSQL的统一SQL分析引擎

    |XSQL是一种易于使用,运行稳定的多数据源查询引擎。1)首先,XSQL提供了一种使用标准SQL从NoSQL... XSQL仅在必要时使用YARN群集资源,此功能对某些使用情况很有用,例如用户将spark-xsql替换为RDMS Client。 我们

Global site tag (gtag.js) - Google Analytics