整理这个博客的原因有两个,
1.在Spark的mailing list有人问道,Spark面试的话,一般会问些什么,有个人回复时提到他面试时一般会问问如何做join
2.今天看了个博客,刚好讲到spark实现大数据join操作的两个算法,map-side join和reduce-side join,正好接此机会整理下
Map-Side Join
- Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。
- 在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。
- 在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式。使用MapReduce DistributedCache时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。
适用于一个数据集小,另一个数据集大的情况
package spark.examples.join import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object SparkMapsideJoin { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("SparkMapsideJoin") conf.setMaster("local[3]") conf.set("spark.shuffle.manager", "sort"); val sc = new SparkContext(conf) //val table1 = sc.textFile(args(1)) //val table2 = sc.textFile(args(2)) val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13")) val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23")) // table1 is smaller, so broadcast it as a map<String, String> val pairs = table1.map { x => val pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) }.collectAsMap val broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it // table2 join table1 in map side val result = table2.map { x => val pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) }.mapPartitions({ iter => val m = broadCastMap.value for { (key, value) <- iter if (m.contains(key)) } yield (key, (value, m.get(key).getOrElse(""))) }) val output = "d:/wordcount-" + System.currentTimeMillis() ; result.saveAsTextFile(output) //save result to local file or HDFS } }
Reduce Side Join
- 当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。
- Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE],
适用于两个join表数据量都很大的情况
package spark.examples.join import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object SparkReducesideJoin { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("SparkMapsideJoin") conf.setMaster("local[3]") conf.set("spark.shuffle.manager", "sort"); val sc = new SparkContext(conf) val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13")) val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23")) //table1 and table 2 are both very large val pairs1 = table1.map { x => val pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) } val pairs2 = table2.map { x => val pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) } val result = pairs1.join(pairs2) val output = "d:/wordcount-" + System.currentTimeMillis(); result.saveAsTextFile(output) //save result to local file or HDFS } }
参考:http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/
相关推荐
Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 ...Dstream join 操作和 RDD join 操作的区别 PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会
Hive on Spark EXPLAIN statement : 讲述了 Common Join / Map join / Bucket Map Join / Sorted Merge Bucket Map Join / skew join 在explain 中的 树结构 。In Hive, command EXPLAIN can be used to show the ...
Spark实现用户订单数据表连接,实现了用户信息表和订单信息表的内连接操作
数据可以有许多来源,如Kafka, Flume, Twitter,ZeroMQ或传统TCP套接字,可以使用复杂算法对其处理实现高层次的功能,如map,reduce,join和window。最后,经处理的数据可被输出到文件系统,数据库,和实时仪表盘。事实...
大数据Spark面试题汇总,共有79道面试题以及题目的解答 部分题目如下: 1. spark 的有几种部署模式,每种模式特点? 2. Spark 为什么比 mapreduce 快? 3. 简单说一下 hadoop 和 spark 的 shuffle 相同和差异? 5. ...
Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL、Spark Streaming、MLLib和GraphX等...
Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。 Transformations:转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一...
通过可靠的统计和精确的估算,我们能够在这些领域做出好的决定:选择散列连接(hash join)操作的正确构建端(build side),选择正确的连接算法(如broadcast hash join与 shuffled hash join), 调整连接的顺序...
Spark Skew Join 的原理及在 eBay 的优化.docx
Spark Streaming 是 Spark 核心 API 的一个扩展,可以实现高吞吐量的、具备容错机制的 实时流数据的处理。支持从多种数据源获取数据,包括 Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及 TCP sockets,从数据源获取...
Spark调优 直接join将会对所有数据进行shuffle,需要大量的io操作,相同的key会在同一个partition中进行处理,任务的并发度也收到了限制。使用broadcast将会把小表分发到每台执行节点上,因此,关联操作都在本地完成...
其次结合simi-join和partition join两者的优势,对过滤后的单侧表使用贪心算法进行拆分;最后对拆分后的子集进行连接,因此把两大表的连接过程转换为分阶段进行的两小表连接。代价分析和实验结果表明,该算法与现有...
Spark DataFrame使用详解,包括:DataFrame解析;创建;Action;条件查询和join详尽操作解释
第一部分 Spark学习 6 第1章 Spark介绍 7 1.1 Spark简介与发展 7 1.2 Spark特点 7 1.3 Spark与Hadoop集成 7 1.4 Spark组件 8 第2章 Spark弹性分布数据集 9 2.1 弹性分布式数据集 9 2.2 MapReduce数据分享效率低 9 ...
卡夫卡火花流与静态数据使用联接 使用join的Kafka Spark流与静态数据
针对大规模数据图下基于回溯法的子图查询算法的准确率低、开销大等问题,为提高查询准确率,降低大图下的查询开销,提出一种基于Spark的子图匹配(SQM)算法。首先根据结构信息过滤数据图,再将查询图分割成基本查询单元;...
SparkStreaming可以从很多数据源获取数据,比如:Kafka、Flume、Twitter、ZeroMQ、Kinesis或TCP连接等,并可以用很多高层算子(map/reduce/join/window等)来方便地处理这些数据。最后处理过的数据还可以推送到文件...
包括spara rdd api,dataframe action操作、查询操作、join操作,dataframe rdd dataset 相互转换以及spark sql。
https://blog.csdn.net/weixin_46122692/article/details/109034397 中sparkcore 项目用到的数据源文件 一张大表 记录时间戳、省份ID、城市ID、用户ID、广告ID 一张小表 记录省份ID、省份名称 两张表做Join 求TopN
如果要通过JDBC查询Cassandra数据,但要使用Spark SQL的功能进行数据处理,则需要此应用程序。 此应用程序(CSJB)是Spark应用程序,它将在Spark SQL中自动将所有Cassandra表注册为架构RDD,并启动嵌入式Apache ...