在实时计算的实际应用中,有时除了需要关心一个时间间隔内的数据,有时还可能会对整个实时计算的所有时间间隔内产生的相关数据进行统计。
比如: 对Nginx的access.log实时监控请求404时,有时除了需要统计某个时间间隔内出现的次数,有时还需要统计一整天出现了多少次404,也就是说404监控横跨多个时间间隔。
Spark Streaming的解决方案是累加器,工作原理是,定义一个类似全局的可更新的变量,每个时间窗口内得到的统计值都累加到上个时间窗口得到的值,这样这个累加值就是横跨多个时间间隔
package spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner import org.apache.spark.streaming._ /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every * second starting with initial value of word count. * Usage: StatefulNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive * data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example * org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` */ object StatefulNetworkWordCount { def main(args: Array[String]) { ///函数常量定义,返回类型是Some(Int),表示的含义是最新状态 ///函数的功能是将当前时间间隔内产生的Key的value集合,加到上一个状态中,得到最新状态 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } ///入参是三元组遍历器,三个元组分别表示Key、当前时间间隔内产生的对应于Key的Value集合、上一个时间点的状态 ///newUpdateFunc的返回值要求是iterator[(String,Int)]类型的 val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { ///对每个Key调用updateFunc函数(入参是当前时间间隔内产生的对应于Key的Value集合、上一个时间点的状态)得到最新状态 ///然后将最新状态映射为Key和最新状态 iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[3]") // Create the context with a 5 second batch size val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.checkpoint(".") // Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream("192.168.26.140", 9999) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) //注意updateStateByKey的四个参数,第一个参数是状态更新函数 val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() } }
上面的核心操作时DStream的updateStateByKey函数操作,它接受四个参数,
2. DStream.updateStateByKey剖析
方法说明:
/** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. Note, that this function may generate a different * tuple with a different key than the input key. Therefore keys may be removed * or added in this way. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @param initialRDD initial state value of each key. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean, initialRDD: RDD[(K, S)] ): DStream[(K, S)] = { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, Some(initialRDD)) }
- initialRDD是(K,S)类型的RDD,它表示一组Key的初始状态,每个(K,S)表示一个Key以及它对应的State状态。 K表示updateStateByKey的Key的类型,比如String,而S表示Key对应的状态(State)类型,在上例中,是Int
- rememberPartitioner: 表示是否在接下来的Spark Streaming执行过程中产生的RDD使用相同的分区算法
- partitioner: 分区算法,上例中使用的Hash分区算法,分区数为ssc.sparkContext.defaultParallelism
- updateFunc是函数常量,类型为(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],表示状态更新函数
(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]如何解读?
入参: 三元组迭代器,三元组中K表示Key,Seq[V]表示一个时间间隔中产生的Key对应的Value集合(Seq类型,需要对这个集合定义累加函数逻辑进行累加),Option[S]表示上个时间间隔的累加值(表示这个Key上个时间点的状态)
出参:二元组迭代器,二元组中K表示Key,S表示当前时间点执行结束后,得到的累加值(即最新状态)
相关推荐
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
由于sparkstreaming的mini-batch机制,必须将之前的状态结果存储在RDD中并在下一次batch计算时将其取出进行合并,这就是updateStateByKey方法的用处。 2、updateStateByKey操作,可以让我们为每个key维护一份state,...
spark Streaming和structed streaming分析,理解整个 Spark Streaming 的模块划分和代码逻辑。
包中构建了Java以及Scala混合框架的maven打包框架以及关于spark core,spark sql 、spark streaming的一些典型案例或者算子使用。
Spark Submitter会议上的写spark streaming的建议,主要涉及spark streaming和kafka对接建议,以及相关逻辑
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...
java的sparkstreaming连接kafka的例子,kafka生产者生产消息,消费者读取消息,sparkstreaming读取kafka小区并进行存储iotdb数据库。
spark streaming原理介绍和代码实战操作
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
Spark核心概念简介: Spark使用maven进行打包(减少jar包大小): Spark中的(弹性分布式数据集)简称RDD: ...SparkStreaming中的正常操作(每读2秒就计算一次): Spark中的local[2]: Spark中的处理流程图像:
1.理解Spark Streaming的工作流程。 2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)...
spark2018欧洲峰会中关于StructuredStreaming中stateful stream processing的ppt
这是一个基于Scala语言开发的Spark RDD、Spark SQL、Spark Streaming相关Demo,包含35个文件。主要文件类型包括29个Scala源文件、2个Markdown文档、1个Reduced文件、1个XML文件、1个Java源文件和1个TXT文件。该项目...
机器学习(Machine Learning, ML)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识...
spark Streaming的原理介绍和与storm的对比
你可以在Spark SQL上引擎上使用DataSet/DataFrame API处理流数据的聚集,事件窗口,和流与批次的连接操作等。最后Structured Streaming 系统快速,稳定,端到端的恰好一次保证,支持容错的处理。
spark streaming streaming
本文SparkStream从磁盘文件、HDFS、KAFKA获取数据源,以单词频次统计作为入门案例,介绍了SparkStream模块API的使用。同时介绍了SparkStream的特点