Skip to content

Commit

Permalink
Merge pull request #117646 from miraradeva/backport23.2-117541
Browse files Browse the repository at this point in the history
release-23.2: storage: fix a series of intent resolution bugs with ignored seq nums
  • Loading branch information
miraradeva authored Jan 11, 2024
2 parents 3c61369 + bc25c51 commit ec590f6
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 62 deletions.
128 changes: 66 additions & 62 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4876,17 +4876,17 @@ func MVCCResolveWriteIntent(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
update roachpb.LockUpdate,
opts MVCCResolveWriteIntentOptions,
) (ok bool, numBytes int64, resumeSpan *roachpb.Span, replLocksReleased bool, err error) {
if len(intent.Key) == 0 {
if len(update.Key) == 0 {
return false, 0, nil, false, emptyKeyError()
}
if len(intent.EndKey) > 0 {
if len(update.EndKey) > 0 {
return false, 0, nil, false, errors.Errorf("can't resolve range intent as point intent")
}
if opts.TargetBytes < 0 {
return false, 0, &roachpb.Span{Key: intent.Key}, false, nil
return false, 0, &roachpb.Span{Key: update.Key}, false, nil
}

// Production code will use a buffered writer, which makes the numBytes
Expand All @@ -4898,7 +4898,7 @@ func MVCCResolveWriteIntent(
// Iterate over all locks held by intent.Txn on this key.
ltIter, err := NewLockTableIterator(rw, LockTableIteratorOptions{
Prefix: true,
MatchTxnID: intent.Txn.ID,
MatchTxnID: update.Txn.ID,
})
if err != nil {
return false, 0, nil, false, err
Expand All @@ -4909,11 +4909,11 @@ func MVCCResolveWriteIntent(

var ltSeekKey EngineKey
ltSeekKey, buf.ltKeyBuf = LockTableKey{
Key: intent.Key,
Key: update.Key,
// lock.Intent is the first locking strength in the lock-table. As a
// minor performance optimization, we seek to this version and iterate
// instead of iterating from the beginning of the version prefix (i.e.
// keys.LockTableSingleKey(intent.Key)). This can seek past half of the
// keys.LockTableSingleKey(update.Key)). This can seek past half of the
// LSM tombstones on this key in cases like those described in d1c91e0e
// where intents are repeatedly written and removed on a specific key so
// an intent is surrounded by a large number of tombstones during its
Expand All @@ -4930,7 +4930,7 @@ func MVCCResolveWriteIntent(
// a single seek and step in cases where only an intent is present. We
// chose not to pessimize the common case to optimize the uncommon case.
Strength: lock.Intent,
TxnUUID: intent.Txn.ID,
TxnUUID: update.Txn.ID,
}.ToEngineKey(buf.ltKeyBuf)

for valid, err := ltIter.SeekEngineKeyGE(ltSeekKey); ; valid, err = ltIter.NextEngineKey() {
Expand All @@ -4943,9 +4943,9 @@ func MVCCResolveWriteIntent(
if err != nil {
return false, 0, nil, false, errors.Wrap(err, "decoding lock table key version")
}
if txnID != intent.Txn.ID {
if txnID != update.Txn.ID {
return false, 0, nil, false, errors.AssertionFailedf(
"unexpected txnID %v != %v while scanning lock table", txnID, intent.Txn.ID)
"unexpected txnID %v != %v while scanning lock table", txnID, update.Txn.ID)
}
if err := ltIter.ValueProto(&buf.meta); err != nil {
return false, 0, nil, false, errors.Wrap(err, "unmarshaling lock table value")
Expand All @@ -4965,9 +4965,9 @@ func MVCCResolveWriteIntent(
}
defer iter.Close()
}
outcome, err = mvccResolveWriteIntent(ctx, rw, iter, ms, intent, &buf.meta, buf)
outcome, err = mvccResolveWriteIntent(ctx, rw, iter, ms, update, &buf.meta, buf)
} else {
outcome, err = mvccReleaseLockInternal(ctx, rw, ms, intent, str, &buf.meta, buf)
outcome, err = mvccReleaseLockInternal(ctx, rw, ms, update, str, &buf.meta, buf)
replLocksReleased = replLocksReleased || outcome != lockNoop
}
if err != nil {
Expand Down Expand Up @@ -5088,58 +5088,57 @@ const (
// provided intent was resolved (a no-op, rewriting the intent, writing a
// SingleDelete key, or writing a Delete key).
//
// REQUIRES: intent and meta refer to the same intent on the same key.
// REQUIRES: update and meta refer to the same intent on the same key.
// REQUIRES: iter surfaces range keys via IterKeyTypePointsAndRanges.
func mvccResolveWriteIntent(
ctx context.Context,
writer Writer,
iter MVCCIterator,
ms *enginepb.MVCCStats,
// TODO(nvanbenschoten): rename this field to "update".
intent roachpb.LockUpdate,
update roachpb.LockUpdate,
meta *enginepb.MVCCMetadata,
buf *putBuffer,
) (outcome lockResolutionOutcome, err error) {
if meta.Txn == nil || meta.Txn.ID != intent.Txn.ID {
return lockNoop, errors.Errorf("txn does not match: %v != %v", meta.Txn, intent.Txn)
if meta.Txn == nil || meta.Txn.ID != update.Txn.ID {
return lockNoop, errors.Errorf("txn does not match: %v != %v", meta.Txn, update.Txn)
}

metaKey := MakeMVCCMetadataKey(intent.Key)
metaKey := MakeMVCCMetadataKey(update.Key)
origMetaKeySize := int64(metaKey.EncodedSize())
origMetaValSize := int64(meta.Size())
metaTimestamp := meta.Timestamp.ToTimestamp()
canSingleDelHelper := singleDelOptimizationHelper{
_didNotUpdateMeta: meta.TxnDidNotUpdateMeta,
_hasIgnoredSeqs: len(intent.IgnoredSeqNums) > 0,
_hasIgnoredSeqs: len(update.IgnoredSeqNums) > 0,
// NB: the value is only used if epochs match, so it doesn't
// matter if we use the one from meta or incoming request here.
_epoch: intent.Txn.Epoch,
_epoch: update.Txn.Epoch,
}

// A commit with a newer epoch than the intent effectively means that we
// An update with a newer epoch than the intent effectively means that we
// wrote this intent before an earlier retry, but didn't write it again
// after. We treat such intents as uncommitted.
//
// A commit with a newer timestamp than the intent means that our timestamp
// An update with a newer timestamp than the intent means that our timestamp
// was pushed during the course of an epoch. We treat such intents as
// committed after moving their timestamp forward. This is possible if a
// transaction writes an intent and then successfully refreshes its
// timestamp to avoid a restart.
//
// A commit with an older epoch than the intent should never happen because
// An update with an older epoch than the intent should never happen because
// epoch increments require client action. This means that they can't be
// caused by replays.
//
// A commit with an older timestamp than the intent should not happen under
// An update with an older timestamp than the intent should not happen under
// normal circumstances because a client should never bump its timestamp
// after issuing an EndTxn request. Replays of intent writes that are pushed
// forward due to WriteTooOld errors without client action combined with
// replays of intent resolution make this configuration a possibility. We
// treat such intents as uncommitted.
epochsMatch := meta.Txn.Epoch == intent.Txn.Epoch
timestampsValid := metaTimestamp.LessEq(intent.Txn.WriteTimestamp)
timestampChanged := metaTimestamp.Less(intent.Txn.WriteTimestamp)
commit := intent.Status == roachpb.COMMITTED && epochsMatch && timestampsValid
epochsMatch := meta.Txn.Epoch == update.Txn.Epoch
timestampsValid := metaTimestamp.LessEq(update.Txn.WriteTimestamp)
timestampChanged := metaTimestamp.Less(update.Txn.WriteTimestamp)
commit := update.Status == roachpb.COMMITTED && epochsMatch && timestampsValid

// Note the small difference to commit epoch handling here: We allow
// a push from a previous epoch to move a newer intent. That's not
Expand All @@ -5166,9 +5165,9 @@ func mvccResolveWriteIntent(
// used for resolving), but that costs latency.
// TODO(tschottdorf): various epoch-related scenarios here deserve more
// testing.
inProgress := !intent.Status.IsFinalized() && meta.Txn.Epoch >= intent.Txn.Epoch
inProgress := !update.Status.IsFinalized() && meta.Txn.Epoch >= update.Txn.Epoch
pushed := inProgress && timestampChanged
latestKey := MVCCKey{Key: intent.Key, Timestamp: metaTimestamp}
latestKey := MVCCKey{Key: update.Key, Timestamp: metaTimestamp}

// Handle partial txn rollbacks. If the current txn sequence
// is part of a rolled back (ignored) seqnum range, we're going
Expand All @@ -5180,7 +5179,13 @@ func mvccResolveWriteIntent(
var rolledBackVal *MVCCValue
buf.newMeta = *meta
newMeta := &buf.newMeta
if len(intent.IgnoredSeqNums) > 0 {
// Update the MVCC history only if:
// 1. There are IgnoredSeqNums present.
// 2. The update is not going to abort the intent; otherwise, the entire
// history will be removed anyway.
// 3. The epochs of the intent and the update match; otherwise the epochs may
// have different seq nums (and ignored seq nums).
if len(update.IgnoredSeqNums) > 0 && (commit || inProgress) && epochsMatch {
// NOTE: mvccMaybeRewriteIntentHistory mutates its meta argument.
// TODO(nvanbenschoten): this is an awkward interface. We shouldn't
// be mutating meta and we shouldn't be restoring the previous value
Expand All @@ -5191,7 +5196,7 @@ func mvccResolveWriteIntent(
// intact and corresponding to the stats in ms to ensure that later on (in
// updateStatsOnResolve) the stats will be updated correctly based on the
// old meta (meta) and the new meta (newMeta).
removeIntent, rolledBackVal, err = mvccMaybeRewriteIntentHistory(ctx, writer, intent.IgnoredSeqNums, newMeta, latestKey)
removeIntent, rolledBackVal, err = mvccMaybeRewriteIntentHistory(ctx, writer, update.IgnoredSeqNums, newMeta, latestKey)
if err != nil {
return lockNoop, err
}
Expand All @@ -5211,7 +5216,7 @@ func mvccResolveWriteIntent(
// If we need to update the intent to roll back part of its intent
// history, make sure that we don't regress its timestamp, even if the
// caller provided an outdated timestamp.
intent.Txn.WriteTimestamp.Forward(metaTimestamp)
update.Txn.WriteTimestamp.Forward(metaTimestamp)
}
}

Expand All @@ -5235,7 +5240,7 @@ func mvccResolveWriteIntent(
if commit || pushed || rolledBackVal != nil {
// The intent might be committing at a higher timestamp, or it might be
// getting pushed.
newTimestamp := intent.Txn.WriteTimestamp
newTimestamp := update.Txn.WriteTimestamp

// Assert that the intent timestamp never regresses. The logic above should
// not allow this, regardless of the input to this function.
Expand Down Expand Up @@ -5295,7 +5300,7 @@ func mvccResolveWriteIntent(
// to the observed timestamp.
newValue := oldValue
newValue.LocalTimestamp = oldValue.GetLocalTimestamp(oldKey.Timestamp)
newValue.LocalTimestamp.Forward(intent.ClockWhilePending.Timestamp)
newValue.LocalTimestamp.Forward(update.ClockWhilePending.Timestamp)
if !newValue.LocalTimestampNeeded(newKey.Timestamp) || !writer.ShouldWriteLocalTimestamps(ctx) {
newValue.LocalTimestamp = hlc.ClockTimestamp{}
}
Expand Down Expand Up @@ -5347,6 +5352,7 @@ func mvccResolveWriteIntent(

// Update or remove the metadata key.
var metaKeySize, metaValSize int64
var logicalOp MVCCLogicalOpType
if !commit {
// Keep existing intent if we're updating it. We update the existing
// metadata's timestamp instead of using the supplied intent meta to avoid
Expand All @@ -5356,6 +5362,7 @@ func mvccResolveWriteIntent(
outcome = lockOverwritten
metaKeySize, metaValSize, err = buf.putLockMeta(
writer, metaKey.Key, lock.Intent, newMeta, true /* alreadyExists */)
logicalOp = MVCCUpdateIntentOpType
} else {
outcome = lockClearedByDelete
useSingleDelete := canSingleDelHelper.onCommitLock()
Expand All @@ -5367,26 +5374,23 @@ func mvccResolveWriteIntent(
ValueSizeKnown: true,
ValueSize: uint32(origMetaValSize),
})
logicalOp = MVCCCommitIntentOpType
}
if err != nil {
return lockNoop, err
}

// Update stat counters related to resolving the intent.
if ms != nil {
ms.Add(updateStatsOnResolve(intent.Key, prevIsValue, prevValSize, origMetaKeySize, origMetaValSize,
ms.Add(updateStatsOnResolve(update.Key, prevIsValue, prevValSize, origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize, meta, newMeta, commit))
}

// Log the logical MVCC operation.
logicalOp := MVCCCommitIntentOpType
if pushed {
logicalOp = MVCCUpdateIntentOpType
}
writer.LogLogicalOp(logicalOp, MVCCLogicalOpDetails{
Txn: intent.Txn,
Key: intent.Key,
Timestamp: intent.Txn.WriteTimestamp,
Txn: update.Txn,
Key: update.Key,
Timestamp: update.Txn.WriteTimestamp,
})
// outcome is set up above.
return outcome, nil
Expand All @@ -5396,7 +5400,7 @@ func mvccResolveWriteIntent(
// MVCCMetadata.
//
// Note that we have to support a somewhat unintuitive case - an ABORT with
// intent.Txn.Epoch < meta.Txn.Epoch:
// update.Txn.Epoch < meta.Txn.Epoch:
// - writer1 writes key0 at epoch 0
// - writer2 with higher priority encounters intent at key0 (epoch 0)
// - writer1 restarts, now at epoch one (txn record not updated)
Expand All @@ -5414,8 +5418,8 @@ func mvccResolveWriteIntent(

// Log the logical MVCC operation.
writer.LogLogicalOp(MVCCAbortIntentOpType, MVCCLogicalOpDetails{
Txn: intent.Txn,
Key: intent.Key,
Txn: update.Txn,
Key: update.Key,
})

ok := false
Expand Down Expand Up @@ -5490,7 +5494,7 @@ func mvccResolveWriteIntent(
// Clear stat counters attributable to the intent we're aborting.
if ms != nil {
ms.Add(updateStatsOnClear(
intent.Key, origMetaKeySize, origMetaValSize, 0, 0, meta, nil, 0))
update.Key, origMetaKeySize, origMetaValSize, 0, 0, meta, nil, 0))
}
// outcome is set above before the clearLockMeta call.
return outcome, nil
Expand Down Expand Up @@ -5518,7 +5522,7 @@ func mvccResolveWriteIntent(

// Update stat counters with older version.
if ms != nil {
ms.Add(updateStatsOnClear(intent.Key, origMetaKeySize, origMetaValSize, metaKeySize,
ms.Add(updateStatsOnClear(update.Key, origMetaKeySize, origMetaValSize, metaKeySize,
metaValSize, meta, &buf.newMeta, unsafeNextKey.Timestamp.WallTime))
}
// outcome is set above before the clearLockMeta call.
Expand Down Expand Up @@ -5599,7 +5603,7 @@ func MVCCResolveWriteIntentRange(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
update roachpb.LockUpdate,
opts MVCCResolveWriteIntentRangeOptions,
) (
numKeys, numBytes int64,
Expand All @@ -5611,7 +5615,7 @@ func MVCCResolveWriteIntentRange(
keysExceeded := opts.MaxKeys < 0
bytesExceeded := opts.TargetBytes < 0
if keysExceeded || bytesExceeded {
resumeSpan := intent.Span // don't inline or `intent` would escape to heap
resumeSpan := update.Span // don't inline or `update` would escape to heap
if keysExceeded {
resumeReason = kvpb.RESUME_KEY_LIMIT
} else if bytesExceeded {
Expand All @@ -5620,12 +5624,12 @@ func MVCCResolveWriteIntentRange(
return 0, 0, &resumeSpan, resumeReason, false, nil
}

ltStart, _ := keys.LockTableSingleKey(intent.Key, nil)
ltEnd, _ := keys.LockTableSingleKey(intent.EndKey, nil)
ltStart, _ := keys.LockTableSingleKey(update.Key, nil)
ltEnd, _ := keys.LockTableSingleKey(update.EndKey, nil)
ltIter, err := NewLockTableIterator(rw, LockTableIteratorOptions{
LowerBound: ltStart,
UpperBound: ltEnd,
MatchTxnID: intent.Txn.ID,
MatchTxnID: update.Txn.ID,
})
if err != nil {
return 0, 0, nil, 0, false, err
Expand All @@ -5634,8 +5638,8 @@ func MVCCResolveWriteIntentRange(
var mvccIter MVCCIterator
iterOpts := IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: intent.Key,
UpperBound: intent.EndKey,
LowerBound: update.Key,
UpperBound: update.EndKey,
}
if rw.ConsistentIterators() {
// Production code should always have consistent iterators.
Expand All @@ -5651,8 +5655,8 @@ func MVCCResolveWriteIntentRange(
buf := newPutBuffer()
defer buf.release()

intentEndKey := intent.EndKey
intent.EndKey = nil
intentEndKey := update.EndKey
update.EndKey = nil

var lastResolvedKey roachpb.Key
var lastResolvedKeyOk bool
Expand Down Expand Up @@ -5702,20 +5706,20 @@ func MVCCResolveWriteIntentRange(
lastResolvedKey = append(lastResolvedKey[:0], ltKey.Key...)
lastResolvedKeyOk = false
}
if ltKey.TxnUUID != intent.Txn.ID {
if ltKey.TxnUUID != update.Txn.ID {
return 0, 0, nil, 0, false, errors.AssertionFailedf(
"unexpected txnID %v != %v while scanning lock table", ltKey.TxnUUID, intent.Txn.ID)
"unexpected txnID %v != %v while scanning lock table", ltKey.TxnUUID, update.Txn.ID)
}
intent.Key = ltKey.Key
update.Key = ltKey.Key
if err := ltIter.ValueProto(&buf.meta); err != nil {
return 0, 0, nil, 0, false, errors.Wrap(err, "unmarshaling lock table value")
}
beforeBytes := rw.BufferedSize()
var outcome lockResolutionOutcome
if ltKey.Strength == lock.Intent {
outcome, err = mvccResolveWriteIntent(ctx, rw, mvccIter, ms, intent, &buf.meta, buf)
outcome, err = mvccResolveWriteIntent(ctx, rw, mvccIter, ms, update, &buf.meta, buf)
} else {
outcome, err = mvccReleaseLockInternal(ctx, rw, ms, intent, ltKey.Strength, &buf.meta, buf)
outcome, err = mvccReleaseLockInternal(ctx, rw, ms, update, ltKey.Strength, &buf.meta, buf)
replLocksReleased = replLocksReleased || outcome != lockNoop
}
if err != nil {
Expand Down
Loading

0 comments on commit ec590f6

Please sign in to comment.