diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 723f6b8369df..b29b241957eb 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -272,8 +272,12 @@ func startDistChangefeed( execPlan := func(ctx context.Context) error { defer stopReplanner() + // Derive a separate context so that we can shut down the changefeed + // as soon as we see an error. + ctx, cancel := execCtx.ExecCfg().DistSQLSrv.Stopper.WithCancelOnQuiesce(ctx) + defer cancel() - resultRows := makeChangefeedResultWriter(resultsCh) + resultRows := makeChangefeedResultWriter(resultsCh, cancel) recv := sql.MakeDistSQLReceiver( ctx, resultRows, @@ -446,10 +450,13 @@ type changefeedResultWriter struct { rowsCh chan<- tree.Datums rowsAffected int err error + cancel context.CancelFunc } -func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter { - return &changefeedResultWriter{rowsCh: rowsCh} +func makeChangefeedResultWriter( + rowsCh chan<- tree.Datums, cancel context.CancelFunc, +) *changefeedResultWriter { + return &changefeedResultWriter{rowsCh: rowsCh, cancel: cancel} } func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error { @@ -469,7 +476,9 @@ func (w *changefeedResultWriter) IncrementRowsAffected(ctx context.Context, n in } func (w *changefeedResultWriter) SetError(err error) { w.err = err + w.cancel() } + func (w *changefeedResultWriter) Err() error { return w.err } diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 4256f69bb20b..3c5698025c7a 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -454,9 +454,8 @@ func (ca *changeAggregator) close() { } if ca.sink != nil { - if err := ca.sink.Close(); err != nil { - log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err) - } + // Best effort: context is often cancel by now, so we expect to see an error + _ = ca.sink.Close() } ca.memAcc.Close(ca.Ctx) if ca.kvFeedMemMon != nil { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 7070cdd97670..b771986e3853 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -121,7 +121,6 @@ func TestChangefeedReplanning(t *testing.T) { } testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - ctx := context.Background() numNodes := 3 @@ -172,7 +171,7 @@ func TestChangefeedReplanning(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY);`) sqlDB.Exec(t, `INSERT INTO foo VALUES (0);`) - feedFactory := makeKafkaFeedFactoryForCluster(tc, db) + feedFactory := makeKafkaFeedFactory(tc, db) cf := feed(t, feedFactory, "CREATE CHANGEFEED FOR d.foo") defer closeFeed(t, cf) @@ -1081,7 +1080,7 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) { // Check that dropping a watched column still stops the changefeed. sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) if _, err := cf.Next(); !testutils.IsError(err, `schema change occurred at`) { - t.Errorf(`expected "schema change occurred at ..." got: %+v`, err.Error()) + require.Regexp(t, `expected "schema change occurred at ..." got: %+v`, err) } } @@ -2769,7 +2768,7 @@ func TestChangefeedRestartMultiNode(t *testing.T) { db = cluster.ServerConn(feedServerID) sqlDB = sqlutils.MakeSQLRunner(db) - f := makeKafkaFeedFactoryForCluster(cluster, db) + f := makeKafkaFeedFactory(cluster, db) feed := feed(t, f, "CREATE CHANGEFEED FOR test_tab WITH updated") defer closeFeed(t, feed) assertPayloadsStripTs(t, feed, []string{ @@ -2827,7 +2826,7 @@ func TestChangefeedStopPolicyMultiNode(t *testing.T) { db = cluster.ServerConn(feedServerID) sqlDB = sqlutils.MakeSQLRunner(db) - f := makeKafkaFeedFactoryForCluster(cluster, db) + f := makeKafkaFeedFactory(cluster, db) feed := feed(t, f, "CREATE CHANGEFEED FOR test_tab WITH schema_change_policy='stop'") defer closeFeed(t, feed) sqlDB.Exec(t, `INSERT INTO test_tab VALUES (1)`) @@ -2943,7 +2942,7 @@ func TestChangefeedRBRAvroAddRegion(t *testing.T) { cluster, db, cleanup := startTestCluster(t) defer cleanup() - f := makeKafkaFeedFactoryForCluster(cluster, db) + f := makeKafkaFeedFactory(cluster, db) sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY)`) waitForSchemaChange(t, sqlDB, `ALTER TABLE rbr SET LOCALITY REGIONAL BY ROW`) @@ -3753,8 +3752,10 @@ func TestChangefeedDataTTL(t *testing.T) { } }() go func() { - changefeed := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo") - changefeedInit <- changefeed + feed, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo") + if err == nil { + changefeedInit <- feed + } close(changefeedInit) }() @@ -3776,6 +3777,7 @@ func TestChangefeedDataTTL(t *testing.T) { atomic.StoreInt32(&shouldWait, 0) resume <- struct{}{} dataExpiredRows = <-changefeedInit + require.NotNil(t, dataExpiredRows) // Verify that, at some point, Next() returns a "must // be after replica GC threshold" error. In the common @@ -5352,7 +5354,7 @@ func TestChangefeedRestartDuringBackfill(t *testing.T) { require.NoError(t, feedJob.Pause()) // Make extra sure that the zombie changefeed can't write any more data. - beforeEmitRowCh <- changefeedbase.MarkRetryableError(errors.New(`nope don't write it`)) + beforeEmitRowCh <- errors.New(`nope don't write it`) // Insert some data that we should only see out of the changefeed after it // re-runs the backfill. @@ -5477,6 +5479,127 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { }) } +func TestChangefeedPropagatesTerminalError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + opts := makeOptions() + defer addCloudStorageOptions(t, &opts)() + defer changefeedbase.TestingSetDefaultMinCheckpointFrequency(testSinkFlushFrequency)() + + const numNodes = 3 + perServerKnobs := make(map[int]base.TestServerArgs, numNodes) + for i := 0; i < numNodes; i++ { + perServerKnobs[i] = base.TestServerArgs{ + // Test uses SPLIT AT, which isn't currently supported for + // secondary tenants. Tracked with #76378. + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + DrainFast: true, + Changefeed: &TestingKnobs{}, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + ExternalIODir: opts.externalIODir, + UseDatabase: "d", + } + } + + tc := serverutils.StartNewTestCluster(t, numNodes, + base.TestClusterArgs{ + ServerArgsPerNode: perServerKnobs, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(context.Background()) + + { + db := tc.ServerConn(1) + sqlDB := sqlutils.MakeSQLRunner(db) + serverutils.SetClusterSetting(t, tc, "kv.rangefeed.enabled", true) + + sqlDB.ExecMultiple(t, + `CREATE DATABASE d;`, + `CREATE TABLE foo (k INT PRIMARY KEY);`, + `INSERT INTO foo (k) SELECT * FROM generate_series(1, 1000);`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 1000, 50));`, + ) + for i := 1; i <= 1000; i += 50 { + sqlDB.ExecSucceedsSoon(t, "ALTER TABLE foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[$1], $2)", 1+(i%numNodes), i) + } + } + // changefeed coordinator will run on this node. + const coordinatorID = 0 + + testFn := func(t *testing.T, nodesToFail []int, opts feedTestOptions) { + for _, n := range nodesToFail { + // Configure changefeed to emit fatal error on the specified nodes. + distSQLKnobs := perServerKnobs[n].Knobs.DistSQL.(*execinfra.TestingKnobs) + var numEmitted int32 + distSQLKnobs.Changefeed.(*TestingKnobs).BeforeEmitRow = func(ctx context.Context) error { + // Emit few rows before returning an error. + if atomic.AddInt32(&numEmitted, 1) > 10 { + err := errors.Newf("synthetic fatal error from node %d", n) + log.Errorf(ctx, "BeforeEmitRow returning error %s", err) + return err + } + return nil + } + } + + defer func() { + // Reset all changefeed knobs. + for i := 0; i < numNodes; i++ { + perServerKnobs[i].Knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed = &TestingKnobs{} + } + }() + + sinkType := randomSinkTypeWithOptions(opts) + f, closeSink := makeFeedFactoryWithOptions(t, sinkType, tc, tc.ServerConn(coordinatorID), opts) + defer closeSink() + feed := feed(t, f, "CREATE CHANGEFEED FOR foo") + defer closeFeed(t, feed) + + // We don't know if we picked enterprise or core feed; regardless, consuming + // from feed should eventually return an error. + var feedErr error + for feedErr == nil { + _, feedErr = feed.Next() + } + log.Errorf(context.Background(), "feedErr=%s", feedErr) + require.Regexp(t, "synthetic fatal error", feedErr) + + // enterprise feeds should also have the job marked failed. + if jobFeed, ok := feed.(cdctest.EnterpriseTestFeed); ok { + require.NoError(t, jobFeed.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusFailed })) + } + } + + for _, tc := range []struct { + name string + nodesToFail []int + opts feedTestOptions + }{ + { + name: "coordinator", + nodesToFail: []int{coordinatorID}, + opts: opts, + }, + { + name: "aggregator", + nodesToFail: []int{2}, + opts: opts.omitSinks("sinkless"), // Sinkless run on coordinator only. + }, + { + name: "many aggregators", + nodesToFail: []int{0, 2}, + opts: opts.omitSinks("sinkless"), // Sinkless run on coordinator only. + }, + } { + t.Run(tc.name, func(t *testing.T) { testFn(t, tc.nodesToFail, tc.opts) }) + } +} + // Primary key changes are supported by changefeeds starting in 21.1. This tests // that basic behavior works. func TestChangefeedPrimaryKeyChangeWorks(t *testing.T) { @@ -5990,7 +6113,7 @@ func TestCoreChangefeedBackfillScanCheckpoint(t *testing.T) { knobs.RaiseRetryableError = func() error { emittedCount++ if emittedCount%200 == 0 { - return changefeedbase.MarkRetryableError(errors.New("test transient error")) + return errors.New("test transient error") } return nil } @@ -6098,9 +6221,7 @@ func TestChangefeedOrderingWithErrors(t *testing.T) { if err != nil { return err } - if status != "retryable error: retryable changefeed error: 500 Internal Server Error: " { - return errors.Errorf("expected retryable error: retryable changefeed error: 500 Internal Server Error:, got: %v", status) - } + require.Regexp(t, "500 Internal Server Error", status) return nil }) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 4e76d9ea8bef..6577bab4998b 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -539,6 +539,12 @@ var feedTestOmitSinks = func(sinkTypes ...string) feedTestOption { return func(opts *feedTestOptions) { opts.disabledSinkTypes = append(opts.disabledSinkTypes, sinkTypes...) } } +func (opts feedTestOptions) omitSinks(sinks ...string) feedTestOptions { + res := opts + res.disabledSinkTypes = append(opts.disabledSinkTypes, sinks...) + return res +} + // withArgsFn is a feedTestOption that allow the caller to modify the // TestServerArgs before they are used to create the test server. Note // that in multi-tenant tests, these will only apply to the kvServer @@ -751,15 +757,15 @@ func randomSinkType(opts ...feedTestOption) string { func randomSinkTypeWithOptions(options feedTestOptions) string { sinkWeights := map[string]int{ - "kafka": 2, // run kafka a bit more often + "kafka": 3, "enterprise": 1, "webhook": 1, "pubsub": 1, - "sinkless": 1, - "cloudstorage": 0, // requires externalIODir set + "sinkless": 2, + "cloudstorage": 0, } if options.externalIODir != "" { - sinkWeights["cloudstorage"] = 1 + sinkWeights["cloudstorage"] = 3 } if options.allowedSinkTypes != nil { sinkWeights = map[string]int{} @@ -821,13 +827,21 @@ func makeFeedFactory( } func makeFeedFactoryWithOptions( - t *testing.T, - sinkType string, - s serverutils.TestTenantInterface, - db *gosql.DB, - options feedTestOptions, + t *testing.T, sinkType string, srvOrCluster interface{}, db *gosql.DB, options feedTestOptions, ) (factory cdctest.TestFeedFactory, sinkCleanup func()) { t.Logf("making %s feed factory", sinkType) + s := func() serverutils.TestTenantInterface { + switch s := srvOrCluster.(type) { + case serverutils.TestTenantInterface: + return s + case serverutils.TestClusterInterface: + return s.Server(0) + default: + t.Fatalf("unexpected argument type %T", s) + return nil + } + }() + pgURLForUser := func(u string, pass ...string) (url.URL, func()) { t.Logf("pgURL %s %s", sinkType, u) if len(pass) < 1 { @@ -840,25 +854,25 @@ func makeFeedFactoryWithOptions( } switch sinkType { case "kafka": - f := makeKafkaFeedFactory(s, db) + f := makeKafkaFeedFactory(srvOrCluster, db) return f, func() {} case "cloudstorage": if options.externalIODir == "" { t.Fatalf("expected externalIODir option to be set") } - f := makeCloudFeedFactory(s, db, options.externalIODir) + f := makeCloudFeedFactory(srvOrCluster, db, options.externalIODir) return f, func() { TestingSetIncludeParquetMetadata()() } case "enterprise": sink, cleanup := pgURLForUser(username.RootUser) - f := makeTableFeedFactory(s, db, sink) + f := makeTableFeedFactory(srvOrCluster, db, sink) return f, cleanup case "webhook": - f := makeWebhookFeedFactory(s, db) + f := makeWebhookFeedFactory(srvOrCluster, db) return f, func() {} case "pubsub": - f := makePubsubFeedFactory(s, db) + f := makePubsubFeedFactory(srvOrCluster, db) return f, func() {} case "sinkless": sink, cleanup := pgURLForUser(username.RootUser) diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index b60b6cd4612e..9db7d592c6d2 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/Shopify/sarama" @@ -371,7 +372,7 @@ func (f *jobFeed) Details() (*jobspb.ChangefeedDetails, error) { if err := f.db.QueryRow( `SELECT payload FROM system.jobs WHERE id=$1`, f.jobID, ).Scan(&payloadBytes); err != nil { - return nil, err + return nil, errors.Wrapf(err, "Details for job %d", f.jobID) } var payload jobspb.Payload if err := protoutil.Unmarshal(payloadBytes, &payload); err != nil { @@ -386,7 +387,7 @@ func (f *jobFeed) HighWaterMark() (hlc.Timestamp, error) { if err := f.db.QueryRow( `SELECT progress FROM system.jobs WHERE id=$1`, f.jobID, ).Scan(&details); err != nil { - return hlc.Timestamp{}, err + return hlc.Timestamp{}, errors.Wrapf(err, "HighWaterMark for job %d", f.jobID) } var progress jobspb.Progress if err := protoutil.Unmarshal(details, &progress); err != nil { @@ -417,10 +418,12 @@ func (f *jobFeed) TickHighWaterMark(minHWM hlc.Timestamp) error { // FetchTerminalJobErr retrieves the error message from changefeed job. func (f *jobFeed) FetchTerminalJobErr() error { var errStr string - if err := f.db.QueryRow( - `SELECT error FROM [SHOW JOBS] WHERE job_id=$1`, f.jobID, - ).Scan(&errStr); err != nil { - return err + if err := testutils.SucceedsSoonError(func() error { + return f.db.QueryRow( + `SELECT error FROM [SHOW JOBS] WHERE job_id=$1`, f.jobID, + ).Scan(&errStr) + }); err != nil { + return errors.Wrapf(err, "FetchTerminalJobErr for job %d", f.jobID) } if errStr != "" { @@ -434,7 +437,7 @@ func (f *jobFeed) FetchRunningStatus() (runningStatusStr string, err error) { if err = f.db.QueryRow( `SELECT running_status FROM [SHOW JOBS] WHERE job_id=$1`, f.jobID, ).Scan(&runningStatusStr); err != nil { - return "", err + return "", errors.Wrapf(err, "FetchRunningStatus for job %d", f.jobID) } return runningStatusStr, err } @@ -692,7 +695,7 @@ func (e enterpriseFeedFactory) startFeedJob(f *jobFeed, create string, args ...i e.di.prepareJob(f) if err := e.db.QueryRow(create, args...).Scan(&f.jobID); err != nil { e.di.pendingJob = nil - return err + return errors.Wrapf(err, "failed to start feed for job %d", f.jobID) } e.di.startJob(f) return nil @@ -705,15 +708,32 @@ type tableFeedFactory struct { uri url.URL } +func getInjectables(srvOrCluster interface{}) (serverutils.TestTenantInterface, []feedInjectable) { + switch t := srvOrCluster.(type) { + case serverutils.TestTenantInterface: + t.PGServer() + return t, []feedInjectable{t} + case serverutils.TestClusterInterface: + servers := make([]feedInjectable, t.NumServers()) + for i := range servers { + servers[i] = t.Server(i) + } + return t.Server(0), servers + default: + panic(errors.AssertionFailedf("unexpected type %T", t)) + } +} + // makeTableFeedFactory returns a TestFeedFactory implementation using the // `experimental-sql` uri. func makeTableFeedFactory( - srv serverutils.TestTenantInterface, db *gosql.DB, sink url.URL, + srvOrCluster interface{}, db *gosql.DB, sink url.URL, ) cdctest.TestFeedFactory { + s, injectables := getInjectables(srvOrCluster) return &tableFeedFactory{ enterpriseFeedFactory: enterpriseFeedFactory{ - s: srv, - di: newDepInjector(srv), + s: s, + di: newDepInjector(injectables...), db: db, }, uri: sink, @@ -809,12 +829,20 @@ func (c *tableFeed) Next() (*cdctest.TestFeedMessage, error) { return toSend, nil } - select { - case <-time.After(timeout()): - return nil, &contextutil.TimeoutError{} - case <-c.ss.eventReady(): - case <-c.shutdown: - return nil, c.terminalJobError() + if err := contextutil.RunWithTimeout( + context.Background(), "tableFeed.Next", timeout(), + func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.ss.eventReady(): + return nil + case <-c.shutdown: + return c.terminalJobError() + } + }, + ); err != nil { + return nil, err } var toSend []*cdctest.TestFeedMessage @@ -882,21 +910,27 @@ func (c *tableFeed) Close() error { var cloudFeedFileRE = regexp.MustCompile(`^\d{33}-(.+?)-(\d+)-(\d+)-([0-9a-fA-F]{8})-(.+?)-`) +var feedIdx int32 + +func feedSubDir() string { + return strconv.Itoa(int(atomic.AddInt32(&feedIdx, 1))) +} + type cloudFeedFactory struct { enterpriseFeedFactory - dir string - feedIdx int + dir string } // makeCloudFeedFactory returns a TestFeedFactory implementation using the cloud // storage uri. func makeCloudFeedFactory( - srv serverutils.TestTenantInterface, db *gosql.DB, dir string, + srvOrCluster interface{}, db *gosql.DB, dir string, ) cdctest.TestFeedFactory { + s, injectables := getInjectables(srvOrCluster) return &cloudFeedFactory{ enterpriseFeedFactory: enterpriseFeedFactory{ - s: srv, - di: newDepInjector(srv), + s: s, + di: newDepInjector(injectables...), db: db, }, dir: dir, @@ -957,8 +991,7 @@ func (f *cloudFeedFactory) Feed( ) } - feedDir := strconv.Itoa(f.feedIdx) - f.feedIdx++ + feedDir := feedSubDir() sinkURI := `experimental-nodelocal://0/` + feedDir // TODO(dan): This is a pretty unsatisfying way to test that the uri passes // through params it doesn't understand to ExternalStorage. @@ -968,7 +1001,7 @@ func (f *cloudFeedFactory) Feed( // Nodelocal puts its dir under `ExternalIODir`, which is passed into // cloudFeedFactory. feedDir = filepath.Join(f.dir, feedDir) - if err := os.Mkdir(feedDir, 0755); err != nil { + if err := os.Mkdir(feedDir, 0755); err != nil && !errors.Is(err, os.ErrExist) { return nil, err } @@ -1301,12 +1334,20 @@ func (c *cloudFeed) Next() (*cdctest.TestFeedMessage, error) { return e, nil } - select { - case <-time.After(timeout()): - return nil, &contextutil.TimeoutError{} - case <-c.ss.eventReady(): - case <-c.shutdown: - return nil, c.terminalJobError() + if err := contextutil.RunWithTimeout( + context.Background(), "cloudFeed.Next", timeout(), + func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.ss.eventReady(): + return nil + case <-c.shutdown: + return c.terminalJobError() + } + }, + ); err != nil { + return nil, err } if err := filepath.Walk(c.dir, c.walkDir); err != nil { @@ -1591,33 +1632,14 @@ type kafkaFeedFactory struct { var _ cdctest.TestFeedFactory = (*kafkaFeedFactory)(nil) // makeKafkaFeedFactory returns a TestFeedFactory implementation using the `kafka` uri. -func makeKafkaFeedFactory( - srv serverutils.TestTenantInterface, db *gosql.DB, -) cdctest.TestFeedFactory { +func makeKafkaFeedFactory(srvOrCluster interface{}, db *gosql.DB) cdctest.TestFeedFactory { + s, injectables := getInjectables(srvOrCluster) return &kafkaFeedFactory{ knobs: &sinkKnobs{}, enterpriseFeedFactory: enterpriseFeedFactory{ - s: srv, + s: s, db: db, - di: newDepInjector(srv), - }, - } -} - -// makeKafkaFeedFactoryForCluster returns a TestFeedFactory -// implementation using the `kafka` uri. -func makeKafkaFeedFactoryForCluster( - c serverutils.TestClusterInterface, db *gosql.DB, -) cdctest.TestFeedFactory { - servers := make([]feedInjectable, c.NumServers()) - for i := 0; i < c.NumServers(); i++ { - servers[i] = c.Server(i) - } - return &kafkaFeedFactory{ - enterpriseFeedFactory: enterpriseFeedFactory{ - s: c.Server(0), - db: db, - di: newDepInjector(servers...), + di: newDepInjector(injectables...), }, } } @@ -1729,12 +1751,20 @@ func (k *kafkaFeed) Partitions() []string { func (k *kafkaFeed) Next() (*cdctest.TestFeedMessage, error) { for { var msg *sarama.ProducerMessage - select { - case <-time.After(timeout()): - return nil, &contextutil.TimeoutError{} - case <-k.shutdown: - return nil, k.terminalJobError() - case msg = <-k.source: + if err := contextutil.RunWithTimeout( + context.Background(), "kafka.Next", timeout(), + func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-k.shutdown: + return k.terminalJobError() + case msg = <-k.source: + return nil + } + }, + ); err != nil { + return nil, err } fm := &cdctest.TestFeedMessage{ @@ -1802,15 +1832,14 @@ type webhookFeedFactory struct { var _ cdctest.TestFeedFactory = (*webhookFeedFactory)(nil) // makeWebhookFeedFactory returns a TestFeedFactory implementation using the `webhook-webhooks` uri. -func makeWebhookFeedFactory( - srv serverutils.TestTenantInterface, db *gosql.DB, -) cdctest.TestFeedFactory { +func makeWebhookFeedFactory(srvOrCluster interface{}, db *gosql.DB) cdctest.TestFeedFactory { + s, injectables := getInjectables(srvOrCluster) useSecure := rand.Float32() < 0.5 return &webhookFeedFactory{ enterpriseFeedFactory: enterpriseFeedFactory{ - s: srv, + s: s, db: db, - di: newDepInjector(srv), + di: newDepInjector(injectables...), }, useSecureServer: useSecure, } @@ -2006,13 +2035,22 @@ func (f *webhookFeed) Next() (*cdctest.TestFeedMessage, error) { return m, nil } - select { - case <-time.After(timeout()): - return nil, &contextutil.TimeoutError{} - case <-f.ss.eventReady(): - case <-f.mockSink.NotifyMessage(): - case <-f.shutdown: - return nil, f.terminalJobError() + if err := contextutil.RunWithTimeout( + context.Background(), "webhook.Next", timeout(), + func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-f.ss.eventReady(): + return nil + case <-f.mockSink.NotifyMessage(): + return nil + case <-f.shutdown: + return f.terminalJobError() + } + }, + ); err != nil { + return nil, err } } } @@ -2114,14 +2152,13 @@ type pubsubFeedFactory struct { var _ cdctest.TestFeedFactory = (*pubsubFeedFactory)(nil) // makePubsubFeedFactory returns a TestFeedFactory implementation using the `pubsub` uri. -func makePubsubFeedFactory( - srv serverutils.TestTenantInterface, db *gosql.DB, -) cdctest.TestFeedFactory { +func makePubsubFeedFactory(srvOrCluster interface{}, db *gosql.DB) cdctest.TestFeedFactory { + s, injectables := getInjectables(srvOrCluster) return &pubsubFeedFactory{ enterpriseFeedFactory: enterpriseFeedFactory{ - s: srv, + s: s, db: db, - di: newDepInjector(srv), + di: newDepInjector(injectables...), }, } } @@ -2252,12 +2289,21 @@ func (p *pubsubFeed) Next() (*cdctest.TestFeedMessage, error) { return m, nil } - select { - case <-time.After(timeout()): - return nil, &contextutil.TimeoutError{} - case <-p.ss.eventReady(): - case <-p.shutdown: - return nil, p.terminalJobError() + + if err := contextutil.RunWithTimeout( + context.Background(), "pubsub.Next", timeout(), + func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.ss.eventReady(): + return nil + case <-p.shutdown: + return p.terminalJobError() + } + }, + ); err != nil { + return nil, err } } } diff --git a/pkg/cli/clisqlshell/sql.go b/pkg/cli/clisqlshell/sql.go index dc3fd5ee4798..b3e3c1a9a9a8 100644 --- a/pkg/cli/clisqlshell/sql.go +++ b/pkg/cli/clisqlshell/sql.go @@ -476,6 +476,30 @@ var optionNames = func() []string { return names }() +var setArgsRe = regexp.MustCompile(`^\s*` + // zero or more leading space. + `(?P