`
bit1129
  • 浏览: 1051314 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark五十三】Spark Streaming整合Kafka

 
阅读更多

 

1. Spark Streaming程序代码

 

package spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object SparkStreamingKakfaWordCount {
  def main(args: Array[String]) {
    println("Start to run SparkStreamingKakfaWordCount")
    val conf = new SparkConf().setAppName("SparkStreamingKakfaWordCount")
    val ssc = new StreamingContext(conf, Seconds(3))
    val topicMap = "test".split(":").map((_, 1)).toMap
    //zookeeper quorums server list
    val zkQuorum = "localhost:2181";
    //consumer group
    val group = "test-consumer-group"
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
     lines.print()
//使用如下语句,则print显示在console上是一堆空白的();使用上面的语句则可以正常显示每行文本
//    lines.map(line => {
//      println(line)
//    }).print


    ssc.start()
    ssc.awaitTermination()
  }
}

 

2. 生产消息到指定的topic

 

生产消息,可以有如下两种方式

 

1.可以使用Kafka自带的通过console为Kakfa生产消息的命令行

./kafka-console-producer.sh --topic test --broke-list localhost:9081

 

2.也可以使用如下Spark Streaming提供的example中的Kafka Message Producer

 

./run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5

 

3. 问题

3.1. KafkaUtils.createStream中的consumerGroupId这个参数指的是consumerGroup的标识,那么什么是consumerGroup?有什么用处? 表示所有的Spark Streaming的Receiver(构建于Kafka的Consumer API之上),同属于一个Consumer Group,因此多个Receiver可以并行消费同一个Topic的不同的partition

3.2. KafkaUtils.createStream中的topicMap这个参数是一个Map,是topicName跟一个整数的映射,topicName可以理解,那么这个对应的整数作何解?

看下KafkaUtils.createStream的方法说明。可见每个topicName对应的整数是这个topic的分区数,即每个Receiver可以使用分区数个线程来并发的读取Topic的numPartitions个分区

 

  /**
   * Create an input stream that pulls messages from a Kafka Broker.
   * @param ssc       StreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   * @param groupId   The group id for this consumer
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread
   * @param storageLevel  Storage level to use for storing the received objects
   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
   */
  def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)] = {
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000")
    createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics, storageLevel)
  }

 

3.3  为什么下面代码会打印()

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.map(line => {
 println(line)
}).print

首先KafkaUtils.createStream返回的是一个ReceiverInputDStream,它包含的数据类型是二元组(String, String),因此,可以调用(_._2)得到二元组第二个元素,直接调用lines.print方法,DStream应该是把lines这个DStream中的内容打印出来。

但是使用上面的代码,流程是,调用print,然后调用lines.map做转换,转换的结果是按说应该应该打印结果,然后返回一个空集合?

之前在做Flume与Spark的对接时,有过类似的处理操作,它的逻辑是:

    val lines = FlumeUtils.createStream(ssc,"localhost",9999)
    lines.map(evt => {
       val str = new String(evt.event.getBody.array())
      ("string received: "+ str)
    }).print()

lines.map操作,返回的是一个("string received: " + str)字符串,因此print可以将它打印出来,如果调用println("string received: " + str),估计跟上面Kafka的结果一样

 

 

 

复杂的场景:

1. Kafka伪分布式安装localhost:9092,localhost:9093,localhost:9094

2. Topic: 6个partition,2个replica

3.  Spark Streaming启动6个Receiver,每个Receiver1个线程读取Topic中的一个partition

 

package spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._

//假设Kafka有6个分区
//Spark Streaming创建6个Input DStream,并行读6个分区
//Spark Streaming将RDD重新分区为18个RDD,进行并行处理,处理逻辑的并行度是读取并行度的3倍
object MyKafkaWordCount {
  def main(args: Array[String]) {
    println("Start to run MyKafkaWordCount")
    val conf = new SparkConf().setAppName("MyKafkaWordCount").setMaster("local[20]")
    val ssc = new StreamingContext(conf, Seconds(3))
    //
    val topicMap = Map("topic-p6-r2"->1)
    val zkQuorum = "localhost:2181";
    val group = "topic-p6-r2-consumer-group"

    //Kakfa has 6 partitions, here create 6 Input DStream
    val streams = (1 to 6).map ( _ =>
      KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    )
    
    ///将6个streams进行union
    val partitions = ssc.union(streams).repartition(18).map("DataReceived: " + _)

    partitions.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 

1.提交application的代码:

 

结果:

1.Spark Streaming收到数据的顺序跟发送的顺序不一致,不一致就对了,因为Kafka保证的partittion内的数据是发送和接收的循序保证,但是不保证,partition之间的顺序性

2.启动脚本:(因为需要6个Receiver,因此只能使用local模式,给20个worker threads)

./spark-submit --name MyKafkaWordCount --master local[20] --executor-memory 512M --total-executor-cores 2  --class spark.examples.streaming.MyKafkaWordCount Hello2.jar

 

 

 

 

 

 

 

 

 

有待阅读:

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/?utm_source=tuicool

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics