diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 102a7f855a94..6852cf9f8e52 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -51,7 +51,8 @@ type changeAggregator struct { spec execinfrapb.ChangeAggregatorSpec memAcc mon.BoundAccount - // cancel shuts down the processor, both the `Next()` flow and the kvfeed. + // cancel cancels the context passed to all resources created while starting + // this aggregator. cancel func() // errCh contains the return values of the kvfeed. errCh chan error @@ -59,9 +60,12 @@ type changeAggregator struct { kvFeedDoneCh chan struct{} kvFeedMemMon *mon.BytesMonitor - // drainWatchCh is signaled if the registry on this node is being drained. + // drainWatchCh is signaled if the job registry on this node is being + // drained, which is a proxy for the node being drained. If a drain occurs, + // it will be blocked until we allow it to proceed by calling drainDone(). + // This gives the aggregator time to checkpoint before shutting down. drainWatchCh <-chan struct{} - drainDone func() // Cleanup function for drain watch. + drainDone func() // sink is the Sink to write rows to. Resolved timestamps are never written // by changeAggregator. @@ -287,7 +291,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, recorder) if err != nil { err = changefeedbase.MarkRetryableError(err) - // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() return @@ -312,7 +315,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed) if err != nil { - // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() return @@ -321,9 +323,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.eventConsumer, ca.sink, err = newEventConsumer( ctx, ca.flowCtx.Cfg, ca.spec, feed, ca.frontier.SpanFrontier(), kvFeedHighWater, ca.sink, ca.metrics, ca.sliMetrics, ca.knobs) - if err != nil { - // Early abort in the case that there is an error setting up the consumption. ca.MoveToDraining(err) ca.cancel() return @@ -334,6 +334,26 @@ func (ca *changeAggregator) Start(ctx context.Context) { // Generate expensive checkpoint only after we ran for a while. ca.lastSpanFlush = timeutil.Now() + + if ca.knobs.OnDrain != nil { + ca.drainWatchCh = ca.knobs.OnDrain() + } else { + ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain() + } +} + +// checkForNodeDrain returns an error if the node is draining. +func (ca *changeAggregator) checkForNodeDrain() error { + if ca.drainWatchCh == nil { + return errors.AssertionFailedf("cannot check for node drain if" + + " watch channel is nil") + } + select { + case <-ca.drainWatchCh: + return changefeedbase.ErrNodeDraining + default: + return nil + } } func (ca *changeAggregator) startKVFeed( @@ -355,21 +375,6 @@ func (ca *changeAggregator) startKVFeed( return nil, err } - ca.drainWatchCh, ca.drainDone = ca.flowCtx.Cfg.JobRegistry.OnDrain() - // Arrange for kvFeed to terminate if the job registry is being drained. - kvfeedCfg.FeedWatcher = func(ctx context.Context) error { - if ca.knobs.OnDrain != nil { - ca.drainWatchCh = ca.knobs.OnDrain() - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-ca.drainWatchCh: - return changefeedbase.ErrNodeDraining - } - } - // Give errCh enough buffer both possible errors from supporting goroutines, // but only the first one is ever used. ca.errCh = make(chan error, 2) @@ -575,21 +580,27 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet } } + // As the last gasp before shutdown, transmit an up-to-date frontier + // information to the coordinator. We expect to get this signal via the + // polling below before the drain actually occurs and starts tearing + // things down. + if err := ca.checkForNodeDrain(); err != nil { + nodeID, _ := ca.FlowCtx.Cfg.NodeID.OptionalNodeID() + meta := &execinfrapb.ChangefeedMeta{ + DrainInfo: &execinfrapb.ChangefeedMeta_DrainInfo{NodeID: nodeID}, + Checkpoint: getFrontierSpans(), + } + + ca.AppendTrailingMeta(execinfrapb.ProducerMetadata{Changefeed: meta}) + ca.shutdownCheckpointEmitted = true + ca.cancel() + ca.MoveToDraining(err) + break + } + if err := ca.tick(); err != nil { var e kvevent.ErrBufferClosed - if errors.Is(err, changefeedbase.ErrNodeDraining) { - err = changefeedbase.ErrNodeDraining - // As the last gasp before shutdown, transmit an up-to-date frontier - // information to the coordinator. - nodeID, _ := ca.FlowCtx.Cfg.NodeID.OptionalNodeID() - meta := &execinfrapb.ChangefeedMeta{ - DrainInfo: &execinfrapb.ChangefeedMeta_DrainInfo{NodeID: nodeID}, - Checkpoint: getFrontierSpans(), - } - - ca.AppendTrailingMeta(execinfrapb.ProducerMetadata{Changefeed: meta}) - ca.shutdownCheckpointEmitted = true - } else if errors.As(err, &e) { + if errors.As(err, &e) { // ErrBufferClosed is a signal that our kvfeed has exited expectedly. err = e.Unwrap() if errors.Is(err, kvevent.ErrNormalRestartReason) { diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 18f99e47567b..742dabbb4425 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -53,10 +53,6 @@ type Config struct { SchemaChangePolicy changefeedbase.SchemaChangePolicy SchemaFeed schemafeed.SchemaFeed - // FeedWatcher function is invoked along with the kv/schema feed. - // It may return an error which will cause kv feed to exit. - FeedWatcher func(ctx context.Context) error - // If true, the feed will begin with a dump of data at exactly the // InitialHighWater. This is a peculiar behavior. In general the // InitialHighWater is a point in time at which all data is known to have @@ -115,10 +111,8 @@ func Run(ctx context.Context, cfg Config) error { g := ctxgroup.WithContext(ctx) g.GoCtx(cfg.SchemaFeed.Run) g.GoCtx(f.run) - if cfg.FeedWatcher != nil { - g.GoCtx(cfg.FeedWatcher) - } err := g.Wait() + log.Errorf(ctx, "kvfeed OVERALL err %v\n", err) // NB: The higher layers of the changefeed should detect the boundary and the // policy and tear everything down. Returning before the higher layers tear down