From 856576282672dd6f110026f14617c1cf0fe4f3eb Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 10 Jun 2022 19:00:34 -0400 Subject: [PATCH] changefeedccl: Do not inhibit server shutdown. Informs #82765 Permanet fix is being tracked by the above issue. This is a temporary fix to ensure that change aggregators cancel their context when the server quiesces so that the server shutdown is not inhibited by the running changefeeds. The test for this functionality is not being merged due to the fact that it takes too long to run; however, the test can be seen here: https://github.com/cockroachdb/cockroach/pull/82767 Release Notes (bug fix): Ensure running changefeeds do not inhibit node shutdown. --- pkg/ccl/changefeedccl/changefeed_processors.go | 9 +++------ pkg/ccl/changefeedccl/changefeed_stmt.go | 7 ++++++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index ab8879cade1c..f729b60d857a 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -208,17 +208,14 @@ func (ca *changeAggregator) MustBeStreaming() bool { // Start is part of the RowSource interface. func (ca *changeAggregator) Start(ctx context.Context) { + // Derive a separate context so that we can shutdown the poller. + ctx, ca.cancel = ca.flowCtx.Stopper().WithCancelOnQuiesce(ctx) + if ca.spec.JobID != 0 { ctx = logtags.AddTag(ctx, "job", ca.spec.JobID) } ctx = ca.StartInternal(ctx, changeAggregatorProcName) - // Derive a separate context so that we can shutdown the poller. Note that - // we need to update both ctx (used throughout this function) and - // ProcessorBase.Ctx (used in all other methods) to the new context. - ctx, ca.cancel = context.WithCancel(ctx) - ca.Ctx = ctx - spans, err := ca.setupSpansAndFrontier() endTime := ca.spec.Feed.EndTime diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index d14042ce636b..3bead972b153 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -1030,7 +1030,12 @@ func (b *changefeedResumer) resumeWithRetries( } } - if !changefeedbase.IsRetryableError(err) { + // Retry changefeed is error is retryable. In addition, we want to handle + // context cancellation as retryable, but only if the resumer context has not been cancelled. + // (resumer context is canceled by the jobs framework -- so we should respect it). + isRetryableErr := changefeedbase.IsRetryableError(err) || + (ctx.Err() == nil && errors.Is(err, context.Canceled)) + if !isRetryableErr { if ctx.Err() != nil { return ctx.Err() }