Skip to content

Commit

Permalink
rangefeed: don't discard ref count for txn after intent abort
Browse files Browse the repository at this point in the history
Fixes #34600.

Before this commit, rangefeed made the assumption that an aborted intent
implied an aborted transaction (after the rangefeed was in a steady-state).
It used this assumption to eagerly discard the bookkeeping for a transaction
after its first MVCCAbortIntent operation was seen as an optimization.

This was incorrect. A transaction can "abort" an intent even if it commits.
This is possible if the intent was only written at a previous epoch and not
written in the epoch that ended up finalizing the transaction. This commit
removes this assumption, which I confirmed fixes the resolved timestamp
regression using the new assertion from #35772.

I believe this could have two effects:
- it could trigger the assertion failure from #34600 if the transaction
  experiencing the incorrect assumption was the oldest being tracked at
  the time of the assumption.
- it could cause resolved timestamp stalls and prevent all future forward
  progress of resolved timestamps if the transaction experiencing the incorrect
  assumption was not the oldest being tracked at the time of the assumption.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 15, 2019
1 parent aa701aa commit f19dd96
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 47 deletions.
35 changes: 5 additions & 30 deletions pkg/storage/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,11 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
return rts.intentQ.DecrRef(t.TxnID, t.Timestamp)

case *enginepb.MVCCAbortIntentOp:
// If the resolved timestamp has been initialized then we can remove the
// txn from the queue immediately after we observe that it has been
// aborted. However, if the resolved timestamp has not yet been
// initialized then we decrement from the txn's reference count. This
// permits the refcount to drop below 0, which is important to handle
// correctly when events may be out of order. For instance, before the
// resolved timestamp is initialized, it's possible that we see an
// intent get aborted before we see it in the first place. If we didn't
// let the refcount drop below 0, we would risk leaking a reference.
if rts.IsInit() {
return rts.intentQ.Del(t.TxnID)
}
// An aborted intent does not necessarily indicate an aborted
// transaction. An AbortIntent operation can be the result of an intent
// that was written only in an earlier epoch being resolved after its
// transaction committed in a later epoch. Don't make any assumptions
// about the transaction other than to decrement its reference count.
return rts.intentQ.DecrRef(t.TxnID, hlc.Timestamp{})

default:
Expand Down Expand Up @@ -421,24 +414,6 @@ func (uiq *unresolvedIntentQueue) updateTxn(
return false
}

// Del removes the transaction from the queue. It returns whether the update had
// an effect on the oldest transaction in the queue.
func (uiq *unresolvedIntentQueue) Del(txnID uuid.UUID) bool {
txn, ok := uiq.txns[txnID]
if !ok {
// Unknown txn.
return false
}

// Will deleting the txn advance the queue's earliest timestamp?
wasMin := txn.index == 0

// Remove txn from the queue.
delete(uiq.txns, txn.txnID)
heap.Remove(&uiq.minHeap, txn.index)
return wasMin
}

// AllowNegRefCount instruts the unresolvedIntentQueue on whether or not to
// allow the reference count on transactions to drop below zero. If disallowed,
// the method also asserts that all unresolved intent refcounts for transactions
Expand Down
42 changes: 25 additions & 17 deletions pkg/storage/rangefeed/resolved_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ func TestUnresolvedIntentQueue(t *testing.T) {
require.False(t, adv)
require.Equal(t, 2, uiq.Len())

// Delete a non-existent txn.
txn4 := uuid.MakeV4()
adv = uiq.Del(txn4)
require.False(t, adv)
require.Equal(t, 2, uiq.Len())

// Update txn1 with a smaller timestamp.
adv = uiq.UpdateTS(txn1, hlc.Timestamp{WallTime: 0})
require.False(t, adv)
Expand Down Expand Up @@ -130,13 +124,13 @@ func TestUnresolvedIntentQueue(t *testing.T) {
require.Equal(t, 1, uiq.txns[txn1].refCount)
require.Equal(t, newTxn1TS, uiq.txns[txn1].timestamp)

// Add new txn at much higher timestamp. Immediately delete.
txn5 := uuid.MakeV4()
adv = uiq.IncRef(txn5, nil, hlc.Timestamp{WallTime: 10})
// Add new txn at much higher timestamp. Immediately decrement ref count.
txn4 := uuid.MakeV4()
adv = uiq.IncRef(txn4, nil, hlc.Timestamp{WallTime: 10})
require.False(t, adv)
require.Equal(t, 3, uiq.Len())
require.Equal(t, txn2, uiq.Oldest().txnID)
adv = uiq.Del(txn5)
adv = uiq.DecrRef(txn4, hlc.Timestamp{WallTime: 10})
require.False(t, adv)
require.Equal(t, 2, uiq.Len())

Expand All @@ -153,19 +147,19 @@ func TestUnresolvedIntentQueue(t *testing.T) {
require.True(t, adv)
require.Equal(t, 0, uiq.Len())

// Add new txn. Immediately delete. Should be empty again.
txn6 := uuid.MakeV4()
adv = uiq.IncRef(txn6, nil, hlc.Timestamp{WallTime: 20})
// Add new txn. Immediately decrement ref count. Should be empty again.
txn5 := uuid.MakeV4()
adv = uiq.IncRef(txn5, nil, hlc.Timestamp{WallTime: 20})
require.False(t, adv)
require.Equal(t, 1, uiq.Len())
require.Equal(t, txn6, uiq.Oldest().txnID)
adv = uiq.Del(txn6)
require.Equal(t, txn5, uiq.Oldest().txnID)
adv = uiq.DecrRef(txn5, hlc.Timestamp{})
require.True(t, adv)
require.Equal(t, 0, uiq.Len())

// Instruct the queue to disallow negative ref counts.
uiq.AllowNegRefCount(false)
require.Panics(t, func() { uiq.DecrRef(txn6, hlc.Timestamp{}) })
require.Panics(t, func() { uiq.DecrRef(txn5, hlc.Timestamp{}) })
require.Equal(t, 0, uiq.Len())
}

Expand Down Expand Up @@ -264,6 +258,8 @@ func TestResolvedTimestamp(t *testing.T) {
// Third transaction aborted. No effect.
fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3))
require.False(t, fwd)
fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get())

// Fourth transaction at higher timestamp. No effect.
Expand Down Expand Up @@ -314,7 +310,19 @@ func TestResolvedTimestamp(t *testing.T) {
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 45}, rts.Get())

// Fifth transaction aborted. Resolved timestamp moves to closed timestamp.
// Fifth transaction bumps epoch and re-writes one of its intents. Resolved
// timestamp moves to the new transaction timestamp.
fwd = rts.ConsumeLogicalOp(updateIntentOp(txn5, hlc.Timestamp{WallTime: 47}))
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 46}, rts.Get())

// Fifth transaction committed, but only one of its intents was written in
// its final epoch. Resolved timestamp moves forward after observing the
// first intent committing at a higher timestamp and moves to the closed
// timestamp after observing the second intent aborting.
fwd = rts.ConsumeLogicalOp(commitIntentOp(txn5, hlc.Timestamp{WallTime: 49}))
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 48}, rts.Get())
fwd = rts.ConsumeLogicalOp(abortIntentOp(txn5))
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 50}, rts.Get())
Expand Down

0 comments on commit f19dd96

Please sign in to comment.