Skip to content

Commit

Permalink
changefeedccl: Fix error handling in mixed version state
Browse files Browse the repository at this point in the history
Prior PR #90810 changed error handling approach used
by CDC to be "retry by default".

The above PR failed to account for some of the
situations that may arrise in mixed version state.
In particular, when upgrading, if the aggregator node
gets restarted with the new version, while the coordinator
still runs old binary, *and* the new aggregator encounters
a retryable error, that error would not be explicitly marked
as retryable, causing the old coordinator binary to treat
the error as a terminal error.

Other combination (new coordinator, old aggregator) is not
succeptible to this situation.

This PR partially reverts changes in the #90810 so previously
retryable errors continue to be explicitly marked as retryable.

There is no need to introduce version gates since the error
handling should be backward compatible, and with this PR, operate
correctly in the mixed version state.

Epic: None

Release note: none
  • Loading branch information
Yevgeniy Miretskiy committed Dec 7, 2022
1 parent d64538f commit 9f8281c
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func refreshUDT(
}); err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, err
return nil, changefeedbase.MarkRetryableError(err)
}
// Immediately release the lease, since we only need it for the exact
// timestamp requested.
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *rowFetcherCache) tableDescForKey(
if err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, family, err
return nil, family, changefeedbase.MarkRetryableError(err)
}
tableDesc = desc.Underlying().(catalog.TableDescriptor)
// Immediately release the lease, since we only need it for the exact
Expand Down
11 changes: 9 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.spec.User(), ca.spec.JobID, ca.sliMetrics)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
Expand Down Expand Up @@ -279,7 +280,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.cancel()
return
}

ca.sink = &errorWrapperSink{wrapped: ca.sink}
ca.eventConsumer, ca.sink, err = newEventConsumer(
ctx, ca.flowCtx, feed, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, feed, ca.spec.Select, ca.knobs, ca.metrics, ca.isSinkless())
Expand Down Expand Up @@ -916,6 +917,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.spec.User(), cf.spec.JobID, sli)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
cf.MoveToDraining(err)
return
}
Expand All @@ -924,6 +926,8 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.resolvedBuf = &b.buf
}

cf.sink = &errorWrapperSink{wrapped: cf.sink}

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
Expand Down Expand Up @@ -1039,6 +1043,8 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
"shut down due to schema change and %s=%q",
changefeedbase.OptSchemaChangePolicy,
changefeedbase.OptSchemaChangePolicyStop))
} else {
err = changefeedbase.MarkRetryableError(err)
}
}

Expand Down Expand Up @@ -1283,7 +1289,8 @@ func (cf *changeFrontier) checkpointJobProgress(

if cf.knobs.RaiseRetryableError != nil {
if err := cf.knobs.RaiseRetryableError(); err != nil {
return false, err
return false, changefeedbase.MarkRetryableError(
errors.New("cf.knobs.RaiseRetryableError"))
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3425,7 +3425,7 @@ func TestChangefeedRetryableError(t *testing.T) {
knobs.BeforeEmitRow = func(_ context.Context) error {
switch atomic.LoadInt64(&failEmit) {
case 1:
return errors.New("synthetic retryable error")
return changefeedbase.MarkRetryableError(fmt.Errorf("synthetic retryable error"))
case 2:
return changefeedbase.WithTerminalError(errors.New("synthetic terminal error"))
default:
Expand Down
16 changes: 16 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func (e *terminalError) Error() string {
return "terminal changefeed error"
}

// TODO(yevgeniy): retryableError and all machinery related
// to MarkRetryableError maybe removed once 23.1 is released.
type retryableError struct{}

func (e *retryableError) Error() string {
return "retryable changefeed error"
}

// WithTerminalError decorates underlying error to indicate
// that the error is a terminal changefeed error.
func WithTerminalError(cause error) error {
Expand All @@ -86,6 +94,14 @@ func WithTerminalError(cause error) error {
return errors.Mark(cause, &terminalError{})
}

// MarkRetryableError wraps the given error, marking it as retryable.s
func MarkRetryableError(cause error) error {
if cause == nil {
return nil
}
return errors.Mark(cause, &retryableError{})
}

// AsTerminalError determines if the cause error is a terminal changefeed
// error. Returns non-nil error if changefeed should terminate with the
// returned error.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (r *confluentSchemaRegistry) doWithRetry(ctx context.Context, fn func() err
}
log.VInfof(ctx, 2, "retrying schema registry operation: %s", err.Error())
}
return err
return changefeedbase.MarkRetryableError(err)
}

func gracefulClose(ctx context.Context, toClose io.ReadCloser) {
Expand Down
53 changes: 53 additions & 0 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,59 @@ func (u *sinkURL) String() string {
return u.URL.String()
}

// errorWrapperSink delegates to another sink and marks all returned errors as
// retryable. During changefeed setup, we use the sink once without this to
// verify configuration, but in the steady state, no sink error should be
// terminal.
type errorWrapperSink struct {
wrapped externalResource
}

// EmitRow implements Sink interface.
func (s errorWrapperSink) EmitRow(
ctx context.Context,
topic TopicDescriptor,
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
) error {
if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc); err != nil {
return changefeedbase.MarkRetryableError(err)
}
return nil
}

// EmitResolvedTimestamp implements Sink interface.
func (s errorWrapperSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
if err := s.wrapped.(ResolvedTimestampSink).EmitResolvedTimestamp(ctx, encoder, resolved); err != nil {
return changefeedbase.MarkRetryableError(err)
}
return nil
}

// Flush implements Sink interface.
func (s errorWrapperSink) Flush(ctx context.Context) error {
if err := s.wrapped.(EventSink).Flush(ctx); err != nil {
return changefeedbase.MarkRetryableError(err)
}
return nil
}

// Close implements Sink interface.
func (s errorWrapperSink) Close() error {
if err := s.wrapped.Close(); err != nil {
return changefeedbase.MarkRetryableError(err)
}
return nil
}

// Dial implements Sink interface.
func (s errorWrapperSink) Dial() error {
return s.wrapped.Dial()
}

// encDatumRowBuffer is a FIFO of `EncDatumRow`s.
//
// TODO(dan): There's some potential allocation savings here by reusing the same
Expand Down

0 comments on commit 9f8281c

Please sign in to comment.