国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

Apache Spark Delta Lake寫數據使用及實現原理代碼解析

大數據 Spark
Delta Lake 寫數據是其最基本的功能,而且其使用和現有的 Spark 寫 Parquet 文件基本一致,在介紹 Delta Lake 實現原理之前先來看看如何使用它,具體使用如下。

[[278252]]

Delta Lake 寫數據是其最基本的功能,而且其使用和現有的 Spark 寫 Parquet 文件基本一致,在介紹 Delta Lake 實現原理之前先來看看如何使用它,具體使用如下: 

  1. df.write.format("delta").save("/data/yangping.wyp/delta/test/"
  2.   
  3. //數據按照 dt 分區 
  4. df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/"
  5.   
  6. // 覆蓋之前的數據 
  7. df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/"

大家可以看出,使用寫 Delta 數據是非常簡單的,這也是 Delte Lake 介紹的 100% 兼容 Spark。

Delta Lake 寫數據原理

前面簡單了解了如何使用 Delta Lake 來寫數據,本小結我們將深入介紹 Delta Lake 是如何保證寫數據的基本原理以及如何保證事務性。

得益于 Apache Spark 強大的數據源 API,我們可以很方便的給 Spark 添加任何數據源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 實現的一種新的數據源,我們調用 df.write.format("delta") 其實底層調用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 類。為了簡單起見,本文介紹的是 Delta Lake 批量寫的實現,實時流寫 Delta Lake 本文不涉及,后面有機會再介紹。 Delta Lake 批量寫擴展了 org.apache.spark.sql.sources.CreatableRelationProvider 特質,并實現了其中的方法。我們調用上面的寫數據方法首先會調用 DeltaDataSource 類的 createRelation 方法,它的具體實現如下: 

  1. override def createRelation( 
  2.     sqlContext: SQLContext, 
  3.     mode: SaveMode, 
  4.     parameters: Map[String, String], 
  5.     data: DataFrame): BaseRelation = { 
  6.   
  7.   // 寫數據的路徑 
  8.   val path = parameters.getOrElse("path", { 
  9.     throw DeltaErrors.pathNotSpecifiedException 
  10.   }) 
  11.   
  12.   // 分區字段 
  13.   val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY) 
  14.     .map(DeltaDataSource.decodePartitioningColumns) 
  15.     .getOrElse(Nil) 
  16.   
  17.   
  18.   // 事務日志對象 
  19.   val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path) 
  20.   
  21.   // 真正的寫操作過程 
  22.   WriteIntoDelta( 
  23.     deltaLog = deltaLog, 
  24.     mode = mode, 
  25.     new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf), 
  26.     partitionColumns = partitionColumns, 
  27.     configuration = Map.empty, 
  28.     data = data).run(sqlContext.sparkSession) 
  29.   
  30.   deltaLog.createRelation() 

其中 mode 就是保持數據的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 這個傳遞的參數,比如分區字段、數據保存路徑以及 Delta 支持的一些參數(replaceWhere、mergeSchema、overwriteSchema 等,具體參見 org.apache.spark.sql.delta.DeltaOptions);data 就是我們需要保存的數據。

createRelation 方法緊接著就是獲取數據保存的路徑,分區字段等信息。然后初始化 deltaLog,deltaLog 的初始化會做很多事情,比如會讀取磁盤所有的事務日志(_delta_log 目錄下),并構建最新事務日志的最新快照,里面可以拿到最新數據的版本。由于 deltaLog 的初始化成本比較高,所以 deltaLog 初始化完之后會緩存到 deltaLogCache 中,這是一個使用 Guava 的 CacheBuilder 類實現的一個緩存,緩存的數據保持一小時,緩存大小可以通過 delta.log.cacheSize 參數進行設置。只要寫數據的路徑是一樣的,就只需要初始化一次 deltaLog,后面直接從緩存中拿即可。除非之前緩存的 deltaLog 被清理了,或者無效才會再次初始化。DeltaLog 類是 Delta Lake 中最重要的類之一,涉及的內容非常多,所以我們會單獨使用一篇文章進行介紹。

緊接著初始化 WriteIntoDelta,WriteIntoDelta 擴展自 RunnableCommand,Delta Lake 中的更新、刪除、合并都是擴展這個類的。初始化完 WriteIntoDelta 之后,就會調用 run 方法執行真正的寫數據操作。WriteIntoDelta 的 run 方法實現如下: 

  1. override def run(sparkSession: SparkSession): Seq[Row] = { 
  2.     deltaLog.withNewTransaction { txn => 
  3.       val actions = write(txn, sparkSession) 
  4.       val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere) 
  5.       txn.commit(actions, operation) 
  6.     } 
  7.     Seq.empty 

Delta Lake 所有的更新操作都是在事務中進行的,deltaLog.withNewTransaction 就是一個事務,withNewTransaction 的實現如下: 

  1. def withNewTransaction[T](thunk: OptimisticTransaction => T): T = { 
  2.   try { 
  3.     // 更新當前表事務日志的快照 
  4.     update() 
  5.     // 初始化樂觀事務鎖對象 
  6.     val txn = new OptimisticTransaction(this) 
  7.     // 開啟事務 
  8.     OptimisticTransaction.setActive(txn) 
  9.     // 執行寫數據操作 
  10.     thunk(txn) 
  11.   } finally { 
  12.     // 關閉事務 
  13.     OptimisticTransaction.clearActive() 
  14.   } 

在開啟事務之前,需要更新當前表事務的快照,因為在執行寫數據之前,這張表可能已經被修改了,執行 update 操作之后,就可以拿到當前表的最新版本,緊接著開啟樂觀事務鎖。thunk(txn) 就是需要執行的事務操作,對應 deltaLog.withNewTransaction 里面的所有代碼。

我們回到上面的 run 方法。val actions = write(txn, sparkSession) 就是執行寫數據的操作,它的實現如下: 

  1.   def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = { 
  2.     import sparkSession.implicits._ 
  3.     // 如果不是第一次往表里面寫數據,需要判斷寫數據的模式是否符合條件 
  4.     if (txn.readVersion > -1) { 
  5.       // This table already exists, check if the insert is valid. 
  6.       if (mode == SaveMode.ErrorIfExists) { 
  7.         throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath) 
  8.       } else if (mode == SaveMode.Ignore) { 
  9.         return Nil 
  10.       } else if (mode == SaveMode.Overwrite) { 
  11.         deltaLog.assertRemovable() 
  12.       } 
  13.     } 
  14.   
  15.     // 更新表的模式,比如是否覆蓋現有的模式,是否和現有的模式進行 merge 
  16.     updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation) 
  17.   
  18.     // 是否定義分區過濾條件 
  19.     val replaceWhere = options.replaceWhere 
  20.     val partitionFilters = if (replaceWhere.isDefined) { 
  21.       val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get) 
  22.       if (mode == SaveMode.Overwrite) { 
  23.         verifyPartitionPredicates( 
  24.           sparkSession, txn.metadata.partitionColumns, predicates) 
  25.       } 
  26.       Some(predicates) 
  27.     } else { 
  28.       None 
  29.     } 
  30.   
  31.     // 第一次寫數據初始化事務日志的目錄 
  32.     if (txn.readVersion < 0) { 
  33.       // Initialize the log path 
  34.       deltaLog.fs.mkdirs(deltaLog.logPath) 
  35.     } 
  36.   
  37.     // 寫數據到文件系統中 
  38.     val newFiles = txn.writeFiles(data, Some(options)) 
  39.       
  40.     val deletedFiles = (mode, partitionFilters) match { 
  41.        // 全量覆蓋,直接拿出緩存在內存中最新事務日志快照里面的所有 AddFile 文件 
  42.       case (SaveMode.Overwrite, None) => 
  43.         txn.filterFiles().map(_.remove) 
  44.       // 從事務日志快照中獲取對應分區里面的所有 AddFile 文件 
  45.       case (SaveMode.Overwrite, Some(predicates)) => 
  46.         // Check to make sure the files we wrote out were actually valid. 
  47.         val matchingFiles = DeltaLog.filterFileList( 
  48.           txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect() 
  49.         val invalidFiles = newFiles.toSet -- matchingFiles 
  50.         if (invalidFiles.nonEmpty) { 
  51.           val badPartitions = invalidFiles 
  52.             .map(_.partitionValues) 
  53.             .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") } 
  54.             .mkString(", "
  55.           throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions) 
  56.         } 
  57.   
  58.         txn.filterFiles(predicates).map(_.remove) 
  59.       case _ => Nil 
  60.     } 
  61.   
  62.     newFiles ++ deletedFiles 
  63.   } 

如果 txn.readVersion == -1,說明是第一次寫數據到 Delta Lake 表,所以當這個值大于 -1 的時候,需要判斷一下寫數據的操作是否合法。

由于 Delta Lake 底層使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,這就是 updateMetadata 函數對應的操作。

因為 Delta Lake 表支持分區,所以我們可能在寫數據的時候指定某個分區進行覆蓋。

真正寫數據的操作是 txn.writeFiles 函數執行的,具體實現如下: 

  1. def writeFiles( 
  2.       data: Dataset[_], 
  3.       writeOptions: Option[DeltaOptions], 
  4.       isOptimize: Boolean): Seq[AddFile] = { 
  5.     hasWritten = true 
  6.   
  7.     val spark = data.sparkSession 
  8.     val partitionSchema = metadata.partitionSchema 
  9.     val outputPath = deltaLog.dataPath 
  10.   
  11.     val (queryExecution, output) = normalizeData(data, metadata.partitionColumns) 
  12.     val partitioningColumns = 
  13.       getPartitioningColumns(partitionSchema, outputoutput.length < data.schema.size
  14.   
  15.     // 獲取 DelayedCommitProtocol,里面可以設置寫文件的名字, 
  16.     // commitTask 和 commitJob 等做一些事情 
  17.     val committer = getCommitter(outputPath) 
  18.   
  19.     val invariants = Invariants.getFromSchema(metadata.schema, spark) 
  20.   
  21.     SQLExecution.withNewExecutionId(spark, queryExecution) { 
  22.       val outputSpec = FileFormatWriter.OutputSpec( 
  23.         outputPath.toString, 
  24.         Map.empty, 
  25.         output
  26.   
  27.       val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants) 
  28.   
  29.       FileFormatWriter.write( 
  30.         sparkSession = spark, 
  31.         plan = physicalPlan, 
  32.         fileFormat = snapshot.fileFormat, 
  33.         committer = committer, 
  34.         outputSpec = outputSpec, 
  35.         hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration), 
  36.         partitionColumns = partitioningColumns, 
  37.         bucketSpec = None, 
  38.         statsTrackers = Nil, 
  39.         options = Map.empty) 
  40.     } 
  41.   
  42.     // 返回新增的文件 
  43.     committer.addedStatuses 

Delta Lake 寫操作最終調用 Spark 的 FileFormatWriter.write 方法進行的,通過這個方法的復用將我們真正的數據寫入到 Delta Lake 表里面去了。

在 Delta Lake 中,如果是新增文件則會在事務日志中使用 AddFile 類記錄相關的信息,AddFile 持久化到事務日志里面的內容如下:

  1. {"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}} 

可以看出 AddFile 里面記錄了新增文件的保存路徑,分區信息,新增的文件大小,修改時間等信息。如果是刪除文件,也會在事務日志里面記錄這個刪除操作,對應的就是使用 RemoveFile 類存儲,RemoveFile 持久化到事務日志里面的內容如下:

  1. {"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}} 

RemoveFile 里面保存了刪除文件的路徑,刪除時間等信息。如果新增一個文件,再刪除一個文件,那么最新的事務日志快照里面只會保存刪除這個文件的記錄。從這里面也可以看出, Delta Lake 刪除、新增 ACID 是針對文件級別的。

上面的寫操作肯定會產生新的文件,所以寫操作之后就需要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和需要刪除的文件(RemoveFile)。針對那些文件需要刪除需要做一些判斷,主要分兩種情況(具體參見 write 方法里面的):

  • 如果是全表覆蓋,則直接從緩存在內存中最新的事務日志快照中拿出所有 AddFile 文件,然后將其標記為 RemoveFile;
  • 如果是分區內的覆蓋,則從緩存在內存中最新的事務日志快照中拿出對應分區下的 AddFile 文件,然后將其標記為 RemoveFile。

最后 write 方法返回新增的文件和需要刪除的文件(newFiles ++ deletedFiles),這些文件最終需要記錄到事務日志里面去。關于事務日志是如何寫進去的請參見這篇文章的詳細分析。

 

責任編輯:未麗燕 來源: 阿里云棲社區
相關推薦

2022-07-06 09:53:04

開源數據湖

2023-06-30 07:40:31

數據分析Delta lake

2018-04-09 12:25:11

2015-03-10 13:55:31

JavaScript預解析原理及實現

2022-03-17 08:55:43

本地線程變量共享全局變量

2020-12-04 14:31:45

大數據Spark

2023-12-22 13:58:00

C++鏈表開發

2021-07-07 10:13:56

大數據Delta Lake 湖倉一體

2016-12-20 09:47:38

Apache SparLambda架構

2018-05-22 09:47:07

2014-02-14 15:43:16

ApacheSpark

2018-04-10 10:32:07

NginxApache服務器

2019-10-08 17:38:17

開源技術 趨勢

2022-06-01 13:52:11

開源大數據

2018-07-27 08:39:44

負載均衡算法實現

2012-04-11 15:41:48

JavaNIO

2015-10-16 09:21:13

SparkMySQL數據分析

2020-07-10 09:04:55

HTTPS瀏覽器網絡協議

2011-08-04 15:52:48

Objective-C HTML

2017-06-26 15:00:17

點贊
收藏

51CTO技術棧公眾號

中文字幕中文字幕在线中心一区| 欧美aaaaaa| 暖暖日本在线观看| 亚洲国产日本| 精品无人国产偷自产在线| 各处沟厕大尺度偷拍女厕嘘嘘| 日本欧美久久久久免费播放网| 欧美福利视频网站| 国产成人免费9x9x人网站视频| 欧美一级在线免费| 国产区av在线| 一本色道a无线码一区v| 探花国产精品| 亚洲老司机在线| 日韩肉感妇bbwbbwbbw| 成人性视频网站| 日本精品福利视频| 国产剧情av麻豆香蕉精品| 日韩精品无码一区二区三区| 中文在线一区| 日本一区二区三区精品视频| 日韩精品一卡二卡三卡四卡无卡| 久久久久久欧美精品色一二三四 | 国产伦精品一区二区三区照片| 国产精品精品国产一区二区| 亚洲精品日韩激情在线电影| 国产综合网站| 国内视频一区| 久久久久国产精品一区三寸| 亚洲精品久久区二区三区蜜桃臀 | 亚洲国产aⅴ成人精品无吗| av在线电影网站| 欧美日韩性生活视频| 99精品老司机免费视频| 日韩欧美另类在线| 欧美日韩精品一区二区三区视频| 久久亚洲影音av资源网| 日韩欧美中文字幕一区二区三区| 精品国产91久久久久久久妲己| 国产精品天天狠天天看| 亚洲欧洲日韩精品在线| 亚洲色图清纯唯美| 国产精品久久久久久五月尺| 91免费看网站| 婷婷激情久久| 国产日韩在线免费| 999在线观看精品免费不卡网站| 久精品国产欧美| 精品一区二区久久久| 性欧美大战久久久久久久| 久久久久九九视频| 中国国产一级毛片| 91麻豆精品91久久久久久清纯| 天堂电影一区| 少妇av一区二区三区| 亚洲国产精品久久人人爱蜜臀| 亚欧洲精品在线视频免费观看| 国产福利一区二区| 成人影院在线观看视频| 欧美日韩性视频在线| 七七成人影院| 欧美激情精品久久久久久大尺度| 日韩一区自拍| 一区二区精品在线| 国产日韩欧美在线| 1024日韩| 伊人成色综合网| 欧美日韩午夜激情| 综合日韩av| 国产一区玩具在线观看| 黑人巨大精品欧美一区| 裸体av在线| 日韩精品在线免费| 91欧美日韩| a级黄色小视频| 欧美中文字幕一区二区三区亚洲| 国产成人精品一区二三区在线观看 | 中文成人在线| 国产乱码精品一区二区三区日韩精品 | 欧美在线播放一区| 久久精品中文| 国产精品观看在线亚洲人成网 | 日韩在线观看a| 在线观看国产精品91| 久久99国产精品免费| 中文字字幕在线中文乱码电影| 欧美性xxxx极品高清hd直播| 午夜精品福利影院| 一区二区av| 久久久噜噜噜久久中文字幕色伊伊 | 亚洲人成77777男人| 丝袜亚洲另类丝袜在线| 天堂精品视频| 亚洲一区成人在线| 日韩精品一级毛片在线播放| 欧美精品激情视频| 欧美aaaaaa午夜精品| 亚洲美女在线免费观看| 色婷婷av一区二区三区久久| 亚洲一区二区毛片| 成人a视频在线| 欧美成人高清视频| 黄页网站在线播放| 亚洲天堂第二页| 亚洲午夜视频| 九色丨porny丨| 综合136福利视频在线| 久久综合伊人| 亚洲s色大片在线观看| 91精品国产乱码久久久久久久久| 国产精品一区二区视频| 91大神在线网站| 国产三级精品网站| 最好看的中文字幕久久| 亚洲伊人精品酒店| 青青草视频在线视频| 亚洲变态欧美另类捆绑| 99热精品在线观看| yw视频在线观看| 国产精品尤物福利片在线观看| 国产亚洲一区二区在线观看| 精品欧美一区二区三区在线观看| 亚洲精品国产系列| 精品日韩欧美一区二区| 免播放器亚洲| 久热国产在线| 久久99国产精品| 欧美中文字幕一二三区视频| 亚洲午夜精品一区 二区 三区| 永久www成人看片| 国产精品啪视频| 亚洲va韩国va欧美va| 日韩精品久久| 在线观看入口黄最新永久免费国产| 国产精品扒开腿做爽爽爽的视频| 中文字幕亚洲一区二区av在线| 999在线精品| 国产专区中文字幕| 国产精品高潮呻吟久久av黑人| 一区二区三区四区高清精品免费观看| 99国产精品免费网站| eeuss鲁片一区| 国产一区红桃视频| 黑人精品xxx一区一二区| 91精品国产福利在线观看麻豆| 色就是色亚洲色图| 久久国产精品-国产精品| 91精品国产综合久久久久久久| 日韩精品乱码免费| 在线观看的黄色| 午夜肉伦伦影院| 久久久噜久噜久久综合| 一区二区免费在线| 久久精品国内一区二区三区水蜜桃| 婷婷亚洲一区二区三区| 国产欧美一区二区三区另类精品 | 欧洲精品一区| 自拍av在线| 精品不卡在线| 日韩福利在线播放| 26uuu国产一区二区三区| 成人爽a毛片免费啪啪红桃视频| 波多野结衣在线| 欧美日韩国产精品一区二区| 免费福利视频一区| 性欧美精品孕妇| 你懂的视频在线一区二区| 日韩视频在线你懂得| 中文字幕一区二区三三| 粉嫩一区二区三区在线观看| 最新中文字幕免费视频| 国产精品日韩欧美大师| www.8ⅹ8ⅹ羞羞漫画在线看| 日韩精品伦理第一区| 国产一区二区三区在线观看网站| 国产欧美日韩中文久久| 日韩欧美中字| 日本中文字幕中出在线| www在线观看免费| 国产精品久久久久久久久久久久久| 欧美日韩精品一区二区三区蜜桃| 老司机午夜精品99久久| 成人搞黄视频| 日本成人网址| 男人亚洲天堂网| 97人人模人人爽人人喊38tv| 亚洲精品在线91| 亚洲欧洲精品一区二区精品久久久| 欧美激情性爽国产精品17p| 亚洲伦乱视频| 欧美精品少妇| 黄色免费观看视频网站| 成人午夜电影免费在线观看| 久久久av亚洲男天堂| 欧美综合一区二区| 久久久久国产一区二区三区四区| 欧美视频日韩| 亚洲性视频在线| 国产传媒在线| 国产成人天天5g影院在线观看|