Skip to content

Commit

Permalink
Merge #35772
Browse files Browse the repository at this point in the history
35772: rangefeed: improve assertion when txn refcount becomes negative r=nvanbenschoten a=nvanbenschoten

Informs #34600.

It is critical that `unresolvedIntentQueue` properly handle negative
reference counts to avoid leaking references before a rangefeed has
finished its resolved timestamp initialization scan. However, once this
scan is complete, reference counts on transactions should never drop
below zero. Such an occurrence would indicate that an intent was lost
either during the initial scan or somewhere in the logical ops stream.

Based on the stacktraces in #34600, I believe this is what is happening
because we can see that an `MVCCCommitIntentOp` is triggering the
assertion. This commit will make this more explicit and should hopefully
also fire more because it won't rely on the intent with a negative
refcount being the oldest intent tracked to fire.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Mar 15, 2019
2 parents 9aa52fd + 8e163ee commit aa701aa
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 14 deletions.
46 changes: 35 additions & 11 deletions pkg/storage/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ func (rts *resolvedTimestamp) Get() hlc.Timestamp {
// timestamp to move forward.
func (rts *resolvedTimestamp) Init() bool {
rts.init = true
rts.intentQ.assertPositiveRefCounts()
// Once the resolvedTimestamp is initialized, all prior written intents
// should be accounted for, so reference counts for transactions should
// never drop below zero.
rts.intentQ.AllowNegRefCount(false)
return rts.recompute()
}

Expand Down Expand Up @@ -302,13 +305,15 @@ func (h *unresolvedTxnHeap) Pop() interface{} {
// with a closed timestamp, which guarantees that no transactions can write new
// intents at or beneath it, a resolved timestamp can be constructed.
type unresolvedIntentQueue struct {
txns map[uuid.UUID]*unresolvedTxn
minHeap unresolvedTxnHeap
txns map[uuid.UUID]*unresolvedTxn
minHeap unresolvedTxnHeap
allowNegRefCount bool
}

func makeUnresolvedIntentQueue() unresolvedIntentQueue {
return unresolvedIntentQueue{
txns: make(map[uuid.UUID]*unresolvedTxn),
txns: make(map[uuid.UUID]*unresolvedTxn),
allowNegRefCount: true,
}
}

Expand Down Expand Up @@ -374,9 +379,11 @@ func (uiq *unresolvedIntentQueue) updateTxn(
) bool {
txn, ok := uiq.txns[txnID]
if !ok {
// Unknown txn.
if delta == 0 {
// Unknown txn.
return false
} else if delta < 0 {
uiq.assertNegRefCountAllowed(txnID, delta)
}

// Add new txn to the queue.
Expand All @@ -402,6 +409,8 @@ func (uiq *unresolvedIntentQueue) updateTxn(
delete(uiq.txns, txn.txnID)
heap.Remove(&uiq.minHeap, txn.index)
return wasMin
} else if txn.refCount < 0 {
uiq.assertNegRefCountAllowed(txn.txnID, txn.refCount)
}

// Forward the txn's timestamp. Need to fix heap if timestamp changes.
Expand Down Expand Up @@ -430,14 +439,29 @@ func (uiq *unresolvedIntentQueue) Del(txnID uuid.UUID) bool {
return wasMin
}

// assertPositiveRefCounts asserts that all unresolved intent refcounts for
// transactions in the unresolvedIntentQueue are positive. Assertion takes O(n)
// time, where n is the total number of transactions being tracked in the queue.
func (uiq *unresolvedIntentQueue) assertPositiveRefCounts() {
// AllowNegRefCount instruts the unresolvedIntentQueue on whether or not to
// allow the reference count on transactions to drop below zero. If disallowed,
// the method also asserts that all unresolved intent refcounts for transactions
// currently in the queue are positive. Assertion takes O(n) time, where n is
// the total number of transactions being tracked in the queue.
func (uiq *unresolvedIntentQueue) AllowNegRefCount(b bool) {
if !b {
// Assert that the queue is currently in compliance.
uiq.assertOnlyPositiveRefCounts()
}
uiq.allowNegRefCount = b
}

func (uiq *unresolvedIntentQueue) assertOnlyPositiveRefCounts() {
for _, txn := range uiq.txns {
if txn.refCount <= 0 {
panic(fmt.Sprintf("unexpected txn refcount %d for txn %+v in unresolvedIntentQueue",
txn.refCount, txn))
panic(fmt.Sprintf("negative refcount %d for txn %+v", txn.refCount, txn))
}
}
}

func (uiq *unresolvedIntentQueue) assertNegRefCountAllowed(txnID uuid.UUID, count int) {
if !uiq.allowNegRefCount {
panic(fmt.Sprintf("refcount for txn %v dropped below zero (%d)", txnID, count))
}
}
11 changes: 8 additions & 3 deletions pkg/storage/rangefeed/resolved_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestUnresolvedIntentQueue(t *testing.T) {
require.Equal(t, 0, len(uiq.Before(hlc.Timestamp{WallTime: 0})))
require.Equal(t, 0, len(uiq.Before(hlc.Timestamp{WallTime: 1})))
require.Equal(t, 1, len(uiq.Before(hlc.Timestamp{WallTime: 2})))
require.NotPanics(t, func() { uiq.assertPositiveRefCounts() })
require.NotPanics(t, func() { uiq.assertOnlyPositiveRefCounts() })

// Decrement a non-existent txn.
txn2 := uuid.MakeV4()
Expand All @@ -60,7 +60,7 @@ func TestUnresolvedIntentQueue(t *testing.T) {
require.Equal(t, 0, len(uiq.Before(hlc.Timestamp{WallTime: 1})))
require.Equal(t, 1, len(uiq.Before(hlc.Timestamp{WallTime: 3})))
require.Equal(t, 2, len(uiq.Before(hlc.Timestamp{WallTime: 5})))
require.Panics(t, func() { uiq.assertPositiveRefCounts() })
require.Panics(t, func() { uiq.assertOnlyPositiveRefCounts() })

// Update a non-existent txn.
txn3 := uuid.MakeV4()
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestUnresolvedIntentQueue(t *testing.T) {
require.Equal(t, 1, uiq.Len())
require.Equal(t, txn1, uiq.Oldest().txnID)
require.Equal(t, newTxn1TS, uiq.Oldest().timestamp)
require.NotPanics(t, func() { uiq.assertPositiveRefCounts() })
require.NotPanics(t, func() { uiq.assertOnlyPositiveRefCounts() })

// Decrease txn1's ref count. Should be empty again.
adv = uiq.DecrRef(txn1, hlc.Timestamp{})
Expand All @@ -162,6 +162,11 @@ func TestUnresolvedIntentQueue(t *testing.T) {
adv = uiq.Del(txn6)
require.True(t, adv)
require.Equal(t, 0, uiq.Len())

// Instruct the queue to disallow negative ref counts.
uiq.AllowNegRefCount(false)
require.Panics(t, func() { uiq.DecrRef(txn6, hlc.Timestamp{}) })
require.Equal(t, 0, uiq.Len())
}

func TestResolvedTimestamp(t *testing.T) {
Expand Down

0 comments on commit aa701aa

Please sign in to comment.