1.distinct
2.cogroup
1.distinct
1.示例代码
package spark.examples import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object SparkRDDDistinct { def main(args : Array[String]) { val conf = new SparkConf().setAppName("SparkRDDDistinct").setMaster("local"); val sc = new SparkContext(conf); val rdd1 = sc.parallelize(List(1,8,2,1,4,2,7,6,2,3,1), 3) val pairs = rdd1.distinct(); pairs.saveAsTextFile("file:///D:/distinct" + System.currentTimeMillis()); println(pairs.toDebugString) } }
1.1 输出的RDD依赖
(3) MappedRDD[3] at distinct at SparkRDDDisctinct.scala:14 [] | ShuffledRDD[2] at distinct at SparkRDDDisctinct.scala:14 [] +-(3) MappedRDD[1] at distinct at SparkRDDDisctinct.scala:14 [] | ParallelCollectionRDD[0] at parallelize at SparkRDDDisctinct.scala:13 []
1.2 作业结果
part-000000: 6 3
part-000001: 4 1 7
part-000002: 8 2
注意的是:结果并没有排序
2.distict的源代码
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) ///map得到元组的第一个元素
3.RDD依赖图
2. cogroup
2.1 示例代码
package spark.examples import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object SparkRDDCogroup { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkRDDCogroup").setMaster("local"); val sc = new SparkContext(conf); //第一个参数是集合,第二个参数是分区数 val rdd1 = sc.parallelize(List((1, 2), (2, 3), (3, 4), (2,10),(4, 5), (5, 6)), 3) val rdd2 = sc.parallelize(List((3, 6), (2, 8), (9,11)), 2); //cogroup操作的RDD的元素类型必须是K/V类型 val pairs = rdd1.cogroup(rdd2); pairs.saveAsTextFile("file:///D:/cogroup" + System.currentTimeMillis()); println(pairs.toDebugString) } }
2.2 RDD依赖关系
(3) MappedValuesRDD[3] at cogroup at SparkRDDCogroup.scala:17 [] | CoGroupedRDD[2] at cogroup at SparkRDDCogroup.scala:17 [] +-(3) ParallelCollectionRDD[0] at parallelize at SparkRDDCogroup.scala:13 [] +-(2) ParallelCollectionRDD[1] at parallelize at SparkRDDCogroup.scala:14 []
2.3 执行结果:
part-00000: (3,(CompactBuffer(4),CompactBuffer(6))) (9,(CompactBuffer(),CompactBuffer(11)))
part-00001: (4,(CompactBuffer(5),CompactBuffer())) (1,(CompactBuffer(2),CompactBuffer()))
part-00002: (5,(CompactBuffer(6),CompactBuffer())) (2,(CompactBuffer(3, 10),CompactBuffer(8)))
从结果中可以看到,
cogroup是对所有的Key进行聚合,不管这个Key在哪个RDD中出现,比如9,在rdd2中出现,那么也会出现在结果集中。
如果rdd中有两个Key一样的元素,比如(2,3),(2,10),那么跟rdd2的(2,8)聚合后得到什么结果?(2,(CompactBuffer(3, 10),CompactBuffer(8)))
2.4 RDD依赖图
cogroup函数的的源代码
/** * For each key k in `this` or `other1` or `other2` or `other3`, * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner) cg.mapValues { case Array(vs, w1s, w2s, w3s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W1]], w2s.asInstanceOf[Iterable[W2]], w3s.asInstanceOf[Iterable[W3]]) } }
可见,cogroup最多对四个RDD同时做cogroup操作。cogroup操作的含义是,对在四个RDD中的每个Key进行操作,Key对应的Value是,每个RDD中这个Key对应的Value的集合所构成的元组
相关推荐
结合代码详细描述RDD算子的执行流程,并配上执行流程图
Spark RDD 算子说明,分别讲述了Transformation和Action这两类的算子。
spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。
Spark对于大数据行业的实时处理数据来说,有着举足轻重的位置,特此学习整理了RDD 算子的各个含义,希望各位读者能够喜欢。谢谢
RDD:弹性分布式数据集(ResilientDistributed Dataset),是Spark对数据的...本节将通过示例的方式验证第二节中相关的转化操作和行动操作。 转化和行动计算结果 代码地址: 参考文献: 王道远 《Spark 快速大数据分析》
Spark思维导图之Spark RDD.png
(1) 创建第一个RDD (2) 使用reduce算子聚合元素 (2) 打印结果 (2) 统计个数 (2) 返回第一个元素 (2) 返回前n个元素 (2) 返回
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及...
1.map():每次处理一条数据 2.mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才 1. coalesc
spark RDD论文中文版
spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将spark的算子如何使用的例子给记录了下来,下面是spark RDD 的一些常用算子的使用这些算子包括有java的,也有scala的语言,由于...
Spark RDD 资料
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; /** * sample(withReplacement,fraction,seed) 算子 * 对RDD...
spark实验5 rdd编程2.doc
spark rdd相关操作详解;包括全部的操作说明和举例;
10 实战解析spark运行原理和RDD解密
RDD即弹性分布式数据集,有容错机制并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征。RDD只是数据集的抽象,分区内部并不会存储具体的数据。
大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc
内容根据spark rdd.scala和ParRDDFunctions.scala源码中rdd顺序整理,包含rdd功能解释。对熟悉spark rdd很有用
Spark分布式计算和RDD模型研究.docxSpark分布式计算和RDD模型研究.docx