1. coalesce(联合,合并,接合,发音cola-les)
2. repartition
1.coalesce
1. 示例代码
package spark.examples import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object SparkRDDCoalesce { def main(args : Array[String]) { val conf = new SparkConf().setAppName("SparkRDDDistinct").setMaster("local"); val sc = new SparkContext(conf); val rdd1 = sc.parallelize(List(1,8,2,1,4,2,7,6,2,3,1,19,21, 66,74,22,21,72,78,102), 8) val pairs = rdd1.coalesce(3, true); pairs.saveAsTextFile("file:///D:/coalesce-0-" + System.currentTimeMillis()); val pairs2 = rdd1.coalesce(3, false); pairs2.saveAsTextFile("file:///D:/coalesce-1-" + System.currentTimeMillis()); println(pairs.toDebugString) } }
1.1 依赖关系
(3) MappedRDD[4] at coalesce at SparkRDDCoalesce.scala:12 [] | CoalescedRDD[3] at coalesce at SparkRDDCoalesce.scala:12 [] | ShuffledRDD[2] at coalesce at SparkRDDCoalesce.scala:12 [] +-(8) MapPartitionsRDD[1] at coalesce at SparkRDDCoalesce.scala:12 [] | ParallelCollectionRDD[0] at parallelize at SparkRDDCoalesce.scala:11 []
1.2 计算结果
1.2.1 shuffle为true
part-00000
4
7
6
1
21
21
78
part-00001
1
2
2
19
66
102
part-00002
8
1
2
3
74
22
72
1.2.2 shuffle为false
part-00000
1
8
2
1
4
part-00001
2
7
6
2
3
1
19
part-00002
21
66
74
22
21
72
78
102
2. RDD依赖图
3.源代码
/** * Return a new RDD that is reduced into `numPartitions` partitions. * * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. * * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * Note: With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => ///将items转换为(递增的Key,item)形式 // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 ///整数的hashCode为其本身?是的,参见Java的Integer#hashCode方法 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions).values } else { ///如果shuffle,则直接构造CoalescedRDD new CoalescedRDD(this, numPartitions) } }
2. repartition
/** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = { coalesce(numPartitions, shuffle = true) }
可见repartition使用了shuffle为true的coalesce,主要用于对partition进行扩容(扩大partition),如果是窄化partition,考虑使用coalesce以避免使用shuffle(言外之意,是使用shuffle为false版本的coalesce)
相关推荐
结合代码详细描述RDD算子的执行流程,并配上执行流程图
Spark RDD 算子说明,分别讲述了Transformation和Action这两类的算子。
spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。
Spark对于大数据行业的实时处理数据来说,有着举足轻重的位置,特此学习整理了RDD 算子的各个含义,希望各位读者能够喜欢。谢谢
当Spark对数据操作和转换时,会自动将RDD中的数据分发到集群,并将操作并行化执行。 Spark中的RDD是一个不可变的分布式对象集合。每个RDD都倍分为多个分区,这些分区运行在集群中的不同节点。RDD可以包含Python、...
Spark思维导图之Spark RDD.png
(1) 创建第一个RDD (2) 使用reduce算子聚合元素 (2) 打印结果 (2) 统计个数 (2) 返回第一个元素 (2) 返回前n个元素 (2) 返回
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及...
1.map():每次处理一条数据 2.mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才 1. coalesc
spark实验5 rdd编程2.doc
spark RDD论文中文版
spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将spark的算子如何使用的例子给记录了下来,下面是spark RDD 的一些常用算子的使用这些算子包括有java的,也有scala的语言,由于...
Spark RDD 资料
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; /** * sample(withReplacement,fraction,seed) 算子 * 对RDD...
spark rdd相关操作详解;包括全部的操作说明和举例;
10 实战解析spark运行原理和RDD解密
RDD即弹性分布式数据集,有容错机制并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征。RDD只是数据集的抽象,分区内部并不会存储具体的数据。
大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc
Spark分布式计算和RDD模型研究.docxSpark分布式计算和RDD模型研究.docx
Spark分布式计算和RDD模型研究.pdfSpark分布式计算和RDD模型研究.pdf