`
bit1129
  • 浏览: 1051160 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark】Spark九: 深入Spark RDD第二部分RDD依赖与运行时

 
阅读更多

在Spark中, RDD是有依赖关系的,这种依赖关系有两种类型

  • 窄依赖(Narrow Dependency)
  • 宽依赖(Wide Dependency)

以下图说明RDD的窄依赖和宽依赖



 

 

窄依赖

窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为

  • 一个父RDD的分区对应于一个子RDD的分区
  • 两个父RDD的分区对应于一个子RDD 的分区。

如上面的map,filter,union属于第一类窄依赖,而join with inputs co-partitioned(对输入进行协同划分的join操作)则为第二类窄依赖

 

如果有多个父RDD的分区对应于同一个子RDD的分区不能称之为窄依赖?

 

宽窄依赖与容错性(2015.1.5)

Spark基于lineage的容错性是值,如果一个RDD出错,那么可以从它的所有父RDD重新计算所得,如果一个RDD仅有一个父RDD(即窄依赖),那么这种重新计算的代价会非常小。

Spark基于Checkpoint(物化)的容错机制何解?在上图中,宽依赖得到的结果(经历过Shuffle过程)是很昂贵的,因此,Spark将此结果物化到磁盘上了,以备后面使用

 

宽依赖

宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作,上图中的groupByKey和对输入未协同划分的join操作就是宽依赖。

 

窄依赖细说

窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使 是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到 父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个 fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线 (pipeline)优化

 

Spark流水线优化:

 

 

 

 

 

宽依赖细说

变换算子序列一碰上shuffle类操作,宽依赖就发生了,流水线优化终止。在具体实现 中,DAGScheduler从当前算子往前回溯依赖图,一碰到宽依赖,就生成一个stage来容纳已遍历的算子序列。在这个stage里,可以安全地实施流水线优化。然后,又从那个宽依赖开始继续回溯,生成下一个stage。

 

分区划分规则与首选位置

要深究两个问题:一,分区如何划分;二,分区该放到集群内哪个节点。这正好对应于RDD结构中另外两个域:分区划分器(partitioner)和首选位置(preferred locations)。

 

分区划分

分区划分对于shuffle类操作很关键,它决定了该操作的父RDD和子RDD之间的依赖类型。上文提到,同一个join算子,如果协同划分的话,两个父 RDD之间、父RDD与子RDD之间能形成一致的分区安排,即同一个key保证被映射到同一个分区,这样就能形成窄依赖。反之,如果没有协同划分,导致宽依赖。

所谓协同划分,就是指定分区划分器以产生前后一致的分区安排。Pregel和HaLoop把这个作为系统内置的一部分;而Spark 默认提供两种划分器:HashPartitioner和RangePartitioner,允许程序通过partitionBy算子指定。注意,HashPartitioner能够发挥作用,要求key的hashCode是有效的,即同样内容的key产生同样的hashCode。这对 String是成立的,但对数组就不成立(因为数组的hashCode是由它的标识,而非内容,生成)。这种情况下,Spark允许用户自定义 ArrayHashPartitioner。

 

首选位置

第二个问题是分区放置的节点,这关乎数据本地性:本地性好,网络通信就少。有些RDD产生时就 有首选位置,如HadoopRDD分区的首选位置就是HDFS块所在的节点。有些RDD或分区被缓存了,那计算就应该送到缓存分区所在的节点进行。再不然,就回溯RDD的lineage一直找到具有首选位置属性的父RDD,并据此决定子RDD的放置。

宽/窄依赖的概念不止用在调度中,对容错也很有用。如果一个节点宕机了,而且运算是窄依赖,那只要把丢失的父RDD分区重算即可,跟其他节点没有依赖。而宽依赖需要父RDD的所有分区都存在, 重算就很昂贵了。所以如果使用checkpoint算子来做检查点,不仅要考虑lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加检查点是最物有所值的。

 

 

Spark中关于Dependency的源代码

 

 

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle

/**
 * :: DeveloperApi ::
 * Base class for dependencies.
 */
@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}


/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
 */
////这里是说,窄依赖是指子RDD的每个Partition只依赖于父RDD很少部分的的RDD,文档明显说的不对!窄依赖起码需要,父RDD的每个Partition只被一个子RDD的Partition依赖
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

//ShuffleDependency指的是,子RDD的partition部分依赖于父RDD的每个Partition
//部分依赖被称为 ShuffleDependency。其实 ShuffleDependency 跟 MapReduce 中 shuffle 的数据依赖相同(mapper 将其 output 进行 partition,然后每个 reducer 会将所有 mapper 输出中属于自己的 partition 通过 HTTP fetch 得到)。
/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 *
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
 *                   the default serializer, as specified by `spark.serializer` config option, will
 *                   be used.
 */
@DeveloperApi
class ShuffleDependency[K, V, C](
    @transient _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.size, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}


/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int) = List(partitionId)
}


/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int) = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

 

 ShuffleDependency的参数说明:

1.RDD数据集合中的元素需要是(K,V)类型,因为一般需要依据key进行shuffle,所以数据结构往往是key-value;同时Shuffle需要根据K做shuffle output的partition。

2.Partitioner,按照K进行分区的算法,比如HashPartitioner

3.Serializer,因为Shuffle过程需要有数据的网络传输,因此需要序列化,Serializer即是指定序列化算法

4.keyOrdering

5.aggregator,mapSideCombine用于map端的combine

6.shuffleId:每个shuffle操作都有一个唯一的ID

 

 

 

 

 

 

 

参考:http://blog.csdn.net/likika2012/article/details/38751393

  • 大小: 162.8 KB
  • 大小: 317.3 KB
分享到:
评论

相关推荐

    Spark机器学习视频第4课.SparkRDD原理剖析

    课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    Spark学习--RDD编码

    RDD:弹性分布式数据集(ResilientDistributed Dataset),是Spark对数据的...本节将通过示例的方式验证第二节中相关的转化操作和行动操作。 转化和行动计算结果 代码地址: 参考文献: 王道远 《Spark 快速大数据分析》

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    大数据spark学习之rdd概述

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及...

    spark RDD 论文 中文版

    spark RDD论文中文版

    spark RDD操作详解

    RDD即弹性分布式数据集,有容错机制并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征。RDD只是数据集的抽象,分区内部并不会存储具体的数据。

    spark rdd 操作详解

    spark rdd相关操作详解;包括全部的操作说明和举例;

    Spark-Core学习知识笔记整理

    第二章 Spark开发环境搭建 8 1 Spark运行模式 8 2 Spark环境搭建 8 2.1Scala的安装 8 2.2Spark的单节点配置 9 2.3Spark-Standalone集群配置 9 2.4Spark-on-Yarn模式配置 12 2.5Spark-on-Mesos模式配置 13 2.6Hive-on...

    Spark机器学习视频第2课.Spark2集群安装

    课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...

    Spark视频第5课_深入理解Spark RDD&Dataframe;

    机器学习(Machine Learning, ML)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识...

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    大数据实验报告Windows环境下安装Spark及RDD编程和Spark编程实现wordcount.doc

    Spark机器学习第1课.Spark介绍

    课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...

    Spark机器学习视频第10课.最终获取用户的收藏以及订单转换率

    课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...

    sparkStreaming实战学习资料

    Spark中的(弹性分布式数据集)简称RDD: Spark中的Transformation操作之Value数据类型的算子: Spark中的Transformation操作之Key-Value数据类型的算子: Spark中的Action操作: Transformation-&gt;map算子: ...

    spark-rdd-APi

    内容根据spark rdd.scala和ParRDDFunctions.scala源码中rdd顺序整理,包含rdd功能解释。对熟悉spark rdd很有用

    Spark1.4.1 RDD算子详解

    结合代码详细描述RDD算子的执行流程,并配上执行流程图

    spark实验5 rdd编程2.doc

    spark实验5 rdd编程2.doc

    spark: RDD与DataFrame之间的相互转换方法

    今天小编就为大家分享一篇spark: RDD与DataFrame之间的相互转换方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

    my presentation on RDD & Spark.pptx

    这是一个Apache Spark的演讲ppt,全都是英文的,制作时间是2020年的。包含Spart的最近状态,RDD和其生态。my presentation on RDD & Spark.pptx

Global site tag (gtag.js) - Google Analytics