From 8e163ee331fd290c2893fdf8b1b8b14ac54db750 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 14 Mar 2019 19:31:32 -0400 Subject: [PATCH] rangefeed: improve assertion when txn refcount becomes negative Informs #34600. It is critical that `unresolvedIntentQueue` properly handle negative reference counts to avoid leaking references before a rangefeed has finished its resolved timestamp initialization scan. However, once this scan is complete, reference counts on transactions should never drop below zero. Such an occurrence would indicate that an intent was lost either during the initial scan or somewhere in the logical ops stream. Based on the stacktraces in #34600, I believe this is what is happening because we can see that an `MVCCCommitIntentOp` is triggering the assertion. This commit will make this more explicit and should hopefully also fire more because it won't rely on the intent with a negative refcount being the oldest intent tracked to fire. Release note: None --- pkg/storage/rangefeed/resolved_timestamp.go | 46 ++++++++++++++----- .../rangefeed/resolved_timestamp_test.go | 11 +++-- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/pkg/storage/rangefeed/resolved_timestamp.go b/pkg/storage/rangefeed/resolved_timestamp.go index b1d4a7e233fc..5bfafd28d2b0 100644 --- a/pkg/storage/rangefeed/resolved_timestamp.go +++ b/pkg/storage/rangefeed/resolved_timestamp.go @@ -103,7 +103,10 @@ func (rts *resolvedTimestamp) Get() hlc.Timestamp { // timestamp to move forward. func (rts *resolvedTimestamp) Init() bool { rts.init = true - rts.intentQ.assertPositiveRefCounts() + // Once the resolvedTimestamp is initialized, all prior written intents + // should be accounted for, so reference counts for transactions should + // never drop below zero. + rts.intentQ.AllowNegRefCount(false) return rts.recompute() } @@ -302,13 +305,15 @@ func (h *unresolvedTxnHeap) Pop() interface{} { // with a closed timestamp, which guarantees that no transactions can write new // intents at or beneath it, a resolved timestamp can be constructed. type unresolvedIntentQueue struct { - txns map[uuid.UUID]*unresolvedTxn - minHeap unresolvedTxnHeap + txns map[uuid.UUID]*unresolvedTxn + minHeap unresolvedTxnHeap + allowNegRefCount bool } func makeUnresolvedIntentQueue() unresolvedIntentQueue { return unresolvedIntentQueue{ - txns: make(map[uuid.UUID]*unresolvedTxn), + txns: make(map[uuid.UUID]*unresolvedTxn), + allowNegRefCount: true, } } @@ -374,9 +379,11 @@ func (uiq *unresolvedIntentQueue) updateTxn( ) bool { txn, ok := uiq.txns[txnID] if !ok { - // Unknown txn. if delta == 0 { + // Unknown txn. return false + } else if delta < 0 { + uiq.assertNegRefCountAllowed(txnID, delta) } // Add new txn to the queue. @@ -402,6 +409,8 @@ func (uiq *unresolvedIntentQueue) updateTxn( delete(uiq.txns, txn.txnID) heap.Remove(&uiq.minHeap, txn.index) return wasMin + } else if txn.refCount < 0 { + uiq.assertNegRefCountAllowed(txn.txnID, txn.refCount) } // Forward the txn's timestamp. Need to fix heap if timestamp changes. @@ -430,14 +439,29 @@ func (uiq *unresolvedIntentQueue) Del(txnID uuid.UUID) bool { return wasMin } -// assertPositiveRefCounts asserts that all unresolved intent refcounts for -// transactions in the unresolvedIntentQueue are positive. Assertion takes O(n) -// time, where n is the total number of transactions being tracked in the queue. -func (uiq *unresolvedIntentQueue) assertPositiveRefCounts() { +// AllowNegRefCount instruts the unresolvedIntentQueue on whether or not to +// allow the reference count on transactions to drop below zero. If disallowed, +// the method also asserts that all unresolved intent refcounts for transactions +// currently in the queue are positive. Assertion takes O(n) time, where n is +// the total number of transactions being tracked in the queue. +func (uiq *unresolvedIntentQueue) AllowNegRefCount(b bool) { + if !b { + // Assert that the queue is currently in compliance. + uiq.assertOnlyPositiveRefCounts() + } + uiq.allowNegRefCount = b +} + +func (uiq *unresolvedIntentQueue) assertOnlyPositiveRefCounts() { for _, txn := range uiq.txns { if txn.refCount <= 0 { - panic(fmt.Sprintf("unexpected txn refcount %d for txn %+v in unresolvedIntentQueue", - txn.refCount, txn)) + panic(fmt.Sprintf("negative refcount %d for txn %+v", txn.refCount, txn)) } } } + +func (uiq *unresolvedIntentQueue) assertNegRefCountAllowed(txnID uuid.UUID, count int) { + if !uiq.allowNegRefCount { + panic(fmt.Sprintf("refcount for txn %v dropped below zero (%d)", txnID, count)) + } +} diff --git a/pkg/storage/rangefeed/resolved_timestamp_test.go b/pkg/storage/rangefeed/resolved_timestamp_test.go index e78755c9f42a..1fa4169b90e4 100644 --- a/pkg/storage/rangefeed/resolved_timestamp_test.go +++ b/pkg/storage/rangefeed/resolved_timestamp_test.go @@ -48,7 +48,7 @@ func TestUnresolvedIntentQueue(t *testing.T) { require.Equal(t, 0, len(uiq.Before(hlc.Timestamp{WallTime: 0}))) require.Equal(t, 0, len(uiq.Before(hlc.Timestamp{WallTime: 1}))) require.Equal(t, 1, len(uiq.Before(hlc.Timestamp{WallTime: 2}))) - require.NotPanics(t, func() { uiq.assertPositiveRefCounts() }) + require.NotPanics(t, func() { uiq.assertOnlyPositiveRefCounts() }) // Decrement a non-existent txn. txn2 := uuid.MakeV4() @@ -60,7 +60,7 @@ func TestUnresolvedIntentQueue(t *testing.T) { require.Equal(t, 0, len(uiq.Before(hlc.Timestamp{WallTime: 1}))) require.Equal(t, 1, len(uiq.Before(hlc.Timestamp{WallTime: 3}))) require.Equal(t, 2, len(uiq.Before(hlc.Timestamp{WallTime: 5}))) - require.Panics(t, func() { uiq.assertPositiveRefCounts() }) + require.Panics(t, func() { uiq.assertOnlyPositiveRefCounts() }) // Update a non-existent txn. txn3 := uuid.MakeV4() @@ -146,7 +146,7 @@ func TestUnresolvedIntentQueue(t *testing.T) { require.Equal(t, 1, uiq.Len()) require.Equal(t, txn1, uiq.Oldest().txnID) require.Equal(t, newTxn1TS, uiq.Oldest().timestamp) - require.NotPanics(t, func() { uiq.assertPositiveRefCounts() }) + require.NotPanics(t, func() { uiq.assertOnlyPositiveRefCounts() }) // Decrease txn1's ref count. Should be empty again. adv = uiq.DecrRef(txn1, hlc.Timestamp{}) @@ -162,6 +162,11 @@ func TestUnresolvedIntentQueue(t *testing.T) { adv = uiq.Del(txn6) require.True(t, adv) require.Equal(t, 0, uiq.Len()) + + // Instruct the queue to disallow negative ref counts. + uiq.AllowNegRefCount(false) + require.Panics(t, func() { uiq.DecrRef(txn6, hlc.Timestamp{}) }) + require.Equal(t, 0, uiq.Len()) } func TestResolvedTimestamp(t *testing.T) {