1. combineByKey函数的运行机制
RDD提供了很多针对元素类型为(K,V)的API,这些API封装在PairRDDFunctions类中,通过Scala隐式转换使用。这些API实现上是借助于combineByKey实现的。combineByKey函数本身也是RDD开放给Spark开发人员使用的API之一
首先看一下combineByKey的方法说明:
/** * Generic function to combine the elements for each key using a custom set of aggregation * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C * Note that V and C can be different -- for example, one might group an RDD of type * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: * * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C's into a single one. * * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
combineByKey的功能是对RDD中的数据集按照Key进行聚合(想象下Hadoop MapReduce的Combiner,用于Map端做Reduce)。聚合的逻辑是通过自定义函数提供给combineByKey。
从上面的源代码中可以看到,combineByKey是把(K,V)类型的RDD转换为(K,C)类型的RDD,C和V可以不一样。
combineByKey函数需要三个重要的函数作为参数
createCombiner:在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey第一次遇到值为k的Key(类型K),那么将对这个(k,v)调用combineCombiner函数,它的作用是将v转换为c(类型是C,聚合对象的类型,c作为局和对象的初始值)
mergeValue:在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey不是第一次(或者第二次,第三次...)遇到值为k的Key(类型K),那么将对这个(k,v)调用mergeValue函数,它的作用是将v累加到聚合对象(类型C)中,mergeValue的
类型是(C,V)=>C,参数中的C遍历到此处的聚合对象,然后对v进行聚合得到新的聚合对象值
mergeCombiners:因为combineByKey是在分布式环境下执行,RDD的每个分区单独进行combineByKey操作,最后需要对各个分区的结果进行最后的聚合,它的函数类型是(C,C)=>C,每个参数是分区聚合得到的聚合对象。
通过上面的分析,combineByKey的流程是:
假设一组具有相同 K 的 <K, V> records 正在一个个流向 combineByKey(),createCombiner 将第一个 record 的value 初始化为 c (比如,c = value),然后从第二个 record 开始,来一个 record 就使用 mergeValue(c, record.value) 来更新 c,比如想要对这些 records 的所有 values 做 sum,那么使用 c = c + record.value。等到records 全部被 mergeValue(),得到结果 c。假设还有一组 records(key 与前面那组的 key 均相同)一个个到来, combineByKey() 使用前面的方法不断计算得到 c'。现在如果要求这两组 records 总的 combineByKey() 后的结果,那么可以使用 final c = mergeCombiners(c, c') 来计算。
2.combineByKey应用举例
2.1 求均值
假设有一组气象数据,每行数据包含日期和当天的气温(比如,20150601 27),那么可以用combineByKey求每月的平均温度,伪代码如下:
C的类型是(Int,Int),表示对于给定的月份,温度累加值和当月的天数
val rdd = sc.textFile("气象数据") val rdd2 = rdd.map(x=>x.split(" ")).map(x => (x(0).substring("从年月日中提取年月"),x(1).toInt)) val createCombiner = (k: String, v: Int)=> { (v,1) } val mergeValue = (c:(Int, Int), v:Int) => { (c._1 + v, c._2 + 1) } val mergeCombiners = (c1:(Int,Int),c2:(Int,Int))=>{ (c1._1 + c2._1, c1._2 + c2._2) } val vdd3 = vdd2.combineByKey( createCombiner, mergeValue, mergeCombiners ) rdd3.foreach(x=>println(x._1 + ": average tempreture is " + x._2._1/x._2._2)
相关推荐
内容根据spark rdd.scala和ParRDDFunctions.scala源码中rdd顺序整理,包含rdd功能解释。对熟悉spark rdd很有用
本文详细的描述了spark rdd的api 这些api 应该够我们日常生产使用了
包括spara rdd api,dataframe action操作、查询操作、join操作,dataframe rdd dataset 相互转换以及spark sql。
spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。
在Java中,函数需要作为实现了Spark的org.apache,spark.api.java.function包中的任一函数接口的对象传递。 函数名 实现的方法 用途 Function, R> R call(T) 接收一个输入值并返回一个输出值,用于类似map() 和filter...
Spark思维导图之Spark RDD.png
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及...
Spark RDD 资料
10 实战解析spark运行原理和RDD解密
Spark分布式计算和RDD模型研究.docxSpark分布式计算和RDD模型研究.docx
Spark分布式计算和RDD模型研究.pdfSpark分布式计算和RDD模型研究.pdf
spark RDD论文中文版
spark实验5 rdd编程2.doc
本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。 如何创建RDD? RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。 举例:从普通数组创建RDD,里面包含了1到9
大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc
spark rdd相关操作详解;包括全部的操作说明和举例;
spark API RDD pdf版的..........对初学者应该有所帮助
RDD即弹性分布式数据集,有容错机制并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征。RDD只是数据集的抽象,分区内部并不会存储具体的数据。
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip
结合代码详细描述RDD算子的执行流程,并配上执行流程图