Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
106458: sql: Fix TestCancelQueriesRace r=rafiss a=rimadeodhar

TestCancelQueriesRace attempts to run two query cancellations at the same time in order to reproduce potential data race. However, it makes an invalid assumption that both query cancellations will always succeed which is not true. This PR updates the check to ensure that at least one of the query cancellations is successful.

I stress tested this test locally using `./dev test ./pkg/sql -f TestCancelQueriesRace --stress --ignore-cache --cpus 16 --timeout 5m`. It ran successfully with over a 1000 successful runs.

Epic: none
Fixes: #105853
Release note: none

106595: changefeedccl: remove changefeed replanning r=miretskiy a=jayshrivastava

This commit removes changefeed replanning functionality.

Initially, this change was to improve the replanning test, which takes 10 seconds to run on average, preventing us from being able to run it under deadlock (see #101894) or stress race. It turns out that the test did not even test replanning based on range distribution, as it used the `ShouldReplan` testing knob to override replanning decisions. After some discussion with `@miretskiy,` it was decided that it would be better to remove replanning entirely for these reasons:
- it's disabled by default
- it's not known to be used by any major production clusters or known to show a significant performance improvement
- replanning an entire changefeed on a high level is a large hammer. restarting a large changefeed this way will likely cause rows to be re-emitted which may outweigh the benefit of replanning. if we decide to add replanning, it should be more graceful.
- the benefit of replanning is not clear (ie. compared to scalability improvements such as reducing CPU usage, goroutine count etc.)

Epic: None
Closes: #101894

Release note (enterprise change): This change reverts #83143. This removes the settings `changefeed.replan_flow_frequency` and `changefeed.replan_flow_threshold`. These settings were disabled by
    default and should not affect existing changefeeds in any way.


Co-authored-by: rimadeodhar <[email protected]>
Co-authored-by: Jayant Shrivastava <[email protected]>
  • Loading branch information
3 people committed Jul 11, 2023
3 parents 65cc261 + 1cdfc5f + c436ec5 commit 92d5d0e
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 168 deletions.
44 changes: 1 addition & 43 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package changefeedccl
import (
"context"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
Expand Down Expand Up @@ -209,21 +208,6 @@ func fetchSpansForTables(
sd, tableDescs[0], initialHighwater, target, sc)
}

var replanChangefeedThreshold = settings.RegisterFloatSetting(
settings.TenantWritable,
"changefeed.replan_flow_threshold",
"fraction of initial flow instances that would be added or updated above which a redistribution would occur (0=disabled)",
0.0,
)

var replanChangefeedFrequency = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.replan_flow_frequency",
"frequency at which changefeed checks to see if redistributing would change its physical execution plan",
10*time.Minute,
settings.PositiveDuration,
)

// startDistChangefeed starts distributed changefeed execution.
func startDistChangefeed(
ctx context.Context,
Expand All @@ -249,7 +233,6 @@ func startDistChangefeed(
return err
}
localState.trackedSpans = trackedSpans
cfKnobs := execCfg.DistSQLSrv.TestingKnobs.Changefeed

// Changefeed flows handle transactional consistency themselves.
var noTxn *kv.Txn
Expand All @@ -267,28 +250,7 @@ func startDistChangefeed(
return err
}

replanOracle := sql.ReplanOnChangedFraction(
func() float64 {
return replanChangefeedThreshold.Get(execCtx.ExecCfg().SV())
},
)
if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil && knobs.ShouldReplan != nil {
replanOracle = knobs.ShouldReplan
}

var replanNoCheckpoint *jobspb.ChangefeedProgress_Checkpoint
var replanNoDrainingNodes []roachpb.NodeID
replanner, stopReplanner := sql.PhysicalPlanChangeChecker(ctx,
p,
makePlan(execCtx, jobID, details, initialHighWater,
trackedSpans, replanNoCheckpoint, replanNoDrainingNodes),
execCtx,
replanOracle,
func() time.Duration { return replanChangefeedFrequency.Get(execCtx.ExecCfg().SV()) },
)

execPlan := func(ctx context.Context) error {
defer stopReplanner()
// Derive a separate context so that we can shut down the changefeed
// as soon as we see an error.
ctx, cancel := execCtx.ExecCfg().DistSQLSrv.Stopper.WithCancelOnQuiesce(ctx)
Expand Down Expand Up @@ -343,11 +305,7 @@ func startDistChangefeed(
return resultRows.Err()
}

if err = ctxgroup.GoAndWait(ctx, execPlan, replanner); errors.Is(err, sql.ErrPlanChanged) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).ReplanCount.Inc(1)
}

return err
return ctxgroup.GoAndWait(ctx, execPlan)
}

var enableBalancedRangeDistribution = settings.RegisterBoolSetting(
Expand Down
110 changes: 0 additions & 110 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,116 +105,6 @@ import (

var testServerRegion = "us-east-1"

func TestChangefeedReplanning(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStressRace(t, "multinode setup doesn't work under testrace")

assertReplanCounter := func(t *testing.T, m *Metrics, exp int64) {
t.Helper()
// If this changefeed is running as a job, we anticipate that it will move
// through the failed state and will increment the metric. Sinkless feeds
// don't contribute to the failures counter.
if strings.Contains(t.Name(), `sinkless`) {
return
}
testutils.SucceedsSoon(t, func() error {
if got := m.ReplanCount.Count(); got != exp {
return errors.Errorf("expected %d failures, got %d", exp, got)
}
return nil
})
}

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()

numNodes := 3
errChan := make(chan error, 1)
readyChan := make(chan struct{})
defaultServerArgs := base.TestServerArgs{
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
Changefeed: &TestingKnobs{
HandleDistChangefeedError: func(err error) error {
if errors.Is(err, sql.ErrPlanChanged) {
select {
case errChan <- err:
return err
default:
return nil
}
}
return nil
},
ShouldReplan: func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool {
select {
case <-readyChan:
return true
default:
return false
}
},
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
UseDatabase: "d",
DefaultTestTenant: base.TestTenantDisabled,
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ServerArgs: defaultServerArgs,
})
defer tc.Stopper().Stop(ctx)

registry := tc.Server(0).JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics)

db := tc.ServerConn(0)
serverutils.SetClusterSetting(t, tc, "changefeed.replan_flow_frequency", time.Millisecond*100)

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...)

sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY);`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0);`)

feedFactory := makeKafkaFeedFactory(tc, db)

cf := feed(t, feedFactory, "CREATE CHANGEFEED FOR d.foo")
defer closeFeed(t, cf)

feed, ok := cf.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

require.NoError(t, feed.TickHighWaterMark(tc.Server(0).Clock().Now()))

sqlDB.ExecMultiple(t,
`INSERT INTO foo (a) SELECT * FROM generate_series(1, 1000);`,
`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 1000, 50));`,
`ALTER TABLE foo SCATTER;`,
)

timeout := 20 * time.Second
if util.RaceEnabled {
timeout *= 3
}

readyChan <- struct{}{}

select {
case err := <-errChan:
require.Regexp(t, "physical plan has changed", err)
assertReplanCounter(t, metrics, 1)
log.Info(ctx, "replan triggered")
case <-time.After(timeout):
t.Fatal("expected distflow to error but hasn't after 20 seconds")
}
}
cdcTest(t, testFn, feedTestForceSink("kafka"))
}

func TestChangefeedBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
9 changes: 0 additions & 9 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,6 @@ var (
Measurement: "Updates",
Unit: metric.Unit_COUNT,
}

metaChangefeedReplanCount = metric.Metadata{
Name: "changefeed.replan_count",
Help: "Number of replans triggered across all feeds",
Measurement: "Replans",
Unit: metric.Unit_COUNT,
}
metaChangefeedEventConsumerFlushNanos = metric.Metadata{
Name: "changefeed.nprocs_flush_nanos",
Help: "Total time spent idle waiting for the parallel consumer to flush",
Expand Down Expand Up @@ -689,7 +682,6 @@ type Metrics struct {
CheckpointHistNanos metric.IHistogram
FrontierUpdates *metric.Counter
ThrottleMetrics cdcutils.Metrics
ReplanCount *metric.Counter
ParallelConsumerFlushNanos metric.IHistogram
ParallelConsumerConsumeNanos metric.IHistogram
ParallelConsumerInFlightEvents *metric.Gauge
Expand Down Expand Up @@ -728,7 +720,6 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
}),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
// Below two metrics were never implemented using the hdr histogram. Set ForceUsePrometheus
// to true.
ParallelConsumerFlushNanos: metric.NewHistogram(metric.HistogramOptions{
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ type TestingKnobs struct {
NullSinkIsExternalIOAccounted bool
// OnDistflowSpec is called when specs for distflow planning have been created
OnDistflowSpec func(aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, frontierSpec *execinfrapb.ChangeFrontierSpec)
// ShouldReplan is used to see if a replan for a changefeed should be triggered
ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool
// RaiseRetryableError is a knob used to possibly return an error.
RaiseRetryableError func() error

Expand Down
2 changes: 2 additions & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ var retiredSettings = map[string]struct{}{
// removed as of 23.2.
"sql.log.unstructured_entries.enabled": {},
"sql.auth.createrole_allows_grant_role_membership.enabled": {},
"changefeed.replan_flow_frequency": {},
"changefeed.replan_flow_threshold": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,14 +1341,18 @@ func TestCancelQueriesRace(t *testing.T) {
_, _ = sqlDB.ExecContext(ctx, `SELECT pg_sleep(10)`)
close(waiter)
}()
_, err := sqlDB.ExecContext(ctx, `CANCEL QUERIES (
_, err1 := sqlDB.ExecContext(ctx, `CANCEL QUERIES (
SELECT query_id FROM [SHOW QUERIES] WHERE query LIKE 'SELECT pg_sleep%'
)`)
require.NoError(t, err)
_, err = sqlDB.ExecContext(ctx, `CANCEL QUERIES (

_, err2 := sqlDB.ExecContext(ctx, `CANCEL QUERIES (
SELECT query_id FROM [SHOW QUERIES] WHERE query LIKE 'SELECT pg_sleep%'
)`)
require.NoError(t, err)
// At least one query cancellation is expected to succeed.
require.Truef(
t,
err1 == nil || err2 == nil,
"Both query cancellations failed with errors: %v and %v", err1, err2)

cancel()
<-waiter
Expand Down

0 comments on commit 92d5d0e

Please sign in to comment.