From 9f8281ce4d337d3e1642eda0323912dc6da111c0 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 6 Dec 2022 15:07:37 -0500 Subject: [PATCH] changefeedccl: Fix error handling in mixed version state Prior PR #90810 changed error handling approach used by CDC to be "retry by default". The above PR failed to account for some of the situations that may arrise in mixed version state. In particular, when upgrading, if the aggregator node gets restarted with the new version, while the coordinator still runs old binary, *and* the new aggregator encounters a retryable error, that error would not be explicitly marked as retryable, causing the old coordinator binary to treat the error as a terminal error. Other combination (new coordinator, old aggregator) is not succeptible to this situation. This PR partially reverts changes in the #90810 so previously retryable errors continue to be explicitly marked as retryable. There is no need to introduce version gates since the error handling should be backward compatible, and with this PR, operate correctly in the mixed version state. Epic: None Release note: none --- .../cdcevent/rowfetcher_cache.go | 4 +- .../changefeedccl/changefeed_processors.go | 11 +++- pkg/ccl/changefeedccl/changefeed_test.go | 2 +- .../changefeedccl/changefeedbase/errors.go | 16 ++++++ pkg/ccl/changefeedccl/schema_registry.go | 2 +- pkg/ccl/changefeedccl/sink.go | 53 +++++++++++++++++++ 6 files changed, 82 insertions(+), 6 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index a031a8f170f0..7d29b5af9c91 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -110,7 +110,7 @@ func refreshUDT( }); err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, err + return nil, changefeedbase.MarkRetryableError(err) } // Immediately release the lease, since we only need it for the exact // timestamp requested. @@ -144,7 +144,7 @@ func (c *rowFetcherCache) tableDescForKey( if err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, family, err + return nil, family, changefeedbase.MarkRetryableError(err) } tableDesc = desc.Underlying().(catalog.TableDescriptor) // Immediately release the lease, since we only need it for the exact diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index f4f0435f0145..e57f6cba5939 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -249,6 +249,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, ca.sliMetrics) if err != nil { + err = changefeedbase.MarkRetryableError(err) // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() @@ -279,7 +280,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.cancel() return } - + ca.sink = &errorWrapperSink{wrapped: ca.sink} ca.eventConsumer, ca.sink, err = newEventConsumer( ctx, ca.flowCtx, feed, ca.frontier.SpanFrontier(), kvFeedHighWater, ca.sink, feed, ca.spec.Select, ca.knobs, ca.metrics, ca.isSinkless()) @@ -916,6 +917,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.spec.User(), cf.spec.JobID, sli) if err != nil { + err = changefeedbase.MarkRetryableError(err) cf.MoveToDraining(err) return } @@ -924,6 +926,8 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.resolvedBuf = &b.buf } + cf.sink = &errorWrapperSink{wrapped: cf.sink} + cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.spec.JobID != 0 { job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) @@ -1039,6 +1043,8 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad "shut down due to schema change and %s=%q", changefeedbase.OptSchemaChangePolicy, changefeedbase.OptSchemaChangePolicyStop)) + } else { + err = changefeedbase.MarkRetryableError(err) } } @@ -1283,7 +1289,8 @@ func (cf *changeFrontier) checkpointJobProgress( if cf.knobs.RaiseRetryableError != nil { if err := cf.knobs.RaiseRetryableError(); err != nil { - return false, err + return false, changefeedbase.MarkRetryableError( + errors.New("cf.knobs.RaiseRetryableError")) } } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 86c2c8bf593c..763b8e77b25f 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3425,7 +3425,7 @@ func TestChangefeedRetryableError(t *testing.T) { knobs.BeforeEmitRow = func(_ context.Context) error { switch atomic.LoadInt64(&failEmit) { case 1: - return errors.New("synthetic retryable error") + return changefeedbase.MarkRetryableError(fmt.Errorf("synthetic retryable error")) case 2: return changefeedbase.WithTerminalError(errors.New("synthetic terminal error")) default: diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index 54507c7b0c48..3aaeea2493f0 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -77,6 +77,14 @@ func (e *terminalError) Error() string { return "terminal changefeed error" } +// TODO(yevgeniy): retryableError and all machinery related +// to MarkRetryableError maybe removed once 23.1 is released. +type retryableError struct{} + +func (e *retryableError) Error() string { + return "retryable changefeed error" +} + // WithTerminalError decorates underlying error to indicate // that the error is a terminal changefeed error. func WithTerminalError(cause error) error { @@ -86,6 +94,14 @@ func WithTerminalError(cause error) error { return errors.Mark(cause, &terminalError{}) } +// MarkRetryableError wraps the given error, marking it as retryable.s +func MarkRetryableError(cause error) error { + if cause == nil { + return nil + } + return errors.Mark(cause, &retryableError{}) +} + // AsTerminalError determines if the cause error is a terminal changefeed // error. Returns non-nil error if changefeed should terminate with the // returned error. diff --git a/pkg/ccl/changefeedccl/schema_registry.go b/pkg/ccl/changefeedccl/schema_registry.go index bb230dd8c4d3..be35c75d7539 100644 --- a/pkg/ccl/changefeedccl/schema_registry.go +++ b/pkg/ccl/changefeedccl/schema_registry.go @@ -199,7 +199,7 @@ func (r *confluentSchemaRegistry) doWithRetry(ctx context.Context, fn func() err } log.VInfof(ctx, 2, "retrying schema registry operation: %s", err.Error()) } - return err + return changefeedbase.MarkRetryableError(err) } func gracefulClose(ctx context.Context, toClose io.ReadCloser) { diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 804bdf1218b5..6b9d367d8a91 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -298,6 +298,59 @@ func (u *sinkURL) String() string { return u.URL.String() } +// errorWrapperSink delegates to another sink and marks all returned errors as +// retryable. During changefeed setup, we use the sink once without this to +// verify configuration, but in the steady state, no sink error should be +// terminal. +type errorWrapperSink struct { + wrapped externalResource +} + +// EmitRow implements Sink interface. +func (s errorWrapperSink) EmitRow( + ctx context.Context, + topic TopicDescriptor, + key, value []byte, + updated, mvcc hlc.Timestamp, + alloc kvevent.Alloc, +) error { + if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc); err != nil { + return changefeedbase.MarkRetryableError(err) + } + return nil +} + +// EmitResolvedTimestamp implements Sink interface. +func (s errorWrapperSink) EmitResolvedTimestamp( + ctx context.Context, encoder Encoder, resolved hlc.Timestamp, +) error { + if err := s.wrapped.(ResolvedTimestampSink).EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { + return changefeedbase.MarkRetryableError(err) + } + return nil +} + +// Flush implements Sink interface. +func (s errorWrapperSink) Flush(ctx context.Context) error { + if err := s.wrapped.(EventSink).Flush(ctx); err != nil { + return changefeedbase.MarkRetryableError(err) + } + return nil +} + +// Close implements Sink interface. +func (s errorWrapperSink) Close() error { + if err := s.wrapped.Close(); err != nil { + return changefeedbase.MarkRetryableError(err) + } + return nil +} + +// Dial implements Sink interface. +func (s errorWrapperSink) Dial() error { + return s.wrapped.Dial() +} + // encDatumRowBuffer is a FIFO of `EncDatumRow`s. // // TODO(dan): There's some potential allocation savings here by reusing the same