Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: remove changefeed replanning #106595

Merged
merged 1 commit into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 1 addition & 43 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package changefeedccl
import (
"context"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
Expand Down Expand Up @@ -209,21 +208,6 @@ func fetchSpansForTables(
sd, tableDescs[0], initialHighwater, target, sc)
}

var replanChangefeedThreshold = settings.RegisterFloatSetting(
settings.TenantWritable,
"changefeed.replan_flow_threshold",
"fraction of initial flow instances that would be added or updated above which a redistribution would occur (0=disabled)",
0.0,
)

var replanChangefeedFrequency = settings.RegisterDurationSetting(
settings.TenantWritable,
"changefeed.replan_flow_frequency",
"frequency at which changefeed checks to see if redistributing would change its physical execution plan",
10*time.Minute,
settings.PositiveDuration,
)

// startDistChangefeed starts distributed changefeed execution.
func startDistChangefeed(
ctx context.Context,
Expand All @@ -249,7 +233,6 @@ func startDistChangefeed(
return err
}
localState.trackedSpans = trackedSpans
cfKnobs := execCfg.DistSQLSrv.TestingKnobs.Changefeed

// Changefeed flows handle transactional consistency themselves.
var noTxn *kv.Txn
Expand All @@ -267,28 +250,7 @@ func startDistChangefeed(
return err
}

replanOracle := sql.ReplanOnChangedFraction(
func() float64 {
return replanChangefeedThreshold.Get(execCtx.ExecCfg().SV())
},
)
if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil && knobs.ShouldReplan != nil {
replanOracle = knobs.ShouldReplan
}

var replanNoCheckpoint *jobspb.ChangefeedProgress_Checkpoint
var replanNoDrainingNodes []roachpb.NodeID
replanner, stopReplanner := sql.PhysicalPlanChangeChecker(ctx,
p,
makePlan(execCtx, jobID, details, initialHighWater,
trackedSpans, replanNoCheckpoint, replanNoDrainingNodes),
execCtx,
replanOracle,
func() time.Duration { return replanChangefeedFrequency.Get(execCtx.ExecCfg().SV()) },
)

execPlan := func(ctx context.Context) error {
defer stopReplanner()
// Derive a separate context so that we can shut down the changefeed
// as soon as we see an error.
ctx, cancel := execCtx.ExecCfg().DistSQLSrv.Stopper.WithCancelOnQuiesce(ctx)
Expand Down Expand Up @@ -343,11 +305,7 @@ func startDistChangefeed(
return resultRows.Err()
}

if err = ctxgroup.GoAndWait(ctx, execPlan, replanner); errors.Is(err, sql.ErrPlanChanged) {
execCtx.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).ReplanCount.Inc(1)
}

return err
return ctxgroup.GoAndWait(ctx, execPlan)
}

var enableBalancedRangeDistribution = settings.RegisterBoolSetting(
Expand Down
110 changes: 0 additions & 110 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,116 +103,6 @@ import (

var testServerRegion = "us-east-1"

func TestChangefeedReplanning(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStressRace(t, "multinode setup doesn't work under testrace")

assertReplanCounter := func(t *testing.T, m *Metrics, exp int64) {
t.Helper()
// If this changefeed is running as a job, we anticipate that it will move
// through the failed state and will increment the metric. Sinkless feeds
// don't contribute to the failures counter.
if strings.Contains(t.Name(), `sinkless`) {
return
}
testutils.SucceedsSoon(t, func() error {
if got := m.ReplanCount.Count(); got != exp {
return errors.Errorf("expected %d failures, got %d", exp, got)
}
return nil
})
}

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()

numNodes := 3
errChan := make(chan error, 1)
readyChan := make(chan struct{})
defaultServerArgs := base.TestServerArgs{
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
Changefeed: &TestingKnobs{
HandleDistChangefeedError: func(err error) error {
if errors.Is(err, sql.ErrPlanChanged) {
select {
case errChan <- err:
return err
default:
return nil
}
}
return nil
},
ShouldReplan: func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool {
select {
case <-readyChan:
return true
default:
return false
}
},
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
UseDatabase: "d",
DefaultTestTenant: base.TestTenantDisabled,
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ServerArgs: defaultServerArgs,
})
defer tc.Stopper().Stop(ctx)

registry := tc.Server(0).JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics)

db := tc.ServerConn(0)
serverutils.SetClusterSetting(t, tc, "changefeed.replan_flow_frequency", time.Millisecond*100)

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.ExecMultiple(t, strings.Split(serverSetupStatements, ";")...)

sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY);`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0);`)

feedFactory := makeKafkaFeedFactory(tc, db)

cf := feed(t, feedFactory, "CREATE CHANGEFEED FOR d.foo")
defer closeFeed(t, cf)

feed, ok := cf.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

require.NoError(t, feed.TickHighWaterMark(tc.Server(0).Clock().Now()))

sqlDB.ExecMultiple(t,
`INSERT INTO foo (a) SELECT * FROM generate_series(1, 1000);`,
`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, 1000, 50));`,
`ALTER TABLE foo SCATTER;`,
)

timeout := 20 * time.Second
if util.RaceEnabled {
timeout *= 3
}

readyChan <- struct{}{}

select {
case err := <-errChan:
require.Regexp(t, "physical plan has changed", err)
assertReplanCounter(t, metrics, 1)
log.Info(ctx, "replan triggered")
case <-time.After(timeout):
t.Fatal("expected distflow to error but hasn't after 20 seconds")
}
}
cdcTest(t, testFn, feedTestForceSink("kafka"))
}

func TestChangefeedBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
9 changes: 0 additions & 9 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,6 @@ var (
Measurement: "Updates",
Unit: metric.Unit_COUNT,
}

metaChangefeedReplanCount = metric.Metadata{
Name: "changefeed.replan_count",
Help: "Number of replans triggered across all feeds",
Measurement: "Replans",
Unit: metric.Unit_COUNT,
}
metaChangefeedEventConsumerFlushNanos = metric.Metadata{
Name: "changefeed.nprocs_flush_nanos",
Help: "Total time spent idle waiting for the parallel consumer to flush",
Expand Down Expand Up @@ -689,7 +682,6 @@ type Metrics struct {
CheckpointHistNanos metric.IHistogram
FrontierUpdates *metric.Counter
ThrottleMetrics cdcutils.Metrics
ReplanCount *metric.Counter
ParallelConsumerFlushNanos metric.IHistogram
ParallelConsumerConsumeNanos metric.IHistogram
ParallelConsumerInFlightEvents *metric.Gauge
Expand Down Expand Up @@ -728,7 +720,6 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
}),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
// Below two metrics were never implemented using the hdr histogram. Set ForceUsePrometheus
// to true.
ParallelConsumerFlushNanos: metric.NewHistogram(metric.HistogramOptions{
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ type TestingKnobs struct {
NullSinkIsExternalIOAccounted bool
// OnDistflowSpec is called when specs for distflow planning have been created
OnDistflowSpec func(aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, frontierSpec *execinfrapb.ChangeFrontierSpec)
// ShouldReplan is used to see if a replan for a changefeed should be triggered
ShouldReplan func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool
// RaiseRetryableError is a knob used to possibly return an error.
RaiseRetryableError func() error

Expand Down
2 changes: 2 additions & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ var retiredSettings = map[string]struct{}{
// removed as of 23.2.
"sql.log.unstructured_entries.enabled": {},
"sql.auth.createrole_allows_grant_role_membership.enabled": {},
"changefeed.replan_flow_frequency": {},
"changefeed.replan_flow_threshold": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down