Skip to content

Commit

Permalink
[SPARK] Relax metadata conflict for identity column (#3525)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
This PR is part of #1959.
The PR relaxes metadata conflict for identity column SYNC high water
mark operation. When winning transaction contains identity column
metadata change and the current transaction does not contain metadata
change, we mark the current transaction as no metadata conflict.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
A new test suite.
## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No.
  • Loading branch information
zhipengmao-db authored Aug 13, 2024
1 parent cfd420d commit 55a038c
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.RowId.RowTrackingMetadataDomain
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaSparkPlanUtils.CheckDeterministicOptions
import org.apache.spark.sql.delta.util.FileNames
Expand Down Expand Up @@ -138,6 +139,12 @@ private[delta] class WinningCommitSummary(val actions: Seq[Action], val commitVe
val onlyAddFiles: Boolean = actions.collect { case f: FileAction => f }
.forall(_.isInstanceOf[AddFile])

// This indicates this commit contains metadata action that is solely for the purpose for
// updating IDENTITY high water marks. This is used by [[ConflictChecker]] to avoid certain
// conflict in [[checkNoMetadataUpdates]].
val identityOnlyMetadataUpdate = DeltaCommitTag
.getTagValueFromCommitInfo(commitInfo, DeltaSourceUtils.IDENTITY_COMMITINFO_TAG)
.exists(_.toBoolean)
}

private[delta] class ConflictChecker(
Expand Down Expand Up @@ -359,16 +366,62 @@ private[delta] class ConflictChecker(
false
}

// scalastyle:off line.size.limit
/**
* Check if the committed transaction has changed metadata.
*
* We want to deal with (and optimize for) the case where the winning commit's metadata update is
* solely for updating IDENTITY high water marks. In addition, we want to allow a metadata update
* that only sets the table property for row tracking enablement to true not to fail concurrent
* transactions if the current transaction does not do a metadata update.
*
* The conflict matrix is as follows:
*
* | | Winning Metadata (id) | Winning Metadata Row Tracking Enablement Only | Winning Metadata (other) | Winning No Metadata |
* | --------------------------------------------- | --------------------- | --------------------------------------------- | ------------------------ | ------------------- |
* | Current Metadata (id) | Conflict | Conflict (3) | Conflict | No conflict |
* | Current Metadata Row Tracking Enablement Only | Conflict (1) | Conflict (3) | Conflict | No conflict |
* | Current Metadata (other) | Conflict (1) | Conflict (3) | Conflict | No conflict |
* | Current No Metadata | No conflict (2) | No conflict (4) | Conflict | No conflict |
*
* The differences in cases (1), (2), (3), and (4) are:
* (1) This is a case we could have done something to avoid conflict, e.g., current transaction
* adds a column, while winning transaction does blind append that generates IDENTITY values. But
* it's not a common case and the change to avoid conflict is non-trivial (we have to somehow
* merge the metadata from winning txn and current txn). We decide to not do that and let it
* conflict.
* (2) This is a case that is more common (e.g., current = delete/update, winning = update high
* water mark) and we will not let it conflict here. Note that it might still cause conflict in
* other conflict checks.
* (3) If the current txn changes the metadata too, we will fail the current txn. While it is
* possible to copy over the metadata information, this scenario is unlikely to happen in practice
* and properly handling this for the many edge case (e.g current txn sets the table property
* to false) is risky.
* (4) In a row tracking enablement only metadata update, the only difference with the previous
* metadata are the row tracking table property and materialized column names. These metadata
* information only affect the preservation of row tracking. If we copy over the new metadata
* configurations and mark the current txn as not preserving row tracking, then the current txn
* is respecting the metadata update and does not need to fail.
*
*/
// scalastyle:on line.size.limit
protected def checkNoMetadataUpdates(): Unit = {
// If winning commit does not contain metadata update, no conflict.
if (winningCommitSummary.metadataUpdates.isEmpty) return

if (tryResolveRowTrackingEnablementOnlyMetadataUpdateConflict()) {
return
}

// Fail if the metadata is different than what the txn read.
if (winningCommitSummary.metadataUpdates.nonEmpty) {
// The only case in the remaining cases that we will not conflict is winning commit is
// identity only metadata update and current commit has no metadata update.
val tolerateIdentityOnlyMetadataUpdate = winningCommitSummary.identityOnlyMetadataUpdate &&
!currentTransactionInfo.metadataChanged

if (!tolerateIdentityOnlyMetadataUpdate) {
if (winningCommitSummary.identityOnlyMetadataUpdate) {
IdentityColumn.logTransactionAbort(deltaLog)
}
throw DeltaErrors.metadataChangedException(winningCommitSummary.commitInfo)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ object IdentityColumn extends DeltaLogging {
val opTypeDefinition = "delta.identityColumn.definition"
// When table with IDENTITY columns are written into.
val opTypeWrite = "delta.identityColumn.write"
// When IDENTITY column update causes transaction to abort.
val opTypeAbort = "delta.identityColumn.abort"

// Return true if `field` is an identity column that allows explicit insert. Caller must ensure
// `isIdentityColumn(field)` is true.
Expand Down Expand Up @@ -244,6 +246,10 @@ object IdentityColumn extends DeltaLogging {
}
}

def logTransactionAbort(deltaLog: DeltaLog): Unit = {
recordDeltaEvent(deltaLog, opTypeAbort)
}

// Calculate the sync'ed IDENTITY high water mark based on actual data and returns a
// potentially updated `StructField`.
def syncIdentity(field: StructField, df: DataFrame): StructField = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,18 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// writes.
protected var trackHighWaterMarks: Option[Set[String]] = None

// Set to true if this transaction is ALTER TABLE ALTER COLUMN SYNC IDENTITY.
protected var syncIdentity: Boolean = false

def setTrackHighWaterMarks(track: Set[String]): Unit = {
assert(trackHighWaterMarks.isEmpty, "The tracking set shouldn't have been set")
trackHighWaterMarks = Some(track)
}

def setSyncIdentity(): Unit = {
syncIdentity = true
}

/**
* Records an update to the metadata that should be committed with this transaction. As this is
* called after write, it skips checking `!hasWritten`. We do not have a full protocol of what
Expand All @@ -421,6 +428,14 @@ trait OptimisticTransactionImpl extends TransactionalWrite
updateMetadataInternal(updatedMetadata, ignoreDefaultProperties = false)
}

// Returns whether this transaction updates metadata solely for IDENTITY high water marks (this
// can be either a write that generates IDENTITY values or an ALTER TABLE ALTER COLUMN SYNC
// IDENTITY command). This must be called before precommitUpdateSchemaWithIdentityHighWaterMarks
// as it might update `newMetadata`.
def isIdentityOnlyMetadataUpdate(): Boolean = {
syncIdentity || (updatedIdentityHighWaterMarks.nonEmpty && newMetadata.isEmpty)
}

// Called before commit to update table schema with collected IDENTITY column high water marks
// so that the change can be committed to delta log.
def precommitUpdateSchemaWithIdentityHighWaterMarks(): Unit = {
Expand Down Expand Up @@ -1206,6 +1221,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// Check for internal SetTransaction conflicts and dedup.
val finalActions = checkForSetTransactionConflictAndDedup(actions ++ this.actions.toSeq)

val identityOnlyMetadataUpdate = isIdentityOnlyMetadataUpdate()
// Update schema for IDENTITY column writes if necessary. This has to be called before
// `prepareCommit` because it might change metadata and `prepareCommit` is responsible for
// converting updated metadata into a `Metadata` action.
Expand Down Expand Up @@ -1235,6 +1251,12 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val readRowIdHighWatermark =
RowId.extractHighWatermark(snapshot).getOrElse(RowId.MISSING_HIGH_WATER_MARK)

val autoTags = mutable.HashMap.empty[String, String]
if (identityOnlyMetadataUpdate) {
autoTags += (DeltaSourceUtils.IDENTITY_COMMITINFO_TAG -> "true")
}
val allTags = tags ++ autoTags

commitAttemptStartTimeMillis = clock.getTimeMillis()
commitInfo = CommitInfo(
time = commitAttemptStartTimeMillis,
Expand All @@ -1248,7 +1270,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
isBlindAppend = Some(isBlindAppend),
operationMetrics = getOperationMetrics(op),
userMetadata = getUserMetadata(op),
tags = if (tags.nonEmpty) Some(tags) else None,
tags = if (allTags.nonEmpty) Some(allTags) else None,
txnId = Some(txnId))

val firstAttemptVersion = getFirstAttemptVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ case class AlterTableChangeColumnDeltaCommand(
assert(oldColumn == newColumn)
val df = txn.snapshot.deltaLog.createDataFrame(txn.snapshot, txn.filterFiles())
val field = IdentityColumn.syncIdentity(newColumn, df)
txn.setSyncIdentity()
txn.readWholeTable()
field
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object DeltaSourceUtils {
val IDENTITY_INFO_START = "delta.identity.start"
val IDENTITY_INFO_STEP = "delta.identity.step"
val IDENTITY_INFO_HIGHWATERMARK = "delta.identity.highWaterMark"
val IDENTITY_COMMITINFO_TAG = "delta.identity.schemaUpdate"

def isDeltaDataSourceName(name: String): Boolean = {
name.toLowerCase(Locale.ROOT) == NAME || name.toLowerCase(Locale.ROOT) == ALT_NAME
Expand Down
Loading

0 comments on commit 55a038c

Please sign in to comment.