Skip to content

Commit

Permalink
changefeedccl: Do not inhibit server shutdown.
Browse files Browse the repository at this point in the history
Informs cockroachdb#82765

Permanet fix is being tracked by the above issue.
This is a temporary fix to ensure that change aggregators
cancel their context when the server quiesces so that the
server shutdown is not inhibited by the running changefeeds.

The test for this functionality is not being merged due
to the fact that it takes too long to run; however, the test
can be seen here: cockroachdb#82767

Release Notes (bug fix): Ensure running changefeeds do
not inhibit node shutdown.
  • Loading branch information
Yevgeniy Miretskiy committed Jun 13, 2022
1 parent 9064a0c commit 8565762
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
9 changes: 3 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,14 @@ func (ca *changeAggregator) MustBeStreaming() bool {

// Start is part of the RowSource interface.
func (ca *changeAggregator) Start(ctx context.Context) {
// Derive a separate context so that we can shutdown the poller.
ctx, ca.cancel = ca.flowCtx.Stopper().WithCancelOnQuiesce(ctx)

if ca.spec.JobID != 0 {
ctx = logtags.AddTag(ctx, "job", ca.spec.JobID)
}
ctx = ca.StartInternal(ctx, changeAggregatorProcName)

// Derive a separate context so that we can shutdown the poller. Note that
// we need to update both ctx (used throughout this function) and
// ProcessorBase.Ctx (used in all other methods) to the new context.
ctx, ca.cancel = context.WithCancel(ctx)
ca.Ctx = ctx

spans, err := ca.setupSpansAndFrontier()

endTime := ca.spec.Feed.EndTime
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,12 @@ func (b *changefeedResumer) resumeWithRetries(
}
}

if !changefeedbase.IsRetryableError(err) {
// Retry changefeed is error is retryable. In addition, we want to handle
// context cancellation as retryable, but only if the resumer context has not been cancelled.
// (resumer context is canceled by the jobs framework -- so we should respect it).
isRetryableErr := changefeedbase.IsRetryableError(err) ||
(ctx.Err() == nil && errors.Is(err, context.Canceled))
if !isRetryableErr {
if ctx.Err() != nil {
return ctx.Err()
}
Expand Down

0 comments on commit 8565762

Please sign in to comment.