diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 7ae8eb8775a4..3f6c7df9f4f5 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -351,30 +351,6 @@ func (s *jobScheduler) executeSchedules(ctx context.Context, maxSchedules int64) return err } -// An internal, safety valve setting to revert scheduler execution to distributed mode. -// This setting should be removed once scheduled job system no longer locks tables for excessive -// periods of time. -var schedulerRunsOnSingleNode = settings.RegisterBoolSetting( - settings.TenantReadOnly, - "jobs.scheduler.single_node_scheduler.enabled", - "execute scheduler on a single node in a cluster", - false, -) - -func (s *jobScheduler) schedulerEnabledOnThisNode(ctx context.Context) bool { - if s.ShouldRunScheduler == nil || !schedulerRunsOnSingleNode.Get(&s.Settings.SV) { - return true - } - - enabled, err := s.ShouldRunScheduler(ctx, s.DB.Clock().NowAsClockTimestamp()) - if err != nil { - log.Errorf(ctx, "error determining if the scheduler enabled: %v; will recheck after %s", - err, recheckEnabledAfterPeriod) - return false - } - return enabled -} - type syncCancelFunc struct { syncutil.Mutex context.CancelFunc @@ -438,12 +414,12 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) { whenDisabled := newCancelWhenDisabled(&s.Settings.SV) for timer := time.NewTimer(initialDelay); ; timer.Reset( - getWaitPeriod(ctx, &s.Settings.SV, s.schedulerEnabledOnThisNode, jitter, s.TestingKnobs)) { + getWaitPeriod(ctx, &s.Settings.SV, jitter, s.TestingKnobs)) { select { case <-stopper.ShouldQuiesce(): return case <-timer.C: - if !schedulerEnabledSetting.Get(&s.Settings.SV) || !s.schedulerEnabledOnThisNode(ctx) { + if !schedulerEnabledSetting.Get(&s.Settings.SV) { continue } @@ -508,11 +484,7 @@ type jitterFn func(duration time.Duration) time.Duration // Returns duration to wait before scanning system.scheduled_jobs. func getWaitPeriod( - ctx context.Context, - sv *settings.Values, - enabledOnThisNode func(ctx context.Context) bool, - jitter jitterFn, - knobs base.ModuleTestingKnobs, + ctx context.Context, sv *settings.Values, jitter jitterFn, knobs base.ModuleTestingKnobs, ) time.Duration { if k, ok := knobs.(*TestingKnobs); ok && k.SchedulerDaemonScanDelay != nil { return k.SchedulerDaemonScanDelay() @@ -522,10 +494,6 @@ func getWaitPeriod( return recheckEnabledAfterPeriod } - if enabledOnThisNode != nil && !enabledOnThisNode(ctx) { - return recheckEnabledAfterPeriod - } - pace := schedulerPaceSetting.Get(sv) if pace < minPacePeriod { if warnIfPaceTooLow.ShouldLog() { diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index 7f6460c1f02d..e3dfb2291d59 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -36,12 +36,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/types" - "github.com/robfig/cron/v3" + cron "github.com/robfig/cron/v3" "github.com/stretchr/testify/require" ) @@ -215,23 +214,22 @@ func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) { sv, cleanup := getScopedSettings() defer cleanup() - var schedulerEnabled func(context.Context) bool noJitter := func(d time.Duration) time.Duration { return d } schedulerEnabledSetting.Override(ctx, sv, false) // When disabled, we wait 5 minutes before rechecking. - require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil)) + require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, noJitter, nil)) schedulerEnabledSetting.Override(ctx, sv, true) // When pace is too low, we use something more reasonable. schedulerPaceSetting.Override(ctx, sv, time.Nanosecond) - require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil)) + require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, noJitter, nil)) // Otherwise, we use user specified setting. pace := 42 * time.Second schedulerPaceSetting.Override(ctx, sv, pace) - require.EqualValues(t, pace, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil)) + require.EqualValues(t, pace, getWaitPeriod(ctx, sv, noJitter, nil)) } type recordScheduleExecutor struct { @@ -755,68 +753,6 @@ INSERT INTO defaultdb.foo VALUES(1, 1) require.Equal(t, "", updated.ScheduleStatus()) } -func TestSchedulerCanBeRestrictedToSingleNode(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - const numNodes = 3 - for _, enableSingleNode := range []bool{true, false} { - t.Run(fmt.Sprintf("runs-on-single-node=%t", enableSingleNode), func(t *testing.T) { - schedulers := struct { - syncutil.Mutex - schedulers []*jobScheduler - }{} - knobs := &TestingKnobs{ - CaptureJobScheduler: func(s interface{}) { - schedulers.Lock() - defer schedulers.Unlock() - schedulers.schedulers = append(schedulers.schedulers, s.(*jobScheduler)) - }, - } - - args := base.TestServerArgs{ - Knobs: base.TestingKnobs{JobsTestingKnobs: knobs}, - } - - tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: args}) - defer tc.Stopper().Stop(context.Background()) - - testutils.SucceedsSoon(t, func() error { - schedulers.Lock() - defer schedulers.Unlock() - if len(schedulers.schedulers) == numNodes { - return nil - } - return errors.Newf("want %d schedules, got %d", numNodes, len(schedulers.schedulers)) - }) - - sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - sqlDB.Exec(t, "SET CLUSTER SETTING jobs.scheduler.single_node_scheduler.enabled=$1", enableSingleNode) - - schedulers.Lock() - defer schedulers.Unlock() - expectedEnabled := numNodes - if enableSingleNode { - expectedEnabled = 1 - } - - testutils.SucceedsSoon(t, func() error { - numEnabled := 0 - for _, s := range schedulers.schedulers { - if s.schedulerEnabledOnThisNode(context.Background()) { - numEnabled++ - } - } - if numEnabled == expectedEnabled { - return nil - } - return errors.Newf("expecting %d enabled, found %d", expectedEnabled, numEnabled) - }) - - }) - } -} - type blockUntilCancelledExecutor struct { once sync.Once started, done chan struct{} diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index bbf9c16eb4c4..b22cd07af7ec 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1511,12 +1511,6 @@ func (s *SQLServer) preStart( sessiondatapb.SessionData{}, ) }, - ShouldRunScheduler: func(ctx context.Context, ts hlc.ClockTimestamp) (bool, error) { - if s.execCfg.Codec.ForSystemTenant() { - return s.isMeta1Leaseholder(ctx, ts) - } - return true, nil - }, }, scheduledjobs.ProdJobSchedulerEnv, ) diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 93f12d28c494..23679af657a5 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -158,6 +158,7 @@ var retiredSettings = map[string]struct{}{ "sql.distsql.flow_scheduler_queueing.enabled": {}, "sql.distsql.drain.cancel_after_wait.enabled": {}, "changefeed.active_protected_timestamps.enabled": {}, + "jobs.scheduler.single_node_scheduler.enabled": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults