Driver Program是用户编写的提交给Spark集群执行的application,它包含两部分
- 作为驱动: Driver与Master、Worker协作完成application进程的启动、DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等。
- 计算逻辑本身,当计算任务在Worker执行时,执行计算逻辑完成application的计算任务
接下来的问题是,给定一个driver programming,哪些作为"驱动代码"在Driver进程中执行,哪些"任务逻辑代码”被包装到任务中,然后分发到计算节点进行计算?
1. 基本Spark driver application
package spark.examples.databricks.reference.apps.loganalysis import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object LogAnalyzer { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Log Analyzer in Scala").setMaster("local[3]") val sc = new SparkContext(sparkConf) //logFile path is provided by the program args val logFile = if (args != null && args.length == 1) args(0) else "E:\\softwareInstalled\\Apache2.2\\logs\\access.log" //transform each line of the logFile into ApacheAccessLog object, //RDD[T],T is of type ApacheAccessLog //Because it will be used more than more, so cache it. ////从数据源中读取文本文件内容,每一行转换为ApacheAccessLog,然后进行cache val accessLogs = sc.textFile(logFile).map(ApacheAccessLog.parseLogLine).cache() // Calculate statistics based on the content size. //Retrieve the contentSize column and cache it val contentSizes = accessLogs.map(log => log.contentSize).cache() ///reduce是个action,count是个action println("Content Size Avg: %s, Min: %s, Max: %s".format( contentSizes.reduce(_ + _) / contentSizes.count, contentSizes.min, contentSizes.max)) // Compute Response Code to Count. //Take first 100 responseCode, no sort here //take操作是个action val responseCodeToCount = accessLogs .map(log => (log.responseCode, 1)) .reduceByKey(_ + _) .take(100) println( s"""Response code counts: ${responseCodeToCount.mkString("[", ",", "]")}""") // Any IPAddress that has accessed the server more than 10 times. //take操作是个action val ipAddresses = accessLogs .map(log => (log.ipAddress, 1)) .reduceByKey(_ + _) .filter(_._2 > 10) //Get the ipAddress that accessed the server 10+ times .map(_._1) //Map to the IP Address column .take(100) println( s"""IPAddresses > 10 times: ${ipAddresses.mkString("[", ",", "]")}""") // Top Endpoints. //top操作是个action val topEndpoints = accessLogs .map(log => (log.endpoint, 1)) .reduceByKey(_ + _) .top(10)(OrderingUtils.SecondValueOrdering) println( s"""Top Endpoints: ${topEndpoints.mkString("[", ",", "]")}""") sc.stop() } }
Spark application首先在Driver上开始运行main函数,执行过程中。 计算逻辑的开始是从读取数据源(比如HDFS中的文件)创建RDD开始,RDD分为transform和action两种操作,transform使用的是懒执行,而action操作将会触发Job的提交。
Job的提交以为着DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等,这是真正的将任务分发的开始。因此,从代码中首先要识别出哪些action。
另外一个问题, 任务执行过程中执行的逻辑代码如何识别?
Application在Driver上执行,遇到RDD的action动作后,开始提交作业,当作业执行完成后,后面的作业陆续提交,也就是说,虽然一个Spark Application可以有多个Job(每个action对应一个Job),这些Job是顺序执行的,Job(X)执行完成才会执行Job(X+1)。
遇到Job执行,比如上面的contentSize.reduce(_+_),那么所谓的计算逻辑就是函数 _+_,这是个求和操作。具体的任务逻辑执行时,还是从action回溯RDD,中间经过转换得到最终的这个Task所属的Partition的数据,然后执行reduce(_+_)
2. Spark Stream程序
package spark.examples.streaming import java.sql.{PreparedStatement, Connection, DriverManager} import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ //No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver? object SparkStreamingForPartition { def main(args: Array[String]) { val conf = new SparkConf().setAppName("NetCatWordCount") conf.setMaster("local[3]") val ssc = new StreamingContext(conf, Seconds(5)) val dstream = ssc.socketTextStream("192.168.26.140", 9999) //foreachRDD是DStream的动作函数,会触发Job执行,然后对一个时间间隔内创建的RDD进行处理。如果RDD执行RDD的动作函数,是否继续触发Job执行? dstream.foreachRDD(rdd => { //embedded function def func(records: Iterator[String]) { var conn: Connection = null var stmt: PreparedStatement = null try { val url = "jdbc:mysql://192.168.26.140:3306/person"; val user = "root"; val password = "" conn = DriverManager.getConnection(url, user, password) records.flatMap(_.split(" ")).foreach(word => { val sql = "insert into TBL_WORDS(word) values (?)"; stmt = conn.prepareStatement(sql); stmt.setString(1, word) stmt.executeUpdate(); }) } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } ///对RDD进行重新分区,以改变处理的并行度 val repartitionedRDD = rdd.repartition(3) ///对每个分区调用func函数,func函数的参数就是一个分区对应的数据的遍历器(Iterator) repartitionedRDD.foreachPartition(func) }) ssc.start() ssc.awaitTermination() } }
DStream的foreachRDD是个Output Operation,类似于RDD的action,因此,高阶函数foreachRDD的函数参数,将在worker上执行。这里的func是定义为foreachRDD参数函数的内部函数,因此会发送到Worker上执行,如果func定义在最外层,比如作为main函数的直接内部函数,是否可以顺利的从Driver序列化到Worker上呢?我认为是可以的。函数在Scala中只是一个普通的对象,没有状态,序列化反序列化时需要创建MySQL的Connection。
3. foreachRDD中的rdd继续执行action算子
package spark.examples.streaming import java.sql.{PreparedStatement, Connection, DriverManager} import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object SparkStreamingForPartition2 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("NetCatWordCount") conf.setMaster("local[3]") val ssc = new StreamingContext(conf, Seconds(5)) val dstream = ssc.socketTextStream("192.168.26.140", 9999) dstream.foreachRDD(rdd => { //对于空RDD调用flatMap报错,这里进行判断 if (!rdd.isEmpty()) { ///RDD的Record个数 val recordCount = rdd.count() ///RDD中的单词数 val wordCount = rdd.flatMap(_.split(" ")).map(word => 1).reduce(_ + _) println("recordCount: =" + recordCount + "," + "wordCount:=" + wordCount) } else { println("Empty RDD, No Data") } }) ssc.start() ssc.awaitTermination() } }
需要注意的是,对于没有任何元素的RDD(RDD.isEmpty为true),那么不能执行转换算子如flatMap等,这在Spark Streaming中读取空的RDD是很常见的情况?但是这个空检查是在RDD的reduce函数中执行的
def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }
如下操作对于空RDD是可以的,返回0
rdd.flatMap(_.split(" ")).map(_ => 1).count()
相关推荐
使用v8worker2在Golang上构建的ReasonML运行时
Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。 SparkContext:整个应用的上下文,控制应用的生命周期。 RDD:Spark的基本计算单元,一组RDD...
将模块移动到Web Worker中,自动将导出的函数反射为异步代理。
dolphinscheduler分布式部署,有3个worker节点,3个master节点 ...5. 解决方案:去掉processUtils.killYarnJob逻辑(hive、spark客户端执行,只需kill 本地进程,yarn任务会自动取消) 作者:weixin_4
命令php think worker:gateway在windows下运行提示GatewayWorker Not Support On Windows. 更改为自定义TP命令行启动要打开三个命令窗口启动,不方便测试。根据GatewayWorker-for-win提供的demo修改的 本资源依赖...
此方法的两个主要用途是:在Web Worker中使用DOM:在worker中编写DOM调用,并使它们在主线程上运行在DOM上方便地调用Web Worker代码的代码:在Web上自动发生的调用Worker,有助于防止繁重JavaScript调用使主线程陷入...
Spark 运行架构如图 1 所示,包括集群资源管理器(Cluster Manager)、多个运行作业任务的工作结点(Worker Node)、每个应用的任务控制结点(Driver)和每个工作结点上负责具体任务的执行进程(Executor)。...
WorkerDOM:实现运行在一个Web Worker中的 DOM API 和框架
要使用worker-loader将Web Worker加载到NextJS站点上,并允许在其worker上运行babel等webpack加载器,必须覆盖构建输出路径。 感谢。 // next.config.jsmodule . exports = { webpack ( config , options ) { ...
C++员工管理代码worker
关于worker类的c++源代码,更好掌握类的用法
精通Spark内核:此阶段聚焦于Spark内核的设计、实现和核心源码解析,对内核中的实现架构、运行原理、性能调优和核心源码各个击破: 1, 通过源码精通Spark内核实现和任务调度; 2,精通RDD、DAGScheduler、Task...
运行在Cloudflare Worker上的RSS订阅生成器.zip
Spark支持独立部署模式,包括一个Spark master进程和多个 Spark worker进程.独立部署模式可以运行在单机上作为测试之用,也可以部署在集群上.如果你打算部署在集群上,可以使用我们提供的部署脚本启动一个集群。
SparkStreaming 运行集群 ./sbin/start-master.sh ...在集群上部署和运行应用程序 linuxmint-virtual-machine bin # ./spark-submit --class it.blog.spark.streaming.JavaNetworkTruckPosition --master spark://linu
worker_spark PostgreSQL 9.3的后台工作程序,它定期执行一个过程。快速开始首先,创建一个产生一些可见效果的过程: CREATE FUNCTION public.my_spark()RETURNS voidLANGUAGE plpgsqlAS $$BEGIN RAISE LOG 'Spark!'...
用于在命令行上测试ServiceWorker代码的沙盒化ServiceWorker上下文。 测试编写在ServiceWorker运行的代码非常困难,并且通常需要浏览器环境和许多工作来完成。 sw-test-env是对ServiceWorker代码进行简单的单元/...
前端预览pdf pdf.js中代码文件pdf.worker.js,文件过大,可修改web/viewer.js中的workerSrc路径,改为对应版本地址
Web Worker 使用起来非常简单,在“主线程”中执如下操作即可创建一个 Worker 实,通过监听onmessage 事件获取消息,通过 postMess
它支持在Spark集群上进行分布式TensorFlow训练和推理,其目标是最大程度地减少在共享网格上运行现有TensorFlow程序所需的代码更改量。 其兼容Spark的API通过以下步骤帮助管理TensorFlow集群: 启动-在执行程序上...