diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index e3dad86f78db..3187eefb0d8b 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -71,6 +71,8 @@ go_test( "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvpb", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/closedts", + "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", @@ -82,14 +84,17 @@ go_test( "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/ctxgroup", "//pkg/util/encoding", + "//pkg/util/future", "//pkg/util/hlc", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/mon", "//pkg/util/retry", "//pkg/util/stop", diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 5e677b2cc4b1..b96110936675 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -14,26 +14,34 @@ import ( "context" "runtime/pprof" "sync" + "sync/atomic" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -1155,3 +1163,321 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) { t.Fatal("timed out waiting for event") } } + +// TestRangeFeedIntentResolutionRace is a regression test for +// https://github.com/cockroachdb/cockroach/issues/104309, i.e. the +// following scenario: +// +// - A rangefeed is running on a lagging follower. +// - A txn writes an intent, which is replicated to the follower. +// - The closed timestamp advances past the intent. +// - The txn commits and resolves the intent at the original write timestamp, +// then GCs its txn record. This is not yet applied on the follower. +// - The rangefeed pushes the txn to advance its resolved timestamp. +// - The txn is GCed, so the push returns ABORTED (it can't know whether the +// txn was committed or aborted after its record is GCed). +// - The rangefeed disregards the "aborted" txn and advances the resolved +// timestamp, emitting a checkpoint. +// - The follower applies the resolved intent and emits an event below +// the checkpoint, violating the checkpoint guarantee. +// +// This scenario is addressed by running a Barrier request through Raft and +// waiting for it to apply locally before removing the txn from resolved ts +// tracking. This ensures the pending intent resolution is applied before +// the resolved ts can advance. +func TestRangeFeedIntentResolutionRace(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // too slow, times out + skip.UnderDeadlock(t) + + // Use a timeout, to prevent a hung test. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + // defer cancel() after Stopper.Stop(), so the context cancels first. + // Otherwise, the stopper will hang waiting for a rangefeed whose context is + // not yet cancelled. + + // Set up an apply filter that blocks Raft application on n3 (follower) for + // the given range. + var blockRangeOnN3 atomic.Int64 + unblockRangeC := make(chan struct{}) + applyFilter := func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) { + if args.StoreID == 3 { + if rangeID := blockRangeOnN3.Load(); rangeID > 0 && rangeID == int64(args.RangeID) { + t.Logf("blocked r%d on s%d", args.RangeID, args.StoreID) + select { + case <-unblockRangeC: + t.Logf("unblocked r%d on s%d", args.RangeID, args.StoreID) + case <-ctx.Done(): + return 0, kvpb.NewError(ctx.Err()) + } + } + } + return 0, nil + } + + // Set up a request filter that blocks transaction pushes for a specific key. + // This is used to prevent the rangefeed txn pusher from pushing the txn + // timestamp above the closed timestamp before it commits, only allowing the + // push to happen after the transaction has committed and GCed below the + // closed timestamp. + var blockPush atomic.Pointer[roachpb.Key] + unblockPushC := make(chan struct{}) + reqFilter := func(ctx context.Context, br *kvpb.BatchRequest) *kvpb.Error { + if br.IsSinglePushTxnRequest() { + req := br.Requests[0].GetPushTxn() + if key := blockPush.Load(); key != nil && req.Key.Equal(*key) { + t.Logf("blocked push for txn %s", req.PusheeTxn) + select { + case <-unblockPushC: + t.Logf("unblocked push for txn %s", req.PusheeTxn) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + } + } + return nil + } + + // Speed up the test by reducing various closed/resolved timestamp intervals. + const interval = 100 * time.Millisecond + st := cluster.MakeTestingClusterSettings() + kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, interval) + closedts.SideTransportCloseInterval.Override(ctx, &st.SV, interval) + closedts.TargetDuration.Override(ctx, &st.SV, interval) + + // Start a cluster with 3 nodes. + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: reqFilter, + TestingApplyCalledTwiceFilter: applyFilter, + RangeFeedPushTxnsInterval: interval, + RangeFeedPushTxnsAge: interval, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + defer cancel() + + n1 := tc.Server(0) + s3 := tc.GetFirstStoreFromServer(t, 2) + clock := n1.Clock() + + // Determine the key/value we're going to write. + prefix := append(n1.Codec().TenantPrefix(), keys.ScratchRangeMin...) + key := append(prefix.Clone(), []byte("/foo")...) + value := []byte("value") + + // Split off a range and upreplicate it, with leaseholder on n1. + _, _, err := n1.SplitRange(prefix) + require.NoError(t, err) + desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...) + t.Logf("split off range %s", desc) + + repl1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(prefix)) // leaseholder + repl3 := tc.GetFirstStoreFromServer(t, 2).LookupReplica(roachpb.RKey(prefix)) // lagging follower + + require.True(t, repl1.OwnsValidLease(ctx, clock.NowAsClockTimestamp())) + + // Block pushes of the txn, to ensure it can write at a fixed timestamp. + // Otherwise, the rangefeed or someone else may push it beyond the closed + // timestamp. + blockPush.Store(&key) + + // We'll use n3 as our lagging follower. Start a rangefeed on it directly. + req := kvpb.RangeFeedRequest{ + Header: kvpb.Header{ + RangeID: desc.RangeID, + }, + Span: desc.RSpan().AsRawSpanWithNoLocals(), + } + eventC := make(chan *kvpb.RangeFeedEvent) + sink := newChannelSink(ctx, eventC) + fErr := future.MakeAwaitableFuture(s3.RangeFeed(&req, sink)) + require.NoError(t, fErr.Get()) // check if we've errored yet + t.Logf("started rangefeed on %s", repl3) + + // Spawn a rangefeed monitor, which posts checkpoint updates to checkpointC. + // This uses a buffered channel of size 1, and empties it out before posting a + // new update, such that it contains the latest known checkpoint and does not + // block the rangefeed. It also posts emitted values for our key to valueC, + // which should only happen once. + checkpointC := make(chan *kvpb.RangeFeedCheckpoint, 1) + valueC := make(chan *kvpb.RangeFeedValue, 1) + go func() { + defer close(checkpointC) + defer close(valueC) + for { + select { + case e := <-eventC: + switch { + case e.Checkpoint != nil: + // Clear checkpointC such that it always contains the latest. + select { + case <-checkpointC: + default: + } + checkpointC <- e.Checkpoint + case e.Val != nil && e.Val.Key.Equal(key): + select { + case valueC <- e.Val: + default: + t.Errorf("duplicate value event for %s: %s", key, e) + } + } + case <-ctx.Done(): + return + } + } + }() + + waitForCheckpoint := func(t *testing.T, ts hlc.Timestamp) hlc.Timestamp { + t.Helper() + timeoutC := time.After(3 * time.Second) + for { + select { + case c := <-checkpointC: + require.NotNil(t, c, "nil checkpoint") + if ts.LessEq(c.ResolvedTS) { + t.Logf("rangefeed checkpoint at %s >= %s", c.ResolvedTS, ts) + return c.ResolvedTS + } + case <-timeoutC: + require.Fail(t, "timed out waiting for checkpoint", "wanted %s", ts) + } + } + } + + // Wait for the initial checkpoint. + rts := waitForCheckpoint(t, clock.Now()) + t.Logf("initial checkpoint at %s", rts) + + // Start a new transaction on n1 with a fixed timestamp (to make sure it + // remains below the closed timestamp). Write an intent, and read it back to + // make sure it has applied. + writeTS := clock.Now() + txn := n1.DB().NewTxn(ctx, "test") + require.NoError(t, txn.SetFixedTimestamp(ctx, writeTS)) + require.NoError(t, txn.Put(ctx, key, value)) + _, err = txn.Get(ctx, key) + require.NoError(t, err) + t.Logf("wrote %s", key) + + // Wait for both the leaseholder and the follower to close the transaction's + // write timestamp. + waitForClosedTimestamp := func(t *testing.T, repl *kvserver.Replica, ts hlc.Timestamp) hlc.Timestamp { + t.Helper() + timeoutC := time.After(3 * time.Second) + for { + if closedTS := repl.GetCurrentClosedTimestamp(ctx); ts.LessEq(closedTS) { + t.Logf("replica %s closed timestamp at %s >= %s", repl, closedTS, ts) + return closedTS + } + select { + case <-time.After(10 * time.Millisecond): + case <-timeoutC: + require.Fail(t, "timeout out waiting for closed timestamp", "wanted %s", ts) + } + } + } + cts := waitForClosedTimestamp(t, repl1, writeTS) + _ = waitForClosedTimestamp(t, repl3, writeTS) + t.Logf("closed timestamp %s is above txn write timestamp %s", cts, writeTS) + + // Wait for the rangefeed checkpoint to reach writeTS.Prev(). This ensures the + // rangefeed's view of the closed timestamp has been updated, and is now only + // blocked by the intent. + waitTS := writeTS.Prev() + waitTS.Logical = 0 + rts = waitForCheckpoint(t, waitTS) + t.Logf("rangefeed caught up to txn write timestamp at %s", rts) + + // Block Raft application on repl3. + blockRangeOnN3.Store(int64(desc.RangeID)) + + // Commit the transaction, and check its commit timestamp. + require.NoError(t, txn.Commit(ctx)) + commitTS := txn.CommitTimestamp() + require.Equal(t, commitTS, writeTS) + t.Logf("txn committed at %s", writeTS) + + // Unblock transaction pushes. Since repl3 won't apply the intent resolution + // yet, the rangefeed will keep trying to push the transaction. Once the + // transaction record is GCed (which happens async), the rangefeed will see an + // ABORTED status. + // + // It may see the intermediate COMMITTED state too, but at the time of writing + // that does not matter, since the rangefeed needs to keep tracking the + // intent until it applies the resolution, and it will also see the ABORTED + // status before that happens. + blockPush.Store(nil) + close(unblockPushC) + + // Make sure repl3 does not emit a checkpoint beyond the write timestamp. Its + // closed timestamp has already advanced past it, but the unresolved intent + // should prevent the resolved timestamp from advancing, despite the false + // ABORTED state. We also make sure no value has been emitted. + waitC := time.After(3 * time.Second) + for done := false; !done; { + select { + case c := <-checkpointC: + require.NotNil(t, c) + require.False(t, commitTS.LessEq(c.ResolvedTS), + "repl %s emitted checkpoint %s beyond write timestamp %s", repl3, c.ResolvedTS, commitTS) + case v := <-valueC: + require.Fail(t, "repl3 emitted premature value %s", v) + case <-waitC: + done = true + } + } + t.Logf("checkpoint still below write timestamp") + + // Unblock application on repl3. Wait for the checkpoint to catch up to the + // commit timestamp, and the committed value to be emitted. + blockRangeOnN3.Store(0) + close(unblockRangeC) + + rts = waitForCheckpoint(t, writeTS) + t.Logf("checkpoint %s caught up to write timestamp %s", rts, writeTS) + + select { + case v := <-valueC: + require.Equal(t, v.Key, key) + require.Equal(t, v.Value.Timestamp, writeTS) + t.Logf("received value %s", v) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for event") + } + + // The rangefeed should still be running. + require.NoError(t, fErr.Get()) +} + +// channelSink is a rangefeed sink which posts events to a channel. +type channelSink struct { + ctx context.Context + ch chan<- *kvpb.RangeFeedEvent +} + +func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channelSink { + return &channelSink{ctx: ctx, ch: ch} +} + +func (c *channelSink) Context() context.Context { + return c.ctx +} + +func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error { + select { + case c.ch <- e: + return nil + case <-c.ctx.Done(): + return c.ctx.Err() + } +} diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index d72732096ce6..841b41ba1edb 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/storage/enginepb", "//pkg/util/admission", "//pkg/util/bufalloc", + "//pkg/util/contextutil", "//pkg/util/envutil", "//pkg/util/future", "//pkg/util/hlc", diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index bfec74e901e8..b817df38c985 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -53,6 +53,19 @@ var ( "periodically push txn write timestamps to advance rangefeed resolved timestamps", true, ) + + // PushTxnsBarrierEnabled is an escape hatch to disable the txn push barrier + // command in case it causes unexpected problems. This can result in + // violations of the rangefeed checkpoint guarantee, emitting premature + // checkpoints before all writes below it have been emitted in rare cases. + // See: https://github.com/cockroachdb/cockroach/issues/104309 + PushTxnsBarrierEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.rangefeed.push_txns.barrier.enabled", + "flush and apply prior writes when a txn push returns an ambiguous abort "+ + "(disabling may emit premature checkpoints before writes in rare cases)", + true, + ) ) // newErrBufferCapacityExceeded creates an error that is returned to subscribers @@ -777,7 +790,7 @@ func (p *Processor) consumeLogicalOps( // Determine whether the operation caused the resolved timestamp to // move forward. If so, publish a RangeFeedCheckpoint notification. - if p.rts.ConsumeLogicalOp(op) { + if p.rts.ConsumeLogicalOp(ctx, op) { p.publishCheckpoint(ctx) } } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 8af6c86094c8..6fe28a43d0e5 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -833,7 +833,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { // The txns are not in a sorted order. Enforce one. sort.Slice(txns, func(i, j int) bool { return bytes.Compare(txns[i].Key, txns[j].Key) < 0 @@ -847,34 +847,34 @@ func TestProcessorTxnPushAttempt(t *testing.T) { assert.Equal(t, txn2Meta, txns[1]) assert.Equal(t, txn3Meta, txns[2]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push does not succeed. Protos not at larger ts. - return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil + return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, false, nil case 2: assert.Equal(t, 3, len(txns)) assert.Equal(t, txn1MetaT2Pre, txns[0]) assert.Equal(t, txn2Meta, txns[1]) assert.Equal(t, txn3Meta, txns[2]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil + return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, false, nil case 3: assert.Equal(t, 2, len(txns)) assert.Equal(t, txn2MetaT2Post, txns[0]) assert.Equal(t, txn3MetaT2Post, txns[1]) if t.Failed() { - return nil, errors.New("test failed") + return nil, false, errors.New("test failed") } // Push succeeds. Return new protos. - return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil + return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, false, nil default: - return nil, nil + return nil, false, nil } }) tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { @@ -992,10 +992,10 @@ func TestProcessorTxnPushDisabled(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { err := errors.Errorf("unexpected txn push for txns=%v ts=%s", txns, ts) t.Errorf("%v", err) - return nil, err + return nil, false, err }) p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &tp, st) @@ -1547,11 +1547,11 @@ func TestProcessorContextCancellation(t *testing.T) { var pusher testTxnPusher pusher.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { pushReadyC <- struct{}{} <-ctx.Done() close(pushDoneC) - return nil, ctx.Err() + return nil, false, ctx.Err() }) pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { return nil diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index f1f9e8d57a8f..c6793040e9b0 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -13,11 +13,13 @@ package rangefeed import ( "bytes" "container/heap" + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -128,32 +130,40 @@ func (rts *resolvedTimestamp) ForwardClosedTS(newClosedTS hlc.Timestamp) bool { // operation within its range of tracked keys. This allows the structure to // update its internal intent tracking to reflect the change. The method returns // whether this caused the resolved timestamp to move forward. -func (rts *resolvedTimestamp) ConsumeLogicalOp(op enginepb.MVCCLogicalOp) bool { - if rts.consumeLogicalOp(op) { +func (rts *resolvedTimestamp) ConsumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { + if rts.consumeLogicalOp(ctx, op) { return rts.recompute() } rts.assertNoChange() return false } -func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { +func (rts *resolvedTimestamp) consumeLogicalOp( + ctx context.Context, op enginepb.MVCCLogicalOp, +) bool { switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCDeleteRangeOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return false case *enginepb.MVCCWriteIntentOp: - rts.assertOpAboveRTS(op, t.Timestamp) + rts.assertOpAboveRTS(ctx, op, t.Timestamp, true /* fatal */) return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp) case *enginepb.MVCCUpdateIntentOp: return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp) case *enginepb.MVCCCommitIntentOp: + // This assertion can be violated in mixed-version clusters prior + // to 24.1, so make it non-fatal for now. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + rts.assertOpAboveRTS(ctx, op, t.Timestamp, false /* fatal */) return rts.intentQ.DecrRef(t.TxnID, t.Timestamp) case *enginepb.MVCCAbortIntentOp: @@ -264,10 +274,22 @@ func (rts *resolvedTimestamp) assertNoChange() { // assertOpAboveTimestamp asserts that this operation is at a larger timestamp // than the current resolved timestamp. A violation of this assertion would // indicate a failure of the closed timestamp mechanism. -func (rts *resolvedTimestamp) assertOpAboveRTS(op enginepb.MVCCLogicalOp, opTS hlc.Timestamp) { +func (rts *resolvedTimestamp) assertOpAboveRTS( + ctx context.Context, op enginepb.MVCCLogicalOp, opTS hlc.Timestamp, fatal bool, +) { if opTS.LessEq(rts.resolvedTS) { - panic(fmt.Sprintf("resolved timestamp %s equal to or above timestamp of operation %v", - rts.resolvedTS, op)) + // NB: MVCCLogicalOp.String() is only implemented for pointer receiver. + // We shadow the variable to avoid it escaping to the heap. + op := op + err := errors.AssertionFailedf( + "resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, &op) + if fatal { + // TODO(erikgrinaker): use log.Fatalf. Panic for now, since tests expect + // it and to minimize code churn for backports. + panic(err) + } else { + log.Errorf(ctx, "%v", err) + } } } diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go index ff5f9795131d..7496c7475bd9 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go @@ -11,6 +11,7 @@ package rangefeed import ( + "context" "testing" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -176,6 +177,7 @@ func TestUnresolvedIntentQueue(t *testing.T) { func TestResolvedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -184,13 +186,13 @@ func TestResolvedTimestamp(t *testing.T) { // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -201,16 +203,16 @@ func TestResolvedTimestamp(t *testing.T) { // Write intent at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) + rts.ConsumeLogicalOp(ctx, writeIntentOp(uuid.MakeV4(), hlc.Timestamp{WallTime: 3})) }) // Write value at earlier timestamp. Assertion failure. require.Panics(t, func() { - rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 4})) + rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 4})) }) // Write value at later timestamp. No effect on resolved timestamp. - fwd = rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 6})) + fwd = rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 6})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -221,12 +223,12 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn2. No effect on the resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Update the timestamp of txn1. Resolved timestamp moves forward. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -236,13 +238,13 @@ func TestResolvedTimestamp(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at same timestamp. No change. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 18})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 17}, rts.Get()) // Write intent for earliest txn at later timestamp. Resolved // timestamp moves forward. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 25})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 18}, rts.Get()) @@ -253,47 +255,47 @@ func TestResolvedTimestamp(t *testing.T) { // First transaction aborted. Resolved timestamp moves to next earliest // intent. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction at higher timestamp. No effect. txn3 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 30})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn3, hlc.Timestamp{WallTime: 31})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Third transaction aborted. No effect. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn3)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn3)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction at higher timestamp. No effect. txn4 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Fourth transaction committed. No effect. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn4, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 24}, rts.Get()) // Second transaction observes one intent being resolved at timestamp // above closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 30}, rts.Get()) @@ -304,22 +306,22 @@ func TestResolvedTimestamp(t *testing.T) { // Second transaction observes another intent being resolved at timestamp // below closed time. Still one intent left. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 34}, rts.Get()) // Second transaction observes final intent being resolved at timestamp // below closed time. Resolved timestamp moves to closed timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 35})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) // Fifth transaction at higher timestamp. No effect. txn5 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 45})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn5, hlc.Timestamp{WallTime: 46})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 40}, rts.Get()) @@ -330,7 +332,7 @@ func TestResolvedTimestamp(t *testing.T) { // Fifth transaction bumps epoch and re-writes one of its intents. Resolved // timestamp moves to the new transaction timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn5, hlc.Timestamp{WallTime: 47})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 46}, rts.Get()) @@ -338,48 +340,49 @@ func TestResolvedTimestamp(t *testing.T) { // its final epoch. Resolved timestamp moves forward after observing the // first intent committing at a higher timestamp and moves to the closed // timestamp after observing the second intent aborting. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn5, hlc.Timestamp{WallTime: 49})) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 48}, rts.Get()) - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn5)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn5)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 50}, rts.Get()) } func TestResolvedTimestampNoClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() // Add a value. No closed timestamp so no resolved timestamp. - fwd := rts.ConsumeLogicalOp(writeValueOp(hlc.Timestamp{WallTime: 1})) + fwd := rts.ConsumeLogicalOp(ctx, writeValueOp(hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add an intent. No closed timestamp so no resolved timestamp. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 1})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Update intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 2})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Add another intent. No closed timestamp so no resolved timestamp. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort the first intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Commit the second intent. No closed timestamp so no resolved timestamp. - fwd = rts.ConsumeLogicalOp(commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, commitIntentOp(txn2, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) } @@ -418,6 +421,8 @@ func TestResolvedTimestampNoIntents(t *testing.T) { func TestResolvedTimestampInit(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() + t.Run("CT Before Init", func(t *testing.T) { rts := makeResolvedTimestamp() @@ -436,7 +441,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -450,7 +455,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Add an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd := rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -469,12 +474,12 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) // Abort that intent's transaction. Not initialized so no-op. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -482,7 +487,7 @@ func TestResolvedTimestampInit(t *testing.T) { // out with the out-of-order intent abort operation. If this abort hadn't // allowed the unresolvedTxn's ref count to drop below 0, this would // have created a reference that would never be cleaned up. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 3})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -501,7 +506,7 @@ func TestResolvedTimestampInit(t *testing.T) { // Abort an intent. Not initialized so no resolved timestamp. txn1 := uuid.MakeV4() - fwd := rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd := rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{}, rts.Get()) @@ -513,6 +518,7 @@ func TestResolvedTimestampInit(t *testing.T) { func TestResolvedTimestampTxnAborted(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -523,7 +529,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 5}, rts.Get()) @@ -533,23 +539,23 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 9}, rts.Get()) // Abort txn1 after a periodic txn push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Update one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, updateIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Abort one of txn1's intents. Should be ignored. - fwd = rts.ConsumeLogicalOp(abortIntentOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortIntentOp(txn1)) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) // Write another intent as txn1. Should add txn1 back into queue. // This will eventually require another txn push to evict. - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 20})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 15}, rts.Get()) @@ -560,7 +566,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 19}, rts.Get()) // Abort txn1 again after another periodic push. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get()) } @@ -569,6 +575,7 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { func TestClosedTimestampLogicalPart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() rts := makeResolvedTimestamp() rts.Init() @@ -579,7 +586,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // Add an intent for a new transaction. txn1 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) @@ -591,7 +598,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) // Abort txn1. Resolved timestamp advances. - fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + fwd = rts.ConsumeLogicalOp(ctx, abortTxnOp(txn1)) require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) @@ -600,7 +607,7 @@ func TestClosedTimestampLogicalPart(t *testing.T) { // and an intent is in the next wall tick; this used to cause an issue because // of the rounding logic. txn2 := uuid.MakeV4() - fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) + fwd = rts.ConsumeLogicalOp(ctx, writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) require.False(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) } diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 2b0fad18a659..99b9e890ef7f 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -12,12 +12,14 @@ package rangefeed import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -223,10 +225,17 @@ func (l *LegacyIntentScanner) Close() { l.iter.Close() } // cleaning up the intents of transactions that are found to be committed. type TxnPusher interface { // PushTxns attempts to push the specified transactions to a new - // timestamp. It returns the resulting transaction protos. - PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + // timestamp. It returns the resulting transaction protos, and a + // bool indicating whether any txn aborts were ambiguous (see + // PushTxnResponse.AmbiguousAbort). + // + // NB: anyAmbiguousAbort may be false with nodes <24.1. + PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error) // ResolveIntents resolves the specified intents. ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error + // Barrier waits for all past and ongoing write commands in the range to have + // applied on the leaseholder and the local replica. + Barrier(ctx context.Context) error } // txnPushAttempt pushes all old transactions that have unresolved intents on @@ -276,7 +285,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { // This may cause transaction restarts, but span refreshing should // prevent a restart for any transaction that has not been written // over at a larger timestamp. - pushedTxns, err := a.p.TxnPusher.PushTxns(ctx, a.txns, a.ts) + pushedTxns, anyAmbiguousAbort, err := a.p.TxnPusher.PushTxns(ctx, a.txns, a.ts) if err != nil { return err } @@ -349,6 +358,49 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error { } } + // It's possible that the ABORTED state is a false negative, where the + // transaction was in fact committed but the txn record has been removed after + // resolving all intents (see batcheval.SynthesizeTxnFromMeta and + // Replica.CanCreateTxnRecord). If this replica has not applied the intent + // resolution yet, we may prematurely emit an MVCCAbortTxnOp and advance + // the resolved ts before emitting the committed intents. This violates the + // rangefeed checkpoint guarantee, and will at the time of writing cause the + // changefeed to drop these events entirely. See: + // https://github.com/cockroachdb/cockroach/issues/104309 + // + // PushTxns will let us know if it found such an ambiguous abort. To guarantee + // that we've applied all resolved intents in this case, submit a Barrier + // command to the leaseholder and wait for it to apply on the local replica. + // + // By the time the local replica applies the barrier it will have enqueued the + // resolved intents in the rangefeed processor's queue. These updates may not + // yet have been applied to the resolved timestamp intent tracker, but that's + // ok -- our MVCCAbortTxnOp will be enqueued and processed after them. + // + // This incurs an additional Raft write, but so would PushTxns() if we hadn't + // hit the ambiguous abort case. This will also block until ongoing writes + // have completed and applied, but that's fine since we currently run on our + // own goroutine (as opposed to on a rangefeed scheduler goroutine). + // + // NB: We can't try to reduce the span of the barrier, because LockSpans may + // not have the full set of intents. + // + // NB: PushTxnResponse.AmbiguousAbort and BarrierResponse.LeaseAppliedIndex + // are not guaranteed to be populated prior to 24.1. In that case, we degrade + // to the old (buggy) behavior. + if anyAmbiguousAbort && PushTxnsBarrierEnabled.Get(&a.p.Settings.SV) { + // The barrier will error out if our context is cancelled (which happens on + // processor shutdown) or if the replica is destroyed. Regardless, use a 1 + // minute backstop to prevent getting wedged. + // + // TODO(erikgrinaker): consider removing this once we have some confidence + // that it won't get wedged. + err := contextutil.RunWithTimeout(ctx, "pushtxns barrier", time.Minute, a.p.TxnPusher.Barrier) + if err != nil { + return err + } + } + // Inform the processor of all logical ops. a.p.sendEvent(ctx, event{ops: ops}, 0) diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 94b3c95102db..3a60f27d5bd8 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -369,13 +369,13 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error) resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, -) ([]*roachpb.Transaction, error) { +) ([]*roachpb.Transaction, bool, error) { return tp.pushTxnsFn(ctx, txns, ts) } @@ -383,8 +383,12 @@ func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.L return tp.resolveIntentsFn(ctx, intents) } +func (tp *testTxnPusher) Barrier(ctx context.Context) error { + return nil +} + func (tp *testTxnPusher) mockPushTxns( - fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), + fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, bool, error), ) { tp.pushTxnsFn = fn } @@ -443,7 +447,7 @@ func TestTxnPushAttempt(t *testing.T) { var tp testTxnPusher tp.mockPushTxns(func( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, - ) ([]*roachpb.Transaction, error) { + ) ([]*roachpb.Transaction, bool, error) { require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1]) @@ -454,7 +458,7 @@ func TestTxnPushAttempt(t *testing.T) { // Return all four protos. The PENDING txn is pushed. txn1ProtoPushed := txn1Proto.Clone() txn1ProtoPushed.WriteTimestamp = ts - return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, nil + return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto, txn4Proto}, false, nil }) tp.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { require.Len(t, intents, 7) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index b19f63b634f6..410ea9444aee 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -101,8 +101,9 @@ func (s *lockedRangefeedStream) Send(e *kvpb.RangeFeedEvent) error { // rangefeedTxnPusher is a shim around intentResolver that implements the // rangefeed.TxnPusher interface. type rangefeedTxnPusher struct { - ir *intentresolver.IntentResolver - r *Replica + ir *intentresolver.IntentResolver + r *Replica + span roachpb.RSpan } // PushTxns is part of the rangefeed.TxnPusher interface. It performs a @@ -110,7 +111,7 @@ type rangefeedTxnPusher struct { // transactions. func (tp *rangefeedTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, -) ([]*roachpb.Transaction, error) { +) ([]*roachpb.Transaction, bool, error) { pushTxnMap := make(map[uuid.UUID]*enginepb.TxnMeta, len(txns)) for i := range txns { txn := &txns[i] @@ -126,18 +127,18 @@ func (tp *rangefeedTxnPusher) PushTxns( }, } - pushedTxnMap, _, pErr := tp.ir.MaybePushTransactions( + pushedTxnMap, anyAmbiguousAbort, pErr := tp.ir.MaybePushTransactions( ctx, pushTxnMap, h, kvpb.PUSH_TIMESTAMP, false, /* skipIfInFlight */ ) if pErr != nil { - return nil, pErr.GoError() + return nil, false, pErr.GoError() } pushedTxns := make([]*roachpb.Transaction, 0, len(pushedTxnMap)) for _, txn := range pushedTxnMap { pushedTxns = append(pushedTxns, txn) } - return pushedTxns, nil + return pushedTxns, anyAmbiguousAbort, nil } // ResolveIntents is part of the rangefeed.TxnPusher interface. @@ -150,6 +151,40 @@ func (tp *rangefeedTxnPusher) ResolveIntents( ).GoError() } +// Barrier is part of the rangefeed.TxnPusher interface. +func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error { + // Execute a Barrier on the leaseholder, and obtain its LAI. Error out on any + // range changes (e.g. splits/merges) that we haven't applied yet. + lai, desc, err := tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey) + if err != nil { + if errors.HasType(err, &kvpb.RangeKeyMismatchError{}) { + return errors.Wrap(err, "range barrier failed, range split") + } + return errors.Wrap(err, "range barrier failed") + } + if lai == 0 { + // We may be talking to a binary which doesn't support + // BarrierRequest.WithLeaseAppliedIndex, so just degrade to the previous + // (buggy) behavior. + return nil + } + if desc.RangeID != tp.r.RangeID { + return errors.Errorf("range barrier failed, range ID changed: %d -> %s", tp.r.RangeID, desc) + } + if !desc.RSpan().Equal(tp.span) { + return errors.Errorf("range barrier failed, range span changed: %s -> %s", tp.span, desc) + } + + // Wait for the local replica to apply it. In the common case where we are the + // leaseholder, the Barrier call will already have waited for application, so + // this succeeds immediately. + if _, err = tp.r.WaitForLeaseAppliedIndex(ctx, lai); err != nil { + return errors.Wrap(err, "range barrier failed") + } + + return nil +} + // RangeFeed registers a rangefeed over the specified span. It sends updates to // the provided stream and returns with a future error when the rangefeed is // complete. The surrounding store's ConcurrentRequestLimiter is used to limit @@ -369,7 +404,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Create a new rangefeed. desc := r.Desc() - tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r} + tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r, span: desc.RSpan()} cfg := rangefeed.Config{ AmbientContext: r.AmbientContext, Clock: r.Clock(),