From 7e5ef1c5e35178e64a62957791ae013146b5ae4c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 29 Jan 2024 12:56:18 +0000 Subject: [PATCH] rangefeed: add `TestProcessorContextCancellation` This tests that async tasks spawned by the rangefeed processor are cancelled when the processor shuts down, via context cancellation. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/processor_test.go | 71 ++++++++++++++++++++- pkg/kv/kvserver/rangefeed/task_test.go | 10 +-- 2 files changed, 75 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index ea3768c61a82..8af6c86094c8 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -831,7 +831,9 @@ func TestProcessorTxnPushAttempt(t *testing.T) { // Create a TxnPusher that performs assertions during the first 3 uses. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { // The txns are not in a sorted order. Enforce one. sort.Slice(txns, func(i, j int) bool { return bytes.Compare(txns[i].Key, txns[j].Key) < 0 @@ -988,7 +990,9 @@ func TestProcessorTxnPushDisabled(t *testing.T) { // Set up a txn pusher and processor that errors on any pushes. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { err := errors.Errorf("unexpected txn push for txns=%v ts=%s", txns, ts) t.Errorf("%v", err) return nil, err @@ -1520,3 +1524,66 @@ func TestSizeOfEvent(t *testing.T) { size := int(unsafe.Sizeof(e)) require.Equal(t, 72, size) } + +// TestProcessorContextCancellation tests that the processor cancels the +// contexts of async tasks when stopped. It does not, however, cancel the +// process() context -- it probably should, but this should arguably also be +// handled by the scheduler. +func TestProcessorContextCancellation(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Try stopping both via the stopper and via Processor.Stop(). + testutils.RunTrueAndFalse(t, "stopper", func(t *testing.T, useStopper bool) { + + // Set up a transaction to push. + txnTS := hlc.Timestamp{WallTime: 10} // after resolved timestamp + txnMeta := enginepb.TxnMeta{ + ID: uuid.MakeV4(), Key: keyA, WriteTimestamp: txnTS, MinTimestamp: txnTS} + + // Set up a transaction pusher that will block until the context cancels. + pushReadyC := make(chan struct{}) + pushDoneC := make(chan struct{}) + + var pusher testTxnPusher + pusher.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { + pushReadyC <- struct{}{} + <-ctx.Done() + close(pushDoneC) + return nil, ctx.Err() + }) + pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error { + return nil + }) + + // Start a test processor. + st := cluster.MakeTestingClusterSettings() + p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &pusher, st) + ctx := context.Background() + defer stopper.Stop(ctx) + + // Add an intent and move the closed timestamp past it. This should trigger a + // txn push attempt, wait for that to happen. + p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta)) + p.ForwardClosedTS(ctx, txnTS.Add(1, 0)) + p.syncEventC() + select { + case <-pushReadyC: + case <-time.After(3 * time.Second): + t.Fatal("txn push did not arrive") + } + + // Now, stop the processor, and wait for the txn pusher to exit. + if useStopper { + stopper.Stop(ctx) + } else { + p.Stop() + } + select { + case <-pushDoneC: + case <-time.After(3 * time.Second): + t.Fatal("txn pusher did not exit") + } + }) +} diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index e40d9addd9d4..94b3c95102db 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -369,14 +369,14 @@ func TestInitResolvedTSScan(t *testing.T) { } type testTxnPusher struct { - pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) + pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error) resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error } func (tp *testTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, ) ([]*roachpb.Transaction, error) { - return tp.pushTxnsFn(txns, ts) + return tp.pushTxnsFn(ctx, txns, ts) } func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error { @@ -384,7 +384,7 @@ func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.L } func (tp *testTxnPusher) mockPushTxns( - fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), + fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error), ) { tp.pushTxnsFn = fn } @@ -441,7 +441,9 @@ func TestTxnPushAttempt(t *testing.T) { // Run a txnPushAttempt. var tp testTxnPusher - tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) { + tp.mockPushTxns(func( + ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, + ) ([]*roachpb.Transaction, error) { require.Equal(t, 4, len(txns)) require.Equal(t, txn1Meta, txns[0]) require.Equal(t, txn2Meta, txns[1])