Skip to content

Commit

Permalink
Merge #117859
Browse files Browse the repository at this point in the history
117859: rangefeed: cancel async tasks on processor stop r=erikgrinaker a=erikgrinaker

Previously, async tasks spawned by the rangefeed processor (typically txn pushes and resolved timestamp scans) were not cancelled when the processor was stopped or the stopper quiesced. If these operations stalled, this could lead to goroutine leaks and node shutdown stalls. However, this was mitigated to some extent by the intent resolver itself detecting stopper quiescence.

This patch uses a separate task context for async tasks, which is cancelled either when the processor is stopped or the stopper quiesces.

In general, the rangefeed scheduler shutdown logic could use some improvement, but this patch does not attempt a broader cleanup in the interest of backportability.

Epic: none
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Jan 20, 2024
2 parents c0cdacd + 2520ea7 commit f3c6a84
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 29 deletions.
89 changes: 76 additions & 13 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,20 +191,23 @@ func (h *processorTestHelper) syncEventAndRegistrationsSpan(span roachpb.Span) {
h.sendSpanSync(&span)
}

func (h *processorTestHelper) triggerTxnPushUntilPushed(
t *testing.T, ackC chan struct{}, timeout time.Duration,
) {
deadline := time.After(timeout)
// triggerTxnPushUntilPushed will schedule PushTxnQueued events until pushedC
// indicates that a transaction push attempt has started by posting an event.
// If a push does not happen in 10 seconds, the attempt fails.
func (h *processorTestHelper) triggerTxnPushUntilPushed(t *testing.T, pushedC <-chan struct{}) {
timeoutC := time.After(10 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
h.scheduler.Enqueue(PushTxnQueued)
select {
case <-deadline:
t.Fatal("failed to get txn push notification")
case ackC <- struct{}{}:
case <-pushedC:
return
case <-time.After(100 * time.Millisecond):
case <-ticker.C:
// We keep sending events to avoid the situation where event arrives
// but flag indicating that push is still running is not reset.
case <-timeoutC:
t.Fatal("failed to get txn push notification")
}
}
}
Expand Down Expand Up @@ -995,7 +998,9 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Create a TxnPusher that performs assertions during the first 3 uses.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
tp.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
// The txns are not in a sorted order. Enforce one.
sort.Slice(txns, func(i, j int) bool {
return bytes.Compare(txns[i].Key, txns[j].Key) < 0
Expand Down Expand Up @@ -1048,7 +1053,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
return nil
}

<-pausePushAttemptsC
pausePushAttemptsC <- struct{}{}
<-resumePushAttemptsC
return nil
})
Expand All @@ -1069,7 +1074,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
require.Equal(t, hlc.Timestamp{WallTime: 9}, h.rts.Get())

// Wait for the first txn push attempt to complete.
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second)
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC)

// The resolved timestamp hasn't moved.
h.syncEventC()
Expand All @@ -1083,7 +1088,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Unblock the second txn push attempt and wait for it to complete.
resumePushAttemptsC <- struct{}{}
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second)
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC)

// The resolved timestamp should have moved forwards to the closed
// timestamp.
Expand All @@ -1107,7 +1112,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Unblock the third txn push attempt and wait for it to complete.
resumePushAttemptsC <- struct{}{}
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC, 30*time.Second)
h.triggerTxnPushUntilPushed(t, pausePushAttemptsC)

// The resolved timestamp should have moved forwards to the closed
// timestamp.
Expand Down Expand Up @@ -1637,3 +1642,61 @@ func TestProcessorBackpressure(t *testing.T) {
},
}, events[len(events)-1])
}

// TestProcessorContextCancellation tests that the processor cancels the
// contexts of async tasks when stopped. It does not, however, cancel the
// process() context -- it probably should, but this should arguably also be
// handled by the scheduler.
func TestProcessorContextCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()

// Try stopping both via the stopper and via Processor.Stop().
testutils.RunTrueAndFalse(t, "stopper", func(t *testing.T, useStopper bool) {

// Set up a transaction to push.
txnTS := hlc.Timestamp{WallTime: 10} // after resolved timestamp
txnMeta := enginepb.TxnMeta{
ID: uuid.MakeV4(), Key: keyA, WriteTimestamp: txnTS, MinTimestamp: txnTS}

// Set up a transaction pusher that will block until the context cancels.
pushReadyC := make(chan struct{})
pushDoneC := make(chan struct{})

var pusher testTxnPusher
pusher.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
pushReadyC <- struct{}{}
<-ctx.Done()
close(pushDoneC)
return nil, ctx.Err()
})
pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
return nil
})

// Start a test processor.
p, h, stopper := newTestProcessor(t, withPusher(&pusher))
ctx := context.Background()
defer stopper.Stop(ctx)

// Add an intent and move the closed timestamp past it. This should trigger a
// txn push attempt, wait for that to happen.
p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta))
p.ForwardClosedTS(ctx, txnTS.Add(1, 0))
h.syncEventC()
h.triggerTxnPushUntilPushed(t, pushReadyC)

// Now, stop the processor, and wait for the txn pusher to exit.
if useStopper {
stopper.Stop(ctx)
} else {
p.Stop()
}
select {
case <-pushDoneC:
case <-time.After(3 * time.Second):
t.Fatal("txn pusher did not exit")
}
})
}
32 changes: 22 additions & 10 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,27 @@ type ScheduledProcessor struct {

// processCtx is the annotated background context used for process(). It is
// stored here to avoid reconstructing it on every call.
processCtx context.Context
processCtx context.Context
// taskCtx is the context used to spawn async tasks (e.g. the txn pusher),
// along with its cancel function which is called when the processor stops or
// the stopper quiesces. It is independent of processCtx, and constructed
// during Start().
//
// TODO(erikgrinaker): the context handling here should be cleaned up.
// processCtx should be passed in from the scheduler and propagate stopper
// quiescence, and the async tasks should probably be run on scheduler
// threads or at least a separate bounded worker pool. But this will do for
// now.
taskCtx context.Context
taskCancel func()

requestQueue chan request
eventC chan *event
// If true, processor is not processing data anymore and waiting for registrations
// to be complete.
stopping bool
stoppedC chan struct{}

// Processor startup runs background tasks to scan intents. If processor is
// stopped early, this task needs to be terminated to avoid resource waste.
startupCancel func()
// stopper passed by start that is used for firing up async work from scheduler.
stopper *stop.Stopper
txnPushActive bool
Expand Down Expand Up @@ -94,9 +104,9 @@ func NewScheduledProcessor(cfg Config) *ScheduledProcessor {
func (p *ScheduledProcessor) Start(
stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor,
) error {
ctx := p.Config.AmbientContext.AnnotateCtx(context.Background())
ctx, p.startupCancel = context.WithCancel(ctx)
p.stopper = stopper
p.taskCtx, p.taskCancel = p.stopper.WithCancelOnQuiesce(
p.Config.AmbientContext.AnnotateCtx(context.Background()))

// Note that callback registration must be performed before starting resolved
// timestamp init because resolution posts resolvedTS event when it is done.
Expand All @@ -112,13 +122,14 @@ func (p *ScheduledProcessor) Start(
initScan := newInitResolvedTSScan(p.Span, p, rtsIter)
// TODO(oleg): we need to cap number of tasks that we can fire up across
// all feeds as they could potentially generate O(n) tasks during start.
if err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run); err != nil {
err := stopper.RunAsyncTask(p.taskCtx, "rangefeed: init resolved ts", initScan.Run)
if err != nil {
initScan.Cancel()
p.scheduler.StopProcessor()
return err
}
} else {
p.initResolvedTS(ctx)
p.initResolvedTS(p.taskCtx)
}

p.Metrics.RangeFeedProcessorsScheduler.Inc(1)
Expand Down Expand Up @@ -203,7 +214,7 @@ func (p *ScheduledProcessor) processPushTxn(ctx context.Context) {
p.txnPushActive = true
// TODO(oleg): we need to cap number of tasks that we can fire up across
// all feeds as they could potentially generate O(n) tasks for push.
err := p.stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run)
err := p.stopper.RunAsyncTask(p.taskCtx, "rangefeed: pushing old txns", pushTxns.Run)
if err != nil {
pushTxns.Cancel()
}
Expand Down Expand Up @@ -231,7 +242,7 @@ func (p *ScheduledProcessor) cleanup() {
// Unregister callback from scheduler
p.scheduler.Unregister()

p.startupCancel()
p.taskCancel()
close(p.stoppedC)
p.MemBudget.Close(context.Background())
}
Expand Down Expand Up @@ -343,6 +354,7 @@ func (p *ScheduledProcessor) Register(
}
}
}
// NB: use ctx, not p.taskCtx, as the registry handles teardown itself.
if err := p.Stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
// If we can't schedule internally, processor is already stopped which
// could only happen on shutdown. Disconnect stream and just remove
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func (s *initResolvedTSScan) Run(ctx context.Context) {
defer s.Cancel()
if err := s.iterateAndConsume(ctx); err != nil {
err = errors.Wrap(err, "initial resolved timestamp scan failed")
log.Errorf(ctx, "%v", err)
if ctx.Err() == nil { // cancellation probably caused the error
log.Errorf(ctx, "%v", err)
}
s.p.StopWithErr(kvpb.NewError(err))
} else {
// Inform the processor that its resolved timestamp can be initialized.
Expand Down Expand Up @@ -238,7 +240,9 @@ func newTxnPushAttempt(
func (a *txnPushAttempt) Run(ctx context.Context) {
defer a.Cancel()
if err := a.pushOldTxns(ctx); err != nil {
log.Errorf(ctx, "pushing old intents failed: %v", err)
if ctx.Err() == nil { // cancellation probably caused the error
log.Errorf(ctx, "pushing old intents failed: %v", err)
}
}
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,22 +360,22 @@ func TestInitResolvedTSScan(t *testing.T) {
}

type testTxnPusher struct {
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error
}

func (tp *testTxnPusher) PushTxns(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
return tp.pushTxnsFn(txns, ts)
return tp.pushTxnsFn(ctx, txns, ts)
}

func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error {
return tp.resolveIntentsFn(ctx, intents)
}

func (tp *testTxnPusher) mockPushTxns(
fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
) {
tp.pushTxnsFn = fn
}
Expand Down Expand Up @@ -432,7 +432,9 @@ func TestTxnPushAttempt(t *testing.T) {

// Run a txnPushAttempt.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
tp.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
require.Equal(t, 4, len(txns))
require.Equal(t, txn1Meta, txns[0])
require.Equal(t, txn2Meta, txns[1])
Expand Down

0 comments on commit f3c6a84

Please sign in to comment.