diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 3f015970f5c9..5cccc7d139bf 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "metrics.go", "name.go", "parquet_sink_cloudstorage.go", + "retry.go", "schema_registry.go", "scram_client.go", "sink.go", @@ -80,7 +81,6 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/exprutil", - "//pkg/sql/flowinfra", "//pkg/sql/importer", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", @@ -234,6 +234,8 @@ go_test( "//pkg/sql/flowinfra", "//pkg/sql/importer", "//pkg/sql/parser", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", "//pkg/sql/randgen", "//pkg/sql/rowenc", "//pkg/sql/rowenc/keyside", diff --git a/pkg/ccl/changefeedccl/avro.go b/pkg/ccl/changefeedccl/avro.go index e6a81150daeb..eb559a85988e 100644 --- a/pkg/ccl/changefeedccl/avro.go +++ b/pkg/ccl/changefeedccl/avro.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/geo" "github.com/cockroachdb/cockroach/pkg/geo/geopb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -408,8 +409,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { func(d tree.Datum, _ interface{}) (interface{}, error) { date := *d.(*tree.DDate) if !date.IsFinite() { - return nil, errors.Errorf( - `infinite date not yet supported with avro`) + return nil, changefeedbase.WithTerminalError(errors.Errorf( + `infinite date not yet supported with avro`)) } // The avro library requires us to return this as a time.Time. return date.ToTime() @@ -498,8 +499,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { ) case types.DecimalFamily: if typ.Precision() == 0 { - return nil, errors.Errorf( - `decimal with no precision not yet supported with avro`) + return nil, changefeedbase.WithTerminalError(errors.Errorf( + `decimal with no precision not yet supported with avro`)) } width := int(typ.Width()) @@ -595,8 +596,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { case types.ArrayFamily: itemSchema, err := typeToAvroSchema(typ.ArrayContents()) if err != nil { - return nil, errors.Wrapf(err, `could not create item schema for %s`, - typ) + return nil, changefeedbase.WithTerminalError( + errors.Wrapf(err, `could not create item schema for %s`, typ)) } itemUnionKey := avroUnionKey(itemSchema.SchemaType.([]avroSchemaType)[1]) @@ -676,8 +677,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { ) default: - return nil, errors.Errorf(`type %s not yet supported with avro`, - typ.SQLString()) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`type %s not yet supported with avro`, typ.SQLString())) } return schema, nil @@ -688,7 +689,7 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) { func columnToAvroSchema(col cdcevent.ResultColumn) (*avroSchemaField, error) { schema, err := typeToAvroSchema(col.Typ) if err != nil { - return nil, errors.Wrapf(err, "column %s", col.Name) + return nil, changefeedbase.WithTerminalError(errors.Wrapf(err, "column %s", col.Name)) } schema.Name = SQLNameToAvroName(col.Name) schema.Metadata = col.SQLStringNotHumanReadable() @@ -790,7 +791,7 @@ func (r *avroDataRecord) rowFromTextual(buf []byte) (rowenc.EncDatumRow, error) return nil, err } if len(newBuf) > 0 { - return nil, errors.New(`only one row was expected`) + return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`)) } return r.rowFromNative(native) } @@ -811,7 +812,7 @@ func (r *avroDataRecord) RowFromBinary(buf []byte) (rowenc.EncDatumRow, error) { return nil, err } if len(newBuf) > 0 { - return nil, errors.New(`only one row was expected`) + return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`)) } return r.rowFromNative(native) } @@ -826,7 +827,8 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) (err error) { fieldIdx, ok := r.fieldIdxByName[col.Name] if !ok { - return errors.AssertionFailedf("could not find avro field for column %s", col.Name) + return changefeedbase.WithTerminalError( + errors.AssertionFailedf("could not find avro field for column %s", col.Name)) } r.native[col.Name], err = r.Fields[fieldIdx].encodeFn(d) return err @@ -840,11 +842,12 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error func (r *avroDataRecord) rowFromNative(native interface{}) (rowenc.EncDatumRow, error) { avroDatums, ok := native.(map[string]interface{}) if !ok { - return nil, errors.Errorf(`unknown avro native type: %T`, native) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown avro native type: %T`, native)) } if len(r.Fields) != len(avroDatums) { - return nil, errors.Errorf( - `expected row with %d columns got %d`, len(r.Fields), len(avroDatums)) + return nil, changefeedbase.WithTerminalError(errors.Errorf( + `expected row with %d columns got %d`, len(r.Fields), len(avroDatums))) } row := make(rowenc.EncDatumRow, len(r.Fields)) @@ -978,7 +981,8 @@ func (r *avroEnvelopeRecord) BinaryFromRow( delete(meta, `updated`) ts, ok := u.(hlc.Timestamp) if !ok { - return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown metadata timestamp type: %T`, u)) } native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } @@ -989,13 +993,14 @@ func (r *avroEnvelopeRecord) BinaryFromRow( delete(meta, `resolved`) ts, ok := u.(hlc.Timestamp) if !ok { - return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u) + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown metadata timestamp type: %T`, u)) } native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } } for k := range meta { - return nil, errors.AssertionFailedf(`unhandled meta key: %s`, k) + return nil, changefeedbase.WithTerminalError(errors.AssertionFailedf(`unhandled meta key: %s`, k)) } return r.codec.BinaryFromNative(buf, native) } @@ -1016,10 +1021,12 @@ func (r *avroDataRecord) refreshTypeMetadata(row cdcevent.Row) error { // precision is set) this is roundtripable without information loss. func decimalToRat(dec apd.Decimal, scale int32) (big.Rat, error) { if dec.Form != apd.Finite { - return big.Rat{}, errors.Errorf(`cannot convert %s form decimal`, dec.Form) + return big.Rat{}, changefeedbase.WithTerminalError( + errors.Errorf(`cannot convert %s form decimal`, dec.Form)) } if scale > 0 && scale != -dec.Exponent { - return big.Rat{}, errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale) + return big.Rat{}, changefeedbase.WithTerminalError( + errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale)) } var r big.Rat if dec.Exponent >= 0 { diff --git a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel index fc42be8203a6..af9c2910fd9c 100644 --- a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel @@ -16,6 +16,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/changefeedccl/cdcevent", + "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/clusterversion", "//pkg/jobs/jobspb", "//pkg/keys", diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index 73cd97a7530e..2f79c992c98b 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -13,6 +13,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -73,7 +74,8 @@ func (e *Evaluator) MatchesFilter( ) (_ bool, err error) { defer func() { if pan := recover(); pan != nil { - err = errors.Newf("error while evaluating WHERE clause: %s", pan) + err = changefeedbase.WithTerminalError( + errors.Newf("error while evaluating WHERE clause: %s", pan)) } }() if e.where == nil { @@ -96,7 +98,8 @@ func (e *Evaluator) Projection( ) (_ cdcevent.Row, err error) { defer func() { if pan := recover(); pan != nil { - err = errors.Newf("error while evaluating SELECT clause: %s", pan) + err = changefeedbase.WithTerminalError( + errors.Newf("error while evaluating SELECT clause: %s", pan)) } }() if len(e.selectors) == 0 { @@ -114,7 +117,8 @@ func (e *Evaluator) Projection( func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) (err error) { defer func() { if pan := recover(); pan != nil { - err = errors.Newf("error while validating CHANGEFEED expression: %s", pan) + err = changefeedbase.WithTerminalError( + errors.Newf("error while validating CHANGEFEED expression: %s", pan)) } }() if len(sc.Exprs) == 0 { // Shouldn't happen, but be defensive. diff --git a/pkg/ccl/changefeedccl/cdceval/validation.go b/pkg/ccl/changefeedccl/cdceval/validation.go index 7d6942171bf4..919e39ea4777 100644 --- a/pkg/ccl/changefeedccl/cdceval/validation.go +++ b/pkg/ccl/changefeedccl/cdceval/validation.go @@ -12,6 +12,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql" @@ -47,7 +48,8 @@ func NormalizeAndValidateSelectForTarget( ) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, retErr error) { defer func() { if pan := recover(); pan != nil { - retErr = errors.Newf("low-level error while normalizing expression, probably syntax is unsupported in CREATE CHANGEFEED: %s", pan) + retErr = changefeedbase.WithTerminalError( + errors.Newf("expression currently unsupported in CREATE CHANGEFEED: %s", pan)) } }() execCtx.SemaCtx() diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index 230b680c615f..52cadff8bc08 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -110,7 +110,7 @@ func refreshUDT( }); err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, changefeedbase.MarkRetryableError(err) + return nil, err } // Immediately release the lease, since we only need it for the exact // timestamp requested. @@ -144,7 +144,7 @@ func (c *rowFetcherCache) tableDescForKey( if err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, family, changefeedbase.MarkRetryableError(err) + return nil, family, err } tableDesc = desc.Underlying().(catalog.TableDescriptor) // Immediately release the lease, since we only need it for the exact diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 3c5698025c7a..101d1ecff4e9 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -250,7 +250,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.spec.User(), ca.spec.JobID, ca.sliMetrics) if err != nil { - err = changefeedbase.MarkRetryableError(err) // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() @@ -263,8 +262,6 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.changedRowBuf = &b.buf } - ca.sink = &errorWrapperSink{wrapped: ca.sink} - // If the initial scan was disabled the highwater would've already been forwarded needsInitialScan := ca.frontier.Frontier().IsEmpty() @@ -483,7 +480,6 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet err = nil } } else { - select { // If the poller errored first, that's the // interesting one, so overwrite `err`. @@ -921,7 +917,6 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.spec.User(), cf.spec.JobID, sli) if err != nil { - err = changefeedbase.MarkRetryableError(err) cf.MoveToDraining(err) return } @@ -930,8 +925,6 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.resolvedBuf = &b.buf } - cf.sink = &errorWrapperSink{wrapped: cf.sink} - cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.spec.JobID != 0 { job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) @@ -1000,9 +993,8 @@ func (cf *changeFrontier) close() { cf.closeMetrics() } if cf.sink != nil { - if err := cf.sink.Close(); err != nil { - log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err) - } + // Best effort: context is often cancel by now, so we expect to see an error + _ = cf.sink.Close() } cf.memAcc.Close(cf.Ctx) cf.MemMonitor.Stop(cf.Ctx) @@ -1043,8 +1035,11 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // Detect whether this boundary should be used to kill or restart the // changefeed. - if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART { - err = changefeedbase.MarkRetryableError(err) + if cf.frontier.boundaryType == jobspb.ResolvedSpan_EXIT { + err = changefeedbase.WithTerminalError(errors.Wrapf(err, + "shut down due to schema change and %s=%q", + changefeedbase.OptSchemaChangePolicy, + changefeedbase.OptSchemaChangePolicyStop)) } } @@ -1289,7 +1284,7 @@ func (cf *changeFrontier) checkpointJobProgress( if cf.knobs.RaiseRetryableError != nil { if err := cf.knobs.RaiseRetryableError(); err != nil { - return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError")) + return false, err } } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 18a813be66bf..dc796396a190 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -39,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" @@ -51,19 +50,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) -var changefeedRetryOptions = retry.Options{ - InitialBackoff: 5 * time.Millisecond, - Multiplier: 2, - MaxBackoff: 10 * time.Second, -} - // featureChangefeedEnabled is used to enable and disable the CHANGEFEED feature. var featureChangefeedEnabled = settings.RegisterBoolSetting( settings.TenantWritable, @@ -213,7 +205,6 @@ func changefeedPlanHook( } if details.SinkURI == `` { - p.ExtendedEvalContext().ChangefeedState = &coreChangefeedProgress{ progress: progress, } @@ -231,7 +222,7 @@ func changefeedPlanHook( logChangefeedCreateTelemetry(ctx, jr) var err error - for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); { + for r := getRetry(ctx); r.Next(); { if err = distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh); err == nil { return nil } @@ -242,15 +233,18 @@ func changefeedPlanHook( } } - if !changefeedbase.IsRetryableError(err) { - log.Warningf(ctx, `CHANGEFEED returning with error: %+v`, err) - return err + if err = changefeedbase.AsTerminalError(ctx, p.ExecCfg().LeaseManager, err); err != nil { + break } + // All other errors retry. progress = p.ExtendedEvalContext().ChangefeedState.(*coreChangefeedProgress).progress } + // TODO(yevgeniy): This seems wrong -- core changefeeds always terminate + // with an error. Perhaps rename this telemetry to indicate number of + // completed feeds. telemetry.Count(`changefeed.core.error`) - return changefeedbase.MaybeStripRetryableErrorMarker(err) + return err } // The below block creates the job and protects the data required for the @@ -794,7 +788,7 @@ func validateSink( canarySink, err := getSink(ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, nilOracle, p.User(), jobID, sli) if err != nil { - return changefeedbase.MaybeStripRetryableErrorMarker(err) + return err } if err := canarySink.Close(); err != nil { return err @@ -1002,18 +996,18 @@ func (b *changefeedResumer) resumeWithRetries( // bubbles up to this level, we'd like to "retry" the flow if possible. This // could be because the sink is down or because a cockroach node has crashed // or for many other reasons. - var err error var lastRunStatusUpdate time.Time - for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); { + for r := getRetry(ctx); r.Next(); { // startedCh is normally used to signal back to the creator of the job that // the job has started; however, in this case nothing will ever receive // on the channel, causing the changefeed flow to block. Replace it with // a dummy channel. startedCh := make(chan tree.Datums, 1) - if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil { - return nil + err := distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh) + if err == nil { + return nil // Changefeed completed -- e.g. due to initial_scan=only mode. } if knobs, ok := execCfg.DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok { @@ -1022,30 +1016,16 @@ func (b *changefeedResumer) resumeWithRetries( } } - // Retry changefeed if error is retryable. In addition, we want to handle - // context cancellation as retryable, but only if the resumer context has not been cancelled. - // (resumer context is canceled by the jobs framework -- so we should respect it). - isRetryableErr := changefeedbase.IsRetryableError(err) || - (ctx.Err() == nil && errors.Is(err, context.Canceled)) - if !isRetryableErr { - if ctx.Err() != nil { - return ctx.Err() - } - - if flowinfra.IsFlowRetryableError(err) { - // We don't want to retry flowinfra retryable error in the retry loop above. - // This error currently indicates that this node is being drained. As such, - // retries will not help. - // Instead, we want to make sure that the changefeed job is not marked failed - // due to a transient, retryable error. - err = jobs.MarkAsRetryJobError(err) - _ = b.setJobRunningStatus(ctx, lastRunStatusUpdate, "retryable flow error: %s", err) - } - - log.Warningf(ctx, `CHANGEFEED job %d returning with error: %+v`, jobID, err) + // Terminate changefeed if needed. + if err := changefeedbase.AsTerminalError(ctx, jobExec.ExecCfg().LeaseManager, err); err != nil { + log.Infof(ctx, "CHANGEFEED %d shutting down (cause: %v)", jobID, err) + // Best effort -- update job status to make it clear why changefeed shut down. + // This won't always work if this node is being shutdown/drained. + b.setJobRunningStatus(ctx, time.Time{}, "shutdown due to %s", err) return err } + // All other errors retry. log.Warningf(ctx, `WARNING: CHANGEFEED job %d encountered retryable error: %v`, jobID, err) lastRunStatusUpdate = b.setJobRunningStatus(ctx, lastRunStatusUpdate, "retryable error: %s", err) if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { @@ -1069,7 +1049,7 @@ func (b *changefeedResumer) resumeWithRetries( progress = reloadedJob.Progress() } } - return errors.Wrap(err, `ran out of retries`) + return errors.Wrap(ctx.Err(), `ran out of retries`) } // OnFailOrCancel is part of the jobs.Resumer interface. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index b771986e3853..04ca32255167 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -65,6 +65,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" @@ -3552,9 +3554,9 @@ func TestChangefeedRetryableError(t *testing.T) { knobs.BeforeEmitRow = func(_ context.Context) error { switch atomic.LoadInt64(&failEmit) { case 1: - return changefeedbase.MarkRetryableError(fmt.Errorf("synthetic retryable error")) + return errors.New("synthetic retryable error") case 2: - return fmt.Errorf("synthetic terminal error") + return changefeedbase.WithTerminalError(errors.New("synthetic terminal error")) default: return nil } @@ -4539,7 +4541,7 @@ func TestChangefeedPanicRecovery(t *testing.T) { prep(t, sqlDB) // Check that disallowed expressions have a good error message. // Also regression test for https://github.com/cockroachdb/cockroach/issues/90416 - sqlDB.ExpectErr(t, "syntax is unsupported in CREATE CHANGEFEED", + sqlDB.ExpectErr(t, "expression currently unsupported in CREATE CHANGEFEED", `CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT 1 FROM foo WHERE EXISTS (SELECT true)`) }) @@ -5486,8 +5488,9 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) { opts := makeOptions() defer addCloudStorageOptions(t, &opts)() defer changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)() - + defer testingUseFastRetry()() const numNodes = 3 + perServerKnobs := make(map[int]base.TestServerArgs, numNodes) for i := 0; i < numNodes; i++ { perServerKnobs[i] = base.TestServerArgs{ @@ -5536,10 +5539,18 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) { // Configure changefeed to emit fatal error on the specified nodes. distSQLKnobs := perServerKnobs[n].Knobs.DistSQL.(*execinfra.TestingKnobs) var numEmitted int32 + nodeToFail := n distSQLKnobs.Changefeed.(*TestingKnobs).BeforeEmitRow = func(ctx context.Context) error { // Emit few rows before returning an error. if atomic.AddInt32(&numEmitted, 1) > 10 { - err := errors.Newf("synthetic fatal error from node %d", n) + // Mark error as terminal, but make it a bit more + // interesting by wrapping it few times. + err := errors.Wrap( + changefeedbase.WithTerminalError( + pgerror.Wrapf( + errors.Newf("synthetic fatal error from node %d", nodeToFail), + pgcode.Io, "something happened with IO")), + "while doing something") log.Errorf(ctx, "BeforeEmitRow returning error %s", err) return err } @@ -6252,7 +6263,7 @@ func TestChangefeedOnErrorOption(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail with custom error") + return changefeedbase.WithTerminalError(errors.New("should fail with custom error")) } foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH on_error='pause'`) @@ -6295,7 +6306,7 @@ func TestChangefeedOnErrorOption(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail with custom error") + return changefeedbase.WithTerminalError(errors.New("should fail with custom error")) } foo := feed(t, f, `CREATE CHANGEFEED FOR bar WITH on_error = 'fail'`) @@ -6315,7 +6326,7 @@ func TestChangefeedOnErrorOption(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail with custom error") + return changefeedbase.WithTerminalError(errors.New("should fail with custom error")) } foo := feed(t, f, `CREATE CHANGEFEED FOR quux`) @@ -7405,7 +7416,7 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) { DistSQL.(*execinfra.TestingKnobs). Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = func(_ context.Context) error { - return errors.Errorf("should fail") + return changefeedbase.WithTerminalError(errors.New("should fail")) } beforeCreate := timeutil.Now() diff --git a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel index 4bb125e722a1..e55f7cb0ae78 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel +++ b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel @@ -13,14 +13,12 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase", visibility = ["//visibility:public"], deps = [ - "//pkg/jobs/joberror", + "//pkg/jobs", "//pkg/jobs/jobspb", - "//pkg/kv/kvclient/kvcoord", "//pkg/roachpb", "//pkg/settings", - "//pkg/sql", "//pkg/sql/catalog/descpb", - "//pkg/sql/flowinfra", + "//pkg/sql/catalog/lease", "//pkg/util", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index 0c1384cb8bfe..54507c7b0c48 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -9,15 +9,11 @@ package changefeedbase import ( - "fmt" - "reflect" - "strings" + "context" - "github.com/cockroachdb/cockroach/pkg/jobs/joberror" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/errors" ) @@ -75,81 +71,53 @@ func (e *taggedError) Cause() error { return e.wrapped } // planned to be moved to the stdlib in go 1.13. func (e *taggedError) Unwrap() error { return e.wrapped } -const retryableErrorString = "retryable changefeed error" +type terminalError struct{} -type retryableError struct { - wrapped error -} - -// MarkRetryableError wraps the given error, marking it as retryable to -// changefeeds. -func MarkRetryableError(e error) error { - return &retryableError{wrapped: e} +func (e *terminalError) Error() string { + return "terminal changefeed error" } -// Error implements the error interface. -func (e *retryableError) Error() string { - return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error()) +// WithTerminalError decorates underlying error to indicate +// that the error is a terminal changefeed error. +func WithTerminalError(cause error) error { + if cause == nil { + return nil + } + return errors.Mark(cause, &terminalError{}) } -// Cause implements the github.com/pkg/errors.causer interface. -func (e *retryableError) Cause() error { return e.wrapped } - -// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is -// planned to be moved to the stdlib in go 1.13. -func (e *retryableError) Unwrap() error { return e.wrapped } - -// IsRetryableError returns true if the supplied error, or any of its parent -// causes, is a IsRetryableError. -func IsRetryableError(err error) bool { - if err == nil { - return false - } - if errors.HasType(err, (*retryableError)(nil)) { - return true +// AsTerminalError determines if the cause error is a terminal changefeed +// error. Returns non-nil error if changefeed should terminate with the +// returned error. +func AsTerminalError(ctx context.Context, lm *lease.Manager, cause error) (termErr error) { + if cause == nil { + return nil } - // During node shutdown it is possible for all outgoing transports used by - // the kvfeed to expire, producing a SendError that the node is still able - // to propagate to the frontier. This has been known to happen during - // cluster upgrades. This scenario should not fail the changefeed. - if kvcoord.IsSendError(err) { - return true + if err := ctx.Err(); err != nil { + // If context has been cancelled, we must respect that; this happens + // if, e.g. this changefeed is being cancelled. + return err } - // TODO(knz): this is a bad implementation. Make it go away - // by avoiding string comparisons. - - // If a RetryableError occurs on a remote node, DistSQL serializes it such - // that we can't recover the structure and we have to rely on this - // unfortunate string comparison. - errStr := err.Error() - if strings.Contains(errStr, retryableErrorString) || - strings.Contains(errStr, kvcoord.SendErrorString) || - strings.Contains(errStr, "draining") { - return true + if lm.IsDraining() { + // This node is being drained. It's safe to propagate this error (to the + // job registry) since job registry should not be able to commit this error + // to the jobs table; but to be safe, make sure this error is marked as jobs + // retryable error to ensure that some other node retries this changefeed. + return jobs.MarkAsRetryJobError(cause) } - return (joberror.IsDistSQLRetryableError(err) || - flowinfra.IsNoInboundStreamConnectionError(err) || - errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)) || - errors.Is(err, sql.ErrPlanChanged)) -} + // GC TTL errors are always fatal. + if errors.HasType(cause, (*roachpb.BatchTimestampBeforeGCError)(nil)) { + return WithTerminalError(cause) + } -// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the -// RetryableError marker out. This won't do anything if the RetryableError -// itself has been wrapped, but that's okay, we'll just have an uglier string. -func MaybeStripRetryableErrorMarker(err error) error { - // The following is a hack to work around the error cast linter. - // What we're doing here is really not kosher; this function - // has no business in assuming that the retryableError{} wrapper - // has not been wrapped already. We could even expect that - // it gets wrapped in the common case. - // TODO(knz): Remove/replace this. - if reflect.TypeOf(err) == retryableErrorType { - err = errors.UnwrapOnce(err) + // Explicitly marked terminal errors are terminal. + if errors.Is(cause, &terminalError{}) { + return cause } - return err -} -var retryableErrorType = reflect.TypeOf((*retryableError)(nil)) + // All other errors retry. + return nil +} diff --git a/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go b/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go index e7502ac1b0ff..84583aabb32c 100644 --- a/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go +++ b/pkg/ccl/changefeedccl/changefeedvalidators/table_validator.go @@ -20,6 +20,17 @@ func ValidateTable( targets changefeedbase.Targets, tableDesc catalog.TableDescriptor, canHandle changefeedbase.CanHandle, +) error { + if err := validateTable(targets, tableDesc, canHandle); err != nil { + return changefeedbase.WithTerminalError(err) + } + return nil +} + +func validateTable( + targets changefeedbase.Targets, + tableDesc catalog.TableDescriptor, + canHandle changefeedbase.CanHandle, ) error { // Technically, the only non-user table known not to work is system.jobs // (which creates a cycle since the resolved timestamp high-water mark is diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index 132f01c0fe13..4415aebac0ee 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -410,9 +410,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { enc, err := getEncoder(opts, targets) require.NoError(t, err) _, err = enc.EncodeKey(context.Background(), rowInsert) - require.EqualError(t, err, fmt.Sprintf("retryable changefeed error: "+ - `contacting confluent schema registry: Post "%s/subjects/foo-key/versions": x509: certificate signed by unknown authority`, - opts.SchemaRegistryURI)) + require.Regexp(t, "x509", err) wrongCert, _, err := cdctest.NewCACertBase64Encoded() require.NoError(t, err) @@ -425,9 +423,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { enc, err = getEncoder(opts, targets) require.NoError(t, err) _, err = enc.EncodeKey(context.Background(), rowInsert) - require.EqualError(t, err, fmt.Sprintf("retryable changefeed error: "+ - `contacting confluent schema registry: Post "%s/subjects/foo-key/versions": x509: certificate signed by unknown authority`, - opts.SchemaRegistryURI)) + require.Regexp(t, `contacting confluent schema registry.*: x509`, err) }) } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 6577bab4998b..70716810cc55 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -389,13 +389,15 @@ func startTestFullServer( options.argsFn(&args) } - ctx := context.Background() + resetRetry := testingUseFastRetry() resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) s, db, _ := serverutils.StartServer(t, args) + ctx := context.Background() cleanup := func() { s.Stopper().Stop(ctx) resetFlushFrequency() + resetRetry() } var err error defer func() { @@ -429,6 +431,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), } + resetRetry := testingUseFastRetry() resetFlushFrequency := changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency) cluster, db, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( t, 3 /* numServers */, knobs, @@ -437,6 +440,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB cleanupAndReset := func() { cleanup() resetFlushFrequency() + resetRetry() } var err error @@ -497,9 +501,10 @@ func startTestTenant( tenantRunner.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...) waitForTenantPodsActive(t, tenantServer, 1) - + resetRetry := testingUseFastRetry() return tenantID, tenantServer, tenantDB, func() { tenantServer.Stopper().Stop(context.Background()) + resetRetry() } } diff --git a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel index 35052ab74d33..a0710fb93de8 100644 --- a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel @@ -8,7 +8,6 @@ go_library( "blocking_buffer.go", "chan_buffer.go", "chunked_event_queue.go", - "err_buffer.go", "event.go", "metrics.go", "throttling_buffer.go", diff --git a/pkg/ccl/changefeedccl/kvevent/err_buffer.go b/pkg/ccl/changefeedccl/kvevent/err_buffer.go deleted file mode 100644 index 06953d63111e..000000000000 --- a/pkg/ccl/changefeedccl/kvevent/err_buffer.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package kvevent - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" -) - -type errorWrapperEventBuffer struct { - Buffer -} - -// NewErrorWrapperEventBuffer returns kvevent Buffer which treats any errors -// as retryable. -func NewErrorWrapperEventBuffer(b Buffer) Buffer { - return &errorWrapperEventBuffer{b} -} - -// Add implements Writer interface. -func (e errorWrapperEventBuffer) Add(ctx context.Context, event Event) error { - if err := e.Buffer.Add(ctx, event); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -var _ Buffer = (*errorWrapperEventBuffer)(nil) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 1725aee5ca0d..1920b5fe99e3 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -98,8 +98,7 @@ func Run(ctx context.Context, cfg Config) error { } bf := func() kvevent.Buffer { - return kvevent.NewErrorWrapperEventBuffer( - kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics)) + return kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics) } f := newKVFeed( diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 3496573e5a61..60e53536f226 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -529,10 +530,11 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { if scope != defaultSLIScope { if !enableSLIMetrics { - return nil, errors.WithHint( - pgerror.Newf(pgcode.ConfigurationLimitExceeded, "cannot create metrics scope %q", scope), - "try restarting with COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS=true", - ) + return nil, changefeedbase.WithTerminalError( + errors.WithHint( + pgerror.Newf(pgcode.ConfigurationLimitExceeded, "cannot create metrics scope %q", scope), + "try restarting with COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS=true", + )) } const failSafeMax = 1024 if len(a.mu.sliMetrics) == failSafeMax { diff --git a/pkg/ccl/changefeedccl/retry.go b/pkg/ccl/changefeedccl/retry.go new file mode 100644 index 000000000000..7537fa19c723 --- /dev/null +++ b/pkg/ccl/changefeedccl/retry.go @@ -0,0 +1,71 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +var useFastRetry = false + +// getRetry returns retry object for changefeed. +func getRetry(ctx context.Context) Retry { + opts := retry.Options{ + InitialBackoff: 5 * time.Second, + Multiplier: 2, + MaxBackoff: 10 * time.Minute, + } + + if useFastRetry { + opts = retry.Options{ + InitialBackoff: 5 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 250 * time.Minute, + } + } + + return Retry{Retry: retry.StartWithCtx(ctx, opts)} +} + +func testingUseFastRetry() func() { + useFastRetry = true + return func() { + useFastRetry = false + } +} + +// reset retry state after changefeed ran for that much time +// without errors. +const resetRetryAfter = 10 * time.Minute + +// Retry is a thin wrapper around retry.Retry which +// resets retry state if changefeed been running for sufficiently +// long time. +type Retry struct { + retry.Retry + lastRetry time.Time +} + +// Next returns whether the retry loop should continue, and blocks for the +// appropriate length of time before yielding back to the caller. +// If the last call to Next() happened long time ago, the amount of time +// to wait gets reset. +func (r *Retry) Next() bool { + defer func() { + r.lastRetry = timeutil.Now() + }() + if timeutil.Since(r.lastRetry) > resetRetryAfter { + r.Reset() + } + return r.Retry.Next() +} diff --git a/pkg/ccl/changefeedccl/schema_registry.go b/pkg/ccl/changefeedccl/schema_registry.go index be35c75d7539..bb230dd8c4d3 100644 --- a/pkg/ccl/changefeedccl/schema_registry.go +++ b/pkg/ccl/changefeedccl/schema_registry.go @@ -199,7 +199,7 @@ func (r *confluentSchemaRegistry) doWithRetry(ctx context.Context, fn func() err } log.VInfof(ctx, 2, "retrying schema registry operation: %s", err.Error()) } - return changefeedbase.MarkRetryableError(err) + return err } func gracefulClose(ctx context.Context, toClose io.ReadCloser) { diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index fd8575cb7614..808aff84d97e 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -559,7 +559,7 @@ func (tf *schemaFeed) validateDescriptor( shouldFilter, err := tf.filter.shouldFilter(ctx, e, tf.targets) log.VEventf(ctx, 1, "validate shouldFilter %v %v", formatEvent(e), shouldFilter) if err != nil { - return err + return changefeedbase.WithTerminalError(err) } if !shouldFilter { // Only sort the tail of the events from earliestTsBeingIngested. diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index ec405c6ca15a..48817fd1050f 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -299,78 +299,6 @@ func (u *sinkURL) String() string { return u.URL.String() } -// errorWrapperSink delegates to another sink and marks all returned errors as -// retryable. During changefeed setup, we use the sink once without this to -// verify configuration, but in the steady state, no sink error should be -// terminal. -type errorWrapperSink struct { - wrapped externalResource -} - -// EmitRow implements Sink interface. -func (s errorWrapperSink) EmitRow( - ctx context.Context, - topic TopicDescriptor, - key, value []byte, - updated, mvcc hlc.Timestamp, - alloc kvevent.Alloc, -) error { - if err := s.wrapped.(EventSink).EmitRow(ctx, topic, key, value, updated, mvcc, alloc); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -func (s errorWrapperSink) EncodeAndEmitRow( - ctx context.Context, - updatedRow cdcevent.Row, - prevRow cdcevent.Row, - topic TopicDescriptor, - updated, mvcc hlc.Timestamp, - alloc kvevent.Alloc, -) error { - - if sinkWithEncoder, ok := s.wrapped.(SinkWithEncoder); ok { - if err := sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, alloc); err != nil { - return changefeedbase.MarkRetryableError(err) - } - } else { - return errors.AssertionFailedf("Expected a sink with encoder for, found %T", s.wrapped) - } - return nil -} - -// EmitResolvedTimestamp implements Sink interface. -func (s errorWrapperSink) EmitResolvedTimestamp( - ctx context.Context, encoder Encoder, resolved hlc.Timestamp, -) error { - if err := s.wrapped.(ResolvedTimestampSink).EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// Flush implements Sink interface. -func (s errorWrapperSink) Flush(ctx context.Context) error { - if err := s.wrapped.(EventSink).Flush(ctx); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// Close implements Sink interface. -func (s errorWrapperSink) Close() error { - if err := s.wrapped.Close(); err != nil { - return changefeedbase.MarkRetryableError(err) - } - return nil -} - -// Dial implements Sink interface. -func (s errorWrapperSink) Dial() error { - return s.wrapped.Dial() -} - // encDatumRowBuffer is a FIFO of `EncDatumRow`s. // // TODO(dan): There's some potential allocation savings here by reusing the same diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 85b8ce91ede7..199734be6bf5 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -171,8 +171,7 @@ func TestWebhookSink(t *testing.T) { // now sink's client accepts no custom certs, should reject the server's cert and fail require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) - require.EqualError(t, sinkSrcNoCert.Flush(context.Background()), - fmt.Sprintf(`Post "%s": x509: certificate signed by unknown authority`, sinkDest.URL())) + require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background())) require.EqualError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), `context canceled`) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 8cb4b2457ced..2afa0a77b9ab 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -502,7 +502,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro lt := logtags.FromContext(ctx) ctx, cancel := m.stopper.WithCancelOnQuiesce(logtags.AddTags(m.ambientCtx.AnnotateCtx(context.Background()), lt)) defer cancel() - if m.isDraining() { + if m.IsDraining() { return nil, errors.New("cannot acquire lease when draining") } newest := m.findNewest(id) @@ -545,7 +545,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro // releaseLease from store. func releaseLease(ctx context.Context, lease *storedLease, m *Manager) { - if m.isDraining() { + if m.IsDraining() { // Release synchronously to guarantee release before exiting. m.storage.release(ctx, m.stopper, lease) return @@ -1010,10 +1010,11 @@ func (m *Manager) Acquire( func (m *Manager) removeOnceDereferenced() bool { return m.storage.testingKnobs.RemoveOnceDereferenced || // Release from the store if the Manager is draining. - m.isDraining() + m.IsDraining() } -func (m *Manager) isDraining() bool { +// IsDraining returns true if this node's lease manager is draining. +func (m *Manager) IsDraining() bool { return m.draining.Load().(bool) } diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 900ce4c359bc..396edf376a0c 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1197,6 +1197,7 @@ func TestLint(t *testing.T) { "*.go", ":!*.pb.go", ":!*.pb.gw.go", + ":!ccl/changefeedccl/changefeedbase/errors.go", ":!kv/kvclient/kvcoord/lock_spans_over_budget_error.go", ":!spanconfig/errors.go", ":!roachpb/replica_unavailable_error.go",