Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Fix metadata cleanup by retaining a checkpoint before the cutoff window #2673

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ trait Checkpoints extends DeltaLogging {
.takeWhile(tv => (cur == 0 || tv.version <= cur) && tv < upperBoundCv)
.toArray
val lastCheckpoint =
getLatestCompleteCheckpointFromList(checkpoints, Some(upperBoundCv.version))
Checkpoints.getLatestCompleteCheckpointFromList(checkpoints, Some(upperBoundCv.version))
if (lastCheckpoint.isDefined) {
logInfo(s"Delta checkpoint is found at version ${lastCheckpoint.get.version}")
return lastCheckpoint
Expand All @@ -444,31 +444,6 @@ trait Checkpoints extends DeltaLogging {
logInfo(s"No checkpoint found for Delta table before version $startVersion")
None
}

/**
* Given a list of checkpoint files, pick the latest complete checkpoint instance which is not
* later than `notLaterThan`.
*/
protected[delta] def getLatestCompleteCheckpointFromList(
instances: Array[CheckpointInstance],
notLaterThanVersion: Option[Long] = None): Option[CheckpointInstance] = {
val sentinelCv = CheckpointInstance.sentinelValue(notLaterThanVersion)
val complete = instances.filter(_ <= sentinelCv).groupBy(identity).filter {
case (ci, matchingCheckpointInstances) =>
ci.format match {
case CheckpointInstance.Format.SINGLE =>
matchingCheckpointInstances.length == 1
case CheckpointInstance.Format.WITH_PARTS =>
assert(ci.numParts.nonEmpty, "Multi-Part Checkpoint must have non empty numParts")
matchingCheckpointInstances.length == ci.numParts.get
case CheckpointInstance.Format.V2 =>
matchingCheckpointInstances.length == 1
case CheckpointInstance.Format.SENTINEL =>
false
}
}
if (complete.isEmpty) None else Some(complete.keys.max)
}
}

object Checkpoints
Expand Down Expand Up @@ -1050,6 +1025,31 @@ object Checkpoints
None
} else Some(struct(partitionValues: _*).as(STRUCT_PARTITIONS_COL_NAME))
}

/**
* Given a list of checkpoint files, pick the latest complete checkpoint instance which is not
* later than `notLaterThan`.
*/
protected[delta] def getLatestCompleteCheckpointFromList(
instances: Array[CheckpointInstance],
notLaterThanVersion: Option[Long] = None): Option[CheckpointInstance] = {
val sentinelCv = CheckpointInstance.sentinelValue(notLaterThanVersion)
val complete = instances.filter(_ <= sentinelCv).groupBy(identity).filter {
case (ci, matchingCheckpointInstances) =>
ci.format match {
case CheckpointInstance.Format.SINGLE =>
matchingCheckpointInstances.length == 1
case CheckpointInstance.Format.WITH_PARTS =>
assert(ci.numParts.nonEmpty, "Multi-Part Checkpoint must have non empty numParts")
matchingCheckpointInstances.length == ci.numParts.get
case CheckpointInstance.Format.V2 =>
matchingCheckpointInstances.length == 1
case CheckpointInstance.Format.SENTINEL =>
false
}
}
if (complete.isEmpty) None else Some(complete.keys.max)
}
}

object V2Checkpoint {
Expand Down
Loading