Spark围绕着BlockManager构建了存储模块,包括RDD,Shuffle,Broadcast的存储都使用了BlockManager。而BlockManager在实现上是一个针对每个应用的Master/Executor结构,即Driver上BlockManager充当了Master角色,而各个Slave上(具体到应用范围,就是Executor)的BlockManager充当了Slave角色。
因此,BlockManager在Driver和应用的各个Executor之间各有一份,Driver上的BlockManager不具备实际存储的能力,它记录了各个Executor的BlockManager的状态(通过查看BlockManagerMaster和BlockManagerMasterActor的源代码,BlockManagerMaster和BlockManagerMasterActor并没有持有一个BlockManager对象,那么每个Executor BlockManager的状态存储在什么地方?通过查看BlockManager的类注释,发现BlockManager确实运行在Driver上)。Master BlockManager和ExecutorBlockManager之间的通信也是基于Akka,消息格式定义于BlockManagerMessages类中。
上面的描述并不准确,事实上在Driver端,同Executor一样,各有一个BlockManager。除此之外,Driver上还有一个BlockManager Master,它的实现类是BlockManagerManager,因此,对于BlockManager而言,Driver既是Master也是Slave
0.BlockManager类注释:
/** * Manager running on every node (driver and executors) which provides interfaces for putting and * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap). * * Note that #initialize() must be called before the BlockManager is usable. */ private[spark] class BlockManager(
1. Master/Slave的通信内容
Master BlockManager向Executor BlockManager可以发送的消息包括:
sealed trait ToBlockManagerSlave // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave // Remove all blocks belonging to a specific RDD. case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave // Remove all blocks belonging to a specific shuffle. case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave // Remove all blocks belonging to a specific broadcast. case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true) extends ToBlockManagerSlave
Executor BlockManager向Master BlockManager可以发送的消息包括:
sealed trait ToBlockManagerMaster case class RegisterBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, sender: ActorRef) extends ToBlockManagerMaster //获取某个Block在哪些Executor的BlockManager上 case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster //获取一组Block在哪些Executor的BlockManager上 case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster //删除Executor case class RemoveExecutor(execId: String) extends ToBlockManagerMaster case object StopBlockManagerMaster extends ToBlockManagerMaster case object GetMemoryStatus extends ToBlockManagerMaster case object GetStorageStatus extends ToBlockManagerMaster case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true) extends ToBlockManagerMaster case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true) extends ToBlockManagerMaster case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case object ExpireDeadHosts extends ToBlockManagerMaster //更新Block信息 case class UpdateBlockInfo( var blockManagerId: BlockManagerId, var blockId: BlockId, var storageLevel: StorageLevel, var memSize: Long, var diskSize: Long, var tachyonSize: Long) extends ToBlockManagerMaster with Externalizable { def this() = this(null, null, null, 0, 0, 0) // For deserialization only override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { blockManagerId.writeExternal(out) out.writeUTF(blockId.name) storageLevel.writeExternal(out) out.writeLong(memSize) out.writeLong(diskSize) out.writeLong(tachyonSize) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { blockManagerId = BlockManagerId(in) blockId = BlockId(in.readUTF()) storageLevel = StorageLevel(in) memSize = in.readLong() diskSize = in.readLong() tachyonSize = in.readLong() } }
2. BlockManagerMasterActor说明
/** * BlockManagerMasterActor is an actor on the master node to track statuses of * all slaves' block managers. */包含的数据结构:
// Mapping from block manager id to the block manager's information. ///BlockManagerId与BlockManagerInfo之间的对应,每个Executor对应一个BlockManagerId private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] // Mapping from executor ID to block manager ID. //Executor ID与BlockManagerID之间的对应关系 private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] // Mapping from block id to the set of block managers that have the block. //BlockId与包含这个Block的Location(由BlockManagerId表示) private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
BlockManagerId是一个由host,port和executorId表示的数据结构,从这里也可以看出来BlockManager是Executor范围的数据结构
3. BlockManagerSlaveActor说明
/** * An actor to take commands from the master to execute options. For example, * this is used to remove blocks from the slave's BlockManager. */
BlockManagerSlaveActor包含的数据结构都体现在构造函数中了,如下所示,BlockManagerSlaveActor包含了本Executor对应的BlockManager以及该Executor的MapOutputTracker用于记录Map Shuffle输出
private[storage] class BlockManagerSlaveActor( blockManager: BlockManager, mapOutputTracker: MapOutputTracker) extends Actor with ActorLogReceive with Logging
弄清楚BlockManager的通信机制,发现要比分析BlockManager的读写数据(依赖于DiskStore和MemoryStore实现,而DiskStore又依赖于DiskBlockManager实现)复杂一些,主要是头脑中没有清晰的picuture:关于BlockManager,Driver有什么,Executor上有什么,它们之间如何通信,这个继续分析吧。
相关推荐
BlockManager 是 spark 中至关重要的一个组件,在spark的运行过程中到处都有 BlockManager 的身影,只有搞清楚 BlockManager 的原理和机制,你才能更加深入的理解 spark。
spark-BlockManager向BlockManagerMaster注册
spark-blockmanager基础及源码彻底解析
Apache Spark源码剖析,Apache Spark源码剖析,Apache Spark源码剖析
Spark 快速大数据分析Spark 快速大数据分析Spark 快速大数据分析Spark 快速大数据分析Spark 快速大数据分析Spark 快速大数据分析
4、项目中采用完全还原企业大数据项目开发场景的方式来讲解,每一个业务模块的讲解都包括了需求分析、方案设计、数据设计、编码实现、功能测试、性能调优等环节,真实还原企业级大数据项目开发场景。 模块简介: 1、...
Spark的共享单车数据存储-Spark的共享单车数据存储系统-Spark的共享单车数据存储系统源码-Spark的共享单车数据存储管理系统-Spark的共享单车数据存储管理系统java代码-Spark的共享单车数据存储系统设计与实现-基于...
深入理解Sp深入理解SPARK:核心思想与源码分析》结合大量图和示例,对Spark的架构、部署模式和工作模块的设计理念、实现源码与使用技巧进行了深入的剖析与解读。 《深入理解SPARK:核心思想与源码分析》一书对Spark...
Spark的共享单车数据存储-Spark的共享单车数据存储系统-Spark的共享单车数据存储系统源码-Spark的共享单车数据存储管理系统-Spark的共享单车数据存储管理系统java代码-Spark的共享单车数据存储系统设计与实现-基于...
1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战....
Spark大型项目实战—基于spark电商用户可视化行为分析大数据平台开发实战.zip该平台以 Spark 框架为核心, 对电商网站的日志进行离线和实时分析。 该大数据分析平台对电商网站的各种用户行为( 访问行为、购物行为、...
开发环境:PyCharm + Python3.7 + Spark + Idea + Mysql + ...可以从中分析出热门电影类型数据占比、历年电影上线走势、电影高频词统计分析、电影评分等级分析、影评时间统计分析、上线电影数量较高年份五大模块。
《Apache Spark源码剖析》以Spark 1.02版本源码为切入点,着力于探寻Spark所要解决的主要问题及其解决办法,通过一系列精心设计的小实验来分析每一步背后的处理逻辑。 《Apache Spark源码剖析》第3~5章详细介绍了...
《深入理解SPARK:核心思想与源码分析》结合大量图和示例,对Spark的架构、部署模式和工作模块的设计理念、实现源码与使用技巧进行了深入的剖析与解读。, 《深入理解SPARK:核心思想与源码分析》一书对Spark1.2.0...
Spark机器学习模块源码解读 Spark机器学习模块源码解读
使用的数据集来自知名数据网站 Kaggle 的 tmdb-movie-metadata 电影数据集,以Python为编程语言,使用大数据框架Spark对数据进行了预处理,然后分别从多个方面对数据进行了分类和分析,并对分析结果进行可视化。...
课时4:SparkRDD原理剖析 课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 ...
ApacheSpark设计与实现.pdf+ApacheSpark源码剖析.pdf+Spark原著中文版.pdf
基于PySpark的统计分析,主要分为以下模块: ```angular2html 1.spark_core:spark的基本操作,统计、wordcount、TopN等,数据主要来自英文新闻网站和自己随机构造的数据 2.spark_mllib:针对spark mllib里面机器学习...