Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Rework error handling #90810

Merged
merged 1 commit into from
Nov 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"metrics.go",
"name.go",
"parquet_sink_cloudstorage.go",
"retry.go",
"schema_registry.go",
"scram_client.go",
"sink.go",
Expand Down Expand Up @@ -80,7 +81,6 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
Expand Down Expand Up @@ -234,6 +234,8 @@ go_test(
"//pkg/sql/flowinfra",
"//pkg/sql/importer",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/keyside",
Expand Down
47 changes: 27 additions & 20 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand Down Expand Up @@ -408,8 +409,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
func(d tree.Datum, _ interface{}) (interface{}, error) {
date := *d.(*tree.DDate)
if !date.IsFinite() {
return nil, errors.Errorf(
`infinite date not yet supported with avro`)
return nil, changefeedbase.WithTerminalError(errors.Errorf(
`infinite date not yet supported with avro`))
}
// The avro library requires us to return this as a time.Time.
return date.ToTime()
Expand Down Expand Up @@ -498,8 +499,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
)
case types.DecimalFamily:
if typ.Precision() == 0 {
return nil, errors.Errorf(
`decimal with no precision not yet supported with avro`)
return nil, changefeedbase.WithTerminalError(errors.Errorf(
`decimal with no precision not yet supported with avro`))
}

width := int(typ.Width())
Expand Down Expand Up @@ -595,8 +596,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
case types.ArrayFamily:
itemSchema, err := typeToAvroSchema(typ.ArrayContents())
if err != nil {
return nil, errors.Wrapf(err, `could not create item schema for %s`,
typ)
return nil, changefeedbase.WithTerminalError(
errors.Wrapf(err, `could not create item schema for %s`, typ))
}
itemUnionKey := avroUnionKey(itemSchema.SchemaType.([]avroSchemaType)[1])

Expand Down Expand Up @@ -676,8 +677,8 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
)

default:
return nil, errors.Errorf(`type %s not yet supported with avro`,
typ.SQLString())
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`type %s not yet supported with avro`, typ.SQLString()))
}

return schema, nil
Expand All @@ -688,7 +689,7 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
func columnToAvroSchema(col cdcevent.ResultColumn) (*avroSchemaField, error) {
schema, err := typeToAvroSchema(col.Typ)
if err != nil {
return nil, errors.Wrapf(err, "column %s", col.Name)
return nil, changefeedbase.WithTerminalError(errors.Wrapf(err, "column %s", col.Name))
}
schema.Name = SQLNameToAvroName(col.Name)
schema.Metadata = col.SQLStringNotHumanReadable()
Expand Down Expand Up @@ -790,7 +791,7 @@ func (r *avroDataRecord) rowFromTextual(buf []byte) (rowenc.EncDatumRow, error)
return nil, err
}
if len(newBuf) > 0 {
return nil, errors.New(`only one row was expected`)
return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`))
}
return r.rowFromNative(native)
}
Expand All @@ -811,7 +812,7 @@ func (r *avroDataRecord) RowFromBinary(buf []byte) (rowenc.EncDatumRow, error) {
return nil, err
}
if len(newBuf) > 0 {
return nil, errors.New(`only one row was expected`)
return nil, changefeedbase.WithTerminalError(errors.New(`only one row was expected`))
}
return r.rowFromNative(native)
}
Expand All @@ -826,7 +827,8 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) (err error) {
fieldIdx, ok := r.fieldIdxByName[col.Name]
if !ok {
return errors.AssertionFailedf("could not find avro field for column %s", col.Name)
return changefeedbase.WithTerminalError(
errors.AssertionFailedf("could not find avro field for column %s", col.Name))
}
r.native[col.Name], err = r.Fields[fieldIdx].encodeFn(d)
return err
Expand All @@ -840,11 +842,12 @@ func (r *avroDataRecord) nativeFromRow(it cdcevent.Iterator) (interface{}, error
func (r *avroDataRecord) rowFromNative(native interface{}) (rowenc.EncDatumRow, error) {
avroDatums, ok := native.(map[string]interface{})
if !ok {
return nil, errors.Errorf(`unknown avro native type: %T`, native)
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown avro native type: %T`, native))
}
if len(r.Fields) != len(avroDatums) {
return nil, errors.Errorf(
`expected row with %d columns got %d`, len(r.Fields), len(avroDatums))
return nil, changefeedbase.WithTerminalError(errors.Errorf(
`expected row with %d columns got %d`, len(r.Fields), len(avroDatums)))
}

row := make(rowenc.EncDatumRow, len(r.Fields))
Expand Down Expand Up @@ -978,7 +981,8 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
delete(meta, `updated`)
ts, ok := u.(hlc.Timestamp)
if !ok {
return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u)
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
Expand All @@ -989,13 +993,14 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
delete(meta, `resolved`)
ts, ok := u.(hlc.Timestamp)
if !ok {
return nil, errors.Errorf(`unknown metadata timestamp type: %T`, u)
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`resolved`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}
for k := range meta {
return nil, errors.AssertionFailedf(`unhandled meta key: %s`, k)
return nil, changefeedbase.WithTerminalError(errors.AssertionFailedf(`unhandled meta key: %s`, k))
}
return r.codec.BinaryFromNative(buf, native)
}
Expand All @@ -1016,10 +1021,12 @@ func (r *avroDataRecord) refreshTypeMetadata(row cdcevent.Row) error {
// precision is set) this is roundtripable without information loss.
func decimalToRat(dec apd.Decimal, scale int32) (big.Rat, error) {
if dec.Form != apd.Finite {
return big.Rat{}, errors.Errorf(`cannot convert %s form decimal`, dec.Form)
return big.Rat{}, changefeedbase.WithTerminalError(
errors.Errorf(`cannot convert %s form decimal`, dec.Form))
}
if scale > 0 && scale != -dec.Exponent {
return big.Rat{}, errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale)
return big.Rat{}, changefeedbase.WithTerminalError(
errors.Errorf(`%s will not roundtrip at scale %d`, &dec, scale))
}
var r big.Rat
if dec.Exponent >= 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdceval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -73,7 +74,8 @@ func (e *Evaluator) MatchesFilter(
) (_ bool, err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while evaluating WHERE clause: %s", pan)
err = changefeedbase.WithTerminalError(
errors.Newf("error while evaluating WHERE clause: %s", pan))
}
}()
if e.where == nil {
Expand All @@ -96,7 +98,8 @@ func (e *Evaluator) Projection(
) (_ cdcevent.Row, err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while evaluating SELECT clause: %s", pan)
err = changefeedbase.WithTerminalError(
errors.Newf("error while evaluating SELECT clause: %s", pan))
}
}()
if len(e.selectors) == 0 {
Expand All @@ -114,7 +117,8 @@ func (e *Evaluator) Projection(
func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) (err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while validating CHANGEFEED expression: %s", pan)
err = changefeedbase.WithTerminalError(
errors.Newf("error while validating CHANGEFEED expression: %s", pan))
}
}()
if len(sc.Exprs) == 0 { // Shouldn't happen, but be defensive.
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -47,7 +48,8 @@ func NormalizeAndValidateSelectForTarget(
) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, retErr error) {
defer func() {
if pan := recover(); pan != nil {
retErr = errors.Newf("low-level error while normalizing expression, probably syntax is unsupported in CREATE CHANGEFEED: %s", pan)
retErr = changefeedbase.WithTerminalError(
errors.Newf("expression currently unsupported in CREATE CHANGEFEED: %s", pan))
}
}()
execCtx.SemaCtx()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func refreshUDT(
}); err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, changefeedbase.MarkRetryableError(err)
return nil, err
}
// Immediately release the lease, since we only need it for the exact
// timestamp requested.
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *rowFetcherCache) tableDescForKey(
if err != nil {
// Manager can return all kinds of errors during chaos, but based on
// its usage, none of them should ever be terminal.
return nil, family, changefeedbase.MarkRetryableError(err)
return nil, family, err
}
tableDesc = desc.Underlying().(catalog.TableDescriptor)
// Immediately release the lease, since we only need it for the exact
Expand Down
21 changes: 8 additions & 13 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.spec.User(), ca.spec.JobID, ca.sliMetrics)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
Expand All @@ -263,8 +262,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.changedRowBuf = &b.buf
}

ca.sink = &errorWrapperSink{wrapped: ca.sink}

// If the initial scan was disabled the highwater would've already been forwarded
needsInitialScan := ca.frontier.Frontier().IsEmpty()

Expand Down Expand Up @@ -483,7 +480,6 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
err = nil
}
} else {

select {
// If the poller errored first, that's the
// interesting one, so overwrite `err`.
Expand Down Expand Up @@ -921,7 +917,6 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.spec.User(), cf.spec.JobID, sli)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
cf.MoveToDraining(err)
return
}
Expand All @@ -930,8 +925,6 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.resolvedBuf = &b.buf
}

cf.sink = &errorWrapperSink{wrapped: cf.sink}

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
Expand Down Expand Up @@ -1000,9 +993,8 @@ func (cf *changeFrontier) close() {
cf.closeMetrics()
}
if cf.sink != nil {
if err := cf.sink.Close(); err != nil {
log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
}
// Best effort: context is often cancel by now, so we expect to see an error
_ = cf.sink.Close()
}
cf.memAcc.Close(cf.Ctx)
cf.MemMonitor.Stop(cf.Ctx)
Expand Down Expand Up @@ -1043,8 +1035,11 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad

// Detect whether this boundary should be used to kill or restart the
// changefeed.
if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART {
err = changefeedbase.MarkRetryableError(err)
if cf.frontier.boundaryType == jobspb.ResolvedSpan_EXIT {
err = changefeedbase.WithTerminalError(errors.Wrapf(err,
"shut down due to schema change and %s=%q",
changefeedbase.OptSchemaChangePolicy,
changefeedbase.OptSchemaChangePolicyStop))
}
}

Expand Down Expand Up @@ -1289,7 +1284,7 @@ func (cf *changeFrontier) checkpointJobProgress(

if cf.knobs.RaiseRetryableError != nil {
if err := cf.knobs.RaiseRetryableError(); err != nil {
return false, changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError"))
return false, err
}
}

Expand Down
Loading