1. Spark Streaming程序代码
package spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object SparkStreamingKakfaWordCount { def main(args: Array[String]) { println("Start to run SparkStreamingKakfaWordCount") val conf = new SparkConf().setAppName("SparkStreamingKakfaWordCount") val ssc = new StreamingContext(conf, Seconds(3)) val topicMap = "test".split(":").map((_, 1)).toMap //zookeeper quorums server list val zkQuorum = "localhost:2181"; //consumer group val group = "test-consumer-group" val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) lines.print() //使用如下语句,则print显示在console上是一堆空白的();使用上面的语句则可以正常显示每行文本 // lines.map(line => { // println(line) // }).print ssc.start() ssc.awaitTermination() } }
2. 生产消息到指定的topic
生产消息,可以有如下两种方式
1.可以使用Kafka自带的通过console为Kakfa生产消息的命令行
./kafka-console-producer.sh --topic test --broke-list localhost:9081
2.也可以使用如下Spark Streaming提供的example中的Kafka Message Producer
./run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
3. 问题
3.1. KafkaUtils.createStream中的consumerGroupId这个参数指的是consumerGroup的标识,那么什么是consumerGroup?有什么用处? 表示所有的Spark Streaming的Receiver(构建于Kafka的Consumer API之上),同属于一个Consumer Group,因此多个Receiver可以并行消费同一个Topic的不同的partition
3.2. KafkaUtils.createStream中的topicMap这个参数是一个Map,是topicName跟一个整数的映射,topicName可以理解,那么这个对应的整数作何解?
看下KafkaUtils.createStream的方法说明。可见每个topicName对应的整数是这个topic的分区数,即每个Receiver可以使用分区数个线程来并发的读取Topic的numPartitions个分区
/** * Create an input stream that pulls messages from a Kafka Broker. * @param ssc StreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, storageLevel) }
3.3 为什么下面代码会打印()
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) lines.map(line => { println(line) }).print
首先KafkaUtils.createStream返回的是一个ReceiverInputDStream,它包含的数据类型是二元组(String, String),因此,可以调用(_._2)得到二元组第二个元素,直接调用lines.print方法,DStream应该是把lines这个DStream中的内容打印出来。
但是使用上面的代码,流程是,调用print,然后调用lines.map做转换,转换的结果是按说应该应该打印结果,然后返回一个空集合?
之前在做Flume与Spark的对接时,有过类似的处理操作,它的逻辑是:
val lines = FlumeUtils.createStream(ssc,"localhost",9999) lines.map(evt => { val str = new String(evt.event.getBody.array()) ("string received: "+ str) }).print()
lines.map操作,返回的是一个("string received: " + str)字符串,因此print可以将它打印出来,如果调用println("string received: " + str),估计跟上面Kafka的结果一样
复杂的场景:
1. Kafka伪分布式安装localhost:9092,localhost:9093,localhost:9094
2. Topic: 6个partition,2个replica
3. Spark Streaming启动6个Receiver,每个Receiver1个线程读取Topic中的一个partition
package spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming._ //假设Kafka有6个分区 //Spark Streaming创建6个Input DStream,并行读6个分区 //Spark Streaming将RDD重新分区为18个RDD,进行并行处理,处理逻辑的并行度是读取并行度的3倍 object MyKafkaWordCount { def main(args: Array[String]) { println("Start to run MyKafkaWordCount") val conf = new SparkConf().setAppName("MyKafkaWordCount").setMaster("local[20]") val ssc = new StreamingContext(conf, Seconds(3)) // val topicMap = Map("topic-p6-r2"->1) val zkQuorum = "localhost:2181"; val group = "topic-p6-r2-consumer-group" //Kakfa has 6 partitions, here create 6 Input DStream val streams = (1 to 6).map ( _ => KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) ) ///将6个streams进行union val partitions = ssc.union(streams).repartition(18).map("DataReceived: " + _) partitions.print() ssc.start() ssc.awaitTermination() } }
1.提交application的代码:
结果:
1.Spark Streaming收到数据的顺序跟发送的顺序不一致,不一致就对了,因为Kafka保证的partittion内的数据是发送和接收的循序保证,但是不保证,partition之间的顺序性
2.启动脚本:(因为需要6个Receiver,因此只能使用local模式,给20个worker threads)
./spark-submit --name MyKafkaWordCount --master local[20] --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.MyKafkaWordCount Hello2.jar
有待阅读:
http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/?utm_source=tuicool
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
相关推荐
kafka与streaming配置与开发文档001. kafka版本为kafka_2.10-0.8.2.0 spark版本为1.3.0
Spark_Streaming整合Kafka.md
spark-streaming-kafka-0-8_2.11-2.4.0.jar
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
spark streaming kafka
通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现
python练习100题的题目
spark streaming 链接kafka必用包,欢迎大家下载与使用
spakr streaming的kafka依赖
kafka kafka与sparkStreaming kafka与Scala
spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
kafka-sparkstreaming-cassandra, 用于 Kafka Spark流的Docker 容器 用于 Kafka Spark流的Docker 容器这里Dockerfile为实验 Kafka 。Spark流( PySpark ) 和Cassandra设置了完整的流环境。 安装Kafka 0.10.2.1用于 ...
<scala.version>2.10.5 <spark.version>1.6.2</spark.version> <jackson.version>2.4.3 <hbase.version>1.2.0 的pom文件
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
spark+kafka 项目jar包
spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.1.1.jar
使用pyspark的stream操作kafka时,需要用到的jar包
spark3.0.0版本对接kafka数据源需要的jar包,最新的版本导致maven的阿里云仓库不能直接下载下来,所以需要手动导入jar包进行操作,有需要的朋友可以免费下载
Spark Streaming 中提供了如下三种位置策略,用于指定 Kafka 主题分区与 Spark 执行程序 Executors 之间的分配关系: + **PreferConsistent** : 它将在所有的 Executors 上均匀分配分区; + **PreferBrokers** : ...
spark streaming 链接kafka的必用包,国内下载很慢,特意分享出来,欢迎大家下载,速度很快哦