7.6 Shuffle与Storage模块间的交互
在Spark中,存储模块被抽象成Storage。顾名思义,Storage是存储的意思,代表着Spark中的数据存储系统,负责管理和实现数据块(Block)的存放。其中存取数据的最小单元是Block,数据由不同的Block组成,所有操作都是以Block为单位进行的。从本质上讲,RDD中的Partition和Storage中的Block是等价的,只是所处的模块不同,看待的角度不一样而已。
Storage抽象模块的实现分为两个层次,如图7-13所示。
(1)通信层:通信层是典型的Master-Slave结构,Master和Slave之间传输控制和状态信息。通信层主要由BlockManager、BlockManagerMaster、BlockManagerMasterEndpoint、BlockManagerSlaveEndpoint等类实现。
(2)存储层:负责把数据存储到内存、磁盘或者堆外内存中,有时还需要为数据在远程节点上生成副本,这些都由存储层提供的接口实现。Spark 2.2.0具体的存储层的实现类有DiskStore和MemoryStore。
图7-13 Storage存储模块
Shuffle模块若要和Storage模块进行交互,需要通过调用统一的操作类BlockManager来完成。如果把整个存储模块看成一个黑盒,BlockManager就是黑盒上留出的一个供外部调用的接口。
7.6.1 Shuffle注册的交互
Spark中BlockManager在Driver端的创建,在SparkContext创建的时候会根据具体的配置创建SparkEnv对象。
Spark 2.2.1版本的SparkContext.scala的源码如下:
Spark 2.4.3版本的SparkContext.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第9行在SparkContext.numDriverCores新增一个参数conf。
createSparkEnv方法中,传入SparkConf配置对象、isLocal标志,以及LiveListenerBus,方法中使用SparkEnv对象的createDriverEnv方法创建SparkEnv并返回。在SparkEnv的createDriverEvn方法中,将会创建BlockManager、BlockManagerMaster等对象,完成Storage在Driver端的部署。
SparkEnv中创建BlockManager、BlockManagerMaster关键源码如下:
使用new关键字实例化出BlockManagerMaster,传入BlockManager的构造函数,实例化出BlockManager对象。这里的BlockManagerMaster和BlockManager属于聚合关系。BlockManager主要对外提供统一的访问接口,BlockManagerMaster主要对内提供各节点之间的指令通信服务。
构建BlockManager时,传入shuffleManager参数,shuffleManager是在SparkEnv中创建的,将shuffleManager传入到BlockManager中,BlockManager就拥有shuffleManager的成员变量,从而可以调用shuffleManager的相关方法。
BlockManagerMaster在Driver端和Executors中的创建稍有差别。首先来看在Driver端创建的情形。创建BlockManagerMaster传入的isDriver参数,isDriver为true,表示在Driver端创建,否则视为在Slave节点上创建。
当SparkContext中执行_env.blockManager.initialize(_applicationId)代码时,会调用Driver端BlockManager的initialize方法。Initialize方法的源码如下所示。
SparkContext.scala的源码如下:
1. _env.blockManager.initialize(_applicationId)
BlockManager.scala的源码如下:
如上面的源码所示,initialize方法使用appId初始化BlockManager,主要完成以下工作。
(1)初始化BlockTransferService。
(2)初始化ShuffleClient。
(3)创建BlockManagerId。
(4)将BlockManager注册到BlockManagerMaster上。
(5)若ShuffleService可用,则注册ShuffleService。
在BlockManager的initialize方法上右击Find Usages,可以看到initialize方法在两个地方得到调用:一个是SparkContext;另一个是Executor。启动Executor时,会调用BlockManager的initialize方法。Executor中调用initialize方法的源码如下所示。
Spark 2.2.1版本的Executor.scala的源码如下:
Spark 2.4.3版本的Executor.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第5行和第7行互换一下顺序。
上段代码中第7行后新增代码,向度量系统进行env.blockManager.shuffleMetricsSource的注册。
上面代码中调用了env.blockManager.initialize方法。在initialize方法中,完成BlockManger向Master端BlockManagerMaster的注册。使用方法master.registerBlockManager(id,maxMemory,slaveEndpoint)完成注册,registerBlockManager方法中传入Id、maxMemory、salveEndPoint引用,分别表示Executor中的BlockManager、最大内存、BlockManager中的BlockMangarSlaveEndpoint。BlockManagerSlaveEndpoint是一个RPC端点,使用它完成同BlockManagerMaster的通信。BlockManager收到注册请求后将Executor中注册的BlockManagerInfo存入哈希表中,以便通过BlockManagerSlaveEndpoint向Executor发送控制命令。
ShuffleManager是一个用于shuffle系统的可插拔接口。在Driver端SparkEnv中创建ShuffleManager,在每个Executor上也会创建。基于spark.shuffle.manager进行设置。Driver使用ShuffleManager注册到shuffles系统,Executors(或Driver在本地运行的任务)可以请求读取和写入数据。这将被SparkEnv的SparkConf和isDriver布尔值作为参数。
ShuffleManager.scala的源码如下:
Spark Shuffle Pluggable框架ShuffleBlockManager在Spark 1.6.0之后改成了ShuffleBlockResolver。ShuffleBlockResolver具体读取shuffle数据,是一个trait。在ShuffleBlockResolver中已无getBytes方法。getBlockData(blockId: ShuffleBlockId)方法返回的是ManagedBuffer,这是核心。
ShuffleBlockResolver的源码如下:
Spark 2.0版本中通过IndexShuffleBlockResolver来具体实现ShuffleBlockResolver(SortBasedShuffle方式),已无FileShuffleBlockManager(Hashshuffle方式)。IndexShuffle-BlockResolver创建和维护逻辑块和物理文件位置之间的shuffle blocks映射关系。来自于相同map task任务的shuffle blocks数据存储在单个合并数据文件中;数据文件中的数据块的偏移量存储在单独的索引文件中。将shuffleBlockId + reduce ID set to 0 + ".后缀"作为数据shuffle data的shuffleBlockId名字。其中,文件名后缀为".data"的是数据文件;文件名后缀为".index"的是索引文件。
7.6.2 Shuffle写数据的交互
基于Sort的Shuffle实现的ShuffleHandle包含BypassMergeSortShuffleHandle与BaseShuffleHandle。两种ShuffleHandle写数据的方法可以参考SortShuffleManager类的getWriter方法,关键代码如下所示。
SortShuffleManager的getWriter的源码如下:
在对应构建的两种数据写入器类BypassMergeSortShuffleWriter与SortShuffleWriter中,都是通过变量shuffleBlockResolver对逻辑数据块与物理数据块的映射进行解析。BypassMergeSortShuffleWriter写数据的具体实现位于实现的write方法中,其中调用的createTempShuffleBlock方法描述了各个分区所生成的中间临时文件的格式与对应的BlockId。SortShuffleWriter写数据的具体实现位于实现的write方法中。
7.6.3 Shuffle读数据的交互
SparkEnv.get.shuffleManager.getReader是SortShuffleManager的getReader,是获取数据的阅读器,getReader方法中创建了一个BlockStoreShuffleReader实例。SortShuffleManager.scala的read()方法的源码如下:
BlockStoreShuffleReader实例的read方法,首先实例化new ShuffleBlockFetcherIterator。ShuffleBlockFetcherIterator是一个阅读器,里面有一个成员blockManager。blockManager是内存和磁盘上数据读写的统一管理器;ShuffleBlockFetcherIterator.scala的initialize方法中splitLocalRemoteBlocks()划分本地和远程的blocks,Utils.randomize(remoteRequests)把远程请求通过随机的方式添加到队列中,fetchUpToMaxBytes()发送远程请求获取我们的blocks,fetchLocalBlocks()获取本地的blocks。
7.6.4 BlockManager架构原理、运行流程图和源码解密
BlockManager是管理整个Spark运行时数据的读写,包含数据存储本身,在数据存储的基础上进行数据读写。由于Spark是分布式的,所以BlockManager也是分布式的,BlockManager本身相对而言是一个比较大的模块,Spark中有非常多的模块:调度模块、资源管理模块等。BlockManager是另外一个非常重要的模块。BlockManager本身的源码量非常大。本节从BlockManager原理流程对BlockManager做深刻地讲解。在Shuffle读写数据的时候,我们需要读写BlockManager。因此,BlockManager是至关重要的内容。
编写一个业务代码WordCount.scala,通过观察WordCount运行时BlockManager的日志来理解BlockManager的运行。
WordCount.scala的代码如下:
在IDEA中运行一个业务程序WordCount.scala,日志中显示以下内容。
SparkEnv:Registering MapOutputTracker,其中MapOutputTracker中数据的读写都和BlockManager关联。
SparkEnv:Registering BlockManagerMaste,其中Registering BlockManagerMaster由BlockManagerMaster进行注册。
DiskBlockManager:Created local directory C:\Users\dell\AppData\Local\Temp\blockmgr-...其中DiskBlockManager是管理磁盘存储的,里面有我们的数据。可以访问Temp目录下以blockmgr-开头的文件的内容。
WordCount运行结果如下:
从Application启动的角度观察BlockManager。
(1)Application启动时会在SparkEnv中注册BlockManagerMaster以及MapOutputTracker,其中:
① BlockManagerMaster:对整个集群的Block数据进行管理。
② MapOutputTrackerMaster:跟踪所有的Mapper的输出。
BlockManagerMaster中有一个引用driverEndpoint,isDriver判断是否运行在Driver上。
BlockManagerMaster的源码如下:
BlockManagerMaster注册给SparkEnv,SparkEnv在SparkContext中。
SparkContext.scala的源码如下:
在SparkContext.scala的createSparkEnv方法中调用SparkEnv.createDriverEnv方法。
进入SparkEnv.scala的createDriverEnv方法。
Spark 2.2.1版本的SparkEnv.scala的源码如下:
Spark 2.4.3版本的SparkEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第8行将port调整为Option(port)。
SparkEnv.scala的createDriverEnv中调用了create方法,判断是否是Driver。create方法的源码如下。
Spark 2.2.1版本的SparkEnv.scala的源码如下:
Spark 2.4.3版本的SparkEnv.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第6行将port: Int调整为port: Option[Int]。
在SparkEnv.scala的createDriverEnv中调用new()函数创建一个MapOutputTrackerMaster。MapOutputTrackerMaster的源码如下。
然后看一下blockManagerMaster。在SparkEnv.scala中调用new()函数创建一个blockManagerMaster。
BlockManagerMaster对整个集群的Block数据进行管理,Block是Spark数据管理的单位,与数据存储没有关系,数据可能存在磁盘上,也可能存储在内存中,还可能存储在offline,如Alluxio上,源码如下:
构建BlockManagerMaster的时候调用new()函数创建一个BlockManagerMasterEndpoint,这是循环消息体。
(2)BlockManagerMasterEndpoint本身是一个消息体,会负责通过远程消息通信的方式去管理所有节点的BlockManager。
查看WordCount在IDEA中的运行日志,日志中显示BlockManagerMasterEndpoint:Registering block manager,向block manager进行注册。
(3)每启动一个ExecutorBackend,都会实例化BlockManager,并通过远程通信的方式注册给BlockManagerMaster;实质上是Executor中的BlockManager在启动的时候注册给了Driver上的BlockManagerMasterEndpoint。
(4)MemoryStore是BlockManager中专门负责内存数据存储和读写的类。
查看WordCount在IDEA中的运行日志,日志中显示MemoryStore: Block broadcast_0 stored as values in memory,数据存储在内存中。
Spark读写数据是以block为单位的,MemoryStore将block数据存储在内存中。MemoryStore.scala的源码如下:
(5)DiskStore是BlockManager中专门负责基于磁盘的数据存储和读写的类。
DiskStore.scala的源码如下:
(6)DiskBlockManager:管理Logical Block与Disk上的Physical Block之间的映射关系并负责磁盘文件的创建、读写等。
查看WordCount在IDEA中的运行日志,日志中显示INFO DiskBlockManager: Created local directory。DiskBlockManager负责磁盘文件的管理。
DiskBlockManager负责管理逻辑级别和物理级别的映射关系,根据BlockID映射一个文件。在目录spark.local.dir或者SPARK_LOCAL_DIRS中,Block文件进行hash生成。通过createLocalDirs生成本地目录。DiskBlockManager的源码如下:
从Job运行的角度来观察BlockManager:
查看WordCount.scala的运行日志:日志中显示INFO BlockManagerInfo: Added broadcast_0_piece0 in memory,将BlockManagerInfo的广播变量加入到内存中。
Driver使用BlockManagerInfo管理ExecutorBackend中BlockManager的元数据,BlockManagerInfo的成员变量包括blockManagerId、系统当前时间timeMs、最大堆内内存maxOnHeapMem、最大堆外内存maxOffHeapMem、slaveEndpoint。
BlockManagerMasterEndpoint.scala的源码如下:
集群中每启动一个节点,就创建一个BlockManager,BlockManager是在每个节点(Driver及Executors)上运行的管理器,用于存放和检索本地和远程不同的存储块(内存、磁盘和堆外内存)。BlockManagerInfo中的BlockManagerId标明是哪个BlockManager,slaveEndpoint是消息循环体,用于消息通信。
(1)首先通过MemoryStore存储广播变量。
(2)在Driver中是通过BlockManagerInfo来管理集群中每个ExecutorBackend中的BlockManager中的元数据信息的。
(3)当改变了具体的ExecutorBackend上的Block信息后,就必须发消息给Driver中的BlockManagerMaster来更新相应的BlockManagerInfo。
(4)当执行第二个Stage的时候,第二个Stage会向Driver中的MapOutputTracker-MasterEndpoint发消息请求上一个Stage中相应的输出,此时MapOutputTrackerMaster会把上一个Stage的输出数据的元数据信息发送给当前请求的Stage。图7-14是BlockManager工作原理和运行机制简图。
图7-14 BlockManager工作原理和运行机制简图
BlockManagerMasterEndpoint.scala中BlockManagerInfo的getStatus方法如下:
其中的BlockStatus是一个case class。
BlockTransferService.scala进行网络连接操作,获取远程数据。
7.6.5 BlockManager解密进阶
本节讲解BlockManager初始化和注册解密、BlockManagerMaster工作解密、BlockTransferService解密、本地数据读写解密、远程数据读写解密。
BlockManager既可以运行在Driver上,也可以运行在Executor上。在Driver上的BlockManager管理集群中Executor的所有的BlockManager,BlockManager分成Master、Slave结构,一切的调度、一切的工作由Master触发,Executor在启动的时候一定会启动BlockManager。BlockManager主要提供了读和写数据的接口,可以从本地读写数据,也可以从远程读写数据。读写数据可以基于磁盘,也可以基于内存以及OffHeap。OffHeap就是堆外空间(如Alluxio是分布式内存管理系统,与基于内存计算的Spark系统形成天衣无缝的组合,在大数据领域中,Spark+Alluxio+Kafka是非常有用的组合)。
从整个程序运行的角度看,Driver也是Executor的一种,BlockManager可以运行在Driver上,也可以运行在Executor上。BlockManager.scala的源码如下:
BlockManager中的成员变量中:BlockManagerMaster对整个集群的BlockManagerMaster进行管理;serializerManager是默认的序列化器;MemoryManager是内存管理;MapOutputTracker是Shuffle输出的时候,要记录ShuffleMapTask输出的位置,以供下一个Stage使用,因此需要进行记录。BlockTransferService是进行网络操作的,如果要连同另外一个BlockManager进行数据读写操作,就需要BlockTransferService。Block是Spark运行时数据的最小抽象单位,可能放入内存中,也可能放入磁盘中,还可能放在Alluxio上。
SecurityManager是安全管理;numUsableCores是可用的Cores。
BlockManager中DiskBlockManager管理磁盘的读写,创建并维护磁盘上逻辑块和物理块之间的逻辑映射位置。一个block被映射到根据BlockId生成的一个文件,块文件哈希列在目录spark.local.dir中(如果设置了SPARK LOCAL DIRS),或在目录(SPARK LOCAL DIRS)中。
然后在BlockManager中创建一个缓存池:block-manager-future以及memoryStore、diskStore。
Shuffle读写数据的时候是通过BlockManager进行管理的。
Spark 2.2.1版本的BlockManager.scala的源码如下:
Spark 2.4.3版本的BlockManager.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第12行新增加一个参数。conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)。
BlockManager.scala中,BlockManager实例对象通过调用initialize方法才能正式工作,传入参数是appId,基于应用程序的ID初始化BlockManager。initialize不是在构造器的时候被使用,因为BlockManager实例化的时候还不知道应用程序的ID,应用程序ID是应用程序启动时,ExecutorBackend向Master注册时候获得的。
BlockManager.scala的initialize方法中的BlockTransferService进行网络通信。ShuffleClient是BlockManagerWorker每次启动时向BlockManagerMaster注册。BlockManager.scala的initialize方法中调用了registerBlockManager,向Master进行注册,告诉BlockManagerMaster把自己注册进去。
BlockManagerMaster.scala的registerBlockManager的源码如下:
registerBlockManager方法的RegisterBlockManager是一个case class。
BlockManagerMessages.scala的源码如下:
在Executor实例化的时候,要初始化blockManager。blockManager在initialize中将应用程序ID传进去。
Executor.scala中,Executor每隔10s向Master发送心跳消息,如收不到心跳消息,blockManager须重新注册。
Spark 2.1.2版本的Executor.scala的源码如下:
Spark 2.4.3版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第5行RpcTimeout调整为以下代码:
回到BlockManagerMaster.scala的registerBlockManager:
registerBlockManager中RegisterBlockManager传入的slaveEndpoint是:具体的Executor启动时会启动一个BlockManagerSlaveEndpoint,会接收BlockManagerMaster发过来的指令。在initialize方法中通过master.registerBlockManager传入slaveEndpoint,而slaveEndpoint是在rpcEnv.setupEndpoint方法中调用new()函数创建的BlockManagerSlaveEndpoint。
总结如下。
(1)当Executor实例化的时候,会通过BlockManager.initialize来实例化Executor上的BlockManager,并且创建BlockManagerSlaveEndpoint这个消息循环体来接受Driver中BlockManagerMaster发过来的指令,如删除Block等。
1. env.blockManager.initialize(conf.getAppId)
BlockManagerSlaveEndpoint.scala的源码如下:
(2)当BlockManagerSlaveEndpoint实例化后,Executor上的BlockManager需要向Driver上的BlockManagerMasterEndpoint注册。
BlockManagerMaster的registerBlockManager方法,其中的driverEndpoint是构建BlockManagerMaster时传进去的。
(3)BlockManagerMasterEndpoint接收到Executor上的注册信息并进行处理。
BlockManagerMasterEndpoint.scala的源码如下:
BlockManagerMasterEndpoint的register注册方法,为每个Executor的BlockManager生成对应的BlockManagerInfo。BlockManagerInfo是一个HashMap[BlockManagerId, BlockManagerInfo]。
BlockManagerMasterEndpoint.scala的register注册方法源码如下:
BlockManagerMasterEndpoint中,BlockManagerId是一个class,标明了BlockManager在哪个Executor中,以及host主机名、port端口等信息。
BlockManagerId.scala的源码如下:
BlockManagerMasterEndpoint中,BlockManagerInfo包含内存、slaveEndpoint等信息。
回到BlockManagerMasterEndpoint的register注册方法:如果blockManagerInfo没有包含BlockManagerId,根据BlockManagerId.executorId查询BlockManagerId,如果匹配到旧的BlockManagerId,就进行清理。
BlockManagerMasterEndpoint的removeExecutor方法如下:
进入removeBlockManager方法,从blockManagerIdByExecutor数据结构中清理掉block manager信息,从blockManagerInfo数据结构中清理掉所有的blocks信息。removeBlockManager源码如下。
BlockManagerMasterEndpoint.scala的removeBlockManager的源码如下:
removeBlockManager中的一行代码blockLocations.remove的remove方法如下。
HashMap.java的源码如下:
回到BlockManagerMasterEndpoint的register注册方法:然后在blockManagerIdByExecutor中加入BlockManagerId,将BlockManagerId加入BlockManagerInfo信息,在listenerBus中进行监听,函数返回BlockManagerId,完成注册。
回到BlockManager.scala,在initialize方法通过master.registerBlockManager注册成功以后,将返回值赋值给idFromMaster。Initialize初始化之后,看一下BlockManager.scala中其他的方法。
reportAllBlocks方法:具体的Executor须向Driver不断地汇报自己的状态。
BlockManager.scala的reportAllBlocks方法的源码如下:
reportAllBlocks方法中调用了getCurrentBlockStatus,包括内存、磁盘等信息。
getCurrentBlockStatus的源码如下:
getCurrentBlockStatus方法中的BlockStatus,包含存储级别StorageLevel、内存大小、磁盘大小等信息。
BlockManagerMasterEndpoint.scala的BlockStatus的源码如下:
回到BlockManager.scala,其中的getLocationBlockIds方法比较重要,根据BlockId获取这个BlockId所在的BlockManager。
BlockManager.scala的getLocationBlockIds的源码如下:
getLocationBlockIds方法中根据BlockId通过master.getLocations向Master获取位置信息,因为master管理所有的位置信息。getLocations方法里的driverEndpoint是BlockManagerMasterEndpoint,Executor向BlockManagerMasterEndpoint发送GetLocationsMultipleBlockIds消息。
BlockManagerMaster.scala的getLocations方法的源码如下:
getLocations中的GetLocationsMultipleBlockIds是一个case class。
在BlockManagerMasterEndpoint侧接收GetLocationsMultipleBlockIds消息。
BlockManagerMasterEndpoint.scala的receiveAndReply方法如下:
进入getLocationsMultipleBlockIds方法,进行map操作,开始查询位置信息。
进入getLocations方法,首先判断内存缓存结构blockLocations中是否包含blockId,如果已包含,就获取位置信息,否则返回空的信息。
其中,blockLocations是一个重要的数据结构,是一个JHashMap。Key是BlockId。Value是一个HashSet[BlockManagerId],使用HashSet。因为每个BlockId在磁盘上有副本,不同机器的位置不一样,而且不同副本对应的BlockManagerId不一样,位于不同的机器上,所以使用HashSet数据结构。
BlockManagerMasterEndpoint.scala的blockLocations的源码如下:
回到BlockManager.scala,getLocalValues是一个重要的方法,从blockInfoManager中获取本地数据。
首先根据blockId从blockInfoManager中获取BlockInfo信息。
从BlockInfo信息获取level级别,根据level.useMemory && memoryStore.contains(blockId)判断是否在内存中,如果在内存中,就从memoryStore中获取数据。
根据level.useDisk && diskStore.contains(blockId)判断是否在磁盘中,如果在磁盘中,就从diskStore中获取数据。
BlockManager.scala的getLocalValues方法的源码如下:
回到BlockManager.scala,getRemoteValues方法从远程的BlockManager中获取block数据,在JVM中不需要去获取锁。
BlockManager.scala的getRemoteValues方法的源码如下:
getRemoteValues方法中调用getRemoteBytes,获取远程的数据,如果获取的失败次数超过最大的获取次数(locations.size),就提示失败,返回空值;如果获取到远程数据,就返回。
getRemoteBytes方法调用blockTransferService.fetchBlockSync方法实现远程获取数据。
Spark 2.2.1版本的BlockTransferService.scala的fetchBlockSync方法的源码如下:
Spark 2.4.3版本的BlockTransferService.scala的fetchBlockSync方法的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第1行新增加一个tempFileManager参数。
上段代码中第9~15行onBlockFetchSuccess方法整体替换为以下的onBlockFetchSuccess代码。
fetchBlocks方法用于从远程节点异步获取序列块,仅在调用[init]之后可用。注意,这个API需要一个序列,可以实现批处理请求,而不是返回一个future,底层实现可以调用onBlockFetchSuccess来尽快获取块的数据,而不是等待所有块被取出来。
fetchBlockSync中调用fetchBlocks方法,NettyBlockTransferService继承自BlockTransfer-Service,是BlockTransferService实现子类。
Spark 2.2.1版本的NettyBlockTransferService的fetchBlocks的源码如下:
Spark 2.4.3版本的NettyBlockTransferService.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第7、14行tempShuffleFileManager调整为tempFileManager。Download-FileManager用于创建临时块文件的管理器,用于获取远程数据以减少内存使用。当文件不再使用时,它将清除文件。
回到BlockManager.scala,无论是doPutBytes(),还是doPutIterator()方法中,都会使用doPut方法。
BlockManager.scala的doPut方法的源码如下:
doPut方法中,lockNewBlockForWriting写入一个新的块前先尝试获得适当的锁,如果我们是第一个写块,获得写入锁后继续后续操作。否则,如果另一个线程已经写入块,须等待写入完成,才能获取读取锁,调用new()函数创建一个BlockInfo赋值给putBlockInfo,然后通过putBody(putBlockInfo)将数据存入。putBody是一个匿名函数,输入BlockInfo,输出的是一个泛型Option[T]。putBody函数体内容是doPutIterator方法(doPutBytes方法也类似调用doPut)调用doPut时传入的。
BlockManager.scala的doPutIterator调用doPut方法,在其putBody匿名函数体中进行判断:
如果是level.useMemory,则在memoryStore中放入数据。
如果是level.useDisk,则在diskStore中放入数据。
如果level.replication大于1,则在其他节点中存入副本数据。
其中,Spark 2.2.1版本的BlockManager.scala的replicate方法的源码如下:
Spark 2.4.3版本的BlockManager.scala的replicate方法的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第15行之后新增代码,构建BlockManagerManagedBuffer的实例buffer。
上段代码中第21行将BlockManagerManagedBuffer替换为buffer。
replicate方法中调用了blockTransferService.uploadBlockSync方法。
BlockTransferService.scala的uploadBlockSync的源码如下:
uploadBlockSync中又调用uploadBlock方法,BlockTransferService.scala的uploadBlock方法无具体实现,NettyBlockTransferService是BlockTransferService的子类,具体实现uploadBlock方法。
Spark 2.2.1版本的NettyBlockTransferService的uploadBlock的源码如下:
Spark 2.4.3版本的NettyBlockTransferService.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中删掉第16~18行。
上段代码中第19行之前新增代码,新建asStream变量,其将blockData与MAX_REMOTE_ BLOCK_SIZE_FETCH_TO_MEM进行比较。当块的大小高于此阈值时,远程块以字节为单位将被提取到磁盘。这是为了避免一个大的请求占用太多的内存。通过设置特定值(例如200)可以启用这个配置。注意,此配置将影响shuffle获取和块管理器远程块获取。对于启用了外部shuffle服务,此功能只能在高于Spark 2.2的版本,在外部shuffle启用时使用服务。
上段代码中第19行新建一个变量callback。
上段代码中第21行logTrace日志的内容进行调整。
上段代码中第25行logError日志的内容进行调整。
上段代码中第29行之后新增一段代码,如果blockData大于MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM,则调用client.uploadStream方法,将数据作为流发送到远程端,与stream()方法的不同之处在于,这是一个请求向远程端发送数据,而不是从远程端接收数据。如果将blockData小于阈值,则将NIO缓冲区转换或复制到数组中,以便对其进行序列化。
回到BlockManager.scala,看一下dropFromMemory方法。如果存储级别定位为MEMORY_AND_DISK,那么数据可能放在内存和磁盘中,内存够的情况下不会放到磁盘上;如果内存不够,就放到磁盘上,这时就会调用dropFromMemory。如果存储级别不是定义为MEMORY_AND_DISK,而只是存储在内存中,内存不够时,缓存的数据此时就会丢弃。如果仍需要数据,那就要重新计算。
BlockManager.scala的dropFromMemory的源码如下:
总结:dropFromMemory是指在内存不够的时候,尝试释放一部分内存给要使用内存的应用,释放的这部分内存数据需考虑是丢弃,还是放到磁盘上。如果丢弃,如5000个步骤作为一个Stage,前面4000个步骤进行了Cache,Cache时可能有100万个partition分区单位,其中丢弃了100个,丢弃的100个数据就要重新计算;但是,如果设置了同时放到内存和磁盘,此时会放入磁盘中,下次如果需要,就可以从磁盘中读取数据,而不是重新计算。