Skip to content

Commit

Permalink
rangefeed: don't advance resolved ts on txn abort
Browse files Browse the repository at this point in the history
Previously when rangefeed received aborted status for pushed
transaction it would eagerly dropped all known intents that belonged
to the transactions from its cache.
This was proven incorrect because we can get ABORTED status event
when transaction is comitted if leaseholder already removed transaction
record.
This commit removes the shortcut and always relies on intent operations
to advance resolved timestamp.

Epic: none

Release note: None
  • Loading branch information
aliher1911 committed Sep 27, 2023
1 parent 43326b2 commit 97a30d2
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 170 deletions.
3 changes: 0 additions & 3 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,9 +808,6 @@ func (p *LegacyProcessor) consumeLogicalOps(
case *enginepb.MVCCAbortIntentOp:
// No updates to publish.

case *enginepb.MVCCAbortTxnOp:
// No updates to publish.

default:
panic(errors.AssertionFailedf("unknown logical op %T", t))
}
Expand Down
25 changes: 17 additions & 8 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,6 @@ func abortIntentOp(txnID uuid.UUID) enginepb.MVCCLogicalOp {
})
}

func abortTxnOp(txnID uuid.UUID) enginepb.MVCCLogicalOp {
return makeLogicalOp(&enginepb.MVCCAbortTxnOp{
TxnID: txnID,
})
}

func makeRangeFeedEvent(val interface{}) *kvpb.RangeFeedEvent {
var event kvpb.RangeFeedEvent
event.MustSetValue(val)
Expand Down Expand Up @@ -1019,8 +1013,16 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
return nil
}

<-pausePushAttemptsC
<-resumePushAttemptsC
select {
case <-pausePushAttemptsC:
select {
case <-resumePushAttemptsC:
case <-ctx.Done():
return ctx.Err()
}
case <-ctx.Done():
return ctx.Err()
}
return nil
})

Expand Down Expand Up @@ -1084,6 +1086,13 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
resumePushAttemptsC <- struct{}{}
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second)

// The resolved timestamp should not move as aborted transaction status can't
// guarantee that transaction was actually aborted.
h.syncEventC()
require.Equal(t, hlc.Timestamp{WallTime: 59}, h.rts.Get())
// "Wait" for txn 2 to resolve when pushed intents "arrive".
p.ConsumeLogicalOps(ctx, commitIntentOp(txn2MetaT3Post.ID, txn2MetaT3Post.WriteTimestamp))
p.ConsumeLogicalOps(ctx, commitIntentOp(txn2MetaT3Post.ID, txn2MetaT3Post.WriteTimestamp))
// The resolved timestamp should have moved forwards to the closed
// timestamp.
h.syncEventC()
Expand Down
45 changes: 0 additions & 45 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,51 +165,6 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
// about the transaction other than to decrement its reference count.
return rts.intentQ.DecrRef(t.TxnID, hlc.Timestamp{})

case *enginepb.MVCCAbortTxnOp:
// Unlike the previous case, an aborted transaction does indicate
// that none of the transaction's intents will ever be committed.
// This means that we can stop tracking the transaction entirely.
// Doing so is critical to ensure forward progress of the resolved
// timestamp in situtations where the oldest transaction on a range
// is abandoned and the locations of its intents are unknown.
//
// However, the transaction may also still be writing, updating, and
// resolving (aborting) its intents, so we need to be careful with
// how we handle any future operations from this transaction. There
// are three different operations we could see the zombie transaction
// perform:
//
// - MVCCWriteIntentOp: it could write another intent. This could result
// in "reintroducing" the transaction to the queue. We allow this
// to happen and rely on pushing the transaction again, eventually
// evicting the transaction from the queue for good.
//
// Just like any other transaction, this new intent will necessarily
// be pushed above the closed timestamp, so we don't need to worry
// about resolved timestamp regressions.
//
// - MVCCUpdateIntentOp: it could update one of its intents. If we're
// not already tracking the transaction then the queue will ignore
// the intent update.
//
// - MVCCAbortIntentOp: it could resolve one of its intents as aborted.
// This is the most likely case. Again, if we're not already tracking
// the transaction then the queue will ignore the intent abort.
//
if !rts.IsInit() {
// We ignore MVCCAbortTxnOp operations until the queue is
// initialized. This is necessary because we allow txn reference
// counts to drop below zero before the queue is initialized and
// expect that all reference count decrements be balanced by a
// corresponding reference count increment.
//
// We could remove this restriction if we evicted all transactions
// with negative reference counts after initialization, but this is
// easier and more clear.
return false
}
return rts.intentQ.Del(t.TxnID)

default:
panic(errors.AssertionFailedf("unknown logical op %T", t))
}
Expand Down
71 changes: 3 additions & 68 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,8 @@ func TestResolvedTimestamp(t *testing.T) {

// First transaction aborted. Resolved timestamp moves to next earliest
// intent.
fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1))
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get())
fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1))
require.False(t, fwd)
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get())

// Third transaction at higher timestamp. No effect.
Expand All @@ -273,9 +270,6 @@ func TestResolvedTimestamp(t *testing.T) {
require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get())

// Third transaction aborted. No effect.
fwd = rts.ConsumeLogicalOp(abortTxnOp(txn3))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get())
fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get())
Expand Down Expand Up @@ -476,11 +470,6 @@ func TestResolvedTimestampInit(t *testing.T) {
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{}, rts.Get())

// Abort that intent's transaction. Not initialized so no-op.
fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{}, rts.Get())

// Later, write an intent for the same transaction. This should cancel
// out with the out-of-order intent abort operation. If this abort hadn't
// allowed the unresolvedTxn's ref count to drop below 0, this would
Expand Down Expand Up @@ -514,60 +503,6 @@ func TestResolvedTimestampInit(t *testing.T) {
})
}

func TestResolvedTimestampTxnAborted(t *testing.T) {
defer leaktest.AfterTest(t)()
rts := makeResolvedTimestamp()
rts.Init()

// Set a closed timestamp. Resolved timestamp advances.
fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 5})
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get())

// Add an intent for a new transaction.
txn1 := uuid.MakeV4()
fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10}))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get())

// Set a new closed timestamp. Resolved timestamp advances.
fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 15})
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get())

// Abort txn1 after a periodic txn push. Resolved timestamp advances.
fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1))
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get())

// Update one of txn1's intents. Should be ignored.
fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20}))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get())

// Abort one of txn1's intents. Should be ignored.
fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get())

// Write another intent as txn1. Should add txn1 back into queue.
// This will eventually require another txn push to evict.
fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 20}))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get())

// Set a new closed timestamp. Resolved timestamp advances, but only up to
// the timestamp of txn1's intent, which we fail remember is uncommittable.
fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 25})
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get())

// Abort txn1 again after another periodic push. Resolved timestamp advances.
fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1))
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get())
}

// Test that things go well when the closed timestamp has non-zero logical part.
func TestClosedTimestampLogicalPart(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand All @@ -593,8 +528,8 @@ func TestClosedTimestampLogicalPart(t *testing.T) {
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get())

// Abort txn1. Resolved timestamp advances.
fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1))
// Abort txn1 intent. Resolved timestamp advances.
fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1))
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get())

Expand Down
3 changes: 0 additions & 3 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,6 @@ func (p *ScheduledProcessor) consumeLogicalOps(
case *enginepb.MVCCAbortIntentOp:
// No updates to publish.

case *enginepb.MVCCAbortTxnOp:
// No updates to publish.

default:
panic(errors.AssertionFailedf("unknown logical op %T", t))
}
Expand Down
43 changes: 16 additions & 27 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,26 +323,29 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
// Inform the Processor of the results of the push for each transaction.
ops := make([]enginepb.MVCCLogicalOp, len(pushedTxns))
var intentsToCleanup []roachpb.LockUpdate
for i, txn := range pushedTxns {
pushed := 0
for _, txn := range pushedTxns {
switch txn.Status {
case roachpb.PENDING, roachpb.STAGING:
// The transaction is still in progress but its timestamp was moved
// forward to the current time. Inform the Processor that it can
// forward the txn's timestamp in its unresolvedIntentQueue.
ops[i].SetValue(&enginepb.MVCCUpdateIntentOp{
ops[pushed].SetValue(&enginepb.MVCCUpdateIntentOp{
TxnID: txn.ID,
Timestamp: txn.WriteTimestamp,
})
pushed++
case roachpb.COMMITTED:
// The transaction is committed and its timestamp may have moved
// forward since we last saw an intent. Inform the Processor
// immediately in case this is the transaction that is holding back
// the resolved timestamp. However, we still need to wait for the
// transaction's intents to actually be resolved.
ops[i].SetValue(&enginepb.MVCCUpdateIntentOp{
ops[pushed].SetValue(&enginepb.MVCCUpdateIntentOp{
TxnID: txn.ID,
Timestamp: txn.WriteTimestamp,
})
pushed++

// Clean up the transaction's intents within the processor's range, which
// should eventually cause all unresolved intents for this transaction on
Expand All @@ -353,34 +356,20 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
txnIntents := intentsInBound(txn, a.span.AsRawSpanWithNoLocals())
intentsToCleanup = append(intentsToCleanup, txnIntents...)
case roachpb.ABORTED:
// The transaction is aborted, so it doesn't need to be tracked
// anymore nor does it need to prevent the resolved timestamp from
// advancing. Inform the Processor that it can remove the txn from
// its unresolvedIntentQueue.
//
// NOTE: the unresolvedIntentQueue will ignore MVCCAbortTxn operations
// before it has been initialized. This is not a concern here though
// because we never launch txnPushAttempt tasks before the queue has
// been initialized.
ops[i].SetValue(&enginepb.MVCCAbortTxnOp{
TxnID: txn.ID,
})

// We just informed the Processor about this txn being aborted, so from
// its perspective, there's nothing more to do — the txn's intents are no
// longer holding up the resolved timestamp.
//
// However, if the txn happens to have its LockSpans populated, then lets
// clean up the intents within the processor's range as an optimization to
// help others and to prevent any rangefeed reconnections from needing to
// push the same txn. If we aborted the txn, then it won't have its
// LockSpans populated. If, however, we ran into a transaction that its
// coordinator tried to rollback but didn't follow up with garbage
// collection, then LockSpans will be populated.
// If transaction is aborted theoretically we don't need to track its
// intents and prevent resolved timestamp from advancing any more.
// Unfortunately there are cases where we don't have transaction record
// for committed transaction and we assume that transaction was aborted
// while in reality we just didn't catch up with its intent cleanups.
// Since this situation is ambiguous, we can't safely ignore this txn, so
// we resort to resolving intents and waiting for intent cleanup's to
// reach us the normal way.
// See https://github.com/cockroachdb/cockroach/issues/104309
txnIntents := intentsInBound(txn, a.span.AsRawSpanWithNoLocals())
intentsToCleanup = append(intentsToCleanup, txnIntents...)
}
}
ops = ops[:pushed]

// Inform the processor of all logical ops.
a.p.sendEvent(ctx, event{ops: ops}, 0)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,8 @@ func TestTxnPushAttempt(t *testing.T) {
{ops: []enginepb.MVCCLogicalOp{
updateIntentOp(txn1, hlc.Timestamp{WallTime: 15}),
updateIntentOp(txn2, hlc.Timestamp{WallTime: 2}),
abortTxnOp(txn3),
abortTxnOp(txn4),
// abortTxnOp(txn3),
// abortTxnOp(txn4),
}},
}
require.Equal(t, len(expEvents), len(p.eventC))
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,6 @@ func populatePrevValsInLogicalOpLog(
case *enginepb.MVCCWriteIntentOp,
*enginepb.MVCCUpdateIntentOp,
*enginepb.MVCCAbortIntentOp,
*enginepb.MVCCAbortTxnOp,
*enginepb.MVCCDeleteRangeOp:
// Nothing to do.
continue
Expand Down Expand Up @@ -668,8 +667,7 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked(
key, ts, valPtr = t.Key, t.Timestamp, &t.Value
case *enginepb.MVCCWriteIntentOp,
*enginepb.MVCCUpdateIntentOp,
*enginepb.MVCCAbortIntentOp,
*enginepb.MVCCAbortTxnOp:
*enginepb.MVCCAbortIntentOp:
// Nothing to do.
continue
case *enginepb.MVCCDeleteRangeOp:
Expand Down
12 changes: 1 addition & 11 deletions pkg/storage/enginepb/mvcc3.proto
Original file line number Diff line number Diff line change
Expand Up @@ -306,16 +306,6 @@ message MVCCAbortIntentOp {
(gogoproto.nullable) = false];
}

// MVCCAbortTxnOp corresponds to an entire transaction being aborted. The
// operation indicates that none of the transaction's intents will ever be
// committed.
message MVCCAbortTxnOp {
bytes txn_id = 1 [
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
(gogoproto.customname) = "TxnID",
(gogoproto.nullable) = false];
}

// MVCCDeleteRangeOp corresponds to a range deletion using an MVCC range
// tombstone.
message MVCCDeleteRangeOp {
Expand All @@ -334,6 +324,6 @@ message MVCCLogicalOp {
MVCCUpdateIntentOp update_intent = 3;
MVCCCommitIntentOp commit_intent = 4;
MVCCAbortIntentOp abort_intent = 5;
MVCCAbortTxnOp abort_txn = 6;
MVCCDeleteRangeOp delete_range = 7;
reserved 6;
}

0 comments on commit 97a30d2

Please sign in to comment.