国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

Spark Shuffle過(guò)程分析:Map階段處理流程

大數(shù)據(jù) Spark
默認(rèn)配置情況下,Spark在Shuffle過(guò)程中會(huì)使用SortShuffleManager來(lái)管理Shuffle過(guò)程中需要的基本組件,以及對(duì)RDD各個(gè)Partition數(shù)據(jù)的計(jì)算。我們可以在Driver和Executor對(duì)應(yīng)的SparkEnv對(duì)象創(chuàng)建過(guò)程中看到對(duì)應(yīng)的配置。

默認(rèn)配置情況下,Spark在Shuffle過(guò)程中會(huì)使用SortShuffleManager來(lái)管理Shuffle過(guò)程中需要的基本組件,以及對(duì)RDD各個(gè)Partition數(shù)據(jù)的計(jì)算。我們可以在Driver和Executor對(duì)應(yīng)的SparkEnv對(duì)象創(chuàng)建過(guò)程中看到對(duì)應(yīng)的配置,如下代碼所示:

 

  1. // Let the user specify short names for shuffle managers 
  2.     val shortShuffleMgrNames = Map( 
  3.       "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, 
  4.       "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) 
  5.     val shuffleMgrName = conf.get("spark.shuffle.manager""sort"
  6.     val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) 
  7.     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 

如果需要修改ShuffleManager實(shí)現(xiàn),則只需要修改配置項(xiàng)spark.shuffle.manager即可,默認(rèn)支持sort和 tungsten-sort,可以指定自己實(shí)現(xiàn)的ShuffleManager類。

因?yàn)镾huffle過(guò)程中需要將Map結(jié)果數(shù)據(jù)輸出到文件,所以需要通過(guò)注冊(cè)一個(gè)ShuffleHandle來(lái)獲取到一個(gè)ShuffleWriter對(duì)象,通過(guò)它來(lái)控制Map階段記錄數(shù)據(jù)輸出的行為。其中,ShuffleHandle包含了如下基本信息:

  • shuffleId:標(biāo)識(shí)Shuffle過(guò)程的唯一ID
  • numMaps:RDD對(duì)應(yīng)的Partitioner指定的Partition的個(gè)數(shù),也就是ShuffleMapTask輸出的Partition個(gè)數(shù)
  • dependency:RDD對(duì)應(yīng)的依賴ShuffleDependency

下面我們看下,在SortShuffleManager中是如何注冊(cè)Shuffle的,代碼如下所示:

 

  1. override def registerShuffle[K, V, C]( 
  2.       shuffleId: Int
  3.       numMaps: Int
  4.       dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { 
  5.     if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { 
  6.       new BypassMergeSortShuffleHandle[K, V]( 
  7.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  8.     } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { 
  9.       new SerializedShuffleHandle[K, V]( 
  10.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  11.     } else { 
  12.       new BaseShuffleHandle(shuffleId, numMaps, dependency) 
  13.     } 
  14.   } 

上面代碼中,對(duì)應(yīng)如下3種ShuffleHandle可以選擇,說(shuō)明如下:

  • BypassMergeSortShuffleHandle

如果dependency不需要進(jìn)行Map Side Combine,并且RDD對(duì)應(yīng)的ShuffleDependency中的Partitioner設(shè)置的Partition的數(shù)量(這個(gè)不要和parent RDD的Partition個(gè)數(shù)混淆,Partitioner指定了map處理結(jié)果的Partition個(gè)數(shù),每個(gè)Partition數(shù)據(jù)會(huì)在Shuffle過(guò)程中全部被拉取而拷貝到下游的某個(gè)Executor端)小于等于配置參數(shù)spark.shuffle.sort.bypassMergeThreshold的值,則會(huì)注冊(cè)BypassMergeSortShuffleHandle。默認(rèn)情況下,spark.shuffle.sort.bypassMergeThreshold的取值是200,這種情況下會(huì)直接將對(duì)RDD的 map處理結(jié)果的各個(gè)Partition數(shù)據(jù)寫(xiě)入文件,并***做一個(gè)合并處理。

  • SerializedShuffleHandle

如果ShuffleDependency中的Serializer,允許對(duì)將要輸出數(shù)據(jù)對(duì)象進(jìn)行排序后,再執(zhí)行序列化寫(xiě)入到文件,則會(huì)選擇創(chuàng)建一個(gè)SerializedShuffleHandle。

  • BaseShuffleHandle

除了上面兩種ShuffleHandle以后,其他情況都會(huì)創(chuàng)建一個(gè)BaseShuffleHandle對(duì)象,它會(huì)以反序列化的格式處理Shuffle輸出數(shù)據(jù)。

Map階段處理流程分析

Map階段RDD的計(jì)算,對(duì)應(yīng)ShuffleMapTask這個(gè)實(shí)現(xiàn)類,它最終會(huì)在每個(gè)Executor上啟動(dòng)運(yùn)行,每個(gè)ShuffleMapTask處理RDD的一個(gè)Partition的數(shù)據(jù)。這個(gè)過(guò)程的核心處理邏輯,代碼如下所示:

 

  1. val manager = SparkEnv.get.shuffleManager 
  2.       writer = manager.getWriter[AnyAny](dep.shuffleHandle, partitionId, context) 
  3.       writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[AnyAny]]]) 

上面代碼中,在調(diào)用rdd的iterator()方法時(shí),會(huì)根據(jù)RDD實(shí)現(xiàn)類的compute方法指定的處理邏輯對(duì)數(shù)據(jù)進(jìn)行處理,當(dāng)然,如果該P(yáng)artition對(duì)應(yīng)的數(shù)據(jù)已經(jīng)處理過(guò)并存儲(chǔ)在MemoryStore或DiskStore,直接通過(guò)BlockManager獲取到對(duì)應(yīng)的Block數(shù)據(jù),而無(wú)需每次需要時(shí)重新計(jì)算。然后,write()方法會(huì)將已經(jīng)處理過(guò)的Partition數(shù)據(jù)輸出到磁盤(pán)文件。

在Spark Shuffle過(guò)程中,每個(gè)ShuffleMapTask會(huì)通過(guò)配置的ShuffleManager實(shí)現(xiàn)類對(duì)應(yīng)的ShuffleManager對(duì)象(實(shí)際上是在SparkEnv中創(chuàng)建),根據(jù)已經(jīng)注冊(cè)的ShuffleHandle,獲取到對(duì)應(yīng)的ShuffleWriter對(duì)象,然后通過(guò)ShuffleWriter對(duì)象將Partition數(shù)據(jù)寫(xiě)入內(nèi)存或文件。所以,接下來(lái)我們可能關(guān)心每一種ShuffleHandle對(duì)應(yīng)的ShuffleWriter的行為,可以看到SortShuffleManager中獲取到ShuffleWriter的實(shí)現(xiàn)代碼,如下所示:

 

  1. /** Get a writer for a given partition. Called on executors by map tasks. */ 
  2.   override def getWriter[K, V]( 
  3.       handle: ShuffleHandle, 
  4.       mapId: Int
  5.       context: TaskContext): ShuffleWriter[K, V] = { 
  6.     numMapsForShuffle.putIfAbsent( 
  7.       handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) 
  8.     val env = SparkEnv.get 
  9.     handle match { 
  10.       case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => 
  11.         new UnsafeShuffleWriter( 
  12.           env.blockManager, 
  13.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  14.           context.taskMemoryManager(), 
  15.           unsafeShuffleHandle, 
  16.           mapId, 
  17.           context, 
  18.           env.conf) 
  19.       case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => 
  20.         new BypassMergeSortShuffleWriter( 
  21.           env.blockManager, 
  22.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  23.           bypassMergeSortHandle, 
  24.           mapId, 
  25.           context, 
  26.           env.conf) 
  27.       case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => 
  28.         new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) 
  29.     } 
  30.   } 

我們以最簡(jiǎn)單的SortShuffleWriter為例進(jìn)行分析,在SortShuffleManager可以通過(guò)getWriter()方法創(chuàng)建一個(gè)SortShuffleWriter對(duì)象,然后在ShuffleMapTask中調(diào)用SortShuffleWriter對(duì)象的write()方法處理Map輸出的記錄數(shù)據(jù),write()方法的處理代碼,如下所示:

 

  1. /** Write a bunch of records to this task's output */ 
  2.   override def write(records: Iterator[Product2[K, V]]): Unit = { 
  3.     sorter = if (dep.mapSideCombine) { 
  4.       require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!"
  5.       new ExternalSorter[K, V, C]( 
  6.         context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) 
  7.     } else { 
  8.       // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  9.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  10.       // if the operation being run is sortByKey. 
  11.       new ExternalSorter[K, V, V]( 
  12.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 
  13.     } 
  14.     sorter.insertAll(records) 
  15.  
  16.     // Don't bother including the time to open the merged output file in the shuffle write time
  17.     // because it just opens a single file, so is typically too fast to measure accurately 
  18.     // (see SPARK-3570). 
  19.     val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) 
  20.     val tmp = Utils.tempFileWith(output
  21.     val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) 
  22.     val partitionLengths = sorter.writePartitionedFile(blockId, tmp) 
  23.     shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) 
  24.     mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) 
  25.   } 

從SortShuffleWriter類中的write()方法可以看到,最終調(diào)用了ExeternalSorter的insertAll()方法,實(shí)現(xiàn)了Map端RDD某個(gè)Partition數(shù)據(jù)處理并輸出到內(nèi)存或磁盤(pán)文件,這也是處理Map階段輸出記錄數(shù)據(jù)最核心、最復(fù)雜的過(guò)程。我們將其分為兩個(gè)階段進(jìn)行分析:***階段是,ExeternalSorter的insertAll()方法處理過(guò)程,將記錄數(shù)據(jù)Spill到磁盤(pán)文件;第二階段是,執(zhí)行完insertAll()方法之后的處理邏輯,創(chuàng)建Shuffle Block數(shù)據(jù)文件及其索引文件。

內(nèi)存緩沖寫(xiě)記錄數(shù)據(jù)并Spill到磁盤(pán)文件

查看SortShuffleWriter類的write()方法可以看到,在內(nèi)存中緩存記錄數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)有兩種:一種是Buffer,對(duì)應(yīng)的實(shí)現(xiàn)類PartitionedPairBuffer,設(shè)置mapSideCombine=false時(shí)會(huì)使用該結(jié)構(gòu);另一種是Map,對(duì)應(yīng)的實(shí)現(xiàn)類是PartitionedAppendOnlyMap,設(shè)置mapSideCombine=false時(shí)會(huì)使用該結(jié)構(gòu)。根據(jù)是否指定mapSideCombine選項(xiàng),分別對(duì)應(yīng)不同的處理流程,我們分別說(shuō)明如下:

設(shè)置mapSideCombine=false時(shí)

這種情況在Map階段不進(jìn)行Combine操作,在內(nèi)存中緩存記錄數(shù)據(jù)會(huì)使用PartitionedPairBuffer這種數(shù)據(jù)結(jié)構(gòu)來(lái)緩存、排序記錄數(shù)據(jù),它是一個(gè)Append-only Buffer,僅支持向Buffer中追加數(shù)據(jù)鍵值對(duì)記錄,PartitionedPairBuffer的結(jié)構(gòu)如下圖所示:

Spark Shuffle過(guò)程分析:Map階段處理流程

默認(rèn)情況下,PartitionedPairBuffer初始分配的存儲(chǔ)容量為capacity = initialCapacity = 64,實(shí)際上這個(gè)容量是針對(duì)key的容量,因?yàn)橐鎯?chǔ)的是鍵值對(duì)記錄數(shù)據(jù),所以實(shí)際存儲(chǔ)鍵值對(duì)的容量為2*initialCapacity = 128。PartitionedPairBuffer是一個(gè)能夠動(dòng)態(tài)擴(kuò)充容量的Buffer,內(nèi)部使用一個(gè)一維數(shù)組來(lái)存儲(chǔ)鍵值對(duì),每次擴(kuò)容結(jié)果為當(dāng)前Buffer容量的2倍,即2*capacity,***支持存儲(chǔ)2^31-1個(gè)鍵值對(duì)記錄(1073741823個(gè))。

通過(guò)上圖可以看到,PartitionedPairBuffer存儲(chǔ)的鍵值對(duì)記錄數(shù)據(jù),鍵是(partition, key)這樣一個(gè)Tuple,值是對(duì)應(yīng)的數(shù)據(jù)value,而且curSize是用來(lái)跟蹤寫(xiě)入Buffer中的記錄的,key在Buffer中的索引位置為2*curSize,value的索引位置為2*curSize+1,可見(jiàn)一個(gè)鍵值對(duì)的key和value的存儲(chǔ)在PartitionedPairBuffer內(nèi)部的數(shù)組中是相鄰的。

使用PartitionedPairBuffer緩存鍵值對(duì)記錄數(shù)據(jù),通過(guò)跟蹤實(shí)際寫(xiě)入到Buffer內(nèi)的記錄數(shù)據(jù)的字節(jié)數(shù)來(lái)判斷,是否需要將Buffer中的數(shù)據(jù)Spill到磁盤(pán)文件,如下代碼所示:

 

  1. protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { 
  2.     var shouldSpill = false 
  3.     if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { 
  4.       // Claim up to double our current memory from the shuffle memory pool 
  5.       val amountToRequest = 2 * currentMemory - myMemoryThreshold 
  6.       val granted = acquireMemory(amountToRequest) 
  7.       myMemoryThreshold += granted 
  8.       // If we were granted too little memory to grow further (either tryToAcquire returned 0, 
  9.       // or we already had more memory than myMemoryThreshold), spill the current collection 
  10.       shouldSpill = currentMemory >= myMemoryThreshold 
  11.     } 
  12.     shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold 
  13.     // Actually spill 
  14.     if (shouldSpill) { 
  15.       _spillCount += 1 
  16.       logSpillage(currentMemory) 
  17.       spill(collection) 
  18.       _elementsRead = 0 
  19.       _memoryBytesSpilled += currentMemory 
  20.       releaseMemory() 
  21.     } 
  22.     shouldSpill 
  23.   } 

上面elementsRead表示存儲(chǔ)到PartitionedPairBuffer中的記錄數(shù),currentMemory是對(duì)Buffer中的總記錄數(shù)據(jù)大小(字節(jié)數(shù))的估算,myMemoryThreshold通過(guò)配置項(xiàng)spark.shuffle.spill.initialMemoryThreshold來(lái)進(jìn)行設(shè)置的,默認(rèn)值為5 * 1024 * 1024 = 5M。當(dāng)滿足條件elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold時(shí),會(huì)先嘗試向MemoryManager申請(qǐng)2 * currentMemory – myMemoryThreshold大小的內(nèi)存,如果能夠申請(qǐng)到,則不進(jìn)行Spill操作,而是繼續(xù)向Buffer中存儲(chǔ)數(shù)據(jù),否則就會(huì)調(diào)用spill()方法將Buffer中數(shù)據(jù)輸出到磁盤(pán)文件。

向PartitionedPairBuffer中寫(xiě)入記錄數(shù)據(jù),以及滿足條件Spill記錄數(shù)據(jù)到磁盤(pán)文件,具體處理流程,如下圖所示:

Spark Shuffle過(guò)程分析:Map階段處理流程

為了查看按照怎樣的規(guī)則進(jìn)行排序,我們看一下,當(dāng)不進(jìn)行Map Side Combine時(shí),創(chuàng)建ExternalSorter對(duì)象的代碼如下所示:

 

  1. // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  2.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  3.       // if the operation being run is sortByKey. 
  4.       new ExternalSorter[K, V, V]( 
  5.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 

上面aggregator = None,ordering = None,在對(duì)PartitionedPairBuffer中的記錄數(shù)據(jù)Spill到磁盤(pán)之前,要使用默認(rèn)的排序規(guī)則進(jìn)行排序,排序的規(guī)則是只對(duì)PartitionedPairBuffer中的記錄按Partition ID進(jìn)行升序排序,可以查看WritablePartitionedPairCollection伴生對(duì)象類的代碼(其中PartitionedPairBuffer類實(shí)現(xiàn)了特質(zhì)WritablePartitionedPairCollection),如下所示:

 

  1. /** 
  2.    * A comparator for (Int, K) pairs that orders them by only their partition ID. 
  3.    */ 
  4.   def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] { 
  5.     override def compare(a: (Int, K), b: (Int, K)): Int = { 
  6.       a._1 - b._1 
  7.     } 
  8.   } 

上面圖中,引用了SortShuffleWriter.writeBlockFiles這個(gè)子序列圖,用來(lái)生成Block數(shù)據(jù)文件和索引文件,后面我們會(huì)單獨(dú)說(shuō)明。通過(guò)對(duì)RDD進(jìn)行計(jì)算生成一個(gè)記錄迭代器對(duì)象,通過(guò)該迭代器迭代出的記錄會(huì)存儲(chǔ)到PartitionedPairBuffer中,當(dāng)滿足Spill條件時(shí),先對(duì)PartitionedPairBuffer中記錄進(jìn)行排序,***Spill到磁盤(pán)文件,這個(gè)過(guò)程中PartitionedPairBuffer中的記錄數(shù)據(jù)的變化情況,如下圖所示:

Spark Shuffle過(guò)程分析:Map階段處理流程

上圖中,對(duì)內(nèi)存中PartitionedPairBuffer中的記錄按照Partition ID進(jìn)行排序,并且屬于同一個(gè)Partition的數(shù)據(jù)記錄在PartitionedPairBuffer內(nèi)部的data數(shù)組中是連續(xù)的。排序結(jié)束后,在Spill到磁盤(pán)文件時(shí),將對(duì)應(yīng)的Partition ID去掉了,只在文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中連續(xù)存儲(chǔ)鍵值對(duì)數(shù)據(jù),但同時(shí)在另一個(gè)內(nèi)存數(shù)組結(jié)構(gòu)中會(huì)保存文件中每個(gè)Partition擁有的記錄數(shù),這樣就能根據(jù)Partition的記錄數(shù)來(lái)順序讀取文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中屬于同一個(gè)Partition的全部記錄數(shù)據(jù)。

ExternalSorter類內(nèi)部維護(hù)了一個(gè)SpillFile的ArrayBuffer數(shù)組,最終可能會(huì)生成多個(gè)SpillFile,SpillFile的定義如下所示:

 

  1. private[this] case class SpilledFile( 
  2.     file: File, 
  3.     blockId: BlockId, 
  4.     serializerBatchSizes: Array[Long], 
  5.     elementsPerPartition: Array[Long]) 

每個(gè)SpillFile包含一個(gè)blockId,標(biāo)識(shí)Map輸出的該臨時(shí)文件;serializerBatchSizes表示每次批量寫(xiě)入到文件的Object的數(shù)量,默認(rèn)為10000,由配置項(xiàng)spark.shuffle.spill.batchSize來(lái)控制;elementsPerPartition表示每個(gè)Partition中的Object的數(shù)量。調(diào)用ExternalSorter的insertAll()方法,最終可能有如下3種情況:

  • Map階段輸出記錄數(shù)較少,沒(méi)有生成SpillFile,那么所有數(shù)據(jù)都在Buffer中,直接對(duì)Buffer中記錄排序并輸出到文件
  • Map階段輸出記錄數(shù)較多,生成多個(gè)SpillFile,同時(shí)Buffer中也有部分記錄數(shù)據(jù)
  • Map階段輸出記錄數(shù)較多,只生成多個(gè)SpillFile
  • 有關(guān)后續(xù)如何對(duì)上面3種情況進(jìn)行處理,可以想見(jiàn)后面對(duì)子序列圖SortShuffleWriter.writeBlockFiles的說(shuō)明。
  • 設(shè)置mapSideCombine=true時(shí)

這種情況在Map階段會(huì)執(zhí)行Combine操作,在Map階段進(jìn)行Combine操作能夠降低Map階段數(shù)據(jù)記錄的總數(shù),從而降低Shuffle過(guò)程中數(shù)據(jù)的跨網(wǎng)絡(luò)拷貝傳輸。這時(shí),RDD對(duì)應(yīng)的ShuffleDependency需要設(shè)置一個(gè)Aggregator用來(lái)執(zhí)行Combine操作,可以看下Aggregator類聲明,代碼如下所示:

 

  1. /** 
  2.  * :: DeveloperApi :: 
  3.  * A set of functions used to aggregate data. 
  4.  * 
  5.  * @param createCombiner function to create the initial value of the aggregation. 
  6.  * @param mergeValue function to merge a new value into the aggregation result. 
  7.  * @param mergeCombiners function to merge outputs from multiple mergeValue function
  8.  */ 
  9. @DeveloperApi 
  10. case class Aggregator[K, V, C] ( 
  11.     createCombiner: V => C, 
  12.     mergeValue: (C, V) => C, 
  13.     mergeCombiners: (C, C) => C) { 
  14.   ... ... 

由于在Map階段只用到了構(gòu)造Aggregator的幾個(gè)函數(shù)參數(shù)createCombiner、mergeValue、mergeCombiners,我們對(duì)這幾個(gè)函數(shù)詳細(xì)說(shuō)明如下:

  • createCombiner:進(jìn)行Aggregation開(kāi)始時(shí),需要設(shè)置初始值。因?yàn)樵贏ggregation過(guò)程中使用了類似Map的內(nèi)存數(shù)據(jù)結(jié)構(gòu)來(lái)管理鍵值對(duì),每次加入前會(huì)先查看Map內(nèi)存結(jié)構(gòu)中是否存在Key對(duì)應(yīng)的Value,***次肯定不存在,所以***將某個(gè)Key的Value加入到Map內(nèi)存結(jié)構(gòu)中時(shí),Key在Map內(nèi)存結(jié)構(gòu)中***次有了Value。
  • mergeValue:某個(gè)Key已經(jīng)在Map結(jié)構(gòu)中存在Value,后續(xù)某次又遇到相同的Key和一個(gè)新的Value,這時(shí)需要通過(guò)該函數(shù),將舊Value和新Value進(jìn)行合并,根據(jù)Key檢索能夠得到合并后的新Value。
  • mergeCombiners:一個(gè)Map內(nèi)存結(jié)構(gòu)中Key和Value是由mergeValue生成的,那么在向Map中插入數(shù)據(jù),肯定會(huì)遇到Map使用容量達(dá)到上限,這時(shí)需要將記錄數(shù)據(jù)Spill到磁盤(pán)文件,那么多個(gè)Spill輸出的磁盤(pán)文件中可能存在同一個(gè)Key,這時(shí)需要對(duì)多個(gè)Spill輸出的磁盤(pán)文件中的Key的多個(gè)Value進(jìn)行合并,這時(shí)需要使用mergeCombiners函數(shù)進(jìn)行處理。

該類中定義了combineValuesByKey、combineValuesByKey、combineCombinersByKey,由于這些函數(shù)是在Reduce階段使用的,所以在這里先不說(shuō)明,后續(xù)文章我們會(huì)單獨(dú)詳細(xì)來(lái)分析。

我們通過(guò)下面的序列圖來(lái)描述,需要進(jìn)行Map Side Combine時(shí)的處理流程,如下所示:

Spark Shuffle過(guò)程分析:Map階段處理流程

對(duì)照上圖,我們看一下,當(dāng)需要進(jìn)行Map Side Combine時(shí),對(duì)應(yīng)的ExternalSorter類insertAll()方法中的處理邏輯,代碼如下所示:

 

  1. val shouldCombine = aggregator.isDefined 
  2.  
  3.     if (shouldCombine) { 
  4.       // Combine values in-memory first using our AppendOnlyMap 
  5.       val mergeValue = aggregator.get.mergeValue 
  6.       val createCombiner = aggregator.get.createCombiner 
  7.       var kv: Product2[K, V] = null 
  8.       val update = (hadValue: Boolean, oldValue: C) => { 
  9.         if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) 
  10.       } 
  11.       while (records.hasNext) { 
  12.         addElementsRead() 
  13.         kv = records.next() 
  14.         map.changeValue((getPartition(kv._1), kv._1), update
  15.         maybeSpillCollection(usingMap = true
  16.       } 
  17.     } 

上面代碼中,map是內(nèi)存數(shù)據(jù)結(jié)構(gòu),最重要的是update函數(shù)和map的changeValue方法(這里的map對(duì)應(yīng)的實(shí)現(xiàn)類是PartitionedAppendOnlyMap)。update函數(shù)所做的工作,其實(shí)就是對(duì)createCombiner和mergeValue這兩個(gè)函數(shù)的使用,***次遇到一個(gè)Key調(diào)用createCombiner函數(shù)處理,非***遇到同一個(gè)Key對(duì)應(yīng)新的Value調(diào)用mergeValue函數(shù)進(jìn)行合并處理。map的changeValue方法主要是將Key和Value在map中存儲(chǔ)或者進(jìn)行修改(對(duì)出現(xiàn)的同一個(gè)Key的多個(gè)Value進(jìn)行合并,并將合并后的新Value替換舊Value)。

PartitionedAppendOnlyMap是一個(gè)經(jīng)過(guò)優(yōu)化的哈希表,它支持向map中追加數(shù)據(jù),以及修改Key對(duì)應(yīng)的Value,但是不支持刪除某個(gè)Key及其對(duì)應(yīng)的Value。它能夠支持的存儲(chǔ)容量是0.7 * 2 ^ 29 = 375809638。當(dāng)達(dá)到指定存儲(chǔ)容量或者指定限制,就會(huì)將map中記錄數(shù)據(jù)Spill到磁盤(pán)文件,這個(gè)過(guò)程和前面的類似,不再累述。

創(chuàng)建Shuffle Block數(shù)據(jù)文件及其索引文件

無(wú)論是使用PartitionedPairBuffer,還是使用PartitionedAppendOnlyMap,當(dāng)需要容量滿足Spill條件時(shí),都會(huì)將該內(nèi)存結(jié)構(gòu)(buffer/map)中記錄數(shù)據(jù)Spill到磁盤(pán)文件,所以Spill到磁盤(pán)文件的格式是相同的。對(duì)于后續(xù)Block數(shù)據(jù)文件和索引文件的生成邏輯也是相同,如下圖所示:

Spark Shuffle過(guò)程分析:Map階段處理流程

假設(shè),我們生成的Shuffle Block文件對(duì)應(yīng)各個(gè)參數(shù)為:shuffleId=2901,mapId=11825,reduceId=0,這里reduceId是一個(gè)NOOP_REDUCE_ID,表示與DiskStore進(jìn)行磁盤(pán)I/O交互操作,而DiskStore期望對(duì)應(yīng)一個(gè)(map, reduce)對(duì),但是對(duì)于排序的Shuffle輸出,通常Reducer拉取數(shù)據(jù)后只生成一個(gè)文件(Reduce文件),所以這里默認(rèn)reduceId為0。經(jīng)過(guò)上圖的處理流程,可以生成一個(gè).data文件,也就是Block數(shù)據(jù)文件;一個(gè).index文件,也就是包含了各個(gè)Partition在數(shù)據(jù)文件中的偏移位置的索引文件。這個(gè)過(guò)程生成的文件,示例如下所示:

 

  1. shuffle_2901_11825_0.data  
  2. shuffle_2901_11825_0.index 

這樣,對(duì)于每個(gè)RDD的多個(gè)Partition進(jìn)行處理后,都會(huì)生成對(duì)應(yīng)的數(shù)據(jù)文件和索引文件,后續(xù)在Reduce端就可以讀取這些Block文件,這些記錄數(shù)據(jù)在文件中都是經(jīng)過(guò)分區(qū)(Partitioned)的。

責(zé)任編輯:未麗燕 來(lái)源: 36大數(shù)據(jù)
相關(guān)推薦

2017-03-27 10:48:03

Hive map優(yōu)化分析

2021-08-11 06:57:16

ShuffleSpark核心

2019-04-22 15:24:24

HadoopSuffleMap端

2023-02-08 13:08:31

2021-10-20 10:04:47

鴻蒙HarmonyOS應(yīng)用

2025-09-15 06:25:00

2025-06-13 08:40:00

ShuffleSpark大數(shù)據(jù)

2022-03-15 08:25:32

SparkShuffle框架

2019-06-06 15:22:07

SparkShuffle內(nèi)存

2019-07-26 15:01:42

SparkShuffle內(nèi)存

2012-08-30 09:48:02

Struts2Java

2025-12-11 08:49:14

2023-11-20 07:27:00

云原生Spark

2009-07-03 13:41:44

WinCE編譯過(guò)程

2024-07-15 09:58:03

OpenRestyNginx日志

2016-12-14 19:20:07

Spark SQL架構(gòu)分布式

2011-04-13 14:57:11

ASP.NET請(qǐng)求處理

2009-07-28 11:32:41

光纖鏈路故障

2017-04-24 09:20:05

Spark分析分區(qū)器

2010-06-13 14:36:20

RARP協(xié)議
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

久久综合九色九九| 亚洲精品福利视频网站| 久久久女女女女999久久| aa国产成人| 在线看国产一区| 性色av一区二区| 中文字幕在线一区二区三区| 亚洲免费视频播放| 久久精品在线| 欧美色图亚洲自拍| 国产精品视频精品| 成人久久精品| 亚洲人成电影网站色www| 哥也色在线视频| 色呦呦网站一区| 在线视频中文字幕久| 一区二区三区在线视频免费观看| 不卡av免费在线| 久久久久国产精品人| 国产一级大片免费看| 久久国产精品露脸对白| 伊人亚洲福利一区二区三区| 久久电影网站| 国产专区综合网| 日本不卡二区| 美女www一区二区| 先锋在线资源一区二区三区| 国产精品久久久久久久免费软件 | 欧美美女一区二区在线观看| 国产高清在线看| 欧美日韩激情在线| 国产麻豆电影在线观看| 日韩经典一区二区| 一本色道久久99精品综合| 日韩av大片站长工具| 日韩久久精品电影| 国产成人精品亚洲日本在线观看| 伊人久久久久久久久久久| 日韩国产一二三区| 78色国产精品| 我不卡伦不卡影院| 欧美日韩精品免费看| 国产在线一区二区| 美女网站免费观看视频 | 嫩草影院中文字幕| 亚洲国产成人私人影院tom| 先锋成人影音| 色999日韩国产欧美一区二区| 日韩av中文| 国产一区二区三区日韩欧美| 99视频这里有精品| 91精品1区2区| 午夜免费视频在线国产| 欧美一区二区精品在线| 免费在线成人激情电影| 88xx成人精品| 亚洲午夜在线| 久久久久久久中文| 亚洲一区二区三区国产| 欧洲日本在线| 色综久久综合桃花网| 精品视频免费在线观看| 婷婷亚洲婷婷综合色香五月| 国产99久久久国产精品潘金网站| 国产无遮挡又黄又爽免费软件 | 中文字幕在线综合| 欧美日韩精品一区二区| 六九午夜精品视频| 97人人澡人人爽| 成年人国产精品| 亚洲精品视频在线免费| 亚洲人成网站777色婷婷| 国产亚洲欧美日韩在线观看一区二区 | 成人国产亚洲欧美成人综合网| 91国内精品在线视频| 在线不卡a资源高清| www.色就是色| 日本韩国一区二区三区视频| 最新欧美色图| 92国产精品视频| 丁香一区二区三区| 国产午夜在线观看| 国内外成人免费激情在线视频网站 | 国产wwwxx| 精品久久人人做人人爽| 香蕉国产成人午夜av影院| 欧美不卡在线一区二区三区| 欧美激情在线看| 欧美色图天堂| 国产免费一区二区三区在线能观看| 国产99久久久久| 黄色网址视频在线观看| 欧美日韩在线视频一区| 免费成人动漫| 国产高清精品一区| 国产精品久久三| 韩漫成人漫画| 久久国产精品 国产精品| 亚洲另类色综合网站| 免费视频观看成人| 亚洲午夜精品福利| 一本到一区二区三区| 欧美久久香蕉| 国产一区亚洲二区三区| 亚洲黄色在线看| 亚洲激情二区| 神马久久高清| 国产成人一区二区三区电影| 国产成人综合精品三级| 手机在线免费av| 国产乱码一区| 欧美日韩国产丝袜美女| 日韩av三区| 五月综合网站| 欧美激情一区二区三区高清视频| 国产精品白丝av| 在线中文字幕视频观看| 成人国产一区二区| 欧美日韩一区二区免费在线观看 | 四虎精品一区二区永久在线观看| 中文字幕少妇一区二区三区| 天堂资源在线中文精品| 色视频在线观看福利| 欧美与欧洲交xxxx免费观看| 99国产精品国产精品毛片| 亚洲v.com| 日韩亚洲欧美一区二区| 亚洲精品动漫久久久久| 日韩av高清在线观看| av超碰免费在线| 日韩高清av| 懂色av一区二区三区蜜臀| 678在线观看视频| 日韩一本精品| 亚洲国产精品99久久| 国内国产精品久久| 欧美电影免费看| 日韩精品xxxx| 97视频在线观看免费高清完整版在线观看 | 成人黄色国产精品网站大全在线免费观看| gogo久久| 亚洲成人av动漫| 日韩精品免费视频| 顶级嫩模精品视频在线看| 国产欧美在线观看免费| 3d动漫一区二区三区| 久久深夜福利免费观看| 久久久国际精品| 精品产国自在拍| 日韩大胆人体| 欧美二区三区在线| 亚洲亚裔videos黑人hd| 26uuu亚洲综合色欧美| 国产精品玖玖玖在线资源| 免费播放av| 国产精品一国产精品最新章节| 日韩一区二区电影网| 国产美女av一区二区三区| 羞羞视频在线观看一区二区| 成人免费xx| 国产精品三区在线| 日韩风俗一区 二区| 久久新电视剧免费观看| 久久91麻豆精品一区| 成年人视频在线观看免费| 色一情一乱一伦一区二区三欧美| 中文字幕日韩欧美在线| 一区二区三区中文在线观看| 国产欧美另类| 精品国产18久久久久久二百| 1024在线视频| 免费久久99精品国产自| 中文字幕欧美精品日韩中文字幕| 国产精品视频在线看| 欧美午夜久久| 成人一级视频| 午夜成在线www| 香蕉精品视频在线| 青青草一区二区| 日韩女优制服丝袜电影| 欧美激情一区二区三区全黄| 中文字幕免费精品| 亚洲第一影院| 福利h视频在线| 精品无码av无码免费专区| 欧美孕妇与黑人孕交| 欧美另类高清zo欧美| 成人av手机在线观看| 91嫩草亚洲精品| 电影一区二区三| 日韩黄色影片| 亚洲熟妇av一区二区三区| 国产精品区一区| 欧美美女18p| 日韩欧美不卡一区| 伊人一区二区三区| 成人亚洲一区二区一| 亚洲视频日本| 天海翼亚洲一区二区三区| 成人在线黄色电影|