`
bit1129
  • 浏览: 1051626 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark六十四】Spark实现join功能

 
阅读更多

整理这个博客的原因有两个,

1.在Spark的mailing list有人问道,Spark面试的话,一般会问些什么,有个人回复时提到他面试时一般会问问如何做join

2.今天看了个博客,刚好讲到spark实现大数据join操作的两个算法,map-side join和reduce-side join,正好接此机会整理下

 

 

 

Map-Side Join

  • Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。
  • 在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。
  • 在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式。使用MapReduce DistributedCache时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。

适用于一个数据集小,另一个数据集大的情况

package spark.examples.join

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkMapsideJoin {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setAppName("SparkMapsideJoin")
    conf.setMaster("local[3]")
    conf.set("spark.shuffle.manager", "sort");
    val sc = new SparkContext(conf)

    //val table1 = sc.textFile(args(1))
    //val table2 = sc.textFile(args(2))

    val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13"))
    val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23"))
    // table1 is smaller, so broadcast it as a map<String, String>
    val pairs = table1.map { x =>
      val pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }.collectAsMap
    val broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it

    // table2 join table1 in map side
    val result = table2.map { x =>
      val pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }.mapPartitions({ iter =>
      val m = broadCastMap.value
      for {
        (key, value) <- iter
        if (m.contains(key))
      } yield (key, (value, m.get(key).getOrElse("")))
    })

    val output = "d:/wordcount-" + System.currentTimeMillis() ;
    result.saveAsTextFile(output) //save result to local file or HDFS
  }
}

 

Reduce Side Join

  • 当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。
  • Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE],

 

适用于两个join表数据量都很大的情况

package spark.examples.join

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

object SparkReducesideJoin {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setAppName("SparkMapsideJoin")
    conf.setMaster("local[3]")
    conf.set("spark.shuffle.manager", "sort");
    val sc = new SparkContext(conf)

    val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13"))
    val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23"))
    //table1 and table 2 are both very large
    val pairs1 = table1.map { x =>
      val pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }

    val pairs2 = table2.map { x =>
      val pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }
    val result = pairs1.join(pairs2)
    val output = "d:/wordcount-" + System.currentTimeMillis();
    result.saveAsTextFile(output) //save result to local file or HDFS
  }
}

 

 

参考:http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/

分享到:
评论

相关推荐

    Spark-2.3.1源码解读

    Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 ...Dstream join 操作和 RDD join 操作的区别 PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会

    Hive on Spark EXPLAIN statement

    Hive on Spark EXPLAIN statement : 讲述了 Common Join / Map join / Bucket Map Join / Sorted Merge Bucket Map Join / skew join 在explain 中的 树结构 。In Hive, command EXPLAIN can be used to show the ...

    Spark实现用户订单数据表连接

    Spark实现用户订单数据表连接,实现了用户信息表和订单信息表的内连接操作

    Spark实时流处理编程指南

    数据可以有许多来源,如Kafka, Flume, Twitter,ZeroMQ或传统TCP套接字,可以使用复杂算法对其处理实现高层次的功能,如map,reduce,join和window。最后,经处理的数据可被输出到文件系统,数据库,和实时仪表盘。事实...

    大数据Spark面试题汇总

    大数据Spark面试题汇总,共有79道面试题以及题目的解答 部分题目如下: 1. spark 的有几种部署模式,每种模式特点? 2. Spark 为什么比 mapreduce 快? 3. 简单说一下 hadoop 和 spark 的 shuffle 相同和差异? 5. ...

    大数据spark交流SPARK 技术交流

    Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL、Spark Streaming、MLLib和GraphX等...

    spark-2.2.2安装流程

    Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。 Transformations:转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一...

    基于成本的Spark SQL优化器框架

    通过可靠的统计和精确的估算,我们能够在这些领域做出好的决定:选择散列连接(hash join)操作的正确构建端(build side),选择正确的连接算法(如broadcast hash join与 shuffled hash join), 调整连接的顺序...

    Spark Skew Join 的原理及在 eBay 的优化.docx

    Spark Skew Join 的原理及在 eBay 的优化.docx

    SparkStreaming原理介绍

    Spark Streaming 是 Spark 核心 API 的一个扩展,可以实现高吞吐量的、具备容错机制的 实时流数据的处理。支持从多种数据源获取数据,包括 Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及 TCP sockets,从数据源获取...

    Spark调优.webp

    Spark调优 直接join将会对所有数据进行shuffle,需要大量的io操作,相同的key会在同一个partition中进行处理,任务的并发度也收到了限制。使用broadcast将会把小表分发到每台执行节点上,因此,关联操作都在本地完成...

    论文研究-基于Spark的两表等值连接过程优化.pdf

    其次结合simi-join和partition join两者的优势,对过滤后的单侧表使用贪心算法进行拆分;最后对拆分后的子集进行连接,因此把两大表的连接过程转换为分阶段进行的两小表连接。代价分析和实验结果表明,该算法与现有...

    Spark DataFrame详解.zip

    Spark DataFrame使用详解,包括:DataFrame解析;创建;Action;条件查询和join详尽操作解释

    大数据学习笔记

    第一部分 Spark学习 6 第1章 Spark介绍 7 1.1 Spark简介与发展 7 1.2 Spark特点 7 1.3 Spark与Hadoop集成 7 1.4 Spark组件 8 第2章 Spark弹性分布数据集 9 2.1 弹性分布式数据集 9 2.2 MapReduce数据分享效率低 9 ...

    Kafka-Spark-stream-with-static-data-using-join:使用join的Kafka Spark流与静态数据

    卡夫卡火花流与静态数据使用联接 使用join的Kafka Spark流与静态数据

    基于Spark的大规模单图上的子图匹配算法.pdf

    针对大规模数据图下基于回溯法的子图查询算法的准确率低、开销大等问题,为提高查询准确率,降低大图下的查询开销,提出一种基于Spark的子图匹配(SQM)算法。首先根据结构信息过滤数据图,再将查询图分割成基本查询单元;...

    spark流数据处理:SparkStreaming的使用

    SparkStreaming可以从很多数据源获取数据,比如:Kafka、Flume、Twitter、ZeroMQ、Kinesis或TCP连接等,并可以用很多高层算子(map/reduce/join/window等)来方便地处理这些数据。最后处理过的数据还可以推送到文件...

    spark rdd api dataframe 以及dataframe rdd dataset 相互转换 spark sql

    包括spara rdd api,dataframe action操作、查询操作、join操作,dataframe rdd dataset 相互转换以及spark sql。

    spark core项目所用到的数据源文件 agent1.log

    https://blog.csdn.net/weixin_46122692/article/details/109034397 中sparkcore 项目用到的数据源文件 一张大表 记录时间戳、省份ID、城市ID、用户ID、广告ID 一张小表 记录省份ID、省份名称 两张表做Join 求TopN

    cassandra-spark-jdbc-bridge:如果要通过JDBC查询Cassandra数据,但想使用Spark SQL的强大功能进行数据处理,则需要此应用程序

    如果要通过JDBC查询Cassandra数据,但要使用Spark SQL的功能进行数据处理,则需要此应用程序。 此应用程序(CSJB)是Spark应用程序,它将在Spark SQL中自动将所有Cassandra表注册为架构RDD,并启动嵌入式Apache ...

Global site tag (gtag.js) - Google Analytics