Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
90520: clisqlshell: make the input to `\set` more flexible r=rafiss a=knz

Fixes cockroachdb#88270.

Prior to this change, `\set a = b` would report an error; also `\set a =b` did not work, and it was not possible to add spaces to the prompt with e.g. `\set prompt1 "a b"`.

Release note (cli change): The input syntax of `\set` is now more flexible: it is now more accepting of space characters in various positions of the syntax, and it supports quoted values, e.g. via `\set prompt1 "a b c"`.

90767: changefeedccl: Correctly shutdown changefeed r=miretskiy a=miretskiy

When one of the aggregators exits with a terminal error, ensure that coordinator 
notices this and shuts down other aggregators in the flow.

Normally, changefeeds run for a very long time.
There are few cases when changefeed must exit: for example, an explicit user initiated termination, 
or perhaps due to explicit user configuration -- such as `schema_change_policy='stop'`. 
When such explicit events occur, all coordinators will detect the event, and all coordinators 
will exit -- thus ensuring that the changefeed shuts down appropriately.

However, what happens if only 1 coordinator notices an error? 
Perhaps that coordinator has a localized error condition -- for example, the node is being 
drained/decomissioned.  In those cases, there could be any number of errors generated -- 
all of which will be propagated up the flow to the changefeed coordinator.


Coordinator notices this error, and then proceeds to drain all
remaining aggregators.  But the remaining aggregators never experienced
any errors -- and thus, this drain may take long time.
The drain time may be variable -- theoretically, as soon as remaining
aggregators attempts to emit anything in the flow, they should notice
the error.  However, aggregators emit only checkpoint records to the
coordinator, and only when their local frontier advances, and only if
this advance is slower than some threshold (default 30 seconds, but
it is user controlled).  Thus it is preferable to shutdown the
flow as soon as we know we should

This PR fixes the above issue by ensuring that changefeed shuts down correctly, 
when one of the aggregators exits with a non-retryable error.

Informs CRDB-7581

Release Notes (enterprise change): Ensure changefeeds shut down when
one of the aggretator nodes returns an error.

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Nov 1, 2022
3 parents 9fa9374 + 617a758 + 17e33ef commit 4815c5f
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 132 deletions.
15 changes: 12 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,12 @@ func startDistChangefeed(

execPlan := func(ctx context.Context) error {
defer stopReplanner()
// Derive a separate context so that we can shut down the changefeed
// as soon as we see an error.
ctx, cancel := execCtx.ExecCfg().DistSQLSrv.Stopper.WithCancelOnQuiesce(ctx)
defer cancel()

resultRows := makeChangefeedResultWriter(resultsCh)
resultRows := makeChangefeedResultWriter(resultsCh, cancel)
recv := sql.MakeDistSQLReceiver(
ctx,
resultRows,
Expand Down Expand Up @@ -446,10 +450,13 @@ type changefeedResultWriter struct {
rowsCh chan<- tree.Datums
rowsAffected int
err error
cancel context.CancelFunc
}

func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter {
return &changefeedResultWriter{rowsCh: rowsCh}
func makeChangefeedResultWriter(
rowsCh chan<- tree.Datums, cancel context.CancelFunc,
) *changefeedResultWriter {
return &changefeedResultWriter{rowsCh: rowsCh, cancel: cancel}
}

func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error {
Expand All @@ -469,7 +476,9 @@ func (w *changefeedResultWriter) IncrementRowsAffected(ctx context.Context, n in
}
func (w *changefeedResultWriter) SetError(err error) {
w.err = err
w.cancel()
}

func (w *changefeedResultWriter) Err() error {
return w.err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,8 @@ func (ca *changeAggregator) close() {
}

if ca.sink != nil {
if err := ca.sink.Close(); err != nil {
log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err)
}
// Best effort: context is often cancel by now, so we expect to see an error
_ = ca.sink.Close()
}
ca.memAcc.Close(ca.Ctx)
if ca.kvFeedMemMon != nil {
Expand Down
147 changes: 134 additions & 13 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func TestChangefeedReplanning(t *testing.T) {
}

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {

ctx := context.Background()

numNodes := 3
Expand Down Expand Up @@ -172,7 +171,7 @@ func TestChangefeedReplanning(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY);`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0);`)

feedFactory := makeKafkaFeedFactoryForCluster(tc, db)
feedFactory := makeKafkaFeedFactory(tc, db)

cf := feed(t, feedFactory, "CREATE CHANGEFEED FOR d.foo")
defer closeFeed(t, cf)
Expand Down Expand Up @@ -1081,7 +1080,7 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
// Check that dropping a watched column still stops the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
if _, err := cf.Next(); !testutils.IsError(err, `schema change occurred at`) {
t.Errorf(`expected "schema change occurred at ..." got: %+v`, err.Error())
require.Regexp(t, `expected "schema change occurred at ..." got: %+v`, err)
}
}

Expand Down Expand Up @@ -2769,7 +2768,7 @@ func TestChangefeedRestartMultiNode(t *testing.T) {
db = cluster.ServerConn(feedServerID)
sqlDB = sqlutils.MakeSQLRunner(db)

f := makeKafkaFeedFactoryForCluster(cluster, db)
f := makeKafkaFeedFactory(cluster, db)
feed := feed(t, f, "CREATE CHANGEFEED FOR test_tab WITH updated")
defer closeFeed(t, feed)
assertPayloadsStripTs(t, feed, []string{
Expand Down Expand Up @@ -2827,7 +2826,7 @@ func TestChangefeedStopPolicyMultiNode(t *testing.T) {
db = cluster.ServerConn(feedServerID)
sqlDB = sqlutils.MakeSQLRunner(db)

f := makeKafkaFeedFactoryForCluster(cluster, db)
f := makeKafkaFeedFactory(cluster, db)
feed := feed(t, f, "CREATE CHANGEFEED FOR test_tab WITH schema_change_policy='stop'")
defer closeFeed(t, feed)
sqlDB.Exec(t, `INSERT INTO test_tab VALUES (1)`)
Expand Down Expand Up @@ -2943,7 +2942,7 @@ func TestChangefeedRBRAvroAddRegion(t *testing.T) {
cluster, db, cleanup := startTestCluster(t)
defer cleanup()

f := makeKafkaFeedFactoryForCluster(cluster, db)
f := makeKafkaFeedFactory(cluster, db)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY)`)
waitForSchemaChange(t, sqlDB, `ALTER TABLE rbr SET LOCALITY REGIONAL BY ROW`)
Expand Down Expand Up @@ -3753,8 +3752,10 @@ func TestChangefeedDataTTL(t *testing.T) {
}
}()
go func() {
changefeed := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo")
changefeedInit <- changefeed
feed, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo")
if err == nil {
changefeedInit <- feed
}
close(changefeedInit)
}()

Expand All @@ -3776,6 +3777,7 @@ func TestChangefeedDataTTL(t *testing.T) {
atomic.StoreInt32(&shouldWait, 0)
resume <- struct{}{}
dataExpiredRows = <-changefeedInit
require.NotNil(t, dataExpiredRows)

// Verify that, at some point, Next() returns a "must
// be after replica GC threshold" error. In the common
Expand Down Expand Up @@ -5352,7 +5354,7 @@ func TestChangefeedRestartDuringBackfill(t *testing.T) {
require.NoError(t, feedJob.Pause())

// Make extra sure that the zombie changefeed can't write any more data.
beforeEmitRowCh <- changefeedbase.MarkRetryableError(errors.New(`nope don't write it`))
beforeEmitRowCh <- errors.New(`nope don't write it`)

// Insert some data that we should only see out of the changefeed after it
// re-runs the backfill.
Expand Down Expand Up @@ -5477,6 +5479,127 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
})
}

func TestChangefeedPropagatesTerminalError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

opts := makeOptions()
defer addCloudStorageOptions(t, &opts)()
defer changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)()

const numNodes = 3
perServerKnobs := make(map[int]base.TestServerArgs, numNodes)
for i := 0; i < numNodes; i++ {
perServerKnobs[i] = base.TestServerArgs{
// Test uses SPLIT AT, which isn't currently supported for
// secondary tenants. Tracked with #76378.
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
ExternalIODir: opts.externalIODir,
UseDatabase: "d",
}
}

tc := serverutils.StartNewTestCluster(t, numNodes,
base.TestClusterArgs{
ServerArgsPerNode: perServerKnobs,
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(context.Background())

{
db := tc.ServerConn(1)
sqlDB := sqlutils.MakeSQLRunner(db)
serverutils.SetClusterSetting(t, tc, "kv.rangefeed.enabled", true)

sqlDB.ExecMultiple(t,
`CREATE DATABASE d;`,
`CREATE TABLE foo (k INT PRIMARY KEY);`,
`INSERT INTO foo (k) SELECT * FROM generate_series(1, 1000);`,
`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 1000, 50));`,
)
for i := 1; i <= 1000; i += 50 {
sqlDB.ExecSucceedsSoon(t, "ALTER TABLE foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[$1], $2)", 1+(i%numNodes), i)
}
}
// changefeed coordinator will run on this node.
const coordinatorID = 0

testFn := func(t *testing.T, nodesToFail []int, opts feedTestOptions) {
for _, n := range nodesToFail {
// Configure changefeed to emit fatal error on the specified nodes.
distSQLKnobs := perServerKnobs[n].Knobs.DistSQL.(*execinfra.TestingKnobs)
var numEmitted int32
distSQLKnobs.Changefeed.(*TestingKnobs).BeforeEmitRow = func(ctx context.Context) error {
// Emit few rows before returning an error.
if atomic.AddInt32(&numEmitted, 1) > 10 {
err := errors.Newf("synthetic fatal error from node %d", n)
log.Errorf(ctx, "BeforeEmitRow returning error %s", err)
return err
}
return nil
}
}

defer func() {
// Reset all changefeed knobs.
for i := 0; i < numNodes; i++ {
perServerKnobs[i].Knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed = &TestingKnobs{}
}
}()

sinkType := randomSinkTypeWithOptions(opts)
f, closeSink := makeFeedFactoryWithOptions(t, sinkType, tc, tc.ServerConn(coordinatorID), opts)
defer closeSink()
feed := feed(t, f, "CREATE CHANGEFEED FOR foo")
defer closeFeed(t, feed)

// We don't know if we picked enterprise or core feed; regardless, consuming
// from feed should eventually return an error.
var feedErr error
for feedErr == nil {
_, feedErr = feed.Next()
}
log.Errorf(context.Background(), "feedErr=%s", feedErr)
require.Regexp(t, "synthetic fatal error", feedErr)

// enterprise feeds should also have the job marked failed.
if jobFeed, ok := feed.(cdctest.EnterpriseTestFeed); ok {
require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusFailed }))
}
}

for _, tc := range []struct {
name string
nodesToFail []int
opts feedTestOptions
}{
{
name: "coordinator",
nodesToFail: []int{coordinatorID},
opts: opts,
},
{
name: "aggregator",
nodesToFail: []int{2},
opts: opts.omitSinks("sinkless"), // Sinkless run on coordinator only.
},
{
name: "many aggregators",
nodesToFail: []int{0, 2},
opts: opts.omitSinks("sinkless"), // Sinkless run on coordinator only.
},
} {
t.Run(tc.name, func(t *testing.T) { testFn(t, tc.nodesToFail, tc.opts) })
}
}

// Primary key changes are supported by changefeeds starting in 21.1. This tests
// that basic behavior works.
func TestChangefeedPrimaryKeyChangeWorks(t *testing.T) {
Expand Down Expand Up @@ -5990,7 +6113,7 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) {
knobs.RaiseRetryableError = func() error {
emittedCount++
if emittedCount%200 == 0 {
return changefeedbase.MarkRetryableError(errors.New("test transient error"))
return errors.New("test transient error")
}
return nil
}
Expand Down Expand Up @@ -6098,9 +6221,7 @@ func TestChangefeedOrderingWithErrors(t *testing.T) {
if err != nil {
return err
}
if status != "retryable error: retryable changefeed error: 500 Internal Server Error: " {
return errors.Errorf("expected retryable error: retryable changefeed error: 500 Internal Server Error:, got: %v", status)
}
require.Regexp(t, "500 Internal Server Error", status)
return nil
})

Expand Down
42 changes: 28 additions & 14 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,12 @@ var feedTestOmitSinks = func(sinkTypes ...string) feedTestOption {
return func(opts *feedTestOptions) { opts.disabledSinkTypes = append(opts.disabledSinkTypes, sinkTypes...) }
}

func (opts feedTestOptions) omitSinks(sinks ...string) feedTestOptions {
res := opts
res.disabledSinkTypes = append(opts.disabledSinkTypes, sinks...)
return res
}

// withArgsFn is a feedTestOption that allow the caller to modify the
// TestServerArgs before they are used to create the test server. Note
// that in multi-tenant tests, these will only apply to the kvServer
Expand Down Expand Up @@ -751,15 +757,15 @@ func randomSinkType(opts ...feedTestOption) string {

func randomSinkTypeWithOptions(options feedTestOptions) string {
sinkWeights := map[string]int{
"kafka": 2, // run kafka a bit more often
"kafka": 3,
"enterprise": 1,
"webhook": 1,
"pubsub": 1,
"sinkless": 1,
"cloudstorage": 0, // requires externalIODir set
"sinkless": 2,
"cloudstorage": 0,
}
if options.externalIODir != "" {
sinkWeights["cloudstorage"] = 1
sinkWeights["cloudstorage"] = 3
}
if options.allowedSinkTypes != nil {
sinkWeights = map[string]int{}
Expand Down Expand Up @@ -821,13 +827,21 @@ func makeFeedFactory(
}

func makeFeedFactoryWithOptions(
t *testing.T,
sinkType string,
s serverutils.TestTenantInterface,
db *gosql.DB,
options feedTestOptions,
t *testing.T, sinkType string, srvOrCluster interface{}, db *gosql.DB, options feedTestOptions,
) (factory cdctest.TestFeedFactory, sinkCleanup func()) {
t.Logf("making %s feed factory", sinkType)
s := func() serverutils.TestTenantInterface {
switch s := srvOrCluster.(type) {
case serverutils.TestTenantInterface:
return s
case serverutils.TestClusterInterface:
return s.Server(0)
default:
t.Fatalf("unexpected argument type %T", s)
return nil
}
}()

pgURLForUser := func(u string, pass ...string) (url.URL, func()) {
t.Logf("pgURL %s %s", sinkType, u)
if len(pass) < 1 {
Expand All @@ -840,25 +854,25 @@ func makeFeedFactoryWithOptions(
}
switch sinkType {
case "kafka":
f := makeKafkaFeedFactory(s, db)
f := makeKafkaFeedFactory(srvOrCluster, db)
return f, func() {}
case "cloudstorage":
if options.externalIODir == "" {
t.Fatalf("expected externalIODir option to be set")
}
f := makeCloudFeedFactory(s, db, options.externalIODir)
f := makeCloudFeedFactory(srvOrCluster, db, options.externalIODir)
return f, func() {
TestingSetIncludeParquetMetadata()()
}
case "enterprise":
sink, cleanup := pgURLForUser(username.RootUser)
f := makeTableFeedFactory(s, db, sink)
f := makeTableFeedFactory(srvOrCluster, db, sink)
return f, cleanup
case "webhook":
f := makeWebhookFeedFactory(s, db)
f := makeWebhookFeedFactory(srvOrCluster, db)
return f, func() {}
case "pubsub":
f := makePubsubFeedFactory(s, db)
f := makePubsubFeedFactory(srvOrCluster, db)
return f, func() {}
case "sinkless":
sink, cleanup := pgURLForUser(username.RootUser)
Expand Down
Loading

0 comments on commit 4815c5f

Please sign in to comment.