`
bit1129
  • 浏览: 1051525 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark三十七】Spark Cache机制

 
阅读更多

今天状态很差,很困,无精打采。学到的Spark知识,没有连贯起来,很多知识点有印象但是很模糊,说不出个123来。本来今天要看看cache,checkpoint和broadcast,结果今天到现在为止已经是5点了,还没有任何的进展。开始硬着头皮把Spark的Cache机制搞一搞吧,发现,cache机制比想象中的难驾驭。

 



 

 调用reduceByKey对应的ShuffledRDD对应的cache

 

cache不起作用

 

package spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

object SparkWordCountCache {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local[3]")
    conf.set("spark.shuffle.manager", "hash"); ///hash是否有影响?
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("file:///D:/word.in.3");
    val rdd2 = rdd1.flatMap(_.split(" "))
    val rdd3 = rdd2.map((_, 1))
    val rdd4 = rdd3.reduceByKey(_ + _, 3);
    rdd4.cache();
    rdd4.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    val result = rdd4.collect; ///没有触发ShuffleMapTask执行,但是依然需要从ShuffleMapTask产生的结果拉取数据
    result.foreach(println(_));
    sc.stop
  }
}

 

以上代码调用rdd3.cache(),而rdd3是一个ShuffleMapRDD,也就是说,保存的是Stage2里面的RDD结果。此时调用cache.collect时,产生的Task都是ResultTask,也就是说,由于cache作用,最后一个Job并没有从前面从头计算?

感觉不对,即使不用cache,也应该不会从头计算吧

 

经验证,感觉是对的,将上面的代码做如下修改,结果一样,最后也不会调用ShuffleMapTask,但是在执行ResultTask时,还是会从MapTask的输出中拉取数据,所以并没有对Shuffle读过程进行简化。

 

    rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    val result = rdd3.collect;
    result.foreach(println(_));

 

上来就踩了个cache的坑!Spark是不支持ShuffleMapRDD的cache的,虽然上面不需要ShuffleMapTask,但是ResultTask运行时,依然需要从MapTask的结果中拉取数据

 

 

调用groupByKey对应的ShuffledRDD对应的cache

 

结果rdd.cache起作用了

 

package spark.examples

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkGroupByExample {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("/tmp/checkpoint/" + System.currentTimeMillis())

    val data = Array[(Int, Char)]((1, 'a'), (2, 'b'),
      (3, 'c'), (4, 'd'),
      (5, 'e'), (3, 'f'),
      (2, 'g'), (1, 'h')
    )
    val pairs = sc.parallelize(data)
    val rdd = pairs.groupByKey(2)
    rdd.cache
    rdd.count;
    rdd.collect.foreach(println(_));
  }
}

 

 

 

调用textFile对应的MappedRDD对应的cache操作

 

基本流程:假如在一个程序中有两个Job。第一个Job运行时,,对于调用了cache的RDD首先计算它的数据,然后写入cache。第二个job在运行时,会直接从cache中读取。

这对于迭代计算的Job,会非常适合,将上个任务的结果缓存,供第二个任务使用,然后依次类推

 

 

package spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

object SparkWordCountCache {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local")
    //Hash based Shuffle;
    conf.set("spark.shuffle.manager", "hash");
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("file:///D:/word.in.3");
    rdd1.cache() ///数据读取后即做cache,第一个job运行后,就会缓存
    val rdd2 = rdd1.flatMap(_.split(" "))
    val rdd3 = rdd2.map((_, 1))
    val result = rdd3.collect; ///打印rdd3的内容
    result.foreach(println(_));
    val rdd4 = rdd3.reduceByKey(_ + _); ///对rdd3做reduceByKey操作
    rdd4.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    sc.stop
  }
}

 

 

源代码基本流程:

 

  • 调用RDD的iterator方法,计算RDD的数据集合(得到的是一个可迭代的集合)
  • 在RDD的iterator方法中,检查RDD的storage level,如果设置了storage level,那么调用SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  • 在CacheManager的getOrCompute方法中,

           a.首先判断是否存在于cache中,如果存在则直接返回,

           b.如果不存在,则调用  val computedValues = rdd.computeOrReadCheckpoint(partition, context)进行计算。

           c.计算结束后,调用CacheManager自身的putInBlockManager将计算得到的数据缓存

           d. 数据放入BlockManager后,还需要更新这个RDD和BlockManager之间的对应关系,以便下次再计算这个RDD时,检查RDD数据是否已经缓存

 

主要源代码

 

 1. getOrCompute方法

 

/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
  def getOrCompute[T](
      rdd: RDD[T],
      partition: Partition,
      context: TaskContext,
      storageLevel: StorageLevel): Iterator[T] = {

    val key = RDDBlockId(rdd.id, partition.index) //RDD的id和partition的index构造RDDBlockId,一个RDD可以有多个partition
    logDebug(s"Looking for partition $key")
    blockManager.get(key) match { ///从blockManger中根据key查找,key最后会存入BlockManager么吗?BlockManager管理Spark的块信息
      case Some(blockResult) =>
        // Partition is already materialized, so just return its values
        context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
        new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

      case None =>
        // Acquire a lock for loading this partition
        // If another thread already holds the lock, wait for it to finish return its results
        val storedValues = acquireLockForPartition[T](key) ///根据Key获取缓存的数据,acquireLockForPartition名字起得不好
        if (storedValues.isDefined) { ///找到数据
          return new InterruptibleIterator[T](context, storedValues.get)
        }

        // Otherwise, we have to load the partition ourselves
        ///为找到缓存的数据,表明是job第一次运行
        try { 
          logInfo(s"Partition $key not found, computing it")
          val computedValues = rdd.computeOrReadCheckpoint(partition, context) ///计算RDD数据

          // If the task is running locally, do not persist the result
          if (context.isRunningLocally) { ///如果数据在本地,就不需要缓存了?
            return computedValues
          }

          // Otherwise, cache the values and keep track of any updates in block statuses
          ///缓存数据
          val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
          
          ///将数据存入BlockManager,注意四个参数
          val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
          
          ///这是什么意思?任务的metrics,任务的
          val metrics = context.taskMetrics
          val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
          metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
          new InterruptibleIterator(context, cachedValues)

        } finally {
          loading.synchronized {
            loading.remove(key)
            loading.notifyAll()
          }
        }
    }
  }

 

2. putInBlockManager方法

/**
   * Cache the values of a partition, keeping track of any updates in the storage statuses of
   * other blocks along the way.
   *
   * The effective storage level refers to the level that actually specifies BlockManager put
   * behavior, not the level originally specified by the user. This is mainly for forcing a
   * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
   * while preserving the the original semantics of the RDD as specified by the application.
   */
  private def putInBlockManager[T](
      key: BlockId,
      values: Iterator[T],
      level: StorageLevel,
      updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
      effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

    val putLevel = effectiveStorageLevel.getOrElse(level)
    if (!putLevel.useMemory) {
      /*
       * This RDD is not to be cached in memory, so we can just pass the computed values as an
       * iterator directly to the BlockManager rather than first fully unrolling it in memory.
       */
      updatedBlocks ++=
        blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
      blockManager.get(key) match {
        case Some(v) => v.data.asInstanceOf[Iterator[T]]
        case None =>
          logInfo(s"Failure to store $key")
          throw new BlockException(key, s"Block manager failed to return cached value for $key!")
      }
    } else {
      /*
       * This RDD is to be cached in memory. In this case we cannot pass the computed values
       * to the BlockManager as an iterator and expect to read it back later. This is because
       * we may end up dropping a partition from memory store before getting it back.
       *
       * In addition, we must be careful to not unroll the entire partition in memory at once.
       * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
       * single partition. Instead, we unroll the values cautiously, potentially aborting and
       * dropping the partition to disk if applicable.
       */
      blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
        case Left(arr) =>
          // We have successfully unrolled the entire partition, so cache it in memory
          updatedBlocks ++=
            blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
          arr.iterator.asInstanceOf[Iterator[T]]
        case Right(it) =>
          // There is not enough space to cache this partition in memory
          val returnValues = it.asInstanceOf[Iterator[T]]
          if (putLevel.useDisk) {
            logWarning(s"Persisting partition $key to disk instead.")
            val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
              useOffHeap = false, deserialized = false, putLevel.replication)
            putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
          } else {
            returnValues
          }
      }
    }
  }

 

 

  • 大小: 76.9 KB
分享到:
评论
1 楼 jchubby 2017-02-20  
关于第一个reduceByKey对应的cache,shuffle之后的rdd,spark会自动保存该结果,直到这个rdd短时间内不会再被使用或者被垃圾回收了,所以用不用cache,collect这个job的第一个map stage都会被skip

相关推荐

    七个pdf理解spark系列_6-CacheAndCheckpoint

    GitHub上某位大牛JerryLead对Spark的理解,大量图示,生动形象,总共7个pdf,看完对spark的原理,运行机制以及后续性能调优有很大的帮助,这是第六个pdf,讲述了cache、checkpoint的实现、使用等常见问题

    Apache Spark的设计与实现 PDF中文版

    本文档面向的是希望对 Spark 设计与实现机制,以及大数据分布式处理框架深入了解的 Geeks。 因为 Spark 社区很活跃,更新速度很快,本文档也会尽量保持同步,文档号的命名与 Spark 版本一致,只是多了一位,最后一...

    精通Spark内核

    精通Spark内核:此阶段聚焦于Spark内核的设计、实现和核心源码解析,对内核中的实现架构、运行原理、性能调优和核心源码各个击破: 1, 通过源码精通Spark内核实现和任务调度; 2,精通RDD、DAGScheduler、Task...

    Mastering Apache Spark 2.x - Second Edition

    You will understand how memory management and binary processing, cache-aware computation, and code generation are used to speed things up dramatically. The book extends to show how to incorporate H20,...

    并行计算框架Spark的自适应缓存管理策略

    并行计算框架Spark缺乏有效缓存选择机制,不能自动识别并缓存高重用度数据;缓存替换算法采用LRU,度量方法不够细致,影响任务的执行效率.本文提出一种Spark框架自适应缓存管理策略(Self-Adaptive Cache Management,SACM...

    Mastering.Apache.Spark.2.x

    You will understand how Memory Management and Binary Processing, Cache-aware Computation, and Code Generation are used to speed things up dramatically. The book goes on to show how to incorporate H20 ...

    large scale machine learning with spark

    By maintaining and streaming data, Spark can figure out when to cache data in-memory, 100x faster than Hadoop and Mahoot. This means data streaming and analytics can run and complete jobs a lot ...

    Large Scale Machine Learning with Spark

    Spark is capable of handling large-scale batch and streaming data to figure out when to cache data in memory and processing them up to 100 times faster than Hadoop-based MapReduce.This means ...

    spark-mong连接jar包

    spark-mong连接jar包 这个主要是spark 用JAVA语言连接mysql , mongodb 数据库的

    spark-atlas-connector-assembly-0.1.0-SNAPSHOT.jar

    但是Atlas官方并没有提供对Spark的支持,目前调研了业内的一些方案,是有一些第三方的插件可以支持Spark的,比如spark-atlas-connector-assembly-0.1.0-SNAPSHOT.jar,但是并没有支持cache语法,本包可实现atlas对于...

    《ApacheSpark设计与实现》.zip

    本文档面向的是希望对 Spark 设计与实现机制,以及大数据分布式处理框架深入了解的 Geeks。因为 Spark 社区很活跃,更新速度很快,本文档也会尽量保持同步,文档号的命名与 Spark 版本一致,只是多了一位,最后一位...

    Spark与Hadoop的结合

    本文来自于csdn,本文介绍了Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。Spark可以直接对HDFS进行数据的读写,同样支持SparkonYARN。Spark可以与MapReduce运行于同...RDD可以cache到内存

    SparkCube:SparkCube是一个开源项目,用于极快速的OLAP数据分析。 SparkCube是Apache Spark的扩展

    spark.sql.cache.tab.display 真的 要在特定应用程序(通常是Spark Thriftserver)中显示Web UI。 必需的 spark.sql.cache.useDatabase db1,db2,dbn 数据库名称列表,以逗号分隔。 仅将这些数据库中的表和视图...

    javashuffle源码-Spark:火花

    特意构造并实践了生产中的线程安全日期转换,IP映射城市,cache、foreachPartition、批量提交入库等性能提升代码。 5.sparkstreaming包:主要码了rdd算子之外的流式计算常用算子,累计计数UpdateStateByKey和...

    Spark中一种高效RDD自主缓存替换策略研究

    通过优化权重模型和改进替换策略,提出了一种高效RDD自主缓存替换策略(efficient RDD automatic cache,ERAC),包括高重用自主缓存算法和缓存替换分级算法,可实现高效RDD的自主缓存和缓存目标的分级替换。...

    Spark hadoop票据过期问题HDFS_DELEGATION_TOKEN

    Spark streaming应用运行7天之后,自动退出,日志显示token for xxx(用户名): HDFS_DELEGATION_TOKEN owner=xxxx@xxxx.com, renewer=yarn, realUser=, issueDate=1581323654722, maxDate=1581928454722, sequence...

    HadoopCon_2015_SparkSQL:HadoopCon 2015上的Spark SQL教程

    在本教程中,您将学习如何使用SQLContext(HiveContext)初始化Spark SQL,操作DataFrame,导入数据,用户定义的函数以及操作cache()。 例如, 滑梯 这是的链接 Python API Spark 1.4.1: from pyspark . sql ...

    Python学习笔记——大数据之SPARK核心

    本文来自于csdn,文章讲解RDD的特点,RDD操作函数相关,穿插案例辣酱得段子,带大家理解MapReduce,通过哈姆雷特单词分析案例进行深度剖析。...RDD可以cache到内存中,每次对RDD数据集的操作之后的

Global site tag (gtag.js) - Google Analytics