From 1a93051a35d93916c1fd4b09a8638c9f09727e1c Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sun, 23 Oct 2022 14:04:14 -0400 Subject: [PATCH] changefeedccl: Rework error handling Prior to this PR, changefeeds would rely on a white list approach in order to determine which errors were retryable. All other errors would be deemed terminal, causing changefeed to fail. The above approach is brittle, and causes unwanted changefeed termination. This PR changes this approach to treat all errors as retryable, unless otherwise indicated. Errors that are known by changefeed to be fatal are handled explicitly, by marking such errors as terminal. For example, changefeeds would exit if the targetted table is dropped. On the other hand, inability to read this table for any reason would not be treated as terminal. Fixes #90320 Fixes #77549 Fixes #63317 Fixes #71341 Fixes #73016 Informs CRDB-6788 Informs CRDB-7581 Release note (enterprise change): Changefeed will now treat all errors, unless otherwise indicated, as retryable errors. --- pkg/ccl/changefeedccl/BUILD.bazel | 4 +- pkg/ccl/changefeedccl/avro.go | 47 ++++--- pkg/ccl/changefeedccl/cdceval/BUILD.bazel | 1 + pkg/ccl/changefeedccl/cdceval/expr_eval.go | 10 +- pkg/ccl/changefeedccl/cdceval/validation.go | 4 +- .../cdcevent/rowfetcher_cache.go | 4 +- .../changefeedccl/changefeed_processors.go | 21 ++-- pkg/ccl/changefeedccl/changefeed_stmt.go | 62 ++++------ pkg/ccl/changefeedccl/changefeed_test.go | 29 +++-- .../changefeedccl/changefeedbase/BUILD.bazel | 7 +- .../changefeedccl/changefeedbase/errors.go | 115 +++++++----------- .../changefeedvalidators/table_validator.go | 11 ++ pkg/ccl/changefeedccl/encoder_test.go | 8 +- pkg/ccl/changefeedccl/helpers_test.go | 9 +- pkg/ccl/changefeedccl/kvevent/BUILD.bazel | 1 - pkg/ccl/changefeedccl/kvevent/err_buffer.go | 35 ------ pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 5 +- pkg/ccl/changefeedccl/metrics.go | 10 +- pkg/ccl/changefeedccl/retry.go | 67 ++++++++++ pkg/ccl/changefeedccl/schema_registry.go | 2 +- .../changefeedccl/schemafeed/schema_feed.go | 2 +- pkg/ccl/changefeedccl/sink.go | 72 ----------- pkg/ccl/changefeedccl/sink_webhook_test.go | 3 +- pkg/sql/catalog/lease/lease.go | 9 +- pkg/testutils/lint/lint_test.go | 1 + 25 files changed, 246 insertions(+), 293 deletions(-) delete mode 100644 pkg/ccl/changefeedccl/kvevent/err_buffer.go create mode 100644 pkg/ccl/changefeedccl/retry.go diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 3f015970f5c9..5cccc7d139bf 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "metrics.go", "name.go", "parquet_sink_cloudstorage.go", + "retry.go", "schema_registry.go", "scram_client.go", "sink.go", @@ -80,7 +81,6 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/exprutil", - "//pkg/sql/flowinfra", "//pkg/sql/importer", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", @@ -234,6 +234,8 @@ go_test( "//pkg/sql/flowinfra", "//pkg/sql/importer", "//pkg/sql/parser", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", "//pkg/sql/randgen", "//pkg/sql/rowenc", "//pkg/sql/rowenc/keyside", diff --git a/pkg/ccl/changefeedccl/avro.go b/pkg/ccl/changefeedccl/avro.go index e6a81150daeb..eb559a85988e 100644 --- a/pkg/ccl/changefeedccl/avro.go +++ b/pkg/ccl/changefeedccl/avro.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/geo" "github.com/cockroachdb/cockroach/pkg/geo/geopb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -408,8 +409,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { func(d tree.Datum, _ interface{}) (interface{}, error) { date := *d.(*tree.DDate) if !date.IsFinite() { - return nil, errors.Errorf( - `infinite date not yet supported with avro`) + return nil, changefeedbase.WithTerminalError(errors.Errorf( + `infinite date not yet supported with avro`)) } // The avro library requires us to return this as a time.Time. return date.ToTime() @@ -498,8 +499,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { ) case types.DecimalFamily: if typ.Precision() == 0 { - return nil, errors.Errorf( - `decimal with no precision not yet supported with avro`) + return nil, changefeedbase.WithTerminalError(errors.Errorf( + `decimal with no precision not yet supported with avro`)) } width := int(typ.Width()) @@ -595,8 +596,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { case types.ArrayFamily: itemSchema, err := typeToAvroSchema(typ.ArrayContents()) if err != nil { - return nil, errors.Wrapf(err, `could not create item schema for %s`, - typ) + return nil, changefeedbase.WithTerminalError( + errors.Wrapf(err, `could not create item schema for %s`, typ)) } itemUnionKey := avroUnionKey(itemSchema.SchemaType.([]avroSchemaType)[1]) @@ -676,8 +677,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { ) default: - return nil, errors.Errorf(`type %s not yet supported with avro`, - typ.SQLString()) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`type %s not yet supported with avro`, typ.SQLString())) } return schema, nil @@ -688,7 +689,7 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { func columnToAvroSchema(col cdcevent.ResultColumn) (*avroSchemaField, error) { schema, err := typeToAvroSchema(col.Typ) if err != nil { - return nil, errors.Wrapf(err, "column %s", col.Name) + return nil, changefeedbase.WithTerminalError(errors.Wrapf(err, "column %s", col.Name)) } schema.Name = SQLNameToAvroName(col.Name) schema.Metadata = col.SQLStringNotHumanReadable() @@ -790,7 +791,7 @@ func (r *avroDataRecord) rowFromTextual(buf []byte) (rowenc.EncDatumRow, error) return nil, err } if len(newBuf) > 0 { - return nil, errors.New(`only one row was expected`) + return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`)) } return r.rowFromNative(native) } @@ -811,7 +812,7 @@ func (r *avroDataRecord) RowFromBinary(buf []byte) (rowenc.EncDatumRow, error) { return nil, err } if len(newBuf) > 0 { - return nil, errors.New(`only one row was expected`) + return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`)) } return r.rowFromNative(native) } @@ -826,7 +827,8 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) (err error) { fieldIdx, ok := r.fieldIdxByName[col.Name] if !ok { - return errors.AssertionFailedf("could not find avro field for column %s", col.Name) + return changefeedbase.WithTerminalError( + errors.AssertionFailedf("could not find avro field for column %s", col.Name)) } r.native[col.Name], err = r.Fields[fieldIdx].encodeFn(d) return err @@ -840,11 +842,12 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error func (r *avroDataRecord) rowFromNative(native interface{}) (rowenc.EncDatumRow, error) { avroDatums, ok := native.(map[string]interface{}) if !ok { - return nil, errors.Errorf(`unknown avro native type: %T`, native) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown avro native type: %T`, native)) } if len(r.Fields) != len(avroDatums) { - return nil, errors.Errorf( - `expected row with %d columns got %d`, len(r.Fields), len(avroDatums)) + return nil, changefeedbase.WithTerminalError(errors.Errorf( + `expected row with %d columns got %d`, len(r.Fields), len(avroDatums))) } row := make(rowenc.EncDatumRow, len(r.Fields)) @@ -978,7 +981,8 @@ func (r *avroEnvelopeRecord) BinaryFromRow( delete(meta, `updated`) ts, ok := u.(hlc.Timestamp) if !ok { - return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown metadata timestamp type: %T`, u)) } native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } @@ -989,13 +993,14 @@ func (r *avroEnvelopeRecord) BinaryFromRow( delete(meta, `resolved`) ts, ok := u.(hlc.Timestamp) if !ok { - return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown metadata timestamp type: %T`, u)) } native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } } for k := range meta { - return nil, errors.AssertionFailedf(`unhandled meta key: %s`, k) + return nil, changefeedbase.WithTerminalError(errors.AssertionFailedf(`unhandled meta key: %s`, k)) } return r.codec.BinaryFromNative(buf, native) } @@ -1016,10 +1021,12 @@ func (r *avroDataRecord) refreshTypeMetadata(row cdcevent.Row) error { // precision is set) this is roundtripable without information loss. func decimalToRat(dec apd.Decimal, scale int32) (big.Rat, error) { if dec.Form != apd.Finite { - return big.Rat{}, errors.Errorf(`cannot convert %s form decimal`, dec.Form) + return big.Rat{}, changefeedbase.WithTerminalError( + errors.Errorf(`cannot convert %s form decimal`, dec.Form)) } if scale > 0 && scale != -dec.Exponent { - return big.Rat{}, errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale) + return big.Rat{}, changefeedbase.WithTerminalError( + errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale)) } var r big.Rat if dec.Exponent >= 0 { diff --git a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel index fc42be8203a6..af9c2910fd9c 100644 --- a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel @@ -16,6 +16,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/changefeedccl/cdcevent", + "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/clusterversion", "//pkg/jobs/jobspb", "//pkg/keys", diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index 73cd97a7530e..2f79c992c98b 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -13,6 +13,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -73,7 +74,8 @@ func (e *Evaluator) MatchesFilter( ) (_ bool, err error) { defer func() { if pan := recover(); pan != nil { - err = errors.Newf("error while evaluating WHERE clause: %s", pan) + err = changefeedbase.WithTerminalError( + errors.Newf("error while evaluating WHERE clause: %s", pan)) } }() if e.where == nil { @@ -96,7 +98,8 @@ func (e *Evaluator) Projection( ) (_ cdcevent.Row, err error) { defer func() { if pan := recover(); pan != nil { - err = errors.Newf("error while evaluating SELECT clause: %s", pan) + err = changefeedbase.WithTerminalError( + errors.Newf("error while evaluating SELECT clause: %s", pan)) } }() if len(e.selectors) == 0 { @@ -114,7 +117,8 @@ func (e *Evaluator) Projection( func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) (err error) { defer func() { if pan := recover(); pan != nil { - err = errors.Newf("error while validating CHANGEFEED expression: %s", pan) + err = changefeedbase.WithTerminalError( + errors.Newf("error while validating CHANGEFEED expression: %s", pan)) } }() if len(sc.Exprs) == 0 { // Shouldn't happen, but be defensive. diff --git a/pkg/ccl/changefeedccl/cdceval/validation.go b/pkg/ccl/changefeedccl/cdceval/validation.go index 7d6942171bf4..919e39ea4777 100644 --- a/pkg/ccl/changefeedccl/cdceval/validation.go +++ b/pkg/ccl/changefeedccl/cdceval/validation.go @@ -12,6 +12,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql" @@ -47,7 +48,8 @@ func NormalizeAndValidateSelectForTarget( ) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, retErr error) { defer func() { if pan := recover(); pan != nil { - retErr = errors.Newf("low-level error while normalizing expression, probably syntax is unsupported in CREATE CHANGEFEED: %s", pan) + retErr = changefeedbase.WithTerminalError( + errors.Newf("expression currently unsupported in CREATE CHANGEFEED: %s", pan)) } }() execCtx.SemaCtx() diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index 230b680c615f..52cadff8bc08 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -110,7 +110,7 @@ func refreshUDT( }); err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, changefeedbase.MarkRetryableError(err) + return nil, err } // Immediately release the lease, since we only need it for the exact // timestamp requested. @@ -144,7 +144,7 @@ func (c *rowFetcherCache) tableDescForKey( if err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, family, changefeedbase.MarkRetryableError(err) + return nil, family, err } tableDesc = desc.Underlying().(catalog.TableDescriptor) // Immediately release the lease, since we only need it for the exact diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 3c5698025c7a..101d1ecff4e9 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -250,7 +250,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, ca.sliMetrics) if err != nil { - err = changefeedbase.MarkRetryableError(err) // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() @@ -263,8 +262,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.changedRowBuf = &b.buf } - ca.sink = &errorWrapperSink{wrapped: ca.sink} - // If the initial scan was disabled the highwater would've already been forwarded needsInitialScan := ca.frontier.Frontier().IsEmpty() @@ -483,7 +480,6 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet err = nil } } else { - select { // If the poller errored first, that's the // interesting one, so overwrite `err`. @@ -921,7 +917,6 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.spec.User(), cf.spec.JobID, sli) if err != nil { - err = changefeedbase.MarkRetryableError(err) cf.MoveToDraining(err) return } @@ -930,8 +925,6 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.resolvedBuf = &b.buf } - cf.sink = &errorWrapperSink{wrapped: cf.sink} - cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.spec.JobID != 0 { job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) @@ -1000,9 +993,8 @@ func (cf *changeFrontier) close() { cf.closeMetrics() } if cf.sink != nil { - if err := cf.sink.Close(); err != nil { - log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err) - } + // Best effort: context is often cancel by now, so we expect to see an error + _ = cf.sink.Close() } cf.memAcc.Close(cf.Ctx) cf.MemMonitor.Stop(cf.Ctx) @@ -1043,8 +1035,11 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // Detect whether this boundary should be used to kill or restart the // changefeed. - if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART { - err = changefeedbase.MarkRetryableError(err) + if cf.frontier.boundaryType == jobspb.ResolvedSpan_EXIT { + err = changefeedbase.WithTerminalError(errors.Wrapf(err, + "shut down due to schema change and %s=%q", + changefeedbase.OptSchemaChangePolicy, + changefeedbase.OptSchemaChangePolicyStop)) } } @@ -1289,7 +1284,7 @@ func (cf *changeFrontier) checkpointJobProgress( if cf.knobs.RaiseRetryableError != nil { if err := cf.knobs.RaiseRetryableError(); err != nil { - return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError")) + return false, err } } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 18a813be66bf..dc796396a190 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -39,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" @@ -51,19 +50,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) -var changefeedRetryOptions = retry.Options{ - InitialBackoff: 5 * time.Millisecond, - Multiplier: 2, - MaxBackoff: 10 * time.Second, -} - // featureChangefeedEnabled is used to enable and disable the CHANGEFEED feature. var featureChangefeedEnabled = settings.RegisterBoolSetting( settings.TenantWritable, @@ -213,7 +205,6 @@ func changefeedPlanHook( } if details.SinkURI == `` { - p.ExtendedEvalContext().ChangefeedState = &coreChangefeedProgress{ progress: progress, } @@ -231,7 +222,7 @@ func changefeedPlanHook( logChangefeedCreateTelemetry(ctx, jr) var err error - for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); { + for r := getRetry(ctx); r.Next(); { if err = distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh); err == nil { return nil } @@ -242,15 +233,18 @@ func changefeedPlanHook( } } - if !changefeedbase.IsRetryableError(err) { - log.Warningf(ctx, `CHANGEFEED returning with error: %+v`, err) - return err + if err = changefeedbase.AsTerminalError(ctx, p.ExecCfg().LeaseManager, err); err != nil { + break } + // All other errors retry. progress = p.ExtendedEvalContext().ChangefeedState.(*coreChangefeedProgress).progress } + // TODO(yevgeniy): This seems wrong -- core changefeeds always terminate + // with an error. Perhaps rename this telemetry to indicate number of + // completed feeds. telemetry.Count(`changefeed.core.error`) - return changefeedbase.MaybeStripRetryableErrorMarker(err) + return err } // The below block creates the job and protects the data required for the @@ -794,7 +788,7 @@ func validateSink( canarySink, err := getSink(ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, nilOracle, p.User(), jobID, sli) if err != nil { - return changefeedbase.MaybeStripRetryableErrorMarker(err) + return err } if err := canarySink.Close(); err != nil { return err @@ -1002,18 +996,18 @@ func (b *changefeedResumer) resumeWithRetries( // bubbles up to this level, we'd like to "retry" the flow if possible. This // could be because the sink is down or because a cockroach node has crashed // or for many other reasons. - var err error var lastRunStatusUpdate time.Time - for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); { + for r := getRetry(ctx); r.Next(); { // startedCh is normally used to signal back to the creator of the job that // the job has started; however, in this case nothing will ever receive // on the channel, causing the changefeed flow to block. Replace it with // a dummy channel. startedCh := make(chan tree.Datums, 1) - if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil { - return nil + err := distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh) + if err == nil { + return nil // Changefeed completed -- e.g. due to initial_scan=only mode. } if knobs, ok := execCfg.DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok { @@ -1022,30 +1016,16 @@ func (b *changefeedResumer) resumeWithRetries( } } - // Retry changefeed if error is retryable. In addition, we want to handle - // context cancellation as retryable, but only if the resumer context has not been cancelled. - // (resumer context is canceled by the jobs framework -- so we should respect it). - isRetryableErr := changefeedbase.IsRetryableError(err) || - (ctx.Err() == nil && errors.Is(err, context.Canceled)) - if !isRetryableErr { - if ctx.Err() != nil { - return ctx.Err() - } - - if flowinfra.IsFlowRetryableError(err) { - // We don't want to retry flowinfra retryable error in the retry loop above. - // This error currently indicates that this node is being drained. As such, - // retries will not help. - // Instead, we want to make sure that the changefeed job is not marked failed - // due to a transient, retryable error. - err = jobs.MarkAsRetryJobError(err) - _ = b.setJobRunningStatus(ctx, lastRunStatusUpdate, "retryable flow error: %s", err) - } - - log.Warningf(ctx, `CHANGEFEED job %d returning with error: %+v`, jobID, err) + // Terminate changefeed if needed. + if err := changefeedbase.AsTerminalError(ctx, jobExec.ExecCfg().LeaseManager, err); err != nil { + log.Infof(ctx, "CHANGEFEED %d shutting down (cause: %v)", jobID, err) + // Best effort -- update job status to make it clear why changefeed shut down. + // This won't always work if this node is being shutdown/drained. + b.setJobRunningStatus(ctx, time.Time{}, "shutdown due to %s", err) return err } + // All other errors retry. log.Warningf(ctx, `WARNING: CHANGEFEED job %d encountered retryable error: %v`, jobID, err) lastRunStatusUpdate = b.setJobRunningStatus(ctx, lastRunStatusUpdate, "retryable error: %s", err) if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { @@ -1069,7 +1049,7 @@ func (b *changefeedResumer) resumeWithRetries( progress = reloadedJob.Progress() } } - return errors.Wrap(err, `ran out of retries`) + return errors.Wrap(ctx.Err(), `ran out of retries`) } // OnFailOrCancel is part of the jobs.Resumer interface. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index b771986e3853..04ca32255167 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -65,6 +65,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" @@ -3552,9 +3554,9 @@ func TestChangefeedRetryableError(t *testing.T) { knobs.BeforeEmitRow = func(_ context.Context) error { switch atomic.LoadInt64(&failEmit) { case 1: - return changefeedbase.MarkRetryableError(fmt.Errorf("synthetic retryable error")) + return errors.New("synthetic retryable error") case 2: - return fmt.Errorf("synthetic terminal error") + return changefeedbase.WithTerminalError(errors.New("synthetic terminal error")) default: return nil } @@ -4539,7 +4541,7 @@ func TestChangefeedPanicRecovery(t *testing.T) { prep(t, sqlDB) // Check that disallowed expressions have a good error message. // Also regression test for https://github.com/cockroachdb/cockroach/issues/90416 - sqlDB.ExpectErr(t, "syntax is unsupported in CREATE CHANGEFEED", + sqlDB.ExpectErr(t, "expression currently unsupported in CREATE CHANGEFEED", `CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT 1 FROM foo WHERE EXISTS (SELECT true)`) }) @@ -5486,8 +5488,9 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) { opts := makeOptions() defer addCloudStorageOptions(t, &opts)() defer changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)() - + defer testingUseFastRetry()() const numNodes = 3 + perServerKnobs := make(map[int]base.TestServerArgs, numNodes) for i := 0; i < numNodes; i++ { perServerKnobs[i] = base.TestServerArgs{ @@ -5536,10 +5539,18 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) { // Configure changefeed to emit fatal error on the specified nodes. distSQLKnobs := perServerKnobs[n].Knobs.DistSQL.(*execinfra.TestingKnobs) var numEmitted int32 + nodeToFail := n distSQLKnobs.Changefeed.(*TestingKnobs).BeforeEmitRow = func(ctx context.Context) error { // Emit few rows before returning an error. if atomic.AddInt32(&numEmitted, 1) > 10 { - err := errors.Newf("synthetic fatal error from node %d", n) + // Mark error as terminal, but make it a bit more + // interesting by wrapping it few times. + err := errors.Wrap( + changefeedbase.WithTerminalError( + pgerror.Wrapf( + errors.Newf("synthetic fatal error from node %d", nodeToFail), + pgcode.Io, "something happened with IO")), + "while doing something") log.Errorf(ctx, "BeforeEmitRow returning error %s", err) return err } @@ -6252,7 +6263,7 @@ func TestChangefeedOnErrorOption(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail with custom error") + return changefeedbase.WithTerminalError(errors.New("should fail with custom error")) } foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH on_error='pause'`) @@ -6295,7 +6306,7 @@ func TestChangefeedOnErrorOption(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail with custom error") + return changefeedbase.WithTerminalError(errors.New("should fail with custom error")) } foo := feed(t, f, `CREATE CHANGEFEED FOR bar WITH on_error = 'fail'`) @@ -6315,7 +6326,7 @@ func TestChangefeedOnErrorOption(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail with custom error") + return changefeedbase.WithTerminalError(errors.New("should fail with custom error")) } foo := feed(t, f, `CREATE CHANGEFEED FOR quux`) @@ -7405,7 +7416,7 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail") + return changefeedbase.WithTerminalError(errors.New("should fail")) } beforeCreate := timeutil.Now() diff --git a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel index 4bb125e722a1..ec97a6c5db1d 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel +++ b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel @@ -13,15 +13,14 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase", visibility = ["//visibility:public"], deps = [ - "//pkg/jobs/joberror", + "//pkg/jobs", "//pkg/jobs/jobspb", - "//pkg/kv/kvclient/kvcoord", "//pkg/roachpb", "//pkg/settings", - "//pkg/sql", "//pkg/sql/catalog/descpb", - "//pkg/sql/flowinfra", + "//pkg/sql/catalog/lease", "//pkg/util", + "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index 0c1384cb8bfe..e9c4102fa01d 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -9,15 +9,12 @@ package changefeedbase import ( - "fmt" - "reflect" - "strings" + "context" - "github.com/cockroachdb/cockroach/pkg/jobs/joberror" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -75,81 +72,63 @@ func (e *taggedError) Cause() error { return e.wrapped } // planned to be moved to the stdlib in go 1.13. func (e *taggedError) Unwrap() error { return e.wrapped } -const retryableErrorString = "retryable changefeed error" +type terminalError struct{} -type retryableError struct { - wrapped error -} - -// MarkRetryableError wraps the given error, marking it as retryable to -// changefeeds. -func MarkRetryableError(e error) error { - return &retryableError{wrapped: e} +func (e *terminalError) Error() string { + return "terminal changefeed error" } -// Error implements the error interface. -func (e *retryableError) Error() string { - return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error()) +// WithTerminalError decorates underlying error to indicate +// that the error is a terminal changefeed error. +func WithTerminalError(cause error) error { + if cause == nil { + return nil + } + return errors.Mark(cause, &terminalError{}) } -// Cause implements the github.com/pkg/errors.causer interface. -func (e *retryableError) Cause() error { return e.wrapped } - -// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is -// planned to be moved to the stdlib in go 1.13. -func (e *retryableError) Unwrap() error { return e.wrapped } - -// IsRetryableError returns true if the supplied error, or any of its parent -// causes, is a IsRetryableError. -func IsRetryableError(err error) bool { - if err == nil { - return false +// AsTerminalError determines if the cause error is a terminal changefeed +// error. Returns non-nil error if changefeed should terminate with the +// returned error. +func AsTerminalError(ctx context.Context, lm *lease.Manager, cause error) (termErr error) { + defer func() { + log.Infof(ctx, "AsTerminalError for %s returning %s", cause, termErr) + }() + if cause == nil { + return nil } - if errors.HasType(err, (*retryableError)(nil)) { - return true + + if err := ctx.Err(); err != nil { + // If context has been cancelled, we must respect that; this happens + // if, e.g. this changefeed is being cancelled. + return err } - // During node shutdown it is possible for all outgoing transports used by - // the kvfeed to expire, producing a SendError that the node is still able - // to propagate to the frontier. This has been known to happen during - // cluster upgrades. This scenario should not fail the changefeed. - if kvcoord.IsSendError(err) { - return true + if lm.IsDraining() { + // This node is being drained. It's safe to propagate this error (to the + // job registry) since job registry should not be able to commit this error + // to the jobs table; but to be safe, make sure this error is marked as jobs + // retryable error to ensure that some other node retries this changefeed. + return jobs.MarkAsRetryJobError(cause) } - // TODO(knz): this is a bad implementation. Make it go away - // by avoiding string comparisons. - - // If a RetryableError occurs on a remote node, DistSQL serializes it such - // that we can't recover the structure and we have to rely on this - // unfortunate string comparison. - errStr := err.Error() - if strings.Contains(errStr, retryableErrorString) || - strings.Contains(errStr, kvcoord.SendErrorString) || - strings.Contains(errStr, "draining") { - return true + if errors.Is(cause, &terminalError{}) { + return cause } - return (joberror.IsDistSQLRetryableError(err) || - flowinfra.IsNoInboundStreamConnectionError(err) || - errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)) || - errors.Is(err, sql.ErrPlanChanged)) + // All other errors retry. + return nil } -// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the -// RetryableError marker out. This won't do anything if the RetryableError -// itself has been wrapped, but that's okay, we'll just have an uglier string. -func MaybeStripRetryableErrorMarker(err error) error { - // The following is a hack to work around the error cast linter. - // What we're doing here is really not kosher; this function - // has no business in assuming that the retryableError{} wrapper - // has not been wrapped already. We could even expect that - // it gets wrapped in the common case. - // TODO(knz): Remove/replace this. - if reflect.TypeOf(err) == retryableErrorType { - err = errors.UnwrapOnce(err) +// CategorizeError tries to determine if the passed in error should +// be treated as a terminal error. +func CategorizeError(err error) error { + if err == nil { + return err + } + + if errors.HasType(err, (*roachpb.BatchTimestampBeforeGCError)(nil)) { + return WithTerminalError(err) } return err } - -var retryableErrorType = reflect.TypeOf((*retryableError)(nil)) diff --git a/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go b/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go index e7502ac1b0ff..84583aabb32c 100644 --- a/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go +++ b/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go @@ -20,6 +20,17 @@ func ValidateTable( targets changefeedbase.Targets, tableDesc catalog.TableDescriptor, canHandle changefeedbase.CanHandle, +) error { + if err := validateTable(targets, tableDesc, canHandle); err != nil { + return changefeedbase.WithTerminalError(err) + } + return nil +} + +func validateTable( + targets changefeedbase.Targets, + tableDesc catalog.TableDescriptor, + canHandle changefeedbase.CanHandle, ) error { // Technically, the only non-user table known not to work is system.jobs // (which creates a cycle since the resolved timestamp high-water mark is diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index 132f01c0fe13..4415aebac0ee 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -410,9 +410,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { enc, err := getEncoder(opts, targets) require.NoError(t, err) _, err = enc.EncodeKey(context.Background(), rowInsert) - require.EqualError(t, err, fmt.Sprintf("retryable changefeed error: "+ - `contacting confluent schema registry: Post "%s/subjects/foo-key/versions": x509: certificate signed by unknown authority`, - opts.SchemaRegistryURI)) + require.Regexp(t, "x509", err) wrongCert, _, err := cdctest.NewCACertBase64Encoded() require.NoError(t, err) @@ -425,9 +423,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { enc, err = getEncoder(opts, targets) require.NoError(t, err) _, err = enc.EncodeKey(context.Background(), rowInsert) - require.EqualError(t, err, fmt.Sprintf("retryable changefeed error: "+ - `contacting confluent schema registry: Post "%s/subjects/foo-key/versions": x509: certificate signed by unknown authority`, - opts.SchemaRegistryURI)) + require.Regexp(t, `contacting confluent schema registry.*: x509`, err) }) } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 6577bab4998b..70716810cc55 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -389,13 +389,15 @@ func startTestFullServer( options.argsFn(&args) } - ctx := context.Background() + resetRetry := testingUseFastRetry() resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) s, db, _ := serverutils.StartServer(t, args) + ctx := context.Background() cleanup := func() { s.Stopper().Stop(ctx) resetFlushFrequency() + resetRetry() } var err error defer func() { @@ -429,6 +431,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } + resetRetry := testingUseFastRetry() resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) cluster, db, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( t, 3 /* numServers */, knobs, @@ -437,6 +440,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB cleanupAndReset := func() { cleanup() resetFlushFrequency() + resetRetry() } var err error @@ -497,9 +501,10 @@ func startTestTenant( tenantRunner.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...) waitForTenantPodsActive(t, tenantServer, 1) - + resetRetry := testingUseFastRetry() return tenantID, tenantServer, tenantDB, func() { tenantServer.Stopper().Stop(context.Background()) + resetRetry() } } diff --git a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel index 35052ab74d33..a0710fb93de8 100644 --- a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel @@ -8,7 +8,6 @@ go_library( "blocking_buffer.go", "chan_buffer.go", "chunked_event_queue.go", - "err_buffer.go", "event.go", "metrics.go", "throttling_buffer.go", diff --git a/pkg/ccl/changefeedccl/kvevent/err_buffer.go b/pkg/ccl/changefeedccl/kvevent/err_buffer.go deleted file mode 100644 index 06953d63111e..000000000000 --- a/pkg/ccl/changefeedccl/kvevent/err_buffer.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package kvevent - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" -) - -type errorWrapperEventBuffer struct { - Buffer -} - -// NewErrorWrapperEventBuffer returns kvevent Buffer which treats any errors -// as retryable. -func NewErrorWrapperEventBuffer(b Buffer) Buffer { - return &errorWrapperEventBuffer{b} -} - -// Add implements Writer interface. -func (e errorWrapperEventBuffer) Add(ctx context.Context, event Event) error { - if err := e.Buffer.Add(ctx, event); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -var _ Buffer = (*errorWrapperEventBuffer)(nil) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 1725aee5ca0d..cde4c65069a4 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -98,8 +98,7 @@ func Run(ctx context.Context, cfg Config) error { } bf := func() kvevent.Buffer { - return kvevent.NewErrorWrapperEventBuffer( - kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics)) + return kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics) } f := newKVFeed( @@ -115,7 +114,7 @@ func Run(ctx context.Context, cfg Config) error { g := ctxgroup.WithContext(ctx) g.GoCtx(cfg.SchemaFeed.Run) g.GoCtx(f.run) - err := g.Wait() + err := changefeedbase.CategorizeError(g.Wait()) // NB: The higher layers of the changefeed should detect the boundary and the // policy and tear everything down. Returning before the higher layers tear down diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 3496573e5a61..60e53536f226 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -529,10 +530,11 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { if scope != defaultSLIScope { if !enableSLIMetrics { - return nil, errors.WithHint( - pgerror.Newf(pgcode.ConfigurationLimitExceeded, "cannot create metrics scope %q", scope), - "try restarting with COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS=true", - ) + return nil, changefeedbase.WithTerminalError( + errors.WithHint( + pgerror.Newf(pgcode.ConfigurationLimitExceeded, "cannot create metrics scope %q", scope), + "try restarting with COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS=true", + )) } const failSafeMax = 1024 if len(a.mu.sliMetrics) == failSafeMax { diff --git a/pkg/ccl/changefeedccl/retry.go b/pkg/ccl/changefeedccl/retry.go new file mode 100644 index 000000000000..dd708a24c124 --- /dev/null +++ b/pkg/ccl/changefeedccl/retry.go @@ -0,0 +1,67 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +var useFastRetry = false + +// getRetry returns retry object for changefeed. +func getRetry(ctx context.Context) Retry { + opts := retry.Options{ + InitialBackoff: 5 * time.Second, + Multiplier: 2, + MaxBackoff: 10 * time.Minute, + } + + if useFastRetry { + opts = retry.Options{ + InitialBackoff: 5 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 250 * time.Minute, + } + } + + return Retry{Retry: retry.StartWithCtx(ctx, opts)} +} + +func testingUseFastRetry() func() { + useFastRetry = true + return func() { + useFastRetry = false + } +} + +// reset retry state after changefeed ran for that much time +// without errors. +const resetRetryAfter = 10 * time.Minute + +// Retry is a thin wrapper around retry.Retry which +// resets retry state if changefeed been running for sufficiently +// long time. +type Retry struct { + retry.Retry + lastRetry time.Time +} + +func (r *Retry) Next() bool { + defer func() { + r.lastRetry = timeutil.Now() + }() + if timeutil.Since(r.lastRetry) > resetRetryAfter { + r.Reset() + } + return r.Retry.Next() +} diff --git a/pkg/ccl/changefeedccl/schema_registry.go b/pkg/ccl/changefeedccl/schema_registry.go index be35c75d7539..bb230dd8c4d3 100644 --- a/pkg/ccl/changefeedccl/schema_registry.go +++ b/pkg/ccl/changefeedccl/schema_registry.go @@ -199,7 +199,7 @@ func (r *confluentSchemaRegistry) doWithRetry(ctx context.Context, fn func() err } log.VInfof(ctx, 2, "retrying schema registry operation: %s", err.Error()) } - return changefeedbase.MarkRetryableError(err) + return err } func gracefulClose(ctx context.Context, toClose io.ReadCloser) { diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index fd8575cb7614..808aff84d97e 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -559,7 +559,7 @@ func (tf *schemaFeed) validateDescriptor( shouldFilter, err := tf.filter.shouldFilter(ctx, e, tf.targets) log.VEventf(ctx, 1, "validate shouldFilter %v %v", formatEvent(e), shouldFilter) if err != nil { - return err + return changefeedbase.WithTerminalError(err) } if !shouldFilter { // Only sort the tail of the events from earliestTsBeingIngested. diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index ec405c6ca15a..48817fd1050f 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -299,78 +299,6 @@ func (u *sinkURL) String() string { return u.URL.String() } -// errorWrapperSink delegates to another sink and marks all returned errors as -// retryable. During changefeed setup, we use the sink once without this to -// verify configuration, but in the steady state, no sink error should be -// terminal. -type errorWrapperSink struct { - wrapped externalResource -} - -// EmitRow implements Sink interface. -func (s errorWrapperSink) EmitRow( - ctx context.Context, - topic TopicDescriptor, - key, value []byte, - updated, mvcc hlc.Timestamp, - alloc kvevent.Alloc, -) error { - if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -func (s errorWrapperSink) EncodeAndEmitRow( - ctx context.Context, - updatedRow cdcevent.Row, - prevRow cdcevent.Row, - topic TopicDescriptor, - updated, mvcc hlc.Timestamp, - alloc kvevent.Alloc, -) error { - - if sinkWithEncoder, ok := s.wrapped.(SinkWithEncoder); ok { - if err := sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, alloc); err != nil { - return changefeedbase.MarkRetryableError(err) - } - } else { - return errors.AssertionFailedf("Expected a sink with encoder for, found %T", s.wrapped) - } - return nil -} - -// EmitResolvedTimestamp implements Sink interface. -func (s errorWrapperSink) EmitResolvedTimestamp( - ctx context.Context, encoder Encoder, resolved hlc.Timestamp, -) error { - if err := s.wrapped.(ResolvedTimestampSink).EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// Flush implements Sink interface. -func (s errorWrapperSink) Flush(ctx context.Context) error { - if err := s.wrapped.(EventSink).Flush(ctx); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// Close implements Sink interface. -func (s errorWrapperSink) Close() error { - if err := s.wrapped.Close(); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// Dial implements Sink interface. -func (s errorWrapperSink) Dial() error { - return s.wrapped.Dial() -} - // encDatumRowBuffer is a FIFO of `EncDatumRow`s. // // TODO(dan): There's some potential allocation savings here by reusing the same diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 85b8ce91ede7..199734be6bf5 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -171,8 +171,7 @@ func TestWebhookSink(t *testing.T) { // now sink's client accepts no custom certs, should reject the server's cert and fail require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) - require.EqualError(t, sinkSrcNoCert.Flush(context.Background()), - fmt.Sprintf(`Post "%s": x509: certificate signed by unknown authority`, sinkDest.URL())) + require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background())) require.EqualError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), `context canceled`) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 8cb4b2457ced..2afa0a77b9ab 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -502,7 +502,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro lt := logtags.FromContext(ctx) ctx, cancel := m.stopper.WithCancelOnQuiesce(logtags.AddTags(m.ambientCtx.AnnotateCtx(context.Background()), lt)) defer cancel() - if m.isDraining() { + if m.IsDraining() { return nil, errors.New("cannot acquire lease when draining") } newest := m.findNewest(id) @@ -545,7 +545,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro // releaseLease from store. func releaseLease(ctx context.Context, lease *storedLease, m *Manager) { - if m.isDraining() { + if m.IsDraining() { // Release synchronously to guarantee release before exiting. m.storage.release(ctx, m.stopper, lease) return @@ -1010,10 +1010,11 @@ func (m *Manager) Acquire( func (m *Manager) removeOnceDereferenced() bool { return m.storage.testingKnobs.RemoveOnceDereferenced || // Release from the store if the Manager is draining. - m.isDraining() + m.IsDraining() } -func (m *Manager) isDraining() bool { +// IsDraining returns true if this node's lease manager is draining. +func (m *Manager) IsDraining() bool { return m.draining.Load().(bool) } diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 900ce4c359bc..396edf376a0c 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1197,6 +1197,7 @@ func TestLint(t *testing.T) { "*.go", ":!*.pb.go", ":!*.pb.gw.go", + ":!ccl/changefeedccl/changefeedbase/errors.go", ":!kv/kvclient/kvcoord/lock_spans_over_budget_error.go", ":!spanconfig/errors.go", ":!roachpb/replica_unavailable_error.go",