Apache Spark Delta Lake寫數據使用及實現原理代碼解析
Delta Lake 寫數據是其最基本的功能,而且其使用和現有的 Spark 寫 Parquet 文件基本一致,在介紹 Delta Lake 實現原理之前先來看看如何使用它,具體使用如下:
- df.write.format("delta").save("/data/yangping.wyp/delta/test/")
- //數據按照 dt 分區
- df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/")
- // 覆蓋之前的數據
- df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/")
大家可以看出,使用寫 Delta 數據是非常簡單的,這也是 Delte Lake 介紹的 100% 兼容 Spark。
Delta Lake 寫數據原理
前面簡單了解了如何使用 Delta Lake 來寫數據,本小結我們將深入介紹 Delta Lake 是如何保證寫數據的基本原理以及如何保證事務性。
得益于 Apache Spark 強大的數據源 API,我們可以很方便的給 Spark 添加任何數據源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 實現的一種新的數據源,我們調用 df.write.format("delta") 其實底層調用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 類。為了簡單起見,本文介紹的是 Delta Lake 批量寫的實現,實時流寫 Delta Lake 本文不涉及,后面有機會再介紹。 Delta Lake 批量寫擴展了 org.apache.spark.sql.sources.CreatableRelationProvider 特質,并實現了其中的方法。我們調用上面的寫數據方法首先會調用 DeltaDataSource 類的 createRelation 方法,它的具體實現如下:
- override def createRelation(
- sqlContext: SQLContext,
- mode: SaveMode,
- parameters: Map[String, String],
- data: DataFrame): BaseRelation = {
- // 寫數據的路徑
- val path = parameters.getOrElse("path", {
- throw DeltaErrors.pathNotSpecifiedException
- })
- // 分區字段
- val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY)
- .map(DeltaDataSource.decodePartitioningColumns)
- .getOrElse(Nil)
- // 事務日志對象
- val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)
- // 真正的寫操作過程
- WriteIntoDelta(
- deltaLog = deltaLog,
- mode = mode,
- new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf),
- partitionColumns = partitionColumns,
- configuration = Map.empty,
- data = data).run(sqlContext.sparkSession)
- deltaLog.createRelation()
- }
其中 mode 就是保持數據的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 這個傳遞的參數,比如分區字段、數據保存路徑以及 Delta 支持的一些參數(replaceWhere、mergeSchema、overwriteSchema 等,具體參見 org.apache.spark.sql.delta.DeltaOptions);data 就是我們需要保存的數據。
createRelation 方法緊接著就是獲取數據保存的路徑,分區字段等信息。然后初始化 deltaLog,deltaLog 的初始化會做很多事情,比如會讀取磁盤所有的事務日志(_delta_log 目錄下),并構建最新事務日志的最新快照,里面可以拿到最新數據的版本。由于 deltaLog 的初始化成本比較高,所以 deltaLog 初始化完之后會緩存到 deltaLogCache 中,這是一個使用 Guava 的 CacheBuilder 類實現的一個緩存,緩存的數據保持一小時,緩存大小可以通過 delta.log.cacheSize 參數進行設置。只要寫數據的路徑是一樣的,就只需要初始化一次 deltaLog,后面直接從緩存中拿即可。除非之前緩存的 deltaLog 被清理了,或者無效才會再次初始化。DeltaLog 類是 Delta Lake 中最重要的類之一,涉及的內容非常多,所以我們會單獨使用一篇文章進行介紹。
緊接著初始化 WriteIntoDelta,WriteIntoDelta 擴展自 RunnableCommand,Delta Lake 中的更新、刪除、合并都是擴展這個類的。初始化完 WriteIntoDelta 之后,就會調用 run 方法執行真正的寫數據操作。WriteIntoDelta 的 run 方法實現如下:
- override def run(sparkSession: SparkSession): Seq[Row] = {
- deltaLog.withNewTransaction { txn =>
- val actions = write(txn, sparkSession)
- val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere)
- txn.commit(actions, operation)
- }
- Seq.empty
- }
Delta Lake 所有的更新操作都是在事務中進行的,deltaLog.withNewTransaction 就是一個事務,withNewTransaction 的實現如下:
- def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
- try {
- // 更新當前表事務日志的快照
- update()
- // 初始化樂觀事務鎖對象
- val txn = new OptimisticTransaction(this)
- // 開啟事務
- OptimisticTransaction.setActive(txn)
- // 執行寫數據操作
- thunk(txn)
- } finally {
- // 關閉事務
- OptimisticTransaction.clearActive()
- }
- }
在開啟事務之前,需要更新當前表事務的快照,因為在執行寫數據之前,這張表可能已經被修改了,執行 update 操作之后,就可以拿到當前表的最新版本,緊接著開啟樂觀事務鎖。thunk(txn) 就是需要執行的事務操作,對應 deltaLog.withNewTransaction 里面的所有代碼。
我們回到上面的 run 方法。val actions = write(txn, sparkSession) 就是執行寫數據的操作,它的實現如下:
- def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
- import sparkSession.implicits._
- // 如果不是第一次往表里面寫數據,需要判斷寫數據的模式是否符合條件
- if (txn.readVersion > -1) {
- // This table already exists, check if the insert is valid.
- if (mode == SaveMode.ErrorIfExists) {
- throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
- } else if (mode == SaveMode.Ignore) {
- return Nil
- } else if (mode == SaveMode.Overwrite) {
- deltaLog.assertRemovable()
- }
- }
- // 更新表的模式,比如是否覆蓋現有的模式,是否和現有的模式進行 merge
- updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation)
- // 是否定義分區過濾條件
- val replaceWhere = options.replaceWhere
- val partitionFilters = if (replaceWhere.isDefined) {
- val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get)
- if (mode == SaveMode.Overwrite) {
- verifyPartitionPredicates(
- sparkSession, txn.metadata.partitionColumns, predicates)
- }
- Some(predicates)
- } else {
- None
- }
- // 第一次寫數據初始化事務日志的目錄
- if (txn.readVersion < 0) {
- // Initialize the log path
- deltaLog.fs.mkdirs(deltaLog.logPath)
- }
- // 寫數據到文件系統中
- val newFiles = txn.writeFiles(data, Some(options))
- val deletedFiles = (mode, partitionFilters) match {
- // 全量覆蓋,直接拿出緩存在內存中最新事務日志快照里面的所有 AddFile 文件
- case (SaveMode.Overwrite, None) =>
- txn.filterFiles().map(_.remove)
- // 從事務日志快照中獲取對應分區里面的所有 AddFile 文件
- case (SaveMode.Overwrite, Some(predicates)) =>
- // Check to make sure the files we wrote out were actually valid.
- val matchingFiles = DeltaLog.filterFileList(
- txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect()
- val invalidFiles = newFiles.toSet -- matchingFiles
- if (invalidFiles.nonEmpty) {
- val badPartitions = invalidFiles
- .map(_.partitionValues)
- .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") }
- .mkString(", ")
- throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions)
- }
- txn.filterFiles(predicates).map(_.remove)
- case _ => Nil
- }
- newFiles ++ deletedFiles
- }
- }
如果 txn.readVersion == -1,說明是第一次寫數據到 Delta Lake 表,所以當這個值大于 -1 的時候,需要判斷一下寫數據的操作是否合法。
由于 Delta Lake 底層使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,這就是 updateMetadata 函數對應的操作。
因為 Delta Lake 表支持分區,所以我們可能在寫數據的時候指定某個分區進行覆蓋。
真正寫數據的操作是 txn.writeFiles 函數執行的,具體實現如下:
- def writeFiles(
- data: Dataset[_],
- writeOptions: Option[DeltaOptions],
- isOptimize: Boolean): Seq[AddFile] = {
- hasWritten = true
- val spark = data.sparkSession
- val partitionSchema = metadata.partitionSchema
- val outputPath = deltaLog.dataPath
- val (queryExecution, output) = normalizeData(data, metadata.partitionColumns)
- val partitioningColumns =
- getPartitioningColumns(partitionSchema, output, output.length < data.schema.size)
- // 獲取 DelayedCommitProtocol,里面可以設置寫文件的名字,
- // commitTask 和 commitJob 等做一些事情
- val committer = getCommitter(outputPath)
- val invariants = Invariants.getFromSchema(metadata.schema, spark)
- SQLExecution.withNewExecutionId(spark, queryExecution) {
- val outputSpec = FileFormatWriter.OutputSpec(
- outputPath.toString,
- Map.empty,
- output)
- val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants)
- FileFormatWriter.write(
- sparkSession = spark,
- plan = physicalPlan,
- fileFormat = snapshot.fileFormat,
- committer = committer,
- outputSpec = outputSpec,
- hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
- partitionColumns = partitioningColumns,
- bucketSpec = None,
- statsTrackers = Nil,
- options = Map.empty)
- }
- // 返回新增的文件
- committer.addedStatuses
- }
Delta Lake 寫操作最終調用 Spark 的 FileFormatWriter.write 方法進行的,通過這個方法的復用將我們真正的數據寫入到 Delta Lake 表里面去了。
在 Delta Lake 中,如果是新增文件則會在事務日志中使用 AddFile 類記錄相關的信息,AddFile 持久化到事務日志里面的內容如下:
- {"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}}
可以看出 AddFile 里面記錄了新增文件的保存路徑,分區信息,新增的文件大小,修改時間等信息。如果是刪除文件,也會在事務日志里面記錄這個刪除操作,對應的就是使用 RemoveFile 類存儲,RemoveFile 持久化到事務日志里面的內容如下:
- {"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}}
RemoveFile 里面保存了刪除文件的路徑,刪除時間等信息。如果新增一個文件,再刪除一個文件,那么最新的事務日志快照里面只會保存刪除這個文件的記錄。從這里面也可以看出, Delta Lake 刪除、新增 ACID 是針對文件級別的。
上面的寫操作肯定會產生新的文件,所以寫操作之后就需要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和需要刪除的文件(RemoveFile)。針對那些文件需要刪除需要做一些判斷,主要分兩種情況(具體參見 write 方法里面的):
- 如果是全表覆蓋,則直接從緩存在內存中最新的事務日志快照中拿出所有 AddFile 文件,然后將其標記為 RemoveFile;
- 如果是分區內的覆蓋,則從緩存在內存中最新的事務日志快照中拿出對應分區下的 AddFile 文件,然后將其標記為 RemoveFile。
最后 write 方法返回新增的文件和需要刪除的文件(newFiles ++ deletedFiles),這些文件最終需要記錄到事務日志里面去。關于事務日志是如何寫進去的請參見這篇文章的詳細分析。
































