国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

spark 自己的分布式存儲系統 - BlockManager

存儲 存儲軟件 大數據 Spark 分布式
BlockManager 是 spark 中至關重要的一個組件, 在 spark的的運行過程中到處都有 BlockManager 的身影, 只有搞清楚 BlockManager 的原理和機制,你才能更加深入的理解 spark。 今天我們來揭開 BlockaManager 的底層原理和設計思路,

整體架構

BlockManager 是 spark 中至關重要的一個組件, 在 spark的的運行過程中到處都有 BlockManager 的身影, 只有搞清楚 BlockManager 的原理和機制,你才能更加深入的理解 spark。 今天我們來揭開 BlockaManager 的底層原理和設計思路,

BlockManager 是一個嵌入在 spark 中的 key-value型分布式存儲系統,是為 spark 量身打造的,

BlockManager 在一個 spark 應用中作為一個本地緩存運行在所有的節點上, 包括所有 driver 和 executor上。 BlockManager 對本地和遠程提供一致的 get 和set 數據塊接口, BlockManager 本身使用不同的存儲方式來存儲這些數據, 包括 memory, disk, off-heap。

 

上面是一個整體的架構圖, BlockManagerMaster擁有BlockManagerMasterEndpoint 的actor和所有BlockManagerSlaveEndpoint的ref, 可以通過這些引用對 slave 下達命令

executor 節點上的BlockManagerMaster 則擁有BlockManagerMasterEndpoint的ref和自身BlockManagerSlaveEndpoint的actor。可以通過 Master的引用注冊自己。

在master 和 slave 可以正常的通信之后, 就可以根據設計的交互協議進行交互, 整個分布式緩存系統也就運轉起來了,

初始化

我們知道, sparkEnv 啟動的時候會啟動各個組件, BlockManager 也不例外, 也是這個時候啟動的,

啟動的時候會根據自己是在 driver 還是 executor 上進行不同的啟動過程,

  1. def registerOrLookupEndpoint( 
  2.         name: String, endpointCreator: => RpcEndpoint): 
  3.       RpcEndpointRef = { 
  4.       if (isDriver) { 
  5.         logInfo("Registering " + name
  6.         rpcEnv.setupEndpoint(name, endpointCreator) 
  7.       } else { 
  8.         RpcUtils.makeDriverRef(name, conf, rpcEnv) 
  9.       } 
  10.     } 

上圖是 sparkEnv 在 master上啟動的時候, 構造了一個 BlockManagerMasterEndpoint, 然后把這個Endpoint 注冊在 rpcEnv中, 同時也會啟動自己的 BlockManager

上圖是 sparkEnv 在executor上啟動的時候, 通過 setupEndpointRef 方法獲取到了  BlockManagerMaster的引用 BlockManagerMasterRef, 同時也會啟動自己的 BlockManager,

在 BlockManager 初始化自己的時候, 會向 BlockManagerMasterEndpoint 注冊自己, BlockManagerMasterEndpoint 發送 registerBlockManager消息,  BlockManagerMasterEndpoint 接受到消息, 把 BlockManagerSlaveEndpoint  的引用 保存在自己的  blockManagerInfo 數據結構中以待后用。

分布式協議

下面的一個表格是 master 和 slave 接受到各種類型的消息, 以及接受到消息后,做的處理。

  • BlockManagerMasterEndpoint  接受的消息

  • BlockManagerSlaveEndpoint 接受的消息

根據以上的協議, 相信我們可以很清楚的猜測整個交互的流程, 一般過程應該是這樣的, slave的 BlockManager  在自己接的上存儲一個 Block, 然后把這個 BlockId 匯報到master的BlockManager , 經過 cache, shuffle 或者 Broadcast后,別的節點需要上一步的Block的時候, 會到 master 獲取數據所在位置, 然后去相應節點上去 fetch。

存儲層

在RDD層面上我們了解到RDD是由不同的partition組成的,我們所進行的transformation和action是在partition上面進行的;而在storage模塊內部,RDD又被視為由不同的block組成,對于RDD的存取是以block為單位進行的,本質上partition和block是等價的,只是看待的角度不同。在Spark storage模塊中中存取數據的最小單位是block,所有的操作都是以block為單位進行的。

 

BlockManager對象被創建的時候會創建出MemoryStore和DiskStore對象用以存取block,如果內存中擁有足夠的內存, 就 使用 MemoryStore存儲,  如果 不夠, 就 spill 到 磁盤中, 通過 DiskStore進行存儲。

  • DiskStore 有一個DiskBlockManager,DiskBlockManager 主要用來創建并持有邏輯 blocks 與磁盤上的 blocks之間的映射,一個邏輯 block 通過 BlockId 映射到一個磁盤上的文件。 在 DiskStore 中會調用  diskManager.getFile 方法, 如果子文件夾不存在,會進行創建, 文件夾的命名方式為(spark-local-yyyyMMddHHmmss-xxxx, xxxx是一個隨機數), 所有的block都會存儲在所創建的folder里面。
  • MemoryStore 相對于DiskStore需要根據block id hash計算出文件路徑并將block存放到對應的文件里面,MemoryStore管理block就顯得非常簡單:MemoryStore內部維護了一個hash map來管理所有的block,以block id為key將block存放到hash map中。而從MemoryStore中取得block則非常簡單,只需從hash map中取出block id對應的value即可。

BlockManager 的 PUT 和GET接口

BlockManager 提供了 Put 接口和 Get 接口, 這兩個 api 屏蔽了底層的細節, 我們來看下底層是如何實現的

  • GET操作 如果 local 中存在就直接返回, 從本地獲取一個Block, 會先判斷如果是 useMemory, 直接從內存中取出, 如果是 useDisk, 會從磁盤中取出返回, 然后根據useMemory判斷是否在內存中緩存一下,方便下次獲取,  如果local 不存在, 從其他節點上獲取, 當然元信息是存在 drive上的,要根據我們上文中提到的 GETlocation 協議獲取 Block 所在節點位置, 然后到其他節點上獲取。
  • PUT操作 操作之前會加鎖來避免多線程的問題, 存儲的時候會根據 存儲級別, 調用對應的是 memoryStore 還是  diskStore, 然后在具體存儲器上面調用 存儲接口。 如果有 replication 需求, 會把數據備份到其他的機器上面。

blockManager 和 blockTransferService 關系

spark 歷史上使用過兩套網絡框架, 最開始的時候, rpc 調用使用的是 akka, 大文件傳輸使用的是 netty,  后面統一全部使用 netty,  這里的大文件傳輸其實走的是 netty,  在啟動 blockManager的時候會啟動一個 blockTransferService 服務, 這個服務就是用來傳輸大文件用的, 對應的具體類是  NettyBlockTransferService, 這個實例中也會有 BlocakManager的引用, 會啟動一個 NettyBlockRpcServer的 netty Handler, 也擁有 BlocakManager 的引用,  用來提供服務, BlocakManager 根據 BlockId 獲取一個 Block 然后包裝為一個 ManagedBuffer 對象,

當我們需要從遠端獲取一個 Block的時候,就需要 blockTransferService 傳輸大的字節數組,

首先需要從 driver上獲取到 Block的真正存儲位置, 然后調用 blockTransferService 的 fetchBlocks方法, 去其他真正存儲節點上去fetch數據, 會從 client 資源池中獲取一個client,  如果是一對一的進行fetch,  使用的是 OneForOneBlockFetcher, 這個Fetcher 是以 Chunks 為單位分別單獨fetch,  每個 Chunks 也就對應一個Block的數據, 根據配置,會進行重試直到***重試次數,發送 OpenBlocks消息,  里面會包裝對應的是哪個  BlockId,  其他節點服務端會根據 BlockId 從 blockManager中拿到數據, 然后用來傳輸, 使用的是 netty 的流式傳輸方式, 同時也會有回調函數,

如果是備份的時候同步上傳一個 Block,  其他節點服務端會根據,uploadBlock消息中包含的BlockId, 在本地的BlockManager 中冗余存儲一份,

ChunkFetch也有一個類似Stream的概念,ChunkFetch的對象是“一個內存中的Iterator[ManagedBuffer]”,即一組Buffer,每一個Buffer對應一個chunkIndex,整個Iterator[ManagedBuffer]由一個StreamID標識。Client每次的ChunkFetch請求是由(streamId,chunkIndex)組成的唯一的StreamChunkId,Server端根據StreamChunkId獲取為一個Buffer并返回給Client; 不管是Stream還是ChunkFetch,在Server的內存中都需要管理一組由StreamID與資源之間映射,即StreamManager類,它提供了getChunk和openStream兩個接口來分別響應ChunkFetch與Stream兩種操作,并且針對Server的ChunkFetch提供一個registerStream接口來注冊一組Buffer,比如可以將BlockManager中一組BlockID對應的Iterator[ManagedBuffer]注冊到StreamManager,從而支持遠程Block Fetch操作。

對于ExternalShuffleService(一種單獨shuffle服務進程,對其他計算節點提供本節點上面的所有shuffle map輸出),它為遠程Executor提供了一種OpenBlocks的RPC接口,即根據請求的appid,executorid,blockid(appid+executor對應本地一組目錄,blockid拆封出)從本地磁盤中加載一組FileSegmentManagedBuffer到內存,并返回加載后的streamId返回給客戶端,從而支持后續的ChunkFetch的操作。

Partition 與 Block 的關系

我們都知道, RDD 的運算是基于 partition, 每個 task 代表一個 分區上一個 stage 內的運算閉包, task 被分別調度到 多個 executor上去運行, 那么是在哪里變成了 Block 呢,  我們以 spark 2.11 源碼為準, 看看這個轉變過程,

一個 RDD 調度到 executor 上會運行調用 getOrCompute方法,

  1. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { 
  2.       readCachedBlock = false 
  3.       computeOrReadCheckpoint(partition, context) 
  4.     }) 

如果 Block 在 BlockManager 中存在, 就會從 BlockManager 中獲取,如果不存在, 就進行計算這個Block, 然后在 BlockManager 中進行存儲持久化, 方便下次使用,

當然獲取的時候是先從本地的 BlockManager 中獲取, 如果本地沒有, 然后再 從 remote 獲取, 先從 driver 上獲取到元數據 Block的位置, 然后去到真正的節點上fetch

如果沒有, 就進行計算, 然后根據存儲級別,存儲到計算節點本地的BlockManager 的內存或磁盤中,

這樣RDD的transformation、action就和block數據建立了聯系,雖然抽象上我們的操作是在partition層面上進行的,但是partition最終還是被映射成為block,因此實際上我們的所有操作都是對block的處理和存取。

blockManager 在 spark 中扮演的角色

blockManager 是非常非常重要的一個 spark 組件, 我們隨便舉幾個例子, 你就知道 BlockManager 多重要了 ,

  • spark  shuffle 的過程總用到了 BlockManager 作為數據的中轉站
  • spark broadcast 調度 task 到多個 executor 的時候, broadCast 底層使用的數據存儲層
  • spark streaming  一個 ReceiverInputDStream 接受到的數據也是先放在 BlockManager 中, 然后封裝為一個 BlockRdd 進行下一步運算的
  • 如果我們 對一個 rdd 進行了cache, cacheManager 也是把數據放在了 blockmanager 中, 截斷了計算鏈依賴, 后續task 運行的時候可以直接從 cacheManager 中獲取到 cacherdd ,不用再從頭計算。

spark cache  與  spark   broadcast task

我隨便舉兩個例子, 看看具體 spark cache 和 spark  broadcast 調度 task 的時候怎么用的 blockManager的

spark cache

rdd 計算的時候, 首先根據RDD id和partition index構造出block id (rdd_xx_xx), 接著從BlockManager中取出相應的block, 如果該block存在,表示此RDD在之前已經被計算過和存儲在BlockManager中,因此取出即可,無需再重新計算。 如果 block 不存在我們可以 計算出來, 然后吧 block 通過   doPutIterator 函數存儲在 節點上的 BlockManager上面, 匯報block信息到 driver, 下次如果使用同一個 rdd, 就可以直接從分布式存儲中 直接取出相應的 block

下面看一下源碼

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 
  2.     if (storageLevel != StorageLevel.NONE) { 
  3.       getOrCompute(split, context) 
  4.     } else { 
  5.       computeOrReadCheckpoint(split, context) 
  6.     } 
  7.   } 

如果存儲級別不是 NONE類型就會調用 getOrCompute 這個我們已經看過了,  里面實際調用  SparkEnv.get.blockManager.getOrElseUpdate 方法, 如果 Block 在 BlockManager 中存在, 就會從 BlockManager 中獲取,如果不存在, 就進行計算這個Block, 然后在 BlockManager 中進行存儲持久化, 方便下次使用,

在  BlockManager 進行存儲后, 會調用下面的代碼把 匯報block信息到 driver,

  1. private def tryToReportBlockStatus( 
  2.      blockId: BlockId, 
  3.      status: BlockStatus, 
  4.      droppedMemorySize: Long = 0L): Boolean = { 
  5.    val storageLevel = status.storageLevel 
  6.    val inMemSize = Math.max(status.memSize, droppedMemorySize) 
  7.    val onDiskSize = status.diskSize 
  8.    master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) 
  9.  } 

實際上上想  masterEndpoint 的引用發送一條 UpdateBlockInfo消息,  master 會把這個 blockId 對應的 location 放在 driver 上,

同樣的如果一個 Block已經計算過了,會到 driver 上獲取到 location 信息

  1. private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { 
  2.    val locs = Random.shuffle(master.getLocations(blockId)) 
  3.    val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } 
  4.    preferredLocs ++ otherLocs 
  5.  } 

spark   broadcast task

這個調度 task 到多個 task 上面過程代碼太多,我就不貼了, 直接說一下流程,

  • DAGScheduler 在  submitMissingTasks 方法提交 task的時候, 會把 task 包裝為一個 Broadcast 類型, 里面使用 TorrentBroadcastFactory 創建一個 TorrentBroadcast 的類型, 使用的是p2p的協議, 會減輕 master 的壓力,  這個里面會 調用 writeBlocks 里面把taskBinary  通過 blockManager.putSingle 放在 BlockManager 緩存中
  • ShuffleMapTask 或者 ResultTask,然后調用 runTask 方法, 里面實際上會調用 Broadcast 的value 方法, 里面最終調用了 BlockManager 的 getLocalBytes 或者 getRemoteBytes 方法

blockManager 在  spark streaming 中的應用

  • ReceiverTracker 在啟動的時候,會運行一個 job, 這個job 就是到 各個executor上去啟動 ReceiverSupervisorImpl, 然后啟動各個具體的數據接收器,  如果是SocketInputDStream, 就會啟動一個 SocketReceiver,
  • Receiver 接收到數據后, 先在 BlockGenerator 中緩存, 等到達一定的大小后,  調用 BlockManagerBasedBlockHandler 的 storeBlock方法持久化到 BlockManager 中, 然后把數據信息匯報到 ReceiverTracker上, 最終 匯總到   ReceivedBlockTracker 中的 timeToAllocatedBlocks中,
  • ReceiverInputDStream compute的時候,  receivedBlockTracker 會根據時間獲取到  BlockManager 中的元信息,里面最終對應的還是 BlockManager 的存儲位置, 最終獲取到數據進行計算,

測試 blockManager

我們做一個簡單的測試,兩端代碼的區別就是 一個 進行了cache ,一個沒有進行cache。

  1. val file = sc.textFile("/fusionlog/midsourcenew/2017-03-13-18-15_2.gz" 
  2. file.count()  
  3. file.count() 

我們從日志可以觀察出來, ***段代碼, 兩個 job 中都從 hdfs 中讀取文件, 讀取了兩次,

  1. val file = sc.textFile("/fusionlog/midsourcenew/2017-03-13-18-15_2.gz").cache() 
  2. file.count() 
  3. file.count() 

有以下日志

  1. MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 1354.9 MB, free 4.9 GB) 
  2. BlockManager: Found block rdd_1_0 locally 

我們發現在***次讀取文件后, 把文件 cache 在了 blockManager 中, 下一個 job 運行的時候, 在本地 BlockManager 直接發現獲取到了 block , 沒有讀取 hdfs 文件 ,

在 spark ui 中也發現了 cache的 Block, 全部是在內存中緩存的, 

責任編輯:武曉燕 來源: spark技術分享
相關推薦

2017-04-14 09:48:25

分布式存儲系統

2018-09-29 14:08:04

存儲系統分布式

2017-10-16 10:24:47

LogDevice存儲系統

2017-07-18 09:51:36

文件存儲系統

2017-10-19 08:45:15

存儲系統HBase

2017-10-12 09:36:54

分布式存儲系統

2018-11-20 09:19:58

存儲系統雪崩效應

2017-10-17 08:33:31

存儲系統分布式

2017-12-18 10:47:04

分布式存儲數據

2019-05-13 15:20:42

存儲系統算法

2019-10-15 10:59:43

分布式存儲系統

2018-10-24 11:01:53

分布式存儲系統

2018-03-13 08:45:08

存儲系統DHT算法

2018-10-29 12:42:23

Ceph分布式存儲

2014-02-19 11:37:57

分布式對象存儲Sheepdog

2013-12-27 10:56:42

分布式對象存儲Sheepdog性能測試

2021-08-07 05:00:20

存儲系統

2010-07-02 10:08:12

BigtableGoogle

2025-01-26 11:54:39

分布式存儲系統

2021-07-04 07:07:06

Ceph分布式存儲架構
點贊
收藏

51CTO技術棧公眾號

国精品产品一区| 91蝌蚪精品视频| av网站一区二区三区| 欧美中文在线观看| 国产一区久久精品| 最新热久久免费视频| 影音先锋欧美资源| 久久视频在线| 中文字幕久久亚洲| 日本精品在线| 亚洲精品videosex极品| 亚洲人成无码网站久久99热国产 | 精品无人区一区二区三区竹菊 | av片在线观看免费| 亚洲激情图片qvod| 日韩avxxx| 青娱乐精品视频| 成人黄色av播放免费| 精品国产亚洲一区二区在线观看 | 欧美日韩不卡在线视频| 亚洲欧美网站| 亚洲在线www| 香蕉久久夜色精品国产使用方法| 在线播放国产精品| 鲁鲁在线中文| 4438x成人网最大色成网站| 伊人中文字幕在线| 亚洲男人的天堂在线观看| 久久这里只有精品23| 国产资源在线一区| 五月天丁香综合久久国产| 亚洲精品欧洲| 91久久精品www人人做人人爽| 啄木系列成人av电影| 欧美高清视频在线观看| 色成人免费网站| 亚洲色图欧美制服丝袜另类第一页| 制服丝袜中文字幕在线| 555www色欧美视频| www视频在线看| 欧美一区二区三区在线观看视频| 日本私人网站在线观看| 精品国产福利视频| 亚洲美女欧洲| 欧美在线不卡一区| 国产在线视频福利| 欧美乱妇23p| 国产在线1区| 欧美一卡二卡在线观看| 草草视频在线观看| 亚洲免费电影在线观看| 色成人免费网站| 超碰97人人做人人爱少妇| 日韩有码欧美| 欧美激情一区二区三区成人| 最新国产精品精品视频| 欧美专区福利在线| 久久麻豆精品| 91成人免费在线观看| 亚洲人成人一区二区三区| 亚洲mm色国产网站| 夜夜嗨一区二区三区| 欧美一区国产一区| 国产一区二区三区四| 久久99中文字幕| 中文字幕一区免费在线观看 | 亚洲国产第一| 欧美日韩一区二区三区免费| 激情久久五月天| 欧美一区二区中文字幕| 中文字幕一区二区三区在线观看 | 国产精品久久久久久福利一牛影视| 在线观看免费视频高清游戏推荐| 亚洲女同ⅹxx女同tv| 欧美69xxxxx| 精品粉嫩超白一线天av| 天堂综合在线播放| 国产日韩欧美在线观看| 久久久亚洲人| 男人揉女人奶房视频60分| 亚洲综合成人在线视频| 欧美成人三区| 中文字幕在线看视频国产欧美| 不卡av在线播放| 在线日韩影院| 亚洲国产免费av| 日本在线视频观看| 日韩精品国产精品| 分分操这里只有精品| 国产精品成人免费精品自在线观看| 永久免费不卡在线观看黄网站| 欧美视频在线观看一区二区| 性欧美videohd高精| 性色av一区二区咪爱| 欧美日韩三区| 8x8x华人在线| 亚洲欧美日韩国产一区二区三区| 国产一级在线| 亚洲人成绝费网站色www| 九九久久精品| 亚洲看片网站| 夜夜嗨av一区二区三区网页 | 亚洲色欲色欲www在线观看| 又爽又大又黄a级毛片在线视频| 亚洲精品国产精品国自产观看浪潮| 欧美绝顶高潮抽搐喷水合集| 欧美一区二区三区在线免费观看| 国产精品久久久久久久久免费樱桃 | 国产精品天天av精麻传媒| 色综合夜色一区| 4438五月综合| 久久精品日韩| 日韩毛片视频在线看| 超清av在线| 国产欧美一区二区三区四区| 国产99久久精品| 成人精品一区二区| 91精品国产91久久久久| 精品系列免费在线观看| 邻居大乳一区二区三区| 精品中文字幕在线| 精品在线观看免费| www.视频在线.com| 国产91热爆ts人妖在线| 9色porny自拍视频一区二区| 亚洲无线看天堂av| 亚洲影院在线看| 有码一区二区三区| 日韩一级特黄| 99亚洲精品视频| 欧美日韩在线三级| 欧洲杯什么时候开赛| 国产免费999| 在线成人激情视频| 美腿丝袜一区二区三区| 国产视频精选在线| 国产午夜精品全部视频播放| av中文一区| 日韩一区二区三区国产| 国产农村妇女精品一区二区| 美丽的小蜜桃4春潮| 欧美俄罗斯性视频| 99久久精品国产麻豆演员表| 爱看av在线入口| 国产欧美韩日| 在线中文字幕一区| 一区二区三区毛片免费| 国产成免费视频| 538国产精品一区二区在线| 久久久久久久久久久久久夜| 亚洲精品福利电影| 中国 免费 av| 日韩精品免费综合视频在线播放| 日韩国产精品久久久| 成人短视频在线观看| 久久综合伊人77777麻豆| 欧美三区在线观看| 在线日韩电影| 户外极限露出调教在线视频| 亚洲自拍在线观看| 色香色香欲天天天影视综合网| 午夜激情久久| 番号集在线观看| 精品日韩欧美| 日韩欧美一区二区免费| 久久精品日韩欧美| 91高清视频在线观看| 女女百合国产免费网站| 国产亚洲欧美另类中文| 91在线视频在线| 涩涩屋成人免费视频软件| 天天干天天操天天做| 日本老师69xxx| 精品久久久久久久久久ntr影视| 亚洲综合激情在线| 黄色成年人视频在线观看| 日韩精品欧美专区| 亚洲欧美中文日韩v在线观看| 成人一区二区在线观看| 亚洲国产精品免费视频| 诱受h嗯啊巨肉高潮| 国产精品日本一区二区| 亚洲第一免费网站| 波波电影院一区二区三区| 久久精品福利| 理论视频在线| 亚洲欧洲一区二区| 久久成年人免费电影| 亚洲精品国产第一综合99久久| 97视频热人人精品免费| 麻豆传媒在线免费| av在线播放天堂| 91av网站在线播放| 一本久久综合亚洲鲁鲁五月天 | 精品高清美女精品国产区| 一区二区三区成人精品| 成人亚洲欧美| 国产美女视频黄a视频免费| 精品国产免费人成电影在线观... 精品国产免费久久久久久尖叫 | 精品少妇人妻av一区二区|