按照Spark的部署设置,对于Spark运行于Yarn之上,有如下四种选择方式(本质上是两种),
- yarn-client+client
- yarn-cluster+cluster
- yarn-client(部署方式默认为client)
- yarn-cluster(部署方式默认为cluster)
yarn-client+cluster组合以及yarn-cluster+client是不正确的组合,Spark报错退出。
本文首先探讨Spark On Yarn之yarn-client+client方式部署下的代码执行流程
程序提交给Yarn运行时环境
- 对于部署方式是Client的情况,SparkSubmit的main函数中通过反射执行应用程序的main方法
- 在应用程序的main方法中,创建SparkContext实例
- 在创建SparkContext的实例过程中,通过如下语句创建Scheduler和Backend实例
private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
- 由于当前是yarn-client和client组合部署模式,
1.代码执行逻辑是: taskScheduler是org.apache.spark.scheduler.cluster.YarnClientClusterScheduler实例,它是TaskSchedulerImpl的子类,它的文档说明为
/**
* This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
*/
2.schedulerBackend是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend实例,它是CoarseGrainedSchedulerBackend的子类,它是文档说明为无
- 继续SparkContext实例的创建过程,调用taskScheduler的start方法,也即YarnClientClusterScheduler的start方法,因为YarnClientClusterScheduler并没有覆盖TaskSchedulerImpl的start方法,所以执行逻辑进入到TaskSchedulerImpl的start方法中
- 在TaskSchedulerImpl的start方法中,调用backend的start方法,由于此处的backend是YarnClientSchedulerBackend,所以代码逻辑进入到YarnClientSchedulerBackend的start方法中
- 在YarnClientSchedulerBackend的start方法中,创建YarnClient(将用户编写的应用程序提交给Yarn的ResourceManager)
- 在YarnClient创建yarn.Client对象,然后调用submitApplication,等待Application执行完,如下代码所示
client = new Client(args, conf) //yarn.client
appId = client.submitApplication()
waitForApplication() ///阻塞等待Application进入Running状态
asyncMonitorApplication() ///异步监控Application的运行状态,If the application has exited for any reason, stop the SparkContext.
- 程序逻辑进入了yarn.Client调用submitApplication的逻辑,执行代码:
1.submitApplication的代码(Spark)
/**
* Submit an application running our ApplicationMaster to the ResourceManager.
*
* The stable Yarn API provides a convenience method (YarnClient#createApplication) for
* creating applications and setting up the application submission context. This was not
* available in the alpha API.
*/
////这里借助Hadoop Yarn提供的API提交应用程序,这里的API是Hadoop Yarn的YarnClient
override def submitApplication(): ApplicationId = {
yarnClient.init(yarnConf) ////yarnClient是通过YarnClient.createYarnClient创建,而YarnClient是Hadoop API,所以yarnClient也是Hadoop的API
yarnClient.start() ///启动Yarn
logInfo("Requesting a new application from cluster with %d NodeManagers" ///NodeManager是什么概念?
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM
val newApp = yarnClient.createApplication()///代码运行到此处,还没有真正的把程序代码提交给Yarn去运行;这里使用YarnClient创建一个Application,类型为YarnClientApplication
val newAppResponse = newApp.getNewApplicationResponse() ///返回GetNewApplicationResponse类型
val appId = newAppResponse.getApplicationId() ///获取applicationId
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse) ///创建启动ApplicationMaster容器的上下文环境
val appContext = createApplicationSubmissionContext(newApp, containerContext)///依据创建的newApp和containerContext,创建应用上下文环境
// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
yarnClient.submitApplication(appContext) ///根据创建的applicationContext,由Hadoop Yarn的yarnClient提交作业
appId
}
2. YarnClient的createApplication代码(Hadoop Yarn)
@Override
public YarnClientApplication createApplication()
throws YarnException, IOException {
ApplicationSubmissionContext context = Records.newRecord
(ApplicationSubmissionContext.class);
GetNewApplicationResponse newApp = getNewApplication();
ApplicationId appId = newApp.getApplicationId();
context.setApplicationId(appId);
return new YarnClientApplication(newApp, context);
}
3. YarnClientApplication的getNewApplicationResponsed代码(Hadoop Yarn)
public GetNewApplicationResponse getNewApplicationResponse() {
return newAppResponse;//类型为GetNewApplicationResponse
}
4. createContainerLaunchContext代码(Spark)
/**
* Set up a ContainerLaunchContext to launch our ApplicationMaster container.
* This sets up the launch environment, java options, and the command for launching the AM.
*/
///创建启动ApplicationMaster容器的上下文环境
protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir) ///准备本地资源
val launchEnv = setupLaunchEnv(appStagingDir)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) ///创建ApplicationMaster容器,类型为ContainerLaunchContext
amContainer.setLocalResources(localResources)
amContainer.setEnvironment(launchEnv)
val javaOpts = ListBuffer[String]()
// Set the environment variable through a command prefix
// to append to the existing value of the variable
var prefixEnv: Option[String] = None
// Add Xmx for AM memory
javaOpts += "-Xmx" + args.amMemory + "m"
val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
javaOpts += "-Djava.io.tmpdir=" + tmpDir
// TODO: Remove once cpuset version is pushed out.
// The context is, default gc for server class machines ends up using all cores to do gc -
// hence if there are multiple containers in same node, Spark GC affects all other containers'
// performance (which can be that of other Spark containers)
// Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
// multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
// of cores on a node.
val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
if (useConcurrentAndIncrementalGC) {
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
javaOpts += "-XX:+UseConcMarkSweepGC"
javaOpts += "-XX:+CMSIncrementalMode"
javaOpts += "-XX:+CMSIncrementalPacing"
javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
// Forward the Spark configuration to the application master / executors.
// TODO: it might be nicer to pass these as an internal environment variable rather than
// as Java options, due to complications with string parsing of nested quotes.
for ((k, v) <- sparkConf.getAll) {
javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
}
// Include driver-specific java options if we are launching a driver
if (isLaunchingDriver) { //什么情况是启动Driver??
sparkConf.getOption("spark.driver.extraJavaOptions")
.orElse(sys.env.get("SPARK_JAVA_OPTS"))
.foreach(opts => javaOpts += opts)
val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
sys.props.get("spark.driver.libraryPath")).flatten
if (libraryPaths.nonEmpty) { ///此处对prefixEnv进行唯一的赋值,
prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
}
}
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
val userClass = ///只有启动Driver的情况下才会设置--class
if (isLaunchingDriver) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
val userJar = ////应用程序的jar文件
if (args.userJar != null) {
Seq("--jar", args.userJar)
} else {
Nil
}
val amClass = ///如果是Driver,则是ApplicationMaster,否则是ExecutorLauncher
if (isLaunchingDriver) {
Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs = ///ApplicationMaster --class --jar
Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
"--num-executors ", args.numExecutors.toString)
// Command for the ApplicationMaster
///封装要执行的命令,使用java -server javaOpts armArgs
val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
//amContainer包含的命令
amContainer.setCommands(printableCommands)
logDebug("===============================================================================")
logDebug("Yarn AM launch context:")
logDebug(s" user class: ${Option(args.userClass).getOrElse("N/A")}")
logDebug(" env:")
launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") }
logDebug(" resources:")
localResources.foreach { case (k, v) => logDebug(s" $k -> $v")}
logDebug(" command:")
logDebug(s" ${printableCommands.mkString(" ")}")
logDebug("===============================================================================")
// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
setupSecurityToken(amContainer)
UserGroupInformation.getCurrentUser().addCredentials(credentials)
amContainer
}
5. createApplicationSubmissionContext的代码(Spark)
/**
* Set up the context for submitting our ApplicationMaster.
* This uses the YarnClientApplication not available in the Yarn alpha API.
*/
///提交ApplicationMaster的上下文环境
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(args.appName) ///应用程序的名字
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext) ///将创建的ContainerLaunchContext包装到appContext中
appContext.setApplicationType("SPARK") ////UI上当应用程序执行完成后,显示的应用程序的类型
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
appContext.setResource(capability)
appContext
}
- 上面分析了yarn.Client调用submitApplication详细逻辑,程序回到的start方法中,继续下面的逻辑
waitForApplication() ///阻塞等待程序进入Running状态
asyncMonitorApplication() ///异步监控程序的运行状态
- 当作业提交到Yarn中之后,Yarn创建一个进程。如果是Driver则运行ApplicationMaster,否则运行ExecutorLauncher。是否是Driver通过yarn.client的isLaunchingDriver变量决定。isLaunchingDriver取值依赖于args.userClass是否存在,用户的指令中提供了--class参数,则args.userClass中的值就是用户提供的--class参数。此逻辑在.yarn.ClientArguments类中的parseArgs中
- 我们的spark-submit提供了--class参数,所以,Yarn将启动ApplicationMaster进程。也就是说,在Yarn的运行时环境中,启动了Spark的ApplicationMaster进程
在Yarn运行时环境中运行Spark的ApplicationMaster进程的执行逻辑
- 代码逻辑进入到ApplicationMaster的main方法中
def main(args: Array[String]) = {
SignalLogger.register(log)
val amArgs = new ApplicationMasterArguments(args) ///args是什么内容?
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) ///因为ApplicationMaster需要跟Yarn的ResourceManager交互,所以这里需要访问RM的YarnRMClientImpl实例
System.exit(master.run()) ///执行ApplicationMaster的run方法
}
}
- 在ApplicationMaster的run方法中调用ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个??
- 此处的逻辑先暂时不表,因为还没有清楚ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个,不过最终的逻辑都会走进ApplicationMaster的registerAM中,先继续吧!!!
- 在ApplicationMaster的registerAM方法中,调用YarnRMClient的register方法,如下是代码:
override def register(
conf: YarnConfiguration,
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
uiHistoryAddress: String,
securityMgr: SecurityManager) = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
logInfo("Registering the ApplicationMaster") ///注册APPlicationMaster到Yarn
synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)///调用AMRMClient的registerApplicationMaster方法
registered = true
}
new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
preferredNodeLocations, securityMgr) ///创建了一个YarnAllocationHandler,register方法不需要返回值
}
- 在ApplicationMaster的registerRM方法中,继续调用YarnAllocator的allocateResources方法。这个方法将近300行,主要的功能如其方法说明文档所说,主要是分配container,按照本机优先,本机架次之,其它机架最后的次序进行container分配
/**
* Allocate missing containers based on the number of executors currently pending and running.
*
* This method prioritizes the allocated container responses from the RM based on node and
* rack locality. Additionally, it releases any extra containers allocated for this application
* but are not needed. This must be synchronized because variables read in this block are
* mutated by other methods.
*/
- 在allocateResources的过程,会执行提交ExecutorRunnable实例到线程池的操作,(这里是对每个分配到的container交给线程池来处理使用)
val executorRunnable = new ExecutorRunnable( ///实现了Runnable接口的任务
container,
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr)
launcherPool.execute(executorRunnable) ///提交给线程池
}
- ExecutorRunnable在container中执行run方法中,封装启动org.apache.spark.executor.CoarseGrainedExecutorBackend进程的指令,这个逻辑是在prepareCommand中完成的,代码片段如下:
val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", ///java命令所在的位置
"-server",
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
"-XX:OnOutOfMemoryError='kill %p'") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", ///启动CoarseGrainedExecutorBackend进程
masterAddress.toString,
slaveId.toString,
hostname.toString,
executorCores.toString,
appId,
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
- 调用NMClient.startContainer启动Container
// Send the start request to the ContainerManager
nmClient.startContainer(container, ctx) //nmClient是NodeManager的实例,输入Hadoop Yarn的API
- 当CoarseGrainedExecutorBackend启动时,代码逻辑回到我们熟悉的轨道上来。调用preStart方法启动Executor
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores) ///给Driver发消息RegisterExecutor
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
- DriverActor收到RegisterExecutor的消息后,调用makeOffer方法,在makeOffer方法中,调用launchTasks方法给ExecutorActor发消息LaunchTask
- ExecutorBackEnd收到LaunchTask的消息后,调用Executor的launchTask方法,然后通过Executor里面的线程池提交任务到线程池执行
总结
囫囵吞枣似的的将yarn-client模式的执行流程走了一遍,毕竟是第一次接触到这里,同时对Yarn也没有很好的理解,所以这中间还有很多不明白的东西,还得不断的思考,细化。
以两幅图片作为结束,
相关推荐
spark初始化源码阅读sparkonyarn的client和cluster区别
内容概要:由于cdh6.3.2的spark版本为2.4.0,并且spark-sql被阉割,现基于cdh6.3.2,scala2.12.0,java1.8,maven3.6.3,,对spark-3.2.2源码进行编译 应用:该资源可用于cdh6.3.2集群配置spark客户端,用于spark-sql
http://spark.apache.org/docs/latest/running-on-yarn.html 准备工作 安装启动Hadoop(需要使用HDFS和YARN,已经ok) 安装单机版Spark(已经ok) 注意:不需要集群,因为把Spark程序提交给YARN运行本质上是把字节码给...
Spark基础理论: https://blog.csdn.net/weixin_45102492/article/details/104318250 Spark安装及Local模式:https://blog.csdn.net/weixin_45102492/article/details/104318738 ...有yarn-client和yarn-clus
./bin/spark-shell --master yarn --deploy-mode client --executor-cores 4 --num-executors 3 ② ./bin/spark-shell --master spark://zly:7077 --total-executor-cores 6 常⻅疑问 1) Spark与Apache Hadoop有...
一个完成的spark mllib 协同过滤推荐算法ALS 完整实例程序,基于 spark yarn-client模式运行,另外,包括训练数据。
-deploy-mode client --master yarn spark-terasort-0.1.jar 1G /user/root/teragenTeraSort 用法: $ spark-submit --class terasort.TeraSort --master $MESOS_MASTER spark-terasort-0.1.jar
yarn-client相当于是命令行 会将你输入的代码提交到yarn上面执行 yarn-cluster是将你写好的程序打成jar包然后提交到yarn上面去执行 然后yarn会将jar包分发到各个节点 并负责资源分配和任务管理 rack: null) dead for...
当flink on yarn模式运行时,发生如下异常信息,需要将压缩包中的4个依赖jar包放入flink安装路径下的lib目录下。 Exception in thread "main" java.lang.NoClassDefFoundError: ...
SPARK COMMAND spark2-submit --class pagerank.rddPagerank --master yarn --deploy-mode client spark2.jar / user / ec2-user / spark_assignment / input / yellow_tripdata * / user / ec2-user / spark_...
火花壳-主纱-部署模式客户端-驱动程序内存30G --executor内存30G --executor-cores 12 --num-executors 30 --conf spark.driver.maxResultSize = 20g sc.getConf.getAll.sorted.foreach(println) rows.rdd....
yarn-client客户端模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。yarn-cluster集群模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。SparkSession和SparkContextSparkSession是Spark 2.0引如的新概念。...
如图所⽰,MapReduce通过实现YARN的Client和ApplicationMaster接⼝集成到YARN中,利⽤YARN申请计算所需资源。 Storm 提供分布式、⾼性能、⾼可靠、容错的实时计算平台,可以对海量数据进⾏实时处理。CQL...
|XSQL是一种易于使用,运行稳定的多数据源查询引擎。1)首先,XSQL提供了一种使用标准SQL从NoSQL... XSQL仅在必要时使用YARN群集资源,此功能对某些使用情况很有用,例如用户将spark-xsql替换为RDMS Client。 我们