From f19dd96f1f85379af1a1518eacd11047f4ca1c6c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 15 Mar 2019 00:28:22 -0400 Subject: [PATCH] rangefeed: don't discard ref count for txn after intent abort Fixes #34600. Before this commit, rangefeed made the assumption that an aborted intent implied an aborted transaction (after the rangefeed was in a steady-state). It used this assumption to eagerly discard the bookkeeping for a transaction after its first MVCCAbortIntent operation was seen as an optimization. This was incorrect. A transaction can "abort" an intent even if it commits. This is possible if the intent was only written at a previous epoch and not written in the epoch that ended up finalizing the transaction. This commit removes this assumption, which I confirmed fixes the resolved timestamp regression using the new assertion from #35772. I believe this could have two effects: - it could trigger the assertion failure from #34600 if the transaction experiencing the incorrect assumption was the oldest being tracked at the time of the assumption. - it could cause resolved timestamp stalls and prevent all future forward progress of resolved timestamps if the transaction experiencing the incorrect assumption was not the oldest being tracked at the time of the assumption. Release note: None --- pkg/storage/rangefeed/resolved_timestamp.go | 35 +++------------- .../rangefeed/resolved_timestamp_test.go | 42 +++++++++++-------- 2 files changed, 30 insertions(+), 47 deletions(-) diff --git a/pkg/storage/rangefeed/resolved_timestamp.go b/pkg/storage/rangefeed/resolved_timestamp.go index 5bfafd28d2b0..60bbba854de8 100644 --- a/pkg/storage/rangefeed/resolved_timestamp.go +++ b/pkg/storage/rangefeed/resolved_timestamp.go @@ -155,18 +155,11 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { return rts.intentQ.DecrRef(t.TxnID, t.Timestamp) case *enginepb.MVCCAbortIntentOp: - // If the resolved timestamp has been initialized then we can remove the - // txn from the queue immediately after we observe that it has been - // aborted. However, if the resolved timestamp has not yet been - // initialized then we decrement from the txn's reference count. This - // permits the refcount to drop below 0, which is important to handle - // correctly when events may be out of order. For instance, before the - // resolved timestamp is initialized, it's possible that we see an - // intent get aborted before we see it in the first place. If we didn't - // let the refcount drop below 0, we would risk leaking a reference. - if rts.IsInit() { - return rts.intentQ.Del(t.TxnID) - } + // An aborted intent does not necessarily indicate an aborted + // transaction. An AbortIntent operation can be the result of an intent + // that was written only in an earlier epoch being resolved after its + // transaction committed in a later epoch. Don't make any assumptions + // about the transaction other than to decrement its reference count. return rts.intentQ.DecrRef(t.TxnID, hlc.Timestamp{}) default: @@ -421,24 +414,6 @@ func (uiq *unresolvedIntentQueue) updateTxn( return false } -// Del removes the transaction from the queue. It returns whether the update had -// an effect on the oldest transaction in the queue. -func (uiq *unresolvedIntentQueue) Del(txnID uuid.UUID) bool { - txn, ok := uiq.txns[txnID] - if !ok { - // Unknown txn. - return false - } - - // Will deleting the txn advance the queue's earliest timestamp? - wasMin := txn.index == 0 - - // Remove txn from the queue. - delete(uiq.txns, txn.txnID) - heap.Remove(&uiq.minHeap, txn.index) - return wasMin -} - // AllowNegRefCount instruts the unresolvedIntentQueue on whether or not to // allow the reference count on transactions to drop below zero. If disallowed, // the method also asserts that all unresolved intent refcounts for transactions diff --git a/pkg/storage/rangefeed/resolved_timestamp_test.go b/pkg/storage/rangefeed/resolved_timestamp_test.go index 1fa4169b90e4..92c8af92564c 100644 --- a/pkg/storage/rangefeed/resolved_timestamp_test.go +++ b/pkg/storage/rangefeed/resolved_timestamp_test.go @@ -68,12 +68,6 @@ func TestUnresolvedIntentQueue(t *testing.T) { require.False(t, adv) require.Equal(t, 2, uiq.Len()) - // Delete a non-existent txn. - txn4 := uuid.MakeV4() - adv = uiq.Del(txn4) - require.False(t, adv) - require.Equal(t, 2, uiq.Len()) - // Update txn1 with a smaller timestamp. adv = uiq.UpdateTS(txn1, hlc.Timestamp{WallTime: 0}) require.False(t, adv) @@ -130,13 +124,13 @@ func TestUnresolvedIntentQueue(t *testing.T) { require.Equal(t, 1, uiq.txns[txn1].refCount) require.Equal(t, newTxn1TS, uiq.txns[txn1].timestamp) - // Add new txn at much higher timestamp. Immediately delete. - txn5 := uuid.MakeV4() - adv = uiq.IncRef(txn5, nil, hlc.Timestamp{WallTime: 10}) + // Add new txn at much higher timestamp. Immediately decrement ref count. + txn4 := uuid.MakeV4() + adv = uiq.IncRef(txn4, nil, hlc.Timestamp{WallTime: 10}) require.False(t, adv) require.Equal(t, 3, uiq.Len()) require.Equal(t, txn2, uiq.Oldest().txnID) - adv = uiq.Del(txn5) + adv = uiq.DecrRef(txn4, hlc.Timestamp{WallTime: 10}) require.False(t, adv) require.Equal(t, 2, uiq.Len()) @@ -153,19 +147,19 @@ func TestUnresolvedIntentQueue(t *testing.T) { require.True(t, adv) require.Equal(t, 0, uiq.Len()) - // Add new txn. Immediately delete. Should be empty again. - txn6 := uuid.MakeV4() - adv = uiq.IncRef(txn6, nil, hlc.Timestamp{WallTime: 20}) + // Add new txn. Immediately decrement ref count. Should be empty again. + txn5 := uuid.MakeV4() + adv = uiq.IncRef(txn5, nil, hlc.Timestamp{WallTime: 20}) require.False(t, adv) require.Equal(t, 1, uiq.Len()) - require.Equal(t, txn6, uiq.Oldest().txnID) - adv = uiq.Del(txn6) + require.Equal(t, txn5, uiq.Oldest().txnID) + adv = uiq.DecrRef(txn5, hlc.Timestamp{}) require.True(t, adv) require.Equal(t, 0, uiq.Len()) // Instruct the queue to disallow negative ref counts. uiq.AllowNegRefCount(false) - require.Panics(t, func() { uiq.DecrRef(txn6, hlc.Timestamp{}) }) + require.Panics(t, func() { uiq.DecrRef(txn5, hlc.Timestamp{}) }) require.Equal(t, 0, uiq.Len()) } @@ -264,6 +258,8 @@ func TestResolvedTimestamp(t *testing.T) { // Third transaction aborted. No effect. fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) require.False(t, fwd) + fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction at higher timestamp. No effect. @@ -314,7 +310,19 @@ func TestResolvedTimestamp(t *testing.T) { require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 45}, rts.Get()) - // Fifth transaction aborted. Resolved timestamp moves to closed timestamp. + // Fifth transaction bumps epoch and re-writes one of its intents. Resolved + // timestamp moves to the new transaction timestamp. + fwd = rts.ConsumeLogicalOp(updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) + require.True(t, fwd) + require.Equal(t, hlc.Timestamp{WallTime: 46}, rts.Get()) + + // Fifth transaction committed, but only one of its intents was written in + // its final epoch. Resolved timestamp moves forward after observing the + // first intent committing at a higher timestamp and moves to the closed + // timestamp after observing the second intent aborting. + fwd = rts.ConsumeLogicalOp(commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) + require.True(t, fwd) + require.Equal(t, hlc.Timestamp{WallTime: 48}, rts.Get()) fwd = rts.ConsumeLogicalOp(abortIntentOp(txn5)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 50}, rts.Get())