Skip to content

Commit

Permalink
Scala API for restoring delta table
Browse files Browse the repository at this point in the history
Add possibility to restore delta table using version or timestamp.
Examples:
io.delta.tables.DeltaTable.forPath("/some_delta_path").restore(1)
io.delta.tables.DeltaTable.forPath("/some_delta_path").restore(java.sql.Timestamp.valueOf("2021-01-01 00:00:00.000"))

Fixes #632

Signed-off-by: Maksym Dovhal <[email protected]>
  • Loading branch information
Maksym Dovhal committed Dec 12, 2021
1 parent 2ddff5e commit fca9b7c
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 5 deletions.
19 changes: 19 additions & 0 deletions core/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.delta.tables

import java.sql.Timestamp
import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
Expand Down Expand Up @@ -472,6 +473,24 @@ class DeltaTable private[tables](
def upgradeTableProtocol(readerVersion: Int, writerVersion: Int): Unit = {
deltaLog.upgradeProtocol(Protocol(readerVersion, writerVersion))
}

/**
* Restores delta table to specific version.
* @param version version to be restored
* @since 1.2.0
*/
def restore(version: Long): Unit = {
executeRestore(deltaLog, version = Some(version))
}

/**
* Restores delta table to the latest commit that happened at or before `timestamp`.
* @param timestamp timestamp to be restored
* @since 1.2.0
*/
def restore(timestamp: Timestamp): Unit = {
executeRestore(deltaLog, timestamp = Option(timestamp))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@

package io.delta.tables.execution

import java.sql.Timestamp
import scala.collection.Map

import org.apache.spark.sql.delta.{DeltaErrors, DeltaHistoryManager, DeltaLog, PreprocessTableUpdate}
import org.apache.spark.sql.delta.commands.{DeleteCommand, DeltaGenerateCommand, VacuumCommand}
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, RestoreTableCommand, VacuumCommand}
import org.apache.spark.sql.delta.util.AnalysisHelper
import io.delta.tables.DeltaTable

import org.apache.spark.sql.{functions, Column, DataFrame, Dataset}
import org.apache.spark.sql.{functions, Column, DataFrame}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical._

/**
Expand Down Expand Up @@ -75,6 +76,14 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
sparkSession.emptyDataFrame
}

protected def executeRestore(
deltaLog: DeltaLog,
version: Option[Long] = None,
timestamp: Option[Timestamp] = None): DataFrame = {
RestoreTableCommand(deltaLog, version, timestamp).run(sparkSession)
sparkSession.emptyDataFrame
}

protected def toStrColumnMap(map: Map[String, String]): Map[String, Column] = {
map.toSeq.map { case (k, v) => k -> functions.expr(v) }.toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1403,6 +1402,15 @@ object DeltaErrors
new io.delta.exceptions.ConcurrentTransactionException(message)
}

def restoreMissedDataFilesError(missedFiles: Array[String], version: Long): Throwable =
new IllegalArgumentException(
s"""Not all files from version $version are available in file system.
| Missed files (top 100 files): ${missedFiles.mkString(",")}.
| Please use more recent version or timestamp for restoring.
| To disable check update option ${SQLConf.IGNORE_MISSING_FILES.key}"""
.stripMargin
)

}

/** The basic class for all Tahoe commit conflict exceptions. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,17 @@ object DeltaOperations {
case class TestOperation(operationName: String = "TEST") extends Operation(operationName) {
override val parameters: Map[String, Any] = Map.empty
}

/** Recorded when restoring the table. */
case class Restore(
version: Option[Long] = None,
timestamp: Option[Long] = None) extends Operation("RESTORE") {
override val parameters: Map[String, Any] = Map(
"version" -> version,
"timestamp" -> timestamp
)
override def changesData: Boolean = true
}
}

private[delta] object DeltaOperationMetrics {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.commands

import java.sql.Timestamp
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile}
import org.apache.spark.sql.delta.sources.DeltaSQLConf._
import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.internal.SQLConf.IGNORE_MISSING_FILES
import org.apache.spark.util.SerializableConfiguration

/**
* Perform restore of delta table to a specified version or timestamp
*
* Algorithm:
* 1) Read the latest snapshot of the table.
* 2) Read snapshot for version or timestamp to restore
* 3) Compute files available in snapshot for restoring (files were removed by some commit)
* but missed in the latest. Add these files into commit as AddFile action.
* 4) Compute files available in the latest snapshot (files were added after version to restore)
* but missed in the snapshot to restore. Add these files into commit as RemoveFile action.
* 5) If SQLConf.IGNORE_MISSING_FILES option is false (default value) check availability of AddFile
* in file system.
* 6) Commit metadata, Protocol, all RemoveFile and AddFile actions
* into delta log using `commitLarge`.
* 7) If table was modified in parallel then ignore restore and raise exception.
*/
case class RestoreTableCommand(
deltaLog: DeltaLog,
version: Option[Long],
timestamp: Option[Timestamp]
) extends LeafRunnableCommand with DeltaCommand {

override def run(spark: SparkSession): Seq[Row] = {
recordDeltaOperation(deltaLog, "delta.restore") {

require(version.isEmpty ^ timestamp.isEmpty,
"Either the version or timestamp should be provided for restore")
val parallelism = restoreParallelism(spark)
val latestSnapshot = deltaLog.update()
val versionToRestore = version.getOrElse(
deltaLog
.history
.getActiveCommitAtTime(timestamp.get, canReturnLastCommit = true)
.version
)

require(versionToRestore < latestSnapshot.version,
s"Version to restore ($versionToRestore) should be less then " +
s"last available version (${latestSnapshot.version})")

val snapshotToRestore = deltaLog.getSnapshotAt(versionToRestore)
val latestSnapshotFiles = latestSnapshot.allFiles
val snapshotToRestoreFiles = snapshotToRestore.allFiles

import spark.implicits._
import collection.JavaConverters._

val filesToAdd = snapshotToRestoreFiles
.join(
latestSnapshotFiles,
snapshotToRestoreFiles("path") === latestSnapshotFiles("path"),
"left_anti")
.as[AddFile]
.map(_.copy(dataChange = true))
.repartition(parallelism)
.cache() // To avoid Dataset recompute for each partition of toLocalIterator()

checkSnapshotFilesAvailability(deltaLog, filesToAdd, versionToRestore)

val filesToRemove = latestSnapshotFiles
.join(
snapshotToRestoreFiles,
latestSnapshotFiles("path") === snapshotToRestoreFiles("path"),
"left_anti")
.as[AddFile]
.map(_.removeWithTimestamp())
.repartition(parallelism)
.cache() // To avoid Dataset recompute for each partition of toLocalIterator()

// Commit files, metrics, protocol and metadata to delta log
deltaLog.withNewTransaction { txn =>

val metrics = computeMetrics(filesToAdd, filesToRemove, snapshotToRestore)
val addActions = filesToAdd.toLocalIterator().asScala
val removeActions = filesToRemove.toLocalIterator().asScala

txn.updateMetadata(snapshotToRestore.metadata)

commitLarge(
spark,
txn,
addActions ++ removeActions,
DeltaOperations.Restore(version, timestamp.map(_.getTime)),
Map.empty,
metrics)
}
filesToAdd.unpersist()
filesToRemove.unpersist()

Seq.empty[Row]
}
}

private def restoreParallelism(spark: SparkSession): Int = spark
.sessionState
.conf
.getConf(DELTA_RESTORE_PARALLELISM)

private def computeMetrics(
toAdd: Dataset[AddFile],
toRemove: Dataset[RemoveFile],
snapshot: Snapshot
): Map[String, String] = {
import toAdd.sparkSession.implicits._

val (numRestoredFiles, restoredFilesSize) = toAdd
.agg("size" -> "count", "size" -> "sum").as[(Long, Option[Long])].head()

val (numRemovedFiles, removedFilesSize) = toRemove
.agg("size" -> "count", "size" -> "sum").as[(Long, Option[Long])].head()

Map(
"numRestoredFiles" -> numRestoredFiles,
"restoredFilesSize" -> restoredFilesSize.getOrElse(0),
"numRemovedFiles" -> numRemovedFiles,
"removedFilesSize" -> removedFilesSize.getOrElse(0),
"numOfFilesAfterRestore" -> snapshot.numOfFiles,
"tableSizeAfterRestore" -> snapshot.sizeInBytes
).mapValues(_.toString).toMap
}

/* Prevent users from running restore to table version with missed
* data files (manually deleted or vacuumed). Restoring to this version partially
* is still possible if spark.sql.files.ignoreMissingFiles is set to true
*/
private def checkSnapshotFilesAvailability(
deltaLog: DeltaLog, files: Dataset[AddFile], version: Long): Unit = {

implicit val spark: SparkSession = files.sparkSession
val ignore = spark
.sessionState
.conf
.getConf(IGNORE_MISSING_FILES)

if (!ignore) {
val path = deltaLog.dataPath
val hadoopConf = spark.sparkContext.broadcast(
new SerializableConfiguration(deltaLog.newDeltaHadoopConf()))

import spark.implicits._
val missedFiles = files
.repartition(restoreParallelism(spark))
.mapPartitions { files =>
val fs = path.getFileSystem(hadoopConf.value.value)
val pathStr = path.toUri.getPath
files.filterNot(f => fs.exists(absolutePath(pathStr, f.path)))
}
.map(_.path)
.head(100)

if (missedFiles.nonEmpty) {
throw DeltaErrors.restoreMissedDataFilesError(missedFiles, version)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,13 @@ trait DeltaSQLConfBase {
|""".stripMargin)
.booleanConf
.createWithDefault(true)

val DELTA_RESTORE_PARALLELISM =
buildConf("restore.parallelism")
.doc("Number of parallel spark tasks used for data restoring." +
" Small value may causes OOM while restoring huge table.")
.intConf
.createWithDefault(1)
}

object DeltaSQLConf extends DeltaSQLConfBase
Loading

0 comments on commit fca9b7c

Please sign in to comment.