diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index a031a8f170f0..7d29b5af9c91 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -110,7 +110,7 @@ func refreshUDT( }); err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, err + return nil, changefeedbase.MarkRetryableError(err) } // Immediately release the lease, since we only need it for the exact // timestamp requested. @@ -144,7 +144,7 @@ func (c *rowFetcherCache) tableDescForKey( if err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, family, err + return nil, family, changefeedbase.MarkRetryableError(err) } tableDesc = desc.Underlying().(catalog.TableDescriptor) // Immediately release the lease, since we only need it for the exact diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index f4f0435f0145..e57f6cba5939 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -249,6 +249,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, ca.sliMetrics) if err != nil { + err = changefeedbase.MarkRetryableError(err) // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() @@ -279,7 +280,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.cancel() return } - + ca.sink = &errorWrapperSink{wrapped: ca.sink} ca.eventConsumer, ca.sink, err = newEventConsumer( ctx, ca.flowCtx, feed, ca.frontier.SpanFrontier(), kvFeedHighWater, ca.sink, feed, ca.spec.Select, ca.knobs, ca.metrics, ca.isSinkless()) @@ -916,6 +917,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.spec.User(), cf.spec.JobID, sli) if err != nil { + err = changefeedbase.MarkRetryableError(err) cf.MoveToDraining(err) return } @@ -924,6 +926,8 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.resolvedBuf = &b.buf } + cf.sink = &errorWrapperSink{wrapped: cf.sink} + cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.spec.JobID != 0 { job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) @@ -1039,6 +1043,8 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad "shut down due to schema change and %s=%q", changefeedbase.OptSchemaChangePolicy, changefeedbase.OptSchemaChangePolicyStop)) + } else { + err = changefeedbase.MarkRetryableError(err) } } @@ -1283,7 +1289,8 @@ func (cf *changeFrontier) checkpointJobProgress( if cf.knobs.RaiseRetryableError != nil { if err := cf.knobs.RaiseRetryableError(); err != nil { - return false, err + return false, changefeedbase.MarkRetryableError( + errors.New("cf.knobs.RaiseRetryableError")) } } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 86c2c8bf593c..763b8e77b25f 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3425,7 +3425,7 @@ func TestChangefeedRetryableError(t *testing.T) { knobs.BeforeEmitRow = func(_ context.Context) error { switch atomic.LoadInt64(&failEmit) { case 1: - return errors.New("synthetic retryable error") + return changefeedbase.MarkRetryableError(fmt.Errorf("synthetic retryable error")) case 2: return changefeedbase.WithTerminalError(errors.New("synthetic terminal error")) default: diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index 54507c7b0c48..3aaeea2493f0 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -77,6 +77,14 @@ func (e *terminalError) Error() string { return "terminal changefeed error" } +// TODO(yevgeniy): retryableError and all machinery related +// to MarkRetryableError maybe removed once 23.1 is released. +type retryableError struct{} + +func (e *retryableError) Error() string { + return "retryable changefeed error" +} + // WithTerminalError decorates underlying error to indicate // that the error is a terminal changefeed error. func WithTerminalError(cause error) error { @@ -86,6 +94,14 @@ func WithTerminalError(cause error) error { return errors.Mark(cause, &terminalError{}) } +// MarkRetryableError wraps the given error, marking it as retryable.s +func MarkRetryableError(cause error) error { + if cause == nil { + return nil + } + return errors.Mark(cause, &retryableError{}) +} + // AsTerminalError determines if the cause error is a terminal changefeed // error. Returns non-nil error if changefeed should terminate with the // returned error. diff --git a/pkg/ccl/changefeedccl/schema_registry.go b/pkg/ccl/changefeedccl/schema_registry.go index bb230dd8c4d3..be35c75d7539 100644 --- a/pkg/ccl/changefeedccl/schema_registry.go +++ b/pkg/ccl/changefeedccl/schema_registry.go @@ -199,7 +199,7 @@ func (r *confluentSchemaRegistry) doWithRetry(ctx context.Context, fn func() err } log.VInfof(ctx, 2, "retrying schema registry operation: %s", err.Error()) } - return err + return changefeedbase.MarkRetryableError(err) } func gracefulClose(ctx context.Context, toClose io.ReadCloser) { diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 804bdf1218b5..6b9d367d8a91 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -298,6 +298,59 @@ func (u *sinkURL) String() string { return u.URL.String() } +// errorWrapperSink delegates to another sink and marks all returned errors as +// retryable. During changefeed setup, we use the sink once without this to +// verify configuration, but in the steady state, no sink error should be +// terminal. +type errorWrapperSink struct { + wrapped externalResource +} + +// EmitRow implements Sink interface. +func (s errorWrapperSink) EmitRow( + ctx context.Context, + topic TopicDescriptor, + key, value []byte, + updated, mvcc hlc.Timestamp, + alloc kvevent.Alloc, +) error { + if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc); err != nil { + return changefeedbase.MarkRetryableError(err) + } + return nil +} + +// EmitResolvedTimestamp implements Sink interface. +func (s errorWrapperSink) EmitResolvedTimestamp( + ctx context.Context, encoder Encoder, resolved hlc.Timestamp, +) error { + if err := s.wrapped.(ResolvedTimestampSink).EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { + return changefeedbase.MarkRetryableError(err) + } + return nil +} + +// Flush implements Sink interface. +func (s errorWrapperSink) Flush(ctx context.Context) error { + if err := s.wrapped.(EventSink).Flush(ctx); err != nil { + return changefeedbase.MarkRetryableError(err) + } + return nil +} + +// Close implements Sink interface. +func (s errorWrapperSink) Close() error { + if err := s.wrapped.Close(); err != nil { + return changefeedbase.MarkRetryableError(err) + } + return nil +} + +// Dial implements Sink interface. +func (s errorWrapperSink) Dial() error { + return s.wrapped.Dial() +} + // encDatumRowBuffer is a FIFO of `EncDatumRow`s. // // TODO(dan): There's some potential allocation savings here by reusing the same