Spark Streaming UI这块是本篇额外的内容,与主题无关,只是把它记录下来
Spark Streaming UI上一组统计数字的含义
Streaming
- Started at: 1433563238275(Spark Streaming开始运行的时间)
- Time since start: 3 minutes 51 seconds(Spark Streaming已经运行了多长时间)
- Network receivers: 2(Receiver个数)
- Batch interval: 1 second(每个Batch的时间间隔,即接收多长时间的数据就生成一个Batch,或者说是RDD)
- Processed batches: 231 (已经处理的Batch个数,不管Batch中是否有数据,都会计算在内,)
- Waiting batches: 0 (等待处理的Batch数据,如果这个值很大,表明Spark的处理速度较数据接收的速度慢,需要增加计算能力或者降低接收速度)
- Received records: 66 (已经接收到的数据,每读取一次,读取到的所有数据称为一个record)
- Processed records: 66 (已经处理的record)
(Processed batches + Waiting batches) * Batch Interval = Time Since Start
Spark Streaming Checkpoint的一个坑
源代码:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreamingCheckpointEnabledTest { def main(args: Array[String]) { val checkpointDirectory = "file:///d:/data/chk_streaming" def funcToCreateSSC(): StreamingContext = { val conf = new SparkConf().setAppName("NetCatWordCount") conf.setMaster("local[3]") val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(checkpointDirectory) ssc } val ssc = StreamingContext.getOrCreate(checkpointDirectory, funcToCreateSSC) val numStreams = 2 val streams = (1 to numStreams).map(i => ssc.socketTextStream("localhost", 9999)) val lines = ssc.union(streams) lines.print() ssc.start() ssc.awaitTermination() } }
以上代码是错误的,因为停掉Driver后再次重启,将无法启动,解决办法是将streams的操作放到funcToCreateSSC函数里,ssc返回前
object SparkStreamingCheckpointEnabledTest { def process(streams: Seq[DStream[String]], ssc: StreamingContext) { val lines = ssc.union(streams) lines.print } def main(args: Array[String]) { val checkpointDirectory = "file:///d:/data/chk_streaming" def funcToCreateSSC(): StreamingContext = { val conf = new SparkConf().setAppName("NetCatWordCount") conf.setMaster("local[3]") val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(checkpointDirectory) val numStreams = 2 val streams = (1 to numStreams).map(i => ssc.socketTextStream("localhost", 9999)) process(streams, ssc) ssc } val ssc = StreamingContext.getOrCreate(checkpointDirectory, funcToCreateSSC) ssc.start() ssc.awaitTermination() } }
相关推荐
3、覆盖Spark所有功能点(Spark RDD、Spark SQL、Spark Streaming,初级功能到高级特性,一个不少); 4、Scala全程案例实战讲解(近百个趣味性案例); 5、Spark案例实战的代码,几乎都提供了Java和Scala两个版本和...
本项目基于Scala开发,包含148个文件,包括Scala源代码、CRC校验文件、TXT文本文件、以及多个checkpoint和ck文件。系统实现了基于Scala的Spark_Core、Spark_SQL和Spark_Streaming功能,界面友好,功能完善,适合用于...
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream object UpadateDemo { def main(args: Array[String]): Unit = { val conf =...
Spark-2.3.1源码解读。...Spark Streaming源码阅读 动态发现新增分区 Dstream join 操作和 RDD join 操作的区别 PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会
9.SparkStreaming Checkpoint 10.消费Kafka偏移量管理 第六章、StructuredStreaming模块 1.StructuredStreaming 概述(核心设计和编程模型) 2.入门案例:WordCount 3.输入源InputSources 4.Streaming Query 设置 5....
本源码为基于Apache Spark的Scala大数据处理设计,共包含191个文件,其中crc文件56个,class文件39个,scala文件20个,bk文件10个,xml文件8个,txt文件2个,properties文件2个,idea/$PRODUCT_WORKSPACE_FILE$文件1...
下面是使用过程中记录的一些心得和博客,感兴趣的朋友可以了解下:项目简介该项目提供了一个在使用spark streaming2.3+kafka1.3的版本集成时,手动存储偏移量到zookeeper中,因为自带的checkpoint弊端太多,不利于...
公司有一个比较核心的实时业务用的是spark streaming2.1.0+kafka0.9.0.0的流式技术来开发的,存储用的hbase+elasticsearch+redis,这中间趟过很多坑,解决了一些bug和问题,在这里我把它做成了一个骨架项目并开源...
Fire框架 Fire框架是由中通大数据自主研发并开源的、专门用于进行Spark和Flink任务开发的...@Streaming(interval = 100, unaligned = true) // 100s做一次checkpoint,开启非对齐checkpoint @Kafka(brokers = "local
通过 SparkStreaming 从 Kafka 加载数据到 HBase 表的演示。 以分钟为基础计算 MIN、MAX、AVG(SUM、CNT)。 Kafka 主题:demo-stream-topic HBase 表:演示日志HBase 家族:demo-ts-metrics 输入数据示例(准确...
亚信18年java笔试题 1、程序的入口函数类: ...本机调试运行: 直接运行 ... spark yarn运行: ./bin/spark-submit ..../lib/streaming-log-0.0.1-...checkpoint路径 日志类型 本类型日志的个性化处理 存储日志HDFS路径 日
文章目录Flink概述Flink生态为什么选择Flink?系统架构JobManager运行...CheckpointFlink部署与运行Yarn运行Flink作业Flink YARN SessionRun a single Flink job on YARN(推荐)Standalone部署Storm、Spark-Streaming