diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 0e22fda22c95..7ef88c24ad53 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -11,7 +11,6 @@ package changefeedccl import ( "context" "sort" - "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" @@ -209,21 +208,6 @@ func fetchSpansForTables( sd, tableDescs[0], initialHighwater, target, sc) } -var replanChangefeedThreshold = settings.RegisterFloatSetting( - settings.TenantWritable, - "changefeed.replan_flow_threshold", - "fraction of initial flow instances that would be added or updated above which a redistribution would occur (0=disabled)", - 0.0, -) - -var replanChangefeedFrequency = settings.RegisterDurationSetting( - settings.TenantWritable, - "changefeed.replan_flow_frequency", - "frequency at which changefeed checks to see if redistributing would change its physical execution plan", - 10*time.Minute, - settings.PositiveDuration, -) - // startDistChangefeed starts distributed changefeed execution. func startDistChangefeed( ctx context.Context, @@ -249,7 +233,6 @@ func startDistChangefeed( return err } localState.trackedSpans = trackedSpans - cfKnobs := execCfg.DistSQLSrv.TestingKnobs.Changefeed // Changefeed flows handle transactional consistency themselves. var noTxn *kv.Txn @@ -267,28 +250,7 @@ func startDistChangefeed( return err } - replanOracle := sql.ReplanOnChangedFraction( - func() float64 { - return replanChangefeedThreshold.Get(execCtx.ExecCfg().SV()) - }, - ) - if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil && knobs.ShouldReplan != nil { - replanOracle = knobs.ShouldReplan - } - - var replanNoCheckpoint *jobspb.ChangefeedProgress_Checkpoint - var replanNoDrainingNodes []roachpb.NodeID - replanner, stopReplanner := sql.PhysicalPlanChangeChecker(ctx, - p, - makePlan(execCtx, jobID, details, initialHighWater, - trackedSpans, replanNoCheckpoint, replanNoDrainingNodes), - execCtx, - replanOracle, - func() time.Duration { return replanChangefeedFrequency.Get(execCtx.ExecCfg().SV()) }, - ) - execPlan := func(ctx context.Context) error { - defer stopReplanner() // Derive a separate context so that we can shut down the changefeed // as soon as we see an error. ctx, cancel := execCtx.ExecCfg().DistSQLSrv.Stopper.WithCancelOnQuiesce(ctx) @@ -343,11 +305,7 @@ func startDistChangefeed( return resultRows.Err() } - if err = ctxgroup.GoAndWait(ctx, execPlan, replanner); errors.Is(err, sql.ErrPlanChanged) { - execCtx.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).ReplanCount.Inc(1) - } - - return err + return ctxgroup.GoAndWait(ctx, execPlan) } var enableBalancedRangeDistribution = settings.RegisterBoolSetting( diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e2454fc37072..3b4e02338b87 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -105,116 +105,6 @@ import ( var testServerRegion = "us-east-1" -func TestChangefeedReplanning(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - skip.UnderStressRace(t, "multinode setup doesn't work under testrace") - - assertReplanCounter := func(t *testing.T, m *Metrics, exp int64) { - t.Helper() - // If this changefeed is running as a job, we anticipate that it will move - // through the failed state and will increment the metric. Sinkless feeds - // don't contribute to the failures counter. - if strings.Contains(t.Name(), `sinkless`) { - return - } - testutils.SucceedsSoon(t, func() error { - if got := m.ReplanCount.Count(); got != exp { - return errors.Errorf("expected %d failures, got %d", exp, got) - } - return nil - }) - } - - testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - ctx := context.Background() - - numNodes := 3 - errChan := make(chan error, 1) - readyChan := make(chan struct{}) - defaultServerArgs := base.TestServerArgs{ - Knobs: base.TestingKnobs{ - DistSQL: &execinfra.TestingKnobs{ - Changefeed: &TestingKnobs{ - HandleDistChangefeedError: func(err error) error { - if errors.Is(err, sql.ErrPlanChanged) { - select { - case errChan <- err: - return err - default: - return nil - } - } - return nil - }, - ShouldReplan: func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool { - select { - case <-readyChan: - return true - default: - return false - } - }, - }, - }, - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - }, - UseDatabase: "d", - DefaultTestTenant: base.TestTenantDisabled, - } - - tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ - ServerArgs: defaultServerArgs, - }) - defer tc.Stopper().Stop(ctx) - - registry := tc.Server(0).JobRegistry().(*jobs.Registry) - metrics := registry.MetricsStruct().Changefeed.(*Metrics) - - db := tc.ServerConn(0) - serverutils.SetClusterSetting(t, tc, "changefeed.replan_flow_frequency", time.Millisecond*100) - - sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...) - - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY);`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (0);`) - - feedFactory := makeKafkaFeedFactory(tc, db) - - cf := feed(t, feedFactory, "CREATE CHANGEFEED FOR d.foo") - defer closeFeed(t, cf) - - feed, ok := cf.(cdctest.EnterpriseTestFeed) - require.True(t, ok) - - require.NoError(t, feed.TickHighWaterMark(tc.Server(0).Clock().Now())) - - sqlDB.ExecMultiple(t, - `INSERT INTO foo (a) SELECT * FROM generate_series(1, 1000);`, - `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 1000, 50));`, - `ALTER TABLE foo SCATTER;`, - ) - - timeout := 20 * time.Second - if util.RaceEnabled { - timeout *= 3 - } - - readyChan <- struct{}{} - - select { - case err := <-errChan: - require.Regexp(t, "physical plan has changed", err) - assertReplanCounter(t, metrics, 1) - log.Info(ctx, "replan triggered") - case <-time.After(timeout): - t.Fatal("expected distflow to error but hasn't after 20 seconds") - } - } - cdcTest(t, testFn, feedTestForceSink("kafka")) -} - func TestChangefeedBasics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 4bb3458c48e7..060ea6d9bcea 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -394,13 +394,6 @@ var ( Measurement: "Updates", Unit: metric.Unit_COUNT, } - - metaChangefeedReplanCount = metric.Metadata{ - Name: "changefeed.replan_count", - Help: "Number of replans triggered across all feeds", - Measurement: "Replans", - Unit: metric.Unit_COUNT, - } metaChangefeedEventConsumerFlushNanos = metric.Metadata{ Name: "changefeed.nprocs_flush_nanos", Help: "Total time spent idle waiting for the parallel consumer to flush", @@ -689,7 +682,6 @@ type Metrics struct { CheckpointHistNanos metric.IHistogram FrontierUpdates *metric.Counter ThrottleMetrics cdcutils.Metrics - ReplanCount *metric.Counter ParallelConsumerFlushNanos metric.IHistogram ParallelConsumerConsumeNanos metric.IHistogram ParallelConsumerInFlightEvents *metric.Gauge @@ -728,7 +720,6 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { }), FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates), ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow), - ReplanCount: metric.NewCounter(metaChangefeedReplanCount), // Below two metrics were never implemented using the hdr histogram. Set ForceUsePrometheus // to true. ParallelConsumerFlushNanos: metric.NewHistogram(metric.HistogramOptions{ diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index a9dc6367abea..7a5cda3e2c24 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -50,8 +50,6 @@ type TestingKnobs struct { NullSinkIsExternalIOAccounted bool // OnDistflowSpec is called when specs for distflow planning have been created OnDistflowSpec func(aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, frontierSpec *execinfrapb.ChangeFrontierSpec) - // ShouldReplan is used to see if a replan for a changefeed should be triggered - ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool // RaiseRetryableError is a knob used to possibly return an error. RaiseRetryableError func() error diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index f5586bdffba1..522c757836d6 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -169,6 +169,8 @@ var retiredSettings = map[string]struct{}{ // removed as of 23.2. "sql.log.unstructured_entries.enabled": {}, "sql.auth.createrole_allows_grant_role_membership.enabled": {}, + "changefeed.replan_flow_frequency": {}, + "changefeed.replan_flow_threshold": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults diff --git a/pkg/sql/show_test.go b/pkg/sql/show_test.go index 613425c2c95a..83874db6be87 100644 --- a/pkg/sql/show_test.go +++ b/pkg/sql/show_test.go @@ -1341,14 +1341,18 @@ func TestCancelQueriesRace(t *testing.T) { _, _ = sqlDB.ExecContext(ctx, `SELECT pg_sleep(10)`) close(waiter) }() - _, err := sqlDB.ExecContext(ctx, `CANCEL QUERIES ( + _, err1 := sqlDB.ExecContext(ctx, `CANCEL QUERIES ( SELECT query_id FROM [SHOW QUERIES] WHERE query LIKE 'SELECT pg_sleep%' )`) - require.NoError(t, err) - _, err = sqlDB.ExecContext(ctx, `CANCEL QUERIES ( + + _, err2 := sqlDB.ExecContext(ctx, `CANCEL QUERIES ( SELECT query_id FROM [SHOW QUERIES] WHERE query LIKE 'SELECT pg_sleep%' )`) - require.NoError(t, err) + // At least one query cancellation is expected to succeed. + require.Truef( + t, + err1 == nil || err2 == nil, + "Both query cancellations failed with errors: %v and %v", err1, err2) cancel() <-waiter