From 97a30d2c524985c2949dbabbdd66ec0e860991e9 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Mon, 25 Sep 2023 18:20:13 +0100 Subject: [PATCH] rangefeed: don't advance resolved ts on txn abort Previously when rangefeed received aborted status for pushed transaction it would eagerly dropped all known intents that belonged to the transactions from its cache. This was proven incorrect because we can get ABORTED status event when transaction is comitted if leaseholder already removed transaction record. This commit removes the shortcut and always relies on intent operations to advance resolved timestamp. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 3 - pkg/kv/kvserver/rangefeed/processor_test.go | 25 ++++--- .../kvserver/rangefeed/resolved_timestamp.go | 45 ------------ .../rangefeed/resolved_timestamp_test.go | 71 +------------------ .../kvserver/rangefeed/scheduled_processor.go | 3 - pkg/kv/kvserver/rangefeed/task.go | 43 +++++------ pkg/kv/kvserver/rangefeed/task_test.go | 4 +- pkg/kv/kvserver/replica_rangefeed.go | 4 +- pkg/storage/enginepb/mvcc3.proto | 12 +--- 9 files changed, 40 insertions(+), 170 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 8c7f003f1905..e65ab963139d 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -808,9 +808,6 @@ func (p *LegacyProcessor) consumeLogicalOps( case *enginepb.MVCCAbortIntentOp: // No updates to publish. - case *enginepb.MVCCAbortTxnOp: - // No updates to publish. - default: panic(errors.AssertionFailedf("unknown logical op %T", t)) } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 2561f0acfa79..ac40579ff774 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -110,12 +110,6 @@ func abortIntentOp(txnID uuid.UUID) enginepb.MVCCLogicalOp { }) } -func abortTxnOp(txnID uuid.UUID) enginepb.MVCCLogicalOp { - return makeLogicalOp(&enginepb.MVCCAbortTxnOp{ - TxnID: txnID, - }) -} - func makeRangeFeedEvent(val interface{}) *kvpb.RangeFeedEvent { var event kvpb.RangeFeedEvent event.MustSetValue(val) @@ -1019,8 +1013,16 @@ func TestProcessorTxnPushAttempt(t *testing.T) { return nil } - <-pausePushAttemptsC - <-resumePushAttemptsC + select { + case <-pausePushAttemptsC: + select { + case <-resumePushAttemptsC: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } return nil }) @@ -1084,6 +1086,13 @@ func TestProcessorTxnPushAttempt(t *testing.T) { resumePushAttemptsC <- struct{}{} h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second) + // The resolved timestamp should not move as aborted transaction status can't + // guarantee that transaction was actually aborted. + h.syncEventC() + require.Equal(t, hlc.Timestamp{WallTime: 59}, h.rts.Get()) + // "Wait" for txn 2 to resolve when pushed intents "arrive". + p.ConsumeLogicalOps(ctx, commitIntentOp(txn2MetaT3Post.ID, txn2MetaT3Post.WriteTimestamp)) + p.ConsumeLogicalOps(ctx, commitIntentOp(txn2MetaT3Post.ID, txn2MetaT3Post.WriteTimestamp)) // The resolved timestamp should have moved forwards to the closed // timestamp. h.syncEventC() diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index 6c5e09643e4d..7a57c5d862d5 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -165,51 +165,6 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { // about the transaction other than to decrement its reference count. return rts.intentQ.DecrRef(t.TxnID, hlc.Timestamp{}) - case *enginepb.MVCCAbortTxnOp: - // Unlike the previous case, an aborted transaction does indicate - // that none of the transaction's intents will ever be committed. - // This means that we can stop tracking the transaction entirely. - // Doing so is critical to ensure forward progress of the resolved - // timestamp in situtations where the oldest transaction on a range - // is abandoned and the locations of its intents are unknown. - // - // However, the transaction may also still be writing, updating, and - // resolving (aborting) its intents, so we need to be careful with - // how we handle any future operations from this transaction. There - // are three different operations we could see the zombie transaction - // perform: - // - // - MVCCWriteIntentOp: it could write another intent. This could result - // in "reintroducing" the transaction to the queue. We allow this - // to happen and rely on pushing the transaction again, eventually - // evicting the transaction from the queue for good. - // - // Just like any other transaction, this new intent will necessarily - // be pushed above the closed timestamp, so we don't need to worry - // about resolved timestamp regressions. - // - // - MVCCUpdateIntentOp: it could update one of its intents. If we're - // not already tracking the transaction then the queue will ignore - // the intent update. - // - // - MVCCAbortIntentOp: it could resolve one of its intents as aborted. - // This is the most likely case. Again, if we're not already tracking - // the transaction then the queue will ignore the intent abort. - // - if !rts.IsInit() { - // We ignore MVCCAbortTxnOp operations until the queue is - // initialized. This is necessary because we allow txn reference - // counts to drop below zero before the queue is initialized and - // expect that all reference count decrements be balanced by a - // corresponding reference count increment. - // - // We could remove this restriction if we evicted all transactions - // with negative reference counts after initialization, but this is - // easier and more clear. - return false - } - return rts.intentQ.Del(t.TxnID) - default: panic(errors.AssertionFailedf("unknown logical op %T", t)) } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go index 4c890bd75780..2c237728686d 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go @@ -256,11 +256,8 @@ func TestResolvedTimestamp(t *testing.T) { // First transaction aborted. Resolved timestamp moves to next earliest // intent. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) - require.True(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) - require.False(t, fwd) + require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction at higher timestamp. No effect. @@ -273,9 +270,6 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction aborted. No effect. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn3)) - require.False(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) @@ -476,11 +470,6 @@ func TestResolvedTimestampInit(t *testing.T) { require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) - // Abort that intent's transaction. Not initialized so no-op. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) - require.False(t, fwd) - require.Equal(t, hlc.Timestamp{}, rts.Get()) - // Later, write an intent for the same transaction. This should cancel // out with the out-of-order intent abort operation. If this abort hadn't // allowed the unresolvedTxn's ref count to drop below 0, this would @@ -514,60 +503,6 @@ func TestResolvedTimestampInit(t *testing.T) { }) } -func TestResolvedTimestampTxnAborted(t *testing.T) { - defer leaktest.AfterTest(t)() - rts := makeResolvedTimestamp() - rts.Init() - - // Set a closed timestamp. Resolved timestamp advances. - fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 5}) - require.True(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) - - // Add an intent for a new transaction. - txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) - require.False(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) - - // Set a new closed timestamp. Resolved timestamp advances. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 15}) - require.True(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) - - // Abort txn1 after a periodic txn push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) - require.True(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) - - // Update one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) - require.False(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) - - // Abort one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) - require.False(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) - - // Write another intent as txn1. Should add txn1 back into queue. - // This will eventually require another txn push to evict. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) - require.False(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) - - // Set a new closed timestamp. Resolved timestamp advances, but only up to - // the timestamp of txn1's intent, which we fail remember is uncommittable. - fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 25}) - require.True(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get()) - - // Abort txn1 again after another periodic push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) - require.True(t, fwd) - require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get()) -} - // Test that things go well when the closed timestamp has non-zero logical part. func TestClosedTimestampLogicalPart(t *testing.T) { defer leaktest.AfterTest(t)() @@ -593,8 +528,8 @@ func TestClosedTimestampLogicalPart(t *testing.T) { require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) - // Abort txn1. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + // Abort txn1 intent. Resolved timestamp advances. + fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index a9573bdc6656..4755b705913d 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -652,9 +652,6 @@ func (p *ScheduledProcessor) consumeLogicalOps( case *enginepb.MVCCAbortIntentOp: // No updates to publish. - case *enginepb.MVCCAbortTxnOp: - // No updates to publish. - default: panic(errors.AssertionFailedf("unknown logical op %T", t)) } diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 64989379c500..9cd28613a0ab 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -323,26 +323,29 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { // Inform the Processor of the results of the push for each transaction. ops := make([]enginepb.MVCCLogicalOp, len(pushedTxns)) var intentsToCleanup []roachpb.LockUpdate - for i, txn := range pushedTxns { + pushed := 0 + for _, txn := range pushedTxns { switch txn.Status { case roachpb.PENDING, roachpb.STAGING: // The transaction is still in progress but its timestamp was moved // forward to the current time. Inform the Processor that it can // forward the txn's timestamp in its unresolvedIntentQueue. - ops[i].SetValue(&enginepb.MVCCUpdateIntentOp{ + ops[pushed].SetValue(&enginepb.MVCCUpdateIntentOp{ TxnID: txn.ID, Timestamp: txn.WriteTimestamp, }) + pushed++ case roachpb.COMMITTED: // The transaction is committed and its timestamp may have moved // forward since we last saw an intent. Inform the Processor // immediately in case this is the transaction that is holding back // the resolved timestamp. However, we still need to wait for the // transaction's intents to actually be resolved. - ops[i].SetValue(&enginepb.MVCCUpdateIntentOp{ + ops[pushed].SetValue(&enginepb.MVCCUpdateIntentOp{ TxnID: txn.ID, Timestamp: txn.WriteTimestamp, }) + pushed++ // Clean up the transaction's intents within the processor's range, which // should eventually cause all unresolved intents for this transaction on @@ -353,34 +356,20 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { txnIntents := intentsInBound(txn, a.span.AsRawSpanWithNoLocals()) intentsToCleanup = append(intentsToCleanup, txnIntents...) case roachpb.ABORTED: - // The transaction is aborted, so it doesn't need to be tracked - // anymore nor does it need to prevent the resolved timestamp from - // advancing. Inform the Processor that it can remove the txn from - // its unresolvedIntentQueue. - // - // NOTE: the unresolvedIntentQueue will ignore MVCCAbortTxn operations - // before it has been initialized. This is not a concern here though - // because we never launch txnPushAttempt tasks before the queue has - // been initialized. - ops[i].SetValue(&enginepb.MVCCAbortTxnOp{ - TxnID: txn.ID, - }) - - // We just informed the Processor about this txn being aborted, so from - // its perspective, there's nothing more to do — the txn's intents are no - // longer holding up the resolved timestamp. - // - // However, if the txn happens to have its LockSpans populated, then lets - // clean up the intents within the processor's range as an optimization to - // help others and to prevent any rangefeed reconnections from needing to - // push the same txn. If we aborted the txn, then it won't have its - // LockSpans populated. If, however, we ran into a transaction that its - // coordinator tried to rollback but didn't follow up with garbage - // collection, then LockSpans will be populated. + // If transaction is aborted theoretically we don't need to track its + // intents and prevent resolved timestamp from advancing any more. + // Unfortunately there are cases where we don't have transaction record + // for committed transaction and we assume that transaction was aborted + // while in reality we just didn't catch up with its intent cleanups. + // Since this situation is ambiguous, we can't safely ignore this txn, so + // we resort to resolving intents and waiting for intent cleanup's to + // reach us the normal way. + // See https://github.com/cockroachdb/cockroach/issues/104309 txnIntents := intentsInBound(txn, a.span.AsRawSpanWithNoLocals()) intentsToCleanup = append(intentsToCleanup, txnIntents...) } } + ops = ops[:pushed] // Inform the processor of all logical ops. a.p.sendEvent(ctx, event{ops: ops}, 0) diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 26fa2c49f6f2..fefcef687afe 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -523,8 +523,8 @@ func TestTxnPushAttempt(t *testing.T) { {ops: []enginepb.MVCCLogicalOp{ updateIntentOp(txn1, hlc.Timestamp{WallTime: 15}), updateIntentOp(txn2, hlc.Timestamp{WallTime: 2}), - abortTxnOp(txn3), - abortTxnOp(txn4), + // abortTxnOp(txn3), + // abortTxnOp(txn4), }}, } require.Equal(t, len(expEvents), len(p.eventC)) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 814f0ebe2fb4..54a433cb8461 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -594,7 +594,6 @@ func populatePrevValsInLogicalOpLog( case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, *enginepb.MVCCAbortIntentOp, - *enginepb.MVCCAbortTxnOp, *enginepb.MVCCDeleteRangeOp: // Nothing to do. continue @@ -668,8 +667,7 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked( key, ts, valPtr = t.Key, t.Timestamp, &t.Value case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, - *enginepb.MVCCAbortIntentOp, - *enginepb.MVCCAbortTxnOp: + *enginepb.MVCCAbortIntentOp: // Nothing to do. continue case *enginepb.MVCCDeleteRangeOp: diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index eb9e4da27924..1aea1db30aa5 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -306,16 +306,6 @@ message MVCCAbortIntentOp { (gogoproto.nullable) = false]; } -// MVCCAbortTxnOp corresponds to an entire transaction being aborted. The -// operation indicates that none of the transaction's intents will ever be -// committed. -message MVCCAbortTxnOp { - bytes txn_id = 1 [ - (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", - (gogoproto.customname) = "TxnID", - (gogoproto.nullable) = false]; -} - // MVCCDeleteRangeOp corresponds to a range deletion using an MVCC range // tombstone. message MVCCDeleteRangeOp { @@ -334,6 +324,6 @@ message MVCCLogicalOp { MVCCUpdateIntentOp update_intent = 3; MVCCCommitIntentOp commit_intent = 4; MVCCAbortIntentOp abort_intent = 5; - MVCCAbortTxnOp abort_txn = 6; MVCCDeleteRangeOp delete_range = 7; + reserved 6; }