国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

還不收藏?Spark動態內存管理源碼解析!

存儲 存儲軟件 Spark
Spark有兩種內存管理模式,靜態內存管理(Static MemoryManager)和動態(統一)內存管理(Unified MemoryManager)。動態內存管理從Spark1.6開始引入,在SparkEnv.scala中的源碼可以看到,Spark目前默認采用動態內存管理模式,若將spark.memory.useLegacyMode設置為true,則會改為采用靜態內存管理。

一、Spark內存管理模式

Spark有兩種內存管理模式,靜態內存管理(Static MemoryManager)和動態(統一)內存管理(Unified MemoryManager)。動態內存管理從Spark1.6開始引入,在SparkEnv.scala中的源碼可以看到,Spark目前默認采用動態內存管理模式,若將spark.memory.useLegacyMode設置為true,則會改為采用靜態內存管理。

  1. // SparkEnv.scala 
  2.     val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode"false
  3.     val memoryManager: MemoryManager = 
  4.       if (useLegacyMemoryManager) { 
  5.         new StaticMemoryManager(conf, numUsableCores) 
  6.       } else { 
  7.         UnifiedMemoryManager(conf, numUsableCores) 
  8.       } 

[[231842]]

二、Spark動態內存管理空間分配

相比于Static MemoryManager模式,Unified MemoryManager模型打破了存儲內存和運行內存的界限,使每一個內存區能夠動態伸縮,降低OOM的概率。由上圖可知,executor JVM內存主要由以下幾個區域組成:

(1)Reserved Memory(預留內存):這部分內存預留給系統使用,默認為300MB,可通過spark.testing.reservedMemory進行設置。

  1. // UnifiedMemoryManager.scala 
  2. private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 

另外,JVM內存的最小值也與reserved Memory有關,即minSystemMemory = reserved Memory*1.5,即默認情況下JVM內存最小值為300MB*1.5=450MB。

  1. // UnifiedMemoryManager.scala 
  2.     val minSystemMemory = (reservedMemory * 1.5).ceil.toLong 

(2)Spark Memeoy:分為execution Memory和storage Memory。去除掉reserved Memory,剩下usableMemory的一部分用于execution和storage這兩類堆內存,默認是0.6,可通過spark.memory.fraction進行設置。例如:JVM內存是1G,那么用于execution和storage的默認內存為(1024-300)*0.6=434MB。

  1. // UnifiedMemoryManager.scala 
  2.     val usableMemory = systemMemory - reservedMemory 
  3.     val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) 
  4.     (usableMemory * memoryFraction).toLong 

他們的邊界由spark.memory.storageFraction設定,默認為0.5。即默認狀態下storage Memory和execution Memory為1:1.

  1. // UnifiedMemoryManager.scala 
  2.      onHeapStorageRegionSize = 
  3.         (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong, 
  4.       numCores = numCores) 

(3)user Memory:剩余內存,用戶根據需要使用,默認占usableMemory的(1-0.6)=0.4.

三、內存控制詳解

首先我們先來了解一下Spark內存管理實現類之前的關系。

1.MemoryManager主要功能是:(1)記錄用了多少StorageMemory和ExecutionMemory;(2)申請Storage、Execution和Unroll Memory;(3)釋放Stroage和Execution Memory。

Execution內存用來執行shuffle、joins、sorts和aggegations操作,Storage內存用于緩存和廣播數據,每一個JVM中都存在著一個MemoryManager。構造MemoryManager需要指定onHeapStorageMemory和onHeapExecutionMemory參數。

  1. // MemoryManager.scala 
  2. private[spark] abstract class MemoryManager( 
  3.     conf: SparkConf, 
  4.     numCores: Int
  5.     onHeapStorageMemory: Long, 
  6.     onHeapExecutionMemory: Long) extends Logging { 

創建StorageMemoryPool和ExecutionMemoryPool對象,用來創建堆內或堆外的Storage和Execution內存池,管理Storage和Execution的內存分配。

  1. // MemoryManager.scala 
  2.   @GuardedBy("this"
  3.   protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) 
  4.   @GuardedBy("this"
  5.   protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) 
  6.   @GuardedBy("this"
  7.   protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) 
  8.   @GuardedBy("this"
  9.   protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP) 

默認情況下,不使用堆外內存,可通過saprk.memory.offHeap.enabled設置,默認堆外內存為0,可使用spark.memory.offHeap.size參數設置。

  1. // All the code you will ever need 
  2.  final val tungstenMemoryMode: MemoryMode = { 
  3.     if (conf.getBoolean("spark.memory.offHeap.enabled"false)) { 
  4.       require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0, 
  5.         "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true"
  6.       require(Platform.unaligned(), 
  7.         "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false."
  8.       MemoryMode.OFF_HEAP 
  9.     } else { 
  10.       MemoryMode.ON_HEAP 
  11.     } 
  12.   } 
  1. // MemoryManager.scala  
  2.  protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0) 

釋放numBytes字節的Execution內存方法

  1. // MemoryManager.scala 
  2. def releaseExecutionMemory( 
  3.       numBytes: Long, 
  4.       taskAttemptId: Long, 
  5.       memoryMode: MemoryMode): Unit = synchronized { 
  6.     memoryMode match { 
  7.       case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId) 
  8.       case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId) 
  9.     } 
  10.   } 

釋放指定task的所有Execution內存并將該task標記為inactive。

  1. // MemoryManager.scala 
  2.  private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized { 
  3.     onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) + 
  4.       offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) 
  5.   } 

釋放numBytes字節的Stoarge內存方法

  1. // MemoryManager.scala 
  2. def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { 
  3.     memoryMode match { 
  4.       case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes) 
  5.       case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes) 
  6.     } 
  7.   } 

釋放所有Storage內存方法

  1. // MemoryManager.scala 
  2. final def releaseAllStorageMemory(): Unit = synchronized { 
  3.     onHeapStorageMemoryPool.releaseAllMemory() 
  4.     offHeapStorageMemoryPool.releaseAllMemory() 
  5.   } 

2.接下來我們了解一下,UnifiedMemoryManager是如何對內存進行控制的?動態內存是如何實現的呢?

UnifiedMemoryManage繼承了MemoryManager

  1. // UnifiedMemoryManage.scala 
  2. private[spark] class UnifiedMemoryManager private[memory] ( 
  3.     conf: SparkConf, 
  4.     val maxHeapMemory: Long, 
  5.     onHeapStorageRegionSize: Long, 
  6.     numCores: Int
  7.   extends MemoryManager( 
  8.     conf, 
  9.     numCores, 
  10.     onHeapStorageRegionSize, 
  11.     maxHeapMemory - onHeapStorageRegionSize) { 

重寫了maxOnHeapStorageMemory方法,***Storage內存=***內存-***Execution內存。

  1. // UnifiedMemoryManage.scala 
  2.  override def maxOnHeapStorageMemory: Long = synchronized { 
  3.     maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed 
  4.   } 

核心方法acquireStorageMemory:申請Storage內存。

  1. // UnifiedMemoryManage.scala 
  2. override def acquireStorageMemory( 
  3.       blockId: BlockId, 
  4.       numBytes: Long, 
  5.       memoryMode: MemoryMode): Boolean = synchronized { 
  6.     assertInvariants() 
  7.     assert(numBytes >= 0) 
  8.     val (executionPool, storagePool, maxMemory) = memoryMode match { 
  9.       //根據不同的內存模式去創建StorageMemoryPool和ExecutionMemoryPool 
  10.       case MemoryMode.ON_HEAP => ( 
  11.         onHeapExecutionMemoryPool, 
  12.         onHeapStorageMemoryPool, 
  13.         maxOnHeapStorageMemory) 
  14.       case MemoryMode.OFF_HEAP => ( 
  15.         offHeapExecutionMemoryPool, 
  16.         offHeapStorageMemoryPool, 
  17.         maxOffHeapMemory) 
  18.     } 
  19.     if (numBytes > maxMemory) { 
  20.       // 若申請內存大于***內存,則申請失敗 
  21.       logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + 
  22.         s"memory limit ($maxMemory bytes)"
  23.       return false 
  24.     } 
  25.     if (numBytes > storagePool.memoryFree) { 
  26.       // 如果Storage內存池沒有足夠的內存,則向Execution內存池借用 
  27.       val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)//當Execution內存有空閑時,Storage才能借到內存 
  28.       executionPool.decrementPoolSize(memoryBorrowedFromExecution)//縮小Execution內存 
  29.       storagePool.incrementPoolSize(memoryBorrowedFromExecution)//增加Storage內存 
  30.     } 
  31.     storagePool.acquireMemory(blockId, numBytes) 
  32.   } 

核心方法acquireExecutionMemory:申請Execution內存。

  1. // UnifiedMemoryManage.scala 
  2. override private[memory] def acquireExecutionMemory( 
  3.       numBytes: Long, 
  4.       taskAttemptId: Long, 
  5.       memoryMode: MemoryMode): Long = synchronized {//使用了synchronized關鍵字,調用acquireExecutionMemory方法可能會阻塞,直到Execution內存池有足夠的內存。 
  6.    ... 
  7.     executionPool.acquireMemory( 
  8.       numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) 
  9.   } 

方法***調用了ExecutionMemoryPool的acquireMemory方法,該方法的參數需要兩個函數:maybeGrowExecutionPool()和computeMaxExecutionPoolSize()。

每個Task能夠使用的內存被限制在pooSize / (2 * numActiveTask) ~ maxPoolSize / numActiveTasks。其中maxPoolSize代表了execution pool的***內存,poolSize表示當前這個pool的大小。

  1. // ExecutionMemoryPool.scala 
  2.       val maxPoolSize = computeMaxPoolSize() 
  3.       val maxMemoryPerTask = maxPoolSize / numActiveTasks 
  4.       val minMemoryPerTask = poolSize / (2 * numActiveTasks) 

maybeGrowExecutionPool()方法實現了如何動態增加Execution內存區的大小。在每次申請execution內存的同時,execution內存池會進行多次嘗試,每次嘗試都可能會回收一些存儲內存。

 

  1. // UnifiedMemoryManage.scala  
  2.      def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {  
  3.       if (extraMemoryNeeded > 0) {//如果申請的內存大于0  
  4.         //計算execution可借到的storage內存,是storage剩余內存和可借出內存的***值  
  5.         val memoryReclaimableFromStorage = math.max(  
  6.           storagePool.memoryFree,  
  7.           storagePool.poolSize - storageRegionSize)  
  8.         if (memoryReclaimableFromStorage > 0) {//如果可以申請到內存  
  9.           val spaceToReclaim = storagePool.freeSpaceToShrinkPool(  
  10.             math.min(extraMemoryNeeded, memoryReclaimableFromStorage))//實際需要的內存,取實際需要的內存和storage內存區域全部可用內存大小的最小值  
  11.           storagePool.decrementPoolSize(spaceToReclaim)//storage內存區域減少  
  12.           executionPool.incrementPoolSize(spaceToReclaim)//execution內存區域增加  
  13.         }  
  14.       }  
  15.     }  
責任編輯:武曉燕 來源: 若澤大數據
相關推薦

2022-04-26 06:21:59

編程動態內存

2025-10-28 01:10:00

2024-01-26 16:28:28

C++動態內存開發

2022-01-13 10:30:21

C語言內存動態

2012-04-01 14:38:06

Windows Ser虛擬化

2025-03-26 00:00:05

2021-02-08 23:51:31

開源工具代碼

2019-10-10 16:20:23

spark內存管理

2019-04-17 14:44:42

Spark內存源碼

2018-12-18 14:37:26

Spark內存管理

2022-01-07 15:10:53

C++動態內存

2010-08-18 10:05:28

Hyper-V動態內存

2019-05-30 11:04:52

內存Spark管理

2021-05-21 09:25:11

鴻蒙HarmonyOS應用

2010-03-01 09:09:21

Windows 8動態內存

2010-03-02 08:53:59

Windows 8動態內存

2022-03-18 22:39:57

動態內存malloc

2011-07-28 10:03:53

Hyper-V動態內存

2011-07-20 13:47:14

CC++

2015-11-26 11:02:37

微軟LinuxHyper-V
點贊
收藏

51CTO技術棧公眾號

一区二区精品伦理... | 日韩精品中文字幕久久臀| 九七影院理伦片| 成人91在线观看| 伊人久久大香线蕉精品组织观看| 日韩综合中文字幕| 欧美电影免费观看网站| 亚洲国产精品va在线| 777电影在线观看| 欧美视频在线观看免费| 91久久在线观看| 全球av集中精品导航福利| 久久影视电视剧免费网站清宫辞电视 | 麻豆精品网站| 国产欧美一区二区三区另类精品| 日韩欧美一区免费| 日本在线观看天堂男亚洲| 国内福利写真片视频在线| 亚洲欧美综合在线精品| chinese少妇国语对白| 99久久er热在这里只有精品66| 国产成人艳妇aa视频在线 | 精品视频免费观看| 国产免费成人| 亚洲黄色一区二区三区| 日韩国产精品久久| 中国人体摄影一区二区三区| 精品在线观看免费| 福利视频一区二区三区四区| 精品亚洲自拍| 91产国在线观看动作片喷水| 国产麻豆精品久久| 亚洲一区二区免费在线| 亚洲美女色禁图| 在线观看成人av电影| av不卡在线播放| 最新天堂中文在线| 五月激情综合网| 高清全集视频免费在线| 亚洲欧美日韩精品| 国产毛片久久久| 91香蕉嫩草影院入口| 亚洲综合好骚| 鲁一鲁一鲁一鲁一澡| 亚洲欧洲制服丝袜| 日本视频在线| 在线日韩欧美视频| 欧洲杯足球赛直播| 欧美日韩一区在线视频| 99国产精品一区| 最新天堂资源在线资源| 狠狠色综合播放一区二区| www.亚洲成人网| 亚洲人成网站精品片在线观看| 在线观看中文字幕| 亚洲福利视频专区| y111111国产精品久久久| 亚洲精品免费一区二区三区| 久草在线在线精品观看| 免费看涩涩视频| 欧美丰满少妇xxxxx高潮对白| 国产极品嫩模在线观看91精品| 国产精品第一区| 国产日韩视频| 精品久久久久av| 色天使色偷偷av一区二区| 成人亚洲欧美| 91精品久久久久久久久中文字幕| 理论电影国产精品| 性生大片免费观看性| 亚洲第一区第一页| 亚洲激情77| 在线观看日本一区| 亚洲一区二区在线免费看| 波多野结衣亚洲一二三| 91精品视频播放| 99re这里只有精品视频首页| 69久久久久| 午夜精品久久久久久久久久久久久| 亚洲经典三级| 男人插女人欧美| 亚洲人在线视频| 欧美激情自拍| 中文字幕视频在线免费观看| 精品福利在线导航| 99国内精品久久久久久久| 免费在线a视频| 狠狠色狠狠色综合日日91app| 男人天堂av网站| 亚洲桃花岛网站| 中文字幕人成人乱码| 999精彩视频| 日韩精品视频观看| 国产综合自拍| 999大胆视频| 精品国产自在精品国产浪潮| 99re国产精品| 中文字幕在线一二| 久久久久成人精品| 国产激情视频一区二区三区欧美| 极品美乳网红视频免费在线观看| 亚洲日本在线看| 在线成人av观看| 成人做爰66片免费看网站| 中文字幕亚洲成人| 国外成人福利视频| 四虎一区二区| 欧美性黄网官网| 亚洲成a人片77777在线播放| 国产原创中文在线观看| 日韩精品高清视频| 老司机精品导航| 91xxx在线观看| 亚洲一区二区三区在线视频| 亚洲黄色小视频| 天堂a√中文在线| 欧美性xxxxxx| 日韩母乳在线| 激情五月宗合网| 亚洲视频第一页| 麻豆一区二区三| 麻豆福利在线观看| 久久人人爽爽人人爽人人片av| 欧美午夜精品伦理| 亚洲欧美综合久久久| 一区二区三区不卡在线视频| 国产精品久久久久久久久久尿| 亚洲欧洲国产日本综合| 免费看久久久| 亚洲免费一级视频| 久久久亚洲精选| 国产午夜精品一区二区三区视频 | 成年人视频观看| 一区二区三区视频免费| 狠狠色丁香婷综合久久| 97在线超碰| 久久久国产精华液999999| 精品国产91乱码一区二区三区| 蜜桃精品在线观看| 大菠萝精品导航| ijzzijzzij亚洲大全| 亚洲最大中文字幕| 99久久久久免费精品国产| 国产一区二区av在线| 国产精品视频中文字幕| 日本人成精品视频在线| 午夜精品福利久久久| 欧美日韩一区二区高清| 91麻豆一二三四在线| 精品久久免费观看| 久久天天躁狠狠躁夜夜躁2014| 国产欧美日本一区视频| 欧美精品一二| 午夜看片在线免费| 色呦呦网站入口| 久久伊人精品视频| 国产精品另类一区| 久久一本综合| 日本最黄一级片免费在线| 欧美日韩在线观看一区| 一本色道久久综合狠狠躁篇怎么玩 | 视频一区视频二区国产精品| 久久免费看少妇高潮| 精品久久对白| 三级黄视频在线观看| 久久亚洲一区二区| 国产亚洲视频在线| 亚洲天堂2016| 亚洲国产专区校园欧美| 日韩精品一区二区三区| 爱爱永久免费视频| 国产精品一区二| 一本大道亚洲视频| 亚洲影院理伦片| 久久亚洲风情| 99这里只有精品视频| 一个人免费观看视频www在线播放 一个人免费视频www在线观看 | 久久成人久久鬼色| 国产一区二区三区视频在线| 李宗瑞系列合集久久| 日韩av图片| 九九热在线精品视频| 色8久久人人97超碰香蕉987| 国产黑丝在线一区二区三区| 伊人久久大香线蕉av不卡| av毛片在线看| 亚洲欧美自偷自拍另类| 欧美一区二区三区在线播放 | 亚洲一区二区三区乱码aⅴ蜜桃女| 亚洲激情视频在线| 久久久www成人免费精品张筱雨| 水蜜桃亚洲精品| 成a人片在线观看www视频| av在线不卡顿| 亚洲欧美乱综合| 日本精品久久久| 亚洲男男gay视频| 国产一区二区三区电影在线观看| 久久精品国产一区二区三区免费看| 亚洲成人午夜电影|