diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index ab8879cade1c..3ef8481d0b7e 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -208,15 +208,15 @@ func (ca *changeAggregator) MustBeStreaming() bool { // Start is part of the RowSource interface. func (ca *changeAggregator) Start(ctx context.Context) { + // Derive a separate context so that we can shutdown the poller. Note that + // we need to update both ctx (used throughout this function) and + // ProcessorBase.Ctx (used in all other methods) to the new context. + ctx, ca.cancel = ca.flowCtx.Stopper().WithCancelOnQuiesce(ctx) + if ca.spec.JobID != 0 { ctx = logtags.AddTag(ctx, "job", ca.spec.JobID) } ctx = ca.StartInternal(ctx, changeAggregatorProcName) - - // Derive a separate context so that we can shutdown the poller. Note that - // we need to update both ctx (used throughout this function) and - // ProcessorBase.Ctx (used in all other methods) to the new context. - ctx, ca.cancel = context.WithCancel(ctx) ca.Ctx = ctx spans, err := ca.setupSpansAndFrontier()