From d3a956e5da9fa5841a83f2f0cd21b13edf3e44b0 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 15 Nov 2023 12:23:20 +0000 Subject: [PATCH 1/2] rangefeed: fix premature checkpoint due to intent resolution race It was possible for rangefeeds to emit a premature checkpoint, before all writes below its timestamp had been emitted. This in turn would cause changefeeds to not emit these write events at all. This could happen because `PushTxn` may return a false `ABORTED` status for a transaction that has in fact been committed, if the transaction record has been GCed (after resolving all intents). The timestamp cache does not retain sufficient information to disambiguate a committed transaction from an aborted one in this case, so it pessimistically assumes an abort (see `Replica.CanCreateTxnRecord` and `batcheval.SynthesizeTxnFromMeta`). However, the rangefeed txn pusher trusted this `ABORTED` status, ignoring the pending txn intents and allowing the resolved timestamp to advance past them before emitting the committed intents. This can lead to the following scenario: - A rangefeed is running on a lagging follower. - A txn writes an intent, which is replicated to the follower. - The closed timestamp advances past the intent. - The txn commits and resolves the intent at the original write timestamp, then GCs its txn record. This is not yet applied on the follower. - The rangefeed pushes the txn to advance its resolved timestamp. - The txn is GCed, so the push returns ABORTED (it can't know whether the txn was committed or aborted after its record is GCed). - The rangefeed disregards the "aborted" txn and advances the resolved timestamp, emitting a checkpoint. - The follower applies the resolved intent and emits an event below the checkpoint, violating the checkpoint guarantee. - The changefeed sees an event below its frontier and drops it, never emitting these events at all. This patch fixes the bug by submitting a barrier command to the leaseholder which waits for all past and ongoing writes (including intent resolution) to complete and apply, and then waits for the local replica to apply the barrier as well. This ensures any committed intent resolution will be applied and emitted before the transaction is removed from resolved timestamp tracking. Epic: none Release note (bug fix): fixed a bug where a changefeed could omit events in rare cases, logging the error "cdc ux violation: detected timestamp ... that is less or equal to the local frontier". This can happen if a rangefeed runs on a follower replica that lags significantly behind the leaseholder, a transaction commits and removes its transaction record before its intent resolution is applied on the follower, the follower's closed timestamp has advanced past the transaction commit timestamp, and the rangefeed attempts to push the transaction to a new timestamp (at least 10 seconds after the transaction began). This may cause the rangefeed to prematurely emit a checkpoint before emitting writes at lower timestamps, which in turn may cause the changefeed to drop these events entirely, never emitting them. --- pkg/kv/kvclient/rangefeed/BUILD.bazel | 4 + .../rangefeed/rangefeed_external_test.go | 328 ++++++++++++++++++ pkg/kv/kvserver/rangefeed/BUILD.bazel | 2 + pkg/kv/kvserver/rangefeed/processor.go | 24 +- pkg/kv/kvserver/rangefeed/processor_test.go | 20 +- pkg/kv/kvserver/rangefeed/task.go | 58 +++- pkg/kv/kvserver/rangefeed/task_test.go | 14 +- pkg/kv/kvserver/replica_rangefeed.go | 50 ++- 8 files changed, 472 insertions(+), 28 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 20e32afa0b38..6368573823d0 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -69,6 +69,8 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/closedts", + "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", @@ -80,6 +82,7 @@ go_test( "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", @@ -87,6 +90,7 @@ go_test( "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/mon", "//pkg/util/retry", "//pkg/util/stop", diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index a33a50528b55..59bd5f1fed67 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -14,24 +14,31 @@ import ( "context" "runtime/pprof" "sync" + "sync/atomic" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -1133,3 +1140,324 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) { t.Fatal("timed out waiting for event") } } + +// TestRangeFeedIntentResolutionRace is a regression test for +// https://github.com/cockroachdb/cockroach/issues/104309, i.e. the +// following scenario: +// +// - A rangefeed is running on a lagging follower. +// - A txn writes an intent, which is replicated to the follower. +// - The closed timestamp advances past the intent. +// - The txn commits and resolves the intent at the original write timestamp, +// then GCs its txn record. This is not yet applied on the follower. +// - The rangefeed pushes the txn to advance its resolved timestamp. +// - The txn is GCed, so the push returns ABORTED (it can't know whether the +// txn was committed or aborted after its record is GCed). +// - The rangefeed disregards the "aborted" txn and advances the resolved +// timestamp, emitting a checkpoint. +// - The follower applies the resolved intent and emits an event below +// the checkpoint, violating the checkpoint guarantee. +// +// This scenario is addressed by running a Barrier request through Raft and +// waiting for it to apply locally before removing the txn from resolved ts +// tracking. This ensures the pending intent resolution is applied before +// the resolved ts can advance. +func TestRangeFeedIntentResolutionRace(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // too slow, times out + skip.UnderDeadlock(t) + + // Use a timeout, to prevent a hung test. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + // defer cancel() after Stopper.Stop(), so the context cancels first. + // Otherwise, the stopper will hang waiting for a rangefeed whose context is + // not yet cancelled. + + // Set up an apply filter that blocks Raft application on n3 (follower) for + // the given range. + var blockRangeOnN3 atomic.Int64 + unblockRangeC := make(chan struct{}) + applyFilter := func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) { + if args.StoreID == 3 { + if rangeID := blockRangeOnN3.Load(); rangeID > 0 && rangeID == int64(args.RangeID) { + t.Logf("blocked r%d on s%d", args.RangeID, args.StoreID) + select { + case <-unblockRangeC: + t.Logf("unblocked r%d on s%d", args.RangeID, args.StoreID) + case <-ctx.Done(): + return 0, roachpb.NewError(ctx.Err()) + } + } + } + return 0, nil + } + + // Set up a request filter that blocks transaction pushes for a specific key. + // This is used to prevent the rangefeed txn pusher from pushing the txn + // timestamp above the closed timestamp before it commits, only allowing the + // push to happen after the transaction has committed and GCed below the + // closed timestamp. + var blockPush atomic.Value + unblockPushC := make(chan struct{}) + reqFilter := func(ctx context.Context, br roachpb.BatchRequest) *roachpb.Error { + if br.IsSinglePushTxnRequest() { + req := br.Requests[0].GetPushTxn() + if v := blockPush.Load(); v != nil { + if key := v.(*roachpb.Key); key != nil && req.Key.Equal(*key) { + t.Logf("blocked push for txn %s", req.PusheeTxn) + select { + case <-unblockPushC: + t.Logf("unblocked push for txn %s", req.PusheeTxn) + case <-ctx.Done(): + return roachpb.NewError(ctx.Err()) + } + } + } + } + return nil + } + + // Speed up the test by reducing various closed/resolved timestamp intervals. + const interval = 100 * time.Millisecond + st := cluster.MakeTestingClusterSettings() + kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, interval) + closedts.SideTransportCloseInterval.Override(ctx, &st.SV, interval) + closedts.TargetDuration.Override(ctx, &st.SV, interval) + + // Start a cluster with 3 nodes. + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: reqFilter, + TestingApplyFilter: applyFilter, + RangeFeedPushTxnsInterval: interval, + RangeFeedPushTxnsAge: interval, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + defer cancel() + + n1 := tc.Server(0) + s3 := tc.GetFirstStoreFromServer(t, 2) + clock := n1.Clock() + + // Determine the key/value we're going to write. + prefix := keys.ScratchRangeMin.Clone() + key := append(prefix.Clone(), []byte("/foo")...) + value := []byte("value") + + // Split off a range and upreplicate it, with leaseholder on n1. + _, _, err := n1.SplitRange(prefix) + require.NoError(t, err) + desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...) + t.Logf("split off range %s", desc) + + repl1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(prefix)) // leaseholder + repl3 := tc.GetFirstStoreFromServer(t, 2).LookupReplica(roachpb.RKey(prefix)) // lagging follower + + require.True(t, repl1.OwnsValidLease(ctx, clock.NowAsClockTimestamp())) + + // Block pushes of the txn, to ensure it can write at a fixed timestamp. + // Otherwise, the rangefeed or someone else may push it beyond the closed + // timestamp. + blockPush.Store(&key) + + // We'll use n3 as our lagging follower. Start a rangefeed on it directly. + req := roachpb.RangeFeedRequest{ + Header: roachpb.Header{ + RangeID: desc.RangeID, + }, + Span: desc.RSpan().AsRawSpanWithNoLocals(), + } + eventC := make(chan *roachpb.RangeFeedEvent) + sink := newChannelSink(ctx, eventC) + go func() { + pErr := s3.RangeFeed(&req, sink) + if ctx.Err() == nil { + assert.NoError(t, pErr.GoError()) + } + }() + t.Logf("starting rangefeed on %s", repl3) + + // Spawn a rangefeed monitor, which posts checkpoint updates to checkpointC. + // This uses a buffered channel of size 1, and empties it out before posting a + // new update, such that it contains the latest known checkpoint and does not + // block the rangefeed. It also posts emitted values for our key to valueC, + // which should only happen once. + checkpointC := make(chan *roachpb.RangeFeedCheckpoint, 1) + valueC := make(chan *roachpb.RangeFeedValue, 1) + go func() { + defer close(checkpointC) + defer close(valueC) + for { + select { + case e := <-eventC: + switch { + case e.Checkpoint != nil: + // Clear checkpointC such that it always contains the latest. + select { + case <-checkpointC: + default: + } + checkpointC <- e.Checkpoint + case e.Val != nil && e.Val.Key.Equal(key): + select { + case valueC <- e.Val: + default: + t.Errorf("duplicate value event for %s: %s", key, e) + } + } + case <-ctx.Done(): + return + } + } + }() + + waitForCheckpoint := func(t *testing.T, ts hlc.Timestamp) hlc.Timestamp { + t.Helper() + timeoutC := time.After(3 * time.Second) + for { + select { + case c := <-checkpointC: + require.NotNil(t, c, "nil checkpoint") + if ts.LessEq(c.ResolvedTS) { + t.Logf("rangefeed checkpoint at %s >= %s", c.ResolvedTS, ts) + return c.ResolvedTS + } + case <-timeoutC: + require.Fail(t, "timed out waiting for checkpoint", "wanted %s", ts) + } + } + } + + // Wait for the initial checkpoint. + rts := waitForCheckpoint(t, clock.Now()) + t.Logf("initial checkpoint at %s", rts) + + // Start a new transaction on n1 with a fixed timestamp (to make sure it + // remains below the closed timestamp). Write an intent, and read it back to + // make sure it has applied. + writeTS := clock.Now() + txn := n1.DB().NewTxn(ctx, "test") + require.NoError(t, txn.SetFixedTimestamp(ctx, writeTS)) + require.NoError(t, txn.Put(ctx, key, value)) + _, err = txn.Get(ctx, key) + require.NoError(t, err) + t.Logf("wrote %s", key) + + // Wait for both the leaseholder and the follower to close the transaction's + // write timestamp. + waitForClosedTimestamp := func(t *testing.T, repl *kvserver.Replica, ts hlc.Timestamp) hlc.Timestamp { + t.Helper() + timeoutC := time.After(3 * time.Second) + for { + if closedTS := repl.GetCurrentClosedTimestamp(ctx); ts.LessEq(closedTS) { + t.Logf("replica %s closed timestamp at %s >= %s", repl, closedTS, ts) + return closedTS + } + select { + case <-time.After(10 * time.Millisecond): + case <-timeoutC: + require.Fail(t, "timeout out waiting for closed timestamp", "wanted %s", ts) + } + } + } + cts := waitForClosedTimestamp(t, repl1, writeTS) + _ = waitForClosedTimestamp(t, repl3, writeTS) + t.Logf("closed timestamp %s is above txn write timestamp %s", cts, writeTS) + + // Wait for the rangefeed checkpoint to reach writeTS.Prev(). This ensures the + // rangefeed's view of the closed timestamp has been updated, and is now only + // blocked by the intent. + waitTS := writeTS.Prev() + waitTS.Logical = 0 + rts = waitForCheckpoint(t, waitTS) + t.Logf("rangefeed caught up to txn write timestamp at %s", rts) + + // Block Raft application on repl3. + blockRangeOnN3.Store(int64(desc.RangeID)) + + // Commit the transaction, and check its commit timestamp. + require.NoError(t, txn.Commit(ctx)) + commitTS := txn.CommitTimestamp() + require.Equal(t, commitTS, writeTS) + t.Logf("txn committed at %s", writeTS) + + // Unblock transaction pushes. Since repl3 won't apply the intent resolution + // yet, the rangefeed will keep trying to push the transaction. Once the + // transaction record is GCed (which happens async), the rangefeed will see an + // ABORTED status. + // + // It may see the intermediate COMMITTED state too, but at the time of writing + // that does not matter, since the rangefeed needs to keep tracking the + // intent until it applies the resolution, and it will also see the ABORTED + // status before that happens. + blockPush.Store((*roachpb.Key)(nil)) + close(unblockPushC) + + // Make sure repl3 does not emit a checkpoint beyond the write timestamp. Its + // closed timestamp has already advanced past it, but the unresolved intent + // should prevent the resolved timestamp from advancing, despite the false + // ABORTED state. We also make sure no value has been emitted. + waitC := time.After(3 * time.Second) + for done := false; !done; { + select { + case c := <-checkpointC: + require.NotNil(t, c) + require.False(t, commitTS.LessEq(c.ResolvedTS), + "repl %s emitted checkpoint %s beyond write timestamp %s", repl3, c.ResolvedTS, commitTS) + case v := <-valueC: + require.Fail(t, "repl3 emitted premature value %s", v) + case <-waitC: + done = true + } + } + t.Logf("checkpoint still below write timestamp") + + // Unblock application on repl3. Wait for the checkpoint to catch up to the + // commit timestamp, and the committed value to be emitted. + blockRangeOnN3.Store(0) + close(unblockRangeC) + + rts = waitForCheckpoint(t, writeTS) + t.Logf("checkpoint %s caught up to write timestamp %s", rts, writeTS) + + select { + case v := <-valueC: + require.Equal(t, v.Key, key) + require.Equal(t, v.Value.Timestamp, writeTS) + t.Logf("received value %s", v) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for event") + } +} + +// channelSink is a rangefeed sink which posts events to a channel. +type channelSink struct { + ctx context.Context + ch chan<- *roachpb.RangeFeedEvent +} + +func newChannelSink(ctx context.Context, ch chan<- *roachpb.RangeFeedEvent) *channelSink { + return &channelSink{ctx: ctx, ch: ch} +} + +func (c *channelSink) Context() context.Context { + return c.ctx +} + +func (c *channelSink) Send(e *roachpb.RangeFeedEvent) error { + select { + case c.ch <- e: + return nil + case <-c.ctx.Done(): + return c.ctx.Err() + } +} diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index e9f1abf12e20..90d9db3af897 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -19,10 +19,12 @@ go_library( "//pkg/keys", "//pkg/roachpb", "//pkg/settings", + "//pkg/settings/cluster", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/util/admission", "//pkg/util/bufalloc", + "//pkg/util/contextutil", "//pkg/util/envutil", "//pkg/util/hlc", "//pkg/util/interval", diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 46aa41b2722b..5dfb201008c0 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -17,6 +17,8 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -37,6 +39,21 @@ const ( defaultCheckStreamsInterval = 1 * time.Second ) +var ( + // PushTxnsBarrierEnabled is an escape hatch to disable the txn push barrier + // command in case it causes unexpected problems. This can result in + // violations of the rangefeed checkpoint guarantee, emitting premature + // checkpoints before all writes below it have been emitted in rare cases. + // See: https://github.com/cockroachdb/cockroach/issues/104309 + PushTxnsBarrierEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.rangefeed.push_txns.barrier.enabled", + "flush and apply prior writes when a txn push returns an ambiguous abort "+ + "(disabling may emit premature checkpoints before writes in rare cases)", + true, + ) +) + // newErrBufferCapacityExceeded creates an error that is returned to subscribers // if the rangefeed processor is not able to keep up with the flow of incoming // events and is forced to drop events in order to not block. @@ -49,9 +66,10 @@ func newErrBufferCapacityExceeded() *roachpb.Error { // Config encompasses the configuration required to create a Processor. type Config struct { log.AmbientContext - Clock *hlc.Clock - RangeID roachpb.RangeID - Span roachpb.RSpan + Settings *cluster.Settings + Clock *hlc.Clock + RangeID roachpb.RangeID + Span roachpb.RSpan TxnPusher TxnPusher // PushTxnsInterval specifies the interval at which a Processor will push diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 955ee49eb59b..d018659ffa87 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -804,7 +804,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { // The txns are not in a sorted order. Enforce one. sort.Slice(txns, func(i, j int) bool { return bytes.Compare(txns[i].Key, txns[j].Key) < 0 @@ -818,34 +818,34 @@ func TestProcessorTxnPushAttempt(t *testing.T) { assert.Equal(t, txn2Meta, txns[1]) assert.Equal(t, txn3Meta, txns[2]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push does not succeed. Protos not at larger ts. - return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil + return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, false, nil case 2: assert.Equal(t, 3, len(txns)) assert.Equal(t, txn1MetaT2Pre, txns[0]) assert.Equal(t, txn2Meta, txns[1]) assert.Equal(t, txn3Meta, txns[2]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil + return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, false, nil case 3: assert.Equal(t, 2, len(txns)) assert.Equal(t, txn2MetaT2Post, txns[0]) assert.Equal(t, txn3MetaT2Post, txns[1]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil + return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, false, nil default: - return nil, nil + return nil, false, nil } }) tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { @@ -1492,11 +1492,11 @@ func TestProcessorContextCancellation(t *testing.T) { var pusher testTxnPusher pusher.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { pushReadyC <- struct{}{} <-ctx.Done() close(pushDoneC) - return nil, ctx.Err() + return nil, false, ctx.Err() }) pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { return nil diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 65258a63af3b..d33e9fad5fa4 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -12,11 +12,13 @@ package rangefeed import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -214,10 +216,17 @@ func (l *LegacyIntentScanner) Close() { l.iter.Close() } // cleaning up the intents of transactions that are found to be committed. type TxnPusher interface { // PushTxns attempts to push the specified transactions to a new - // timestamp. It returns the resulting transaction protos. - PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + // timestamp. It returns the resulting transaction protos, and a + // bool indicating whether any txn aborts were ambiguous (see + // PushTxnResponse.AmbiguousAbort). + // + // NB: anyAmbiguousAbort may be false with nodes <24.1. + PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error) // ResolveIntents resolves the specified intents. ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error + // Barrier waits for all past and ongoing write commands in the range to have + // applied on the leaseholder and the local replica. + Barrier(ctx context.Context) error } // txnPushAttempt pushes all old transactions that have unresolved intents on @@ -267,7 +276,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { // This may cause transaction restarts, but span refreshing should // prevent a restart for any transaction that has not been written // over at a larger timestamp. - pushedTxns, err := a.p.TxnPusher.PushTxns(ctx, a.txns, a.ts) + pushedTxns, anyAmbiguousAbort, err := a.p.TxnPusher.PushTxns(ctx, a.txns, a.ts) if err != nil { return err } @@ -340,6 +349,49 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { } } + // It's possible that the ABORTED state is a false negative, where the + // transaction was in fact committed but the txn record has been removed after + // resolving all intents (see batcheval.SynthesizeTxnFromMeta and + // Replica.CanCreateTxnRecord). If this replica has not applied the intent + // resolution yet, we may prematurely emit an MVCCAbortTxnOp and advance + // the resolved ts before emitting the committed intents. This violates the + // rangefeed checkpoint guarantee, and will at the time of writing cause the + // changefeed to drop these events entirely. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + // + // PushTxns will let us know if it found such an ambiguous abort. To guarantee + // that we've applied all resolved intents in this case, submit a Barrier + // command to the leaseholder and wait for it to apply on the local replica. + // + // By the time the local replica applies the barrier it will have enqueued the + // resolved intents in the rangefeed processor's queue. These updates may not + // yet have been applied to the resolved timestamp intent tracker, but that's + // ok -- our MVCCAbortTxnOp will be enqueued and processed after them. + // + // This incurs an additional Raft write, but so would PushTxns() if we hadn't + // hit the ambiguous abort case. This will also block until ongoing writes + // have completed and applied, but that's fine since we currently run on our + // own goroutine (as opposed to on a rangefeed scheduler goroutine). + // + // NB: We can't try to reduce the span of the barrier, because LockSpans may + // not have the full set of intents. + // + // NB: PushTxnResponse.AmbiguousAbort and BarrierResponse.LeaseAppliedIndex + // are not guaranteed to be populated prior to 24.1. In that case, we degrade + // to the old (buggy) behavior. + if anyAmbiguousAbort && PushTxnsBarrierEnabled.Get(&a.p.Settings.SV) { + // The barrier will error out if our context is cancelled (which happens on + // processor shutdown) or if the replica is destroyed. Regardless, use a 1 + // minute backstop to prevent getting wedged. + // + // TODO(erikgrinaker): consider removing this once we have some confidence + // that it won't get wedged. + err := contextutil.RunWithTimeout(ctx, "pushtxns barrier", time.Minute, a.p.TxnPusher.Barrier) + if err != nil { + return err + } + } + // Inform the processor of all logical ops. a.p.sendEvent(ctx, event{ops: ops}, 0) diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 39b2ee45b017..e2e17ae4c75c 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -359,13 +359,13 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error) resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, -) ([]*roachpb.Transaction, error) { +) ([]*roachpb.Transaction, bool, error) { return tp.pushTxnsFn(ctx, txns, ts) } @@ -373,8 +373,12 @@ func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.L return tp.resolveIntentsFn(ctx, intents) } +func (tp *testTxnPusher) Barrier(ctx context.Context) error { + return nil +} + func (tp *testTxnPusher) mockPushTxns( - fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), + fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error), ) { tp.pushTxnsFn = fn } @@ -433,7 +437,7 @@ func TestTxnPushAttempt(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1]) @@ -444,7 +448,7 @@ func TestTxnPushAttempt(t *testing.T) { // Return all four protos. The PENDING txn is pushed. txn1ProtoPushed := txn1Proto.Clone() txn1ProtoPushed.WriteTimestamp = ts - return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil + return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, false, nil }) tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { require.Len(t, intents, 7) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index fdfb1660151c..abff69b956b7 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -93,8 +93,9 @@ func (s *lockedRangefeedStream) Send(e *roachpb.RangeFeedEvent) error { // rangefeedTxnPusher is a shim around intentResolver that implements the // rangefeed.TxnPusher interface. type rangefeedTxnPusher struct { - ir *intentresolver.IntentResolver - r *Replica + ir *intentresolver.IntentResolver + r *Replica + span roachpb.RSpan } // PushTxns is part of the rangefeed.TxnPusher interface. It performs a @@ -102,7 +103,7 @@ type rangefeedTxnPusher struct { // transactions. func (tp *rangefeedTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, -) ([]*roachpb.Transaction, error) { +) ([]*roachpb.Transaction, bool, error) { pushTxnMap := make(map[uuid.UUID]*enginepb.TxnMeta, len(txns)) for i := range txns { txn := &txns[i] @@ -118,18 +119,18 @@ func (tp *rangefeedTxnPusher) PushTxns( }, } - pushedTxnMap, _, pErr := tp.ir.MaybePushTransactions( + pushedTxnMap, anyAmbiguousAbort, pErr := tp.ir.MaybePushTransactions( ctx, pushTxnMap, h, roachpb.PUSH_TIMESTAMP, false, /* skipIfInFlight */ ) if pErr != nil { - return nil, pErr.GoError() + return nil, false, pErr.GoError() } pushedTxns := make([]*roachpb.Transaction, 0, len(pushedTxnMap)) for _, txn := range pushedTxnMap { pushedTxns = append(pushedTxns, txn) } - return pushedTxns, nil + return pushedTxns, anyAmbiguousAbort, nil } // ResolveIntents is part of the rangefeed.TxnPusher interface. @@ -142,6 +143,40 @@ func (tp *rangefeedTxnPusher) ResolveIntents( ).GoError() } +// Barrier is part of the rangefeed.TxnPusher interface. +func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error { + // Execute a Barrier on the leaseholder, and obtain its LAI. Error out on any + // range changes (e.g. splits/merges) that we haven't applied yet. + lai, desc, err := tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey) + if err != nil { + if errors.HasType(err, &roachpb.RangeKeyMismatchError{}) { + return errors.Wrap(err, "range barrier failed, range split") + } + return errors.Wrap(err, "range barrier failed") + } + if lai == 0 { + // We may be talking to a binary which doesn't support + // BarrierRequest.WithLeaseAppliedIndex, so just degrade to the previous + // (buggy) behavior. + return nil + } + if desc.RangeID != tp.r.RangeID { + return errors.Errorf("range barrier failed, range ID changed: %d -> %s", tp.r.RangeID, desc) + } + if !desc.RSpan().Equal(tp.span) { + return errors.Errorf("range barrier failed, range span changed: %s -> %s", tp.span, desc) + } + + // Wait for the local replica to apply it. In the common case where we are the + // leaseholder, the Barrier call will already have waited for application, so + // this succeeds immediately. + if _, err = tp.r.WaitForLeaseAppliedIndex(ctx, lai); err != nil { + return errors.Wrap(err, "range barrier failed") + } + + return nil +} + // RangeFeed registers a rangefeed over the specified span. It sends updates to // the provided stream and returns with an optional error when the rangefeed is // complete. The surrounding store's ConcurrentRequestLimiter is used to limit @@ -370,9 +405,10 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Create a new rangefeed. desc := r.Desc() - tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r} + tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r, span: desc.RSpan()} cfg := rangefeed.Config{ AmbientContext: r.AmbientContext, + Settings: r.ClusterSettings(), Clock: r.Clock(), RangeID: r.RangeID, Span: desc.RSpan(), From 9e137c43013268e2d57717d6f9957e85077dd218 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 23 Jan 2024 19:55:56 +0000 Subject: [PATCH 2/2] rangefeed: assert intent commits above resolved timestamp Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor.go | 2 +- .../kvserver/rangefeed/resolved_timestamp.go | 40 +++++-- .../rangefeed/resolved_timestamp_test.go | 101 ++++++++++-------- 3 files changed, 86 insertions(+), 57 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 5dfb201008c0..af9a20faebd9 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -744,7 +744,7 @@ func (p *Processor) consumeLogicalOps( // Determine whether the operation caused the resolved timestamp to // move forward. If so, publish a RangeFeedCheckpoint notification. - if p.rts.ConsumeLogicalOp(op) { + if p.rts.ConsumeLogicalOp(ctx, op) { p.publishCheckpoint(ctx) } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index f1f9e8d57a8f..c6793040e9b0 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -13,11 +13,13 @@ package rangefeed import ( "bytes" "container/heap" + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -128,32 +130,40 @@ func (rts *resolvedTimestamp) ForwardClosedTS(newClosedTS hlc.Timestamp) bool { // operation within its range of tracked keys. This allows the structure to // update its internal intent tracking to reflect the change. The method returns // whether this caused the resolved timestamp to move forward. -func (rts *resolvedTimestamp) ConsumeLogicalOp(op enginepb.MVCCLogicalOp) bool { - if rts.consumeLogicalOp(op) { +func (rts *resolvedTimestamp) ConsumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { + if rts.consumeLogicalOp(ctx, op) { return rts.recompute() } rts.assertNoChange() return false } -func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { +func (rts *resolvedTimestamp) consumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCDeleteRangeOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCWriteIntentOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp) case *enginepb.MVCCUpdateIntentOp: return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp) case *enginepb.MVCCCommitIntentOp: + // This assertion can be violated in mixed-version clusters prior + // to 24.1, so make it non-fatal for now. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + rts.assertOpAboveRTS(ctx, op, t.Timestamp, false /* fatal */) return rts.intentQ.DecrRef(t.TxnID, t.Timestamp) case *enginepb.MVCCAbortIntentOp: @@ -264,10 +274,22 @@ func (rts *resolvedTimestamp) assertNoChange() { // assertOpAboveTimestamp asserts that this operation is at a larger timestamp // than the current resolved timestamp. A violation of this assertion would // indicate a failure of the closed timestamp mechanism. -func (rts *resolvedTimestamp) assertOpAboveRTS(op enginepb.MVCCLogicalOp, opTS hlc.Timestamp) { +func (rts *resolvedTimestamp) assertOpAboveRTS( + ctx context.Context, op enginepb.MVCCLogicalOp, opTS hlc.Timestamp, fatal bool, +) { if opTS.LessEq(rts.resolvedTS) { - panic(fmt.Sprintf("resolved timestamp %s equal to or above timestamp of operation %v", - rts.resolvedTS, op)) + // NB: MVCCLogicalOp.String() is only implemented for pointer receiver. + // We shadow the variable to avoid it escaping to the heap. + op := op + err := errors.AssertionFailedf( + "resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, &op) + if fatal { + // TODO(erikgrinaker): use log.Fatalf. Panic for now, since tests expect + // it and to minimize code churn for backports. + panic(err) + } else { + log.Errorf(ctx, "%v", err) + } } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go index ff5f9795131d..7496c7475bd9 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go @@ -11,6 +11,7 @@ package rangefeed import ( + "context" "testing" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -176,6 +177,7 @@ func TestUnresolvedIntentQueue(t *testing.T) { func TestResolvedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -184,13 +186,13 @@ func TestResolvedTimestamp(t *testing.T) { // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -201,16 +203,16 @@ func TestResolvedTimestamp(t *testing.T) { // Write intent at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) + rts.ConsumeLogicalOp(ctx, writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) }) // Write value at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 4})) + rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 4})) }) // Write value at later timestamp. No effect on resolved timestamp. - fwd = rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 6})) + fwd = rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 6})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -221,12 +223,12 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn2. No effect on the resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn1. Resolved timestamp moves forward. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -236,13 +238,13 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at same timestamp. No change. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at later timestamp. Resolved // timestamp moves forward. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 18}, rts.Get()) @@ -253,47 +255,47 @@ func TestResolvedTimestamp(t *testing.T) { // First transaction aborted. Resolved timestamp moves to next earliest // intent. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction at higher timestamp. No effect. txn3 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction aborted. No effect. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction at higher timestamp. No effect. txn4 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction committed. No effect. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Second transaction observes one intent being resolved at timestamp // above closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 30}, rts.Get()) @@ -304,22 +306,22 @@ func TestResolvedTimestamp(t *testing.T) { // Second transaction observes another intent being resolved at timestamp // below closed time. Still one intent left. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 34}, rts.Get()) // Second transaction observes final intent being resolved at timestamp // below closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) // Fifth transaction at higher timestamp. No effect. txn5 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) @@ -330,7 +332,7 @@ func TestResolvedTimestamp(t *testing.T) { // Fifth transaction bumps epoch and re-writes one of its intents. Resolved // timestamp moves to the new transaction timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 46}, rts.Get()) @@ -338,48 +340,49 @@ func TestResolvedTimestamp(t *testing.T) { // its final epoch. Resolved timestamp moves forward after observing the // first intent committing at a higher timestamp and moves to the closed // timestamp after observing the second intent aborting. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 48}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn5)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn5)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 50}, rts.Get()) } func TestResolvedTimestampNoClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() // Add a value. No closed timestamp so no resolved timestamp. - fwd := rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 1})) + fwd := rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Update intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort the first intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Commit the second intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) } @@ -418,6 +421,8 @@ func TestResolvedTimestampNoIntents(t *testing.T) { func TestResolvedTimestampInit(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() + t.Run("CT Before Init", func(t *testing.T) { rts := makeResolvedTimestamp() @@ -436,7 +441,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -450,7 +455,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -469,12 +474,12 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort that intent's transaction. Not initialized so no-op. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -482,7 +487,7 @@ func TestResolvedTimestampInit(t *testing.T) { // out with the out-of-order intent abort operation. If this abort hadn't // allowed the unresolvedTxn's ref count to drop below 0, this would // have created a reference that would never be cleaned up. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -501,7 +506,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -513,6 +518,7 @@ func TestResolvedTimestampInit(t *testing.T) { func TestResolvedTimestampTxnAborted(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -523,7 +529,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -533,23 +539,23 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Abort txn1 after a periodic txn push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Update one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Abort one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Write another intent as txn1. Should add txn1 back into queue. // This will eventually require another txn push to evict. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -560,7 +566,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get()) // Abort txn1 again after another periodic push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get()) } @@ -569,6 +575,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { func TestClosedTimestampLogicalPart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -579,7 +586,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) @@ -591,7 +598,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) // Abort txn1. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) @@ -600,7 +607,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // and an intent is in the next wall tick; this used to cause an issue because // of the rounding logic. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) }