1. aggregateByKey的运行机制
/** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument * instead of creating a new U. */ def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner) }
从aggregateByKey的源代码中,可以看出
a.aggregateByKey把类型为(K,V)的RDD转换为类型为(K,U)的RDD,V和U的类型可以不一样,这一点跟combineByKey是一样的,即返回的二元组的值类型可以不一样
b.aggregateByKey内部是通过调用combineByKey实现的,combineByKey的createCombiner函数逻辑由zeroValue这个变量实现,zeroValue作为聚合的初始值,通常对于加法聚合则为0,乘法聚合则为1,集合操作则为空集合
c.seqOp在combineByKey中的功能是mergeValues,(U,V)=>U
d.combOp在combineByKey中的功能是mergeCombiners
2. aggregateByKey举例
2.1 求均值
val rdd = sc.textFile("气象数据") val rdd2 = rdd.map(x=>x.split(" ")).map(x => (x(0).substring("从年月日中提取年月"),x(1).toInt)) val zeroValue = (0,0) val seqOp= (u:(Int, Int), v:Int) => { (u._1 + v, u._2 + 1) } val compOp= (c1:(Int,Int),c2:(Int,Int))=>{ (u1._1 + u2._1, u1._2 + u2._2) } val vdd3 = vdd2.aggregateByKey( zeroValue , seqOp, compOp ) rdd3.foreach(x=>println(x._1 + ": average tempreture is " + x._2._1/x._2._2)
从求均值的实现来看,aggregate通过提供零值的方式,避免了combineByKey中的createCombiner步骤(createCombiner本质工作就是遇到第一个key时进行初始化操作,这个初始化不是提供零值,而是对第一个(k,v)进行转换得到c的初始值))
相关推荐
内容根据spark rdd.scala和ParRDDFunctions.scala源码中rdd顺序整理,包含rdd功能解释。对熟悉spark rdd很有用
本文详细的描述了spark rdd的api 这些api 应该够我们日常生产使用了
包括spara rdd api,dataframe action操作、查询操作、join操作,dataframe rdd dataset 相互转换以及spark sql。
spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。
在Java中,函数需要作为实现了Spark的org.apache,spark.api.java.function包中的任一函数接口的对象传递。 函数名 实现的方法 用途 Function, R> R call(T) 接收一个输入值并返回一个输出值,用于类似map() 和filter...
Spark思维导图之Spark RDD.png
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及...
10 实战解析spark运行原理和RDD解密
Spark分布式计算和RDD模型研究.docxSpark分布式计算和RDD模型研究.docx
Spark分布式计算和RDD模型研究.pdfSpark分布式计算和RDD模型研究.pdf
Spark RDD 资料
spark RDD论文中文版
spark实验5 rdd编程2.doc
本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。 如何创建RDD? RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。 举例:从普通数组创建RDD,里面包含了1到9
大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc
spark rdd相关操作详解;包括全部的操作说明和举例;
spark API RDD pdf版的..........对初学者应该有所帮助
RDD即弹性分布式数据集,有容错机制并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征。RDD只是数据集的抽象,分区内部并不会存储具体的数据。
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip
结合代码详细描述RDD算子的执行流程,并配上执行流程图