Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Fix metadata cleanup by retaining a checkpoint before the cutoff window #2673

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

sumeet-db
Copy link
Collaborator

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

We delete eligible delta files only if there's a checkpoint newer than them before the cutoff window begins.

Note: In rare cases where a checkpoint outside the cutoff window is already retained to account for timestamp-adjustments, we might end up retaining an extra checkpoint and some commit files with this change.

How was this patch tested?

New UTs and update some of the existing UTs

Does this PR introduce any user-facing changes?

@sumeet-db sumeet-db changed the title [Spark] Fix OSS metadata cleanup by retaining a checkpoint before the cutoff window [Spark] Fix metadata cleanup by retaining a checkpoint before the cutoff window Feb 22, 2024
@sumeet-db sumeet-db force-pushed the delta-oss-metadata-fix branch 2 times, most recently from 5c9dbda to 7793527 Compare February 27, 2024 19:35
@felipepessoto
Copy link
Contributor

Does it fixes #606?

@sumeet-db sumeet-db force-pushed the delta-oss-metadata-fix branch from 7793527 to 674b17e Compare March 27, 2024 22:42
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing comments so far... the code would benefit from type aliases and other strategies to improve readability and to highlight logical concepts instead of the physical data types that happen to implement those concepts.

Does it fix #606?

Yes, it should!

}

override def next(): ArrayBuffer[T] = {
if (!hasNext) throw new NoSuchElementException("No more items to return")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary? Option.get already throws NoSuchElementException when empty.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, but calling it triggers the queueItemsIfNeeded.

That said, scala docs for Iterator.next specifically state that calling next is undefined behavior unless you verify hasNext=true first, so IMO we could still remove this check as redundant.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had added to call queueItemsIfNeeded.
Removed the check based on Scala docs.

Comment on lines 509 to 514
// 2. Drop everything but the last checkpoint-dependency group.
// .sliding(2) can return a single entry if the underlying iterators' size is 1
// https://www.scala-lang.org/old/node/7939
private val lastCheckpointPreservingIterator:
Iterator[ArrayBuffer[ArrayBuffer[ArrayBuffer[FileStatus]]]] =
new CheckpointGroupingIterator(underlying).sliding(2).filter(_.length == 2).map(_.head)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how sliding(2) would "drop everything but the last" ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what happens is it returns everything except first and last twice, and the map call only takes the first of each pair?

(A, B), (B, C), ..., (X, Y), (Y, Z)
 ^       ^      ...   ^       ^

An awkward/expensive workaround because scala does not provide any Iterator.dropRight method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately yes, an awkward workaround to simulate dropRight without loading the iterator in memory. Added the example in the comment to clarify it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.filter(_.length == 2) is needed for the (A) case.
https://www.scala-lang.org/old/node/7939

Comment on lines 454 to 455
while (bufferedUnderlying.hasNext &&
(!isNewGroupStart(bufferedUnderlying.head) || group.isEmpty)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since None.forall is always true:

Suggested change
while (bufferedUnderlying.hasNext &&
(!isNewGroupStart(bufferedUnderlying.head) || group.isEmpty)) {
while (!bufferedUnderlying.headOption.forall(isNewGroupStart) && group.isEmpty)) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it to: (assuming you meant || instead of &&)
while (!bufferedUnderlying.headOption.forall(isNewGroupStart) || group.isEmpty)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes! Now that I look closer, none of these are logically equivalent!

Old logic said:

If iterator is non-empty AND (next element is not a group start, OR the current group is empty, add to the current group)

... which could be cast as !E && (!S || G) == (!E && !S) || (!E && G)

My suggested logic said:

If NOT (iterator is empty OR next element is a group start) AND the current group is empty

... which could be cast as !(E || S) && G which simplifies to (!E && !S) && G == !E && !S && G

Your new logic says:

If NOT (iterator is empty OR next element is a group start) OR the current group is empty

... which could be cast as !(E || S) || G which simplifies to (!E && !S) || G == (!E || G) && (!S || G)

Comment on lines 488 to 494
class LastCheckpointPreservingLogDeletionIterator(
underlying: Iterator[ArrayBuffer[ArrayBuffer[FileStatus]]])
extends Iterator[ArrayBuffer[ArrayBuffer[ArrayBuffer[FileStatus]]]]{

private class CheckpointGroupingIterator(
underlying: Iterator[ArrayBuffer[ArrayBuffer[FileStatus]]])
extends GroupBreakingIterator[ArrayBuffer[ArrayBuffer[FileStatus]]](underlying) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level comment: These kinds of nested/buffered/grouped iterators make it very hard to see the overall flow of the algorithm -- there's not enough statement of intent. Some code comments explaining the overall flow at a high level, plus judicious use of type aliases would go a long way IMO, e.g.

  // All files with the same commit version
  type SameCommitVersionFileGroup = ArrayBuffer[FileStatus]
  
  // All commits whose timestamps need to be adjusted because of the first commit in the group
  type TimestampAdjustedCommitGroup = ArrayBuffer[SameCommitVersionFileGroup]
  
  // All timestamp groups that depend on the same checkpoint
  type DependentCheckpointGroup = ArrayBuffer[TimestampAdjustedCommitGroup]
  
  // Returns all checkpoint groups except the last, so we never delete the last checkpoint.
  class LastCheckpointPreservingLogDeletionIterator(
      underlying: Iterator[TimestampAdjustedCommitGroup])
    extends Iterator[DependentCheckpointGroup]{

    // Each next call returns all timestamp groups that depend on the same checkpoint.
    private class CheckpointGroupingIterator(
        underlying: Iterator[TimestampAdjustedCommitGroup])
      extends GroupBreakingIterator[TimestampAdjustedCommitGroup](underlying) {

Once such changes are made, it will be a lot easier to review the code for correctness by comparing against the stated intent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That def. makes sense.

  1. Added the types
  2. Added high level comments in the MetadataCleanup
  3. Made the listExpiredDeltaLogs easier to read.

@sumeet-db sumeet-db requested a review from scovich April 16, 2024 21:44
Comment on lines 454 to 455
while (bufferedUnderlying.hasNext &&
(!isNewGroupStart(bufferedUnderlying.head) || group.isEmpty)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes! Now that I look closer, none of these are logically equivalent!

Old logic said:

If iterator is non-empty AND (next element is not a group start, OR the current group is empty, add to the current group)

... which could be cast as !E && (!S || G) == (!E && !S) || (!E && G)

My suggested logic said:

If NOT (iterator is empty OR next element is a group start) AND the current group is empty

... which could be cast as !(E || S) && G which simplifies to (!E && !S) && G == !E && !S && G

Your new logic says:

If NOT (iterator is empty OR next element is a group start) OR the current group is empty

... which could be cast as !(E || S) || G which simplifies to (!E && !S) || G == (!E || G) && (!S || G)


private def queueItemsIfNeeded(): Unit = {
if (bufferedOutput.isEmpty && bufferedUnderlying.hasNext) {
val group = new mutable.ArrayBuffer[T]()
while (bufferedUnderlying.hasNext &&
(!isNewGroupStart(bufferedUnderlying.head) || group.isEmpty)) {
while (!bufferedUnderlying.headOption.forall(isNewGroupStart) || group.isEmpty) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescuing #2673 (comment)

tl;dr: The correct rewrite should have been:

Suggested change
while (!bufferedUnderlying.headOption.forall(isNewGroupStart) || group.isEmpty) {
while (bufferedUnderlying.headOption.exists(isNewGroupStart(_) || group.isEmpty)) {

If no tests failed because of this, it suggests we have a pretty basic test gap that needs to be corrected.

Comment on lines +516 to +519
// Implement Iterator.dropRight(1) using Iterator.sliding(2).map(_.head) since scala doesn't
// provide a dropRight method for iterators.
// (A, B), (B, C), ..., (X, Y), (Y, Z)
// ^ ^ ... ^ ^
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could achieve this using a buffered element instead?

class DropLastIterator[T](underlying: Iterator[T]) extends Iterator[T] {
  private var _next =  underlying.nextOption()

  override def hasNext: Boolean = _next.isDefined && underlying.hasNext
  override def next(): T = {
    val rval = _next.get // verified by hasNext
    _next = Some(underlying.next()) // verified by hasNext
    rval
  }
}

... not sure whether it's better to write the above 10 LoC or keep the current 1 LoC + 10 lines of comments?

NOTE: Iterator.nextOption was added in scala-2.13, but we could add it to the existing scala-2.12 shim for Option if we wanted to use it here.

Comment on lines 506 to 508
val checkpointPaths = files.flatten.toArray.collect {
case f if isCheckpointFile(f) => CheckpointInstance(f.getPath)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should probably toArray after the collect, to avoid creating two arrays?
(probably doesn't matter much in practice, tho)

@@ -566,11 +577,13 @@ object DeltaHistoryManager extends DeltaLogging {
* @param maxTimestamp The timestamp until which we can delete (inclusive).
*/
class TimestampAdjustingLogDeletionIterator(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this class name is very helpful at describing what the iterator actually does? Something like TimestampAdjustmentGroupingIterator is probably more accurate?

val deltaFileIdx = group.lastIndexWhere { files =>
files.exists((file: FileStatus) => isDeltaFile(file)) }
files.exists((file: FileStatus) => isDeltaFile(file))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: is this sufficient? Or does the compiler truly need that type annotation?

Suggested change
files.exists((file: FileStatus) => isDeltaFile(file))
files.exists(file => isDeltaFile(file))

Comment on lines +121 to +123
* 3. Keep only the timestamp-adjusted-commit-groups whose start timestamp is less than or equal
* to the cutoff timestamp.
* 4. Remove any timestamp-adjusted-commit-groups that are fully protected.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do 3/ and 4/ differ?

Comment on lines +442 to +445
private abstract class GroupBreakingIterator[T](
underlying: Iterator[T]) extends Iterator[ArrayBuffer[T]] {
private var bufferedOutput: Option[mutable.ArrayBuffer[T]] = None
private val bufferedUnderlying = underlying.buffered
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the DeltaLogGroupingIterator, the latter seems to have a simpler implementation while achieving a very similar result. Can we not adopt it here?

  private abstract class GroupBreakingIterator[T](
      underlying: BufferedIterator[T]) extends Iterator[ArrayBuffer[T]] {

    def this(underlying: Iterator[T]) = this(underlying.buffered)

    protected def isNewGroupStart(item: T): Boolean

    protected def transform(item: T): T = item

    override def hasNext: Boolean = underlying.hasNext

    override def next(): ArrayBuffer[T] = {
      val first = transform(underlying.next())
      val buffer = ArrayBuffer(first)
      while (underlying.headOption.exists(!isNewGroupStart(_)) {
        buffer += transform(underlying.next())
      }
      buffer
    }
  }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think we could combine both classes into a single GroupByIterator base class, and use it to express all three iterators:

GroupByIterator
  // An iterator-optimized variant of [[Seq.groupBy]]. It only attempts to group adjacent
  // elements with the same key, and each call to [[next]] returns a complete (key, group)
  // pair. The returned groups are never empty. Two groups may have the same key only if
  // they are separated by other groups in the stream.
 abstract class GroupByIterator[K, T](
      underlying: BufferedIterator[T]) extends Iterator[(K, ArrayBuffer[T])] {

    def this(underlying: Iterator[T]) = this(underlying.buffered)
    
    // Extracts the grouping key for this item. Adjacent items whose grouping keys
    // compare equal will be placed in the same group.
    protected def key(item: T): K

    // True if an item belongs to the same group as the given grouping key. 
    // Defaults to equality comparison of group key vs. item key.
    protected def isSameGroup(groupKey: K, item: T): Boolean = {
      groupKey == key(item)
    }

    override def hasNext: Boolean = underlying.hasNext

    override def next(): (K, ArrayBuffer[T]) = {
      val first = underlying.next()
      val groupKey = key(first)
      val buffer = ArrayBuffer(first)
      while (underlying.headOption.exists(item => isSameGroup(groupKey, item))) {
        buffer += underlying.next()
      }
      groupKey -> buffer
    }
  }
DeltaLogVersionGroupingIterator

Super easy case:

class DeltaLogVersionGroupingIterator(checkpointAndDeltas: Iterator[FileStatus])
    extends GroupByIterator[Long, FileStatus](checkpointAndDeltas) {
  // Files associated with the same commit version go in the same commit group.
  override protected def key(file: FileStatus): Long = deltaVersion(file.getPath)
}
CheckpointGroupingIterator

A bit of plumbing needed to strip away the group keys:

    def getCheckpointGroupingIterator(
        underlying: Iterator[TimestampAdjustedCommitGroup]): Iterator[DependentCheckpointGroup]
      type Key = Option[Long]
      type Item = TimestampAdjustedCommitGroup
      
      val result = new GroupByIterator[Key, Item](underlying) {
        override protected def key(group: Item): Key = {
          val checkpointPaths = group.flatten.toArray.collect {
            case f if isCheckpointFile(f) => CheckpointInstance(f.getPath)
          }
          Checkpoints.getLatestCompleteCheckpointFromList(checkpointPaths).map(_.version)
        }
        override protected def isSameGroup(groupKey: Key, item: Item): Boolean = {
          key(item).isEmpty // current depends on previous if current lacks a checkpoint
        }
      }
      result.map(_._2) // strip off the unwanted group keys
    }
TimestampAdjustmentGroupedIterator

Fair bit of plumbing required because timestamp adjustment is a running max calculation of sorts:

    // Computes the adjusted commit timestamp for each version group
    def getTimestampAdjustedGroupedIterator(it: Iterator[SameCommitVersionFileGroup])
        : Iterator[TimestampAdjustedCommitGroup] = {
      // Make a pass over the file groups, assigning each group an adjusted timestamp
      // and updating the commit file status as needed to match. The resulting iterator
      // adds two new fields per group: the adjusted timestamp value, and a flag indicating
      // whether the group was actually adjusted. 
      var lastAdjustedTimestampOpt: Option[Long] = None
      val underlying = it.map { files =>
        val isAdjusted = files.indexWhere(isDeltaFile) match {
          case -1 =>
            // No commit in this group. Assign it a trivial adjusted timestamp.
            lastAdjustedTimestampOpt = lastAdjustedTimestampOpt.map(_ + 1)
            true
          case index =>
            // This group has a commit. Adjust timestamp only if needed.
            val file = files(index)
            val fileTimestamp = file.getModificationTime
            lastAdjustedTimestampOpt match {
              case Some(prev) if fileTimestamp <= prev =>
                // Commit timestamp is "too old" -- adjust it
                val adjustedTimestamp = prev + 1
                lastAdjustedTimestampOpt = Some(adjustedTimestamp)
                files(index) = createAdjustedFileStatus(file, adjustedTimestamp)
                true
              case _ =>
                // No adjustment needed
                lastAdjustedTimestampOpt = Some(fileTimestamp)
                false
            }
        }
        ((lastAdjustedTimestampOpt, isAdjusted), files)
      }
      
      type Key = (Option[Long], Boolean)
      type Item = (Key, SameCommitVersionFileGroup)

      val result = new GroupByIterator[Key, Item](underlying) {
        // key is adjusted timestamp info
        override protected def key(item: Item): Option[Long] = item._1
        
        // We only care whether this group is adjusted (group key doesn't actually matter)
        override protected def isSameGroup(groupKey: Key, item: Item): Boolean = key(item)._2
      }

      // Strip off the unwanted extra information
      result.map { case (_, group) =>
        group.map { case (_, files) => files
      }
    }

One big question in my mind tho -- why are we adjusting the modified timestamp of the file status as part of timestamp adjustment? Wrong timestamp will confuse the I/O layer, and I don't think anything downstream consumes it other than (perhaps) the actual file deletion code? Is it just an artifact of how the current code is written? Or is there something fundamental?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... I think we should consider a completely different approach. See other comment. But the complete checkpoint detection and timestamp adjustment logic suggested here might still be useful.

*/
private def needsTimestampAdjustment(commitFile: FileStatus): Boolean = {
lastCommitFileOpt.exists { lastFile =>
versionGetter(lastFile.getPath) < versionGetter(commitFile.getPath) &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that file listing is ordered, how could we ever encounter an "invalid" version that requires this check?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a big step back to re-examine this whole algorithm...

Let's say for now that we're interested in "commit versions" rather than individual files -- we will delete all files associated with a given commit, or we delete none of them.

We need to preserve three sets of commit versions whose...

  1. Commit timestamp is after the cutoff
  2. Commit timestamp impacts the adjusted commit timestamp of a commit after the cutoff
  3. Checkpoint is part of the snapshot state for a commit after the cutoff

Now suppose we do the following iterator prep work:

  1. File listing (unfiltered)
    • Iterator[FileStatus]
  2. Group files by commit version
    • Iterator[(Long, ArrayBuffer[FileStatus])]
  3. Flag each version that has a complete checkpoint
    • Iterator[(Boolean, Long, ArrayBuffer[FileStatus])]
  4. Compute adjusted timestamps, flagging versions whose adjusted timestamp was influenced by a previous version
    • Iterator[CommitVersionGroup], where
    • case class CommitVersionGroup(
          commitTimestamp: Long, 
          timestampIsAdjusted: Boolean, 
          hasCompleteCheckpoint: Boolean, 
          version: Long, 
          files: ArrayBuffer[FileStatus])
  5. Truncate the iterator upon encountering the first commit version group whose adjusted timestamp is larger than the cutoff
    • takeWhile(_.commitTimestamp <= cutoffTimestamp)

At this point, we cannot see (nor delete) any commit versions after the one that was active as-of the cutoff. But we still need to preserve that final "cutoff commit", along with any earlier commit whose timestamp or checkpoint influences the cutoff commit.

Next, we can build up a state machine that consumes an iterator and produces an iterator as output. It buffers up commit versions until it can prove the existence of some later commit which is independent of all buffered commits' timestamps and checkpoints. Once that happens, we can "release" the buffered commits and restart the buffering. That later independent commit may be the cutoff commit, or some earlier commit. If no such commit exists then the buffered commits are required for safety and it is correct to suppress them.

We remember three pieces of loop-carried state:

  • A list of buffered commit versions
  • Latest unadjusted timestamp seen so far
  • Latest checkpoint seen so far

In order to avoid corner cases, seed the state machine by examining the first version group; if no such group exists we are done. The first commit's timestamp is never adjusted, so we always know the latest unadjusted timestamp, but there's no guarantee it has a checkpoint.

The state machine is actually three miniature state machines, one to track adjusted timestamps, one to track checkpoints, and one to track buffered commit versions.

v - current commit version being considered
x - some arbitrary commit version before v, ie 0 <= x < v
y - some arbitrary commit version between x and v, ie x < y < v
T - latest unadjusted timestamp
C - latest checkpoint
B - buffered commit version range

Adjusted timestamps:

T (pre) Timestamp v adjusted? T (post)
x - v
x x

Latest checkpoint:

C (pre) Checkpoint at v? C (post)
x - x
x v

Buffered commit versions:

T (pre) C (pre) T C Release B (post)
x - x - - x..v
x - x v - x..v
x - v - - x..v
x - v v [x, v) v
x x x x - x..v
x x x v - x..v
x x v x - x..v
x x v v [x, v) v
x y x y - x..v
x y x v - x..v
x y v y [x, y) y..v
x y v v [x, v) v
y x y x - x..v
y x y v [x, y) y..v
y x v x - x..v
y x v v [x, v) v

Note that the above essentially computes a minimum over the two kinds of bounds as each new group arrives, and releases any buffered versions that are no longer in bounds.

That state machine corresponds (I believe) to the following code:

// Input: the iterator that results from all prep steps listed above
def fileDeletionIterator(
    versionGroups: BufferedIterator[CommitVersionGroup], 
    cutoffTimestamp: Long): Iterator[FileStatus] = {
  
  // Early-out if the listing is empty -- simplifies the code that follows.
  if (versionGroups.isEmpty) { return Iterator.empty }
  
  // Use the first group to seed the computation
  val first = versionGroups.next()
  var latestUnadjustedTimestampVersion = first.version
  var latestCheckpointVersion = Option.when(first.hasCheckpoint)(first.version)

  type Batch = ArrayBuffer[CommitVersionGroup]
  var bufferedGroups = ArrayBuffer(first)
  
  // Consumes one version group, possibly releasing a batch.
  def processOneGroup(): Option[Batch] = {
    val group = versionGroups.next()
    var boundsChanged = false
    
    // Adjust the timestamp lower bound if needed
    if (!group.timestampIsAdjusted) {
      latestUnadjustedTimestampVersion = group.version
      boundsChanged = true
    }
     
    // Adjust the checkpoint lower bound if needed
    if (group.hasCompleteCheckpoint) {
      latestCheckpointVersion = Some(group.version)
      boundsChanged = true
    }
      
    // Release any buffered versions that we no longer need to protect from deletion
    val batch = Option.when(boundsChanged) {
      val newLowerBound = Math.min(
        latestUnadjustedTimestampVersion, 
        latestCheckpointVersion.getOrElse(0))
      val (toRelease, toKeep) = bufferedGroups.partition(_.version < newLowerBound)
      bufferedGroups = toKeep
      toRelease
    }
      
    bufferedGroups += group
    batch
  }
  
  // Repeatedly consumes groups until a batch is released, or underlying exhausted
  def getBatch(): Option[Batch] = {
    while (versionGroups.hasNext) {
      processOneGroup().foreach { return _ }
    }
    None
  }
  
  // The actual batch iterator. Files from the final unreleased batch will never be
  // returned to the caller, and are thus protected from deletion.
  new Iterator[Batch] {
    var batch = getBatch()
    override def hasNext: Boolean = batch.isDefined
    override def next(): Batch = {
      val rval = batch.get // validated by hasNext
      batch = getBatch()
      rval
    }
  }.flatten.flatten
}

@felipepessoto
Copy link
Contributor

@sumeet-db are you planning to merge this PR?

@felipepessoto
Copy link
Contributor

@sumeet-db, @scovich, have you considered simple creating a checkpoint before deleting the logs?

@scovich
Copy link
Collaborator

scovich commented Jan 30, 2025

have you considered simple creating a checkpoint before deleting the logs?

Yes, actually. It's not all that "simple" tho :(

  1. Metadata cleanup has to make sure the checkpoint it wrote was valid before proceeding, or the table history is lost. One more thing that can go wrong.
  2. The client has to write the checkpoint using the table features that were enabled as-of that table version -- NOT as-of current table version. Yet another thing that can go wrong.
  3. Multiple clients running metadata cleanup concurrently would race to create the checkpoint, which exacerbates 1/ when multi-part checkpoints are in use
  4. We would still need to find the previous checkpoint in order to create the snapshot to be checkpointed, so it wouldn't really reduce the complexity of the listing algorithm.

Meanwhile, the primary goal of metadata cleanup is to control metadata bloat. If there were too few/small commits to trigger a new checkpoint, then the extra files are probably not worth the trouble to delete.

@felipepessoto
Copy link
Contributor

Very good points. I found the method createCheckpointAtVersion and I thought it could handle all of these concerns, but it is hard to confirm.

@scovich
Copy link
Collaborator

scovich commented Jan 31, 2025

I found the method createCheckpointAtVersion and I thought it could handle all of these concerns, but it is hard to confirm.

AFAIK, that method is only used by unit tests, to set up specific scenarios. I couldn't find any call sites in production code.

@felipepessoto
Copy link
Contributor

Only to document, after reading the comments at BufferingLogDeletionIterator class there is one more reason to don't create the checkpoint, it could change all the adjusted timestamps, breaking time travel by timestamp.

/**
* An iterator that helps select old log files for deletion. It takes the input iterator of log
* files from the earliest file, and returns should-be-deleted files until the given maxTimestamp
* or maxVersion to delete is reached. Note that this iterator may stop deleting files earlier
* than maxTimestamp or maxVersion if it finds that files that need to be preserved for adjusting
* the timestamps of subsequent files. Let's go through an example. Assume the following commit
* history:
*
* +---------+-----------+--------------------+
* | Version | Timestamp | Adjusted Timestamp |
* +---------+-----------+--------------------+
* | 0 | 0 | 0 |
* | 1 | 5 | 5 |
* | 2 | 10 | 10 |
* | 3 | 7 | 11 |
* | 4 | 8 | 12 |
* | 5 | 14 | 14 |
* +---------+-----------+--------------------+
*
* As you can see from the example, we require timestamps to be monotonically increasing with
* respect to the version of the commit, and each commit to have a unique timestamp. If we have
* a commit which doesn't obey one of these two requirements, we adjust the timestamp of that
* commit to be one millisecond greater than the previous commit.
*
* Given the above commit history, the behavior of this iterator will be as follows:
* - For maxVersion = 1 and maxTimestamp = 9, we can delete versions 0 and 1
* - Until we receive maxVersion >= 4 and maxTimestamp >= 12, we can't delete versions 2 and 3.
* This is because version 2 is used to adjust the timestamps of commits up to version 4.
* - For maxVersion >= 5 and maxTimestamp >= 14 we can delete everything
* The semantics of time travel guarantee that for a given timestamp, the user will ALWAYS get the
* same version. Consider a user asks to get the version at timestamp 11. If all files are there,
* we would return version 3 (timestamp 11) for this query. If we delete versions 0-2, the
* original timestamp of version 3 (7) will not have an anchor to adjust on, and if the time
* travel query is re-executed we would return version 4. This is the motivation behind this
* iterator implementation.
*
* The implementation maintains an internal "maybeDelete" buffer of files that we are unsure of
* deleting because they may be necessary to adjust time of future files. For each file we get
* from the underlying iterator, we check whether it needs time adjustment or not. If it does need
* time adjustment, then we cannot immediately decide whether it is safe to delete that file or
* not and therefore we put it in each the buffer. Then we iteratively peek ahead at the future
* files and accordingly decide whether to delete all the buffered files or retain them.
*
* @param underlying The iterator which gives the list of files in ascending version order
* @param maxTimestamp The timestamp until which we can delete (inclusive).
* @param maxVersion The version until which we can delete (inclusive).
* @param versionGetter A method to get the commit version from the file path.
*/
class BufferingLogDeletionIterator(

@felipepessoto
Copy link
Contributor

BTW, log compaction files are created with newer timestamp than startVersion, the reason that it doesn't affect BufferingLogDeletionIterator is another bug, they are not being cleaned up.

@felipepessoto
Copy link
Contributor

@scovich, @sumeet-db since this PR is stale and has conflicts, I created an alternative fix. Could you please review: #4146

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants