From c3522bea45166ceacfeba5fb681d60a576225c7e Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 23 Jun 2022 11:38:49 -0400 Subject: [PATCH] sql/gcjob: deal with a missing system config span This can cause panics if not handled. It must be quite rare that the jobs subsystem gets going before we have a system config, but clearly it's possible. See https://teamcityartifacts.crdb.io/presignedTokenAuth/158ce0ab-b5b6-4cc1-9f1b-672d20d0a56e/repository/download/Cockroach_Ci_Tests_LocalRoachtest/5545173:id/acceptance/version-upgrade/run_1/artifacts.zip!/logs/2.unredacted/cockroach-stderr.05b0f904debc.roach.2022-06-23T14_53_26Z.103197.log Release note (bug fix): Fixed a bug where a panic could occur during server startup when restarting a node which is running a GC job. --- pkg/sql/gcjob/gc_job.go | 25 ++-- pkg/sql/gcjob/gcjobnotifier/notifier.go | 6 + pkg/sql/gcjob/refresh_statuses.go | 9 +- pkg/sql/gcjob_test/BUILD.bazel | 6 + pkg/sql/gcjob_test/gc_job_test.go | 145 ++++++++++++++++++++++++ pkg/sql/schema_changer.go | 4 + 6 files changed, 185 insertions(+), 10 deletions(-) diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 85cdd7f7dea3..7098efd62726 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -180,9 +180,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) }() p := execCtx.(sql.JobExecContext) // TODO(pbardea): Wait for no versions. - execCfg := p.ExecCfg() - if err := p.ExecCfg().JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil { + // Clone the ExecConfig so that fields can be overwritten for testing knobs. + execCfg := *p.ExecCfg() + if n := execCfg.GCJobTestingKnobs.Notifier; n != nil { + execCfg.GCJobNotifier = n + } + // Use the same SystemConfigProvider as the notifier. + execCfg.SystemConfig = execCfg.GCJobNotifier.SystemConfigProvider() + + if err := execCfg.JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil { return err } @@ -191,12 +198,12 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) return err } } - details, progress, err := initDetailsAndProgress(ctx, execCfg, r.job.ID()) + details, progress, err := initDetailsAndProgress(ctx, &execCfg, r.job.ID()) if err != nil { return err } - if err := maybeUnsplitRanges(ctx, execCfg, r.job.ID(), details, progress); err != nil { + if err := maybeUnsplitRanges(ctx, &execCfg, r.job.ID(), details, progress); err != nil { return err } @@ -221,10 +228,10 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) if details.Tenant == nil { remainingTables := getAllTablesWaitingForGC(details, progress) expired, earliestDeadline = refreshTables( - ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress, + ctx, &execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress, ) } else { - expired, earliestDeadline, err = refreshTenant(ctx, execCfg, details.Tenant.DropTime, details, progress) + expired, earliestDeadline, err = refreshTenant(ctx, &execCfg, details.Tenant.DropTime, details, progress) if err != nil { return err } @@ -233,16 +240,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) if expired { // Some elements have been marked as DELETING so save the progress. - persistProgress(ctx, execCfg, r.job.ID(), progress, runningStatusGC(progress)) + persistProgress(ctx, &execCfg, r.job.ID(), progress, runningStatusGC(progress)) if fn := execCfg.GCJobTestingKnobs.RunBeforePerformGC; fn != nil { if err := fn(r.job.ID()); err != nil { return err } } - if err := performGC(ctx, execCfg, details, progress); err != nil { + if err := performGC(ctx, &execCfg, details, progress); err != nil { return err } - persistProgress(ctx, execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC) + persistProgress(ctx, &execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC) // Trigger immediate re-run in case of more expired elements. timerDuration = 0 diff --git a/pkg/sql/gcjob/gcjobnotifier/notifier.go b/pkg/sql/gcjob/gcjobnotifier/notifier.go index 2fdda49708b1..17d5da8f8a8d 100644 --- a/pkg/sql/gcjob/gcjobnotifier/notifier.go +++ b/pkg/sql/gcjob/gcjobnotifier/notifier.go @@ -60,6 +60,12 @@ func New( return n } +// SystemConfigProvider provides access to the notifier's underlying +// SystemConfigProvider. +func (n *Notifier) SystemConfigProvider() config.SystemConfigProvider { + return n.provider +} + func noopFunc() {} // AddNotifyee should be called prior to the first reading of the system config. diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index c42630571c57..fdd028663070 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -92,8 +92,15 @@ func updateStatusForGCElements( ) (expired, missing bool, timeToNextTrigger time.Time) { defTTL := execCfg.DefaultZoneConfig.GC.TTLSeconds cfg := execCfg.SystemConfig.GetSystemConfig() + // If the system config is nil, it means we have not seen an initial system + // config. Because we register for notifications when the system config + // changes before we get here, we'll get notified to update statuses as soon + // a new configuration is available. If we were to proceed, we'd hit a nil + // pointer panic. + if cfg == nil { + return false, false, maxDeadline + } protectedtsCache := execCfg.ProtectedTimestampProvider - earliestDeadline := timeutil.Unix(0, int64(math.MaxInt64)) if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error { diff --git a/pkg/sql/gcjob_test/BUILD.bazel b/pkg/sql/gcjob_test/BUILD.bazel index c211f45c3901..1cddf13631b2 100644 --- a/pkg/sql/gcjob_test/BUILD.bazel +++ b/pkg/sql/gcjob_test/BUILD.bazel @@ -9,6 +9,8 @@ go_test( ], deps = [ "//pkg/base", + "//pkg/config", + "//pkg/config/zonepb", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", @@ -19,6 +21,7 @@ go_test( "//pkg/security/securitytest", "//pkg/security/username", "//pkg/server", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catalogkeys", @@ -26,6 +29,7 @@ go_test( "//pkg/sql/catalog/descs", "//pkg/sql/catalog/tabledesc", "//pkg/sql/gcjob", + "//pkg/sql/gcjob/gcjobnotifier", "//pkg/testutils", "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", @@ -35,6 +39,8 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", + "//pkg/util/stop", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/sql/gcjob_test/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go index d5c2bc54a3ea..7819dcff095c 100644 --- a/pkg/sql/gcjob_test/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -19,6 +19,8 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" @@ -33,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/gcjob" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -41,6 +45,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -564,3 +570,142 @@ func TestGCTenant(t *testing.T) { require.True(t, nil == r.Value) }) } + +// TestGCJobNoSystemConfig tests that the GC job is robust to running with +// no system config provided by the SystemConfigProvider. It is a regression +// test for a panic which could occur due to a slow systemconfigwatcher +// initialization. +func TestGCJobNoSystemConfig(t *testing.T) { + defer leaktest.AfterTest(t)() + + provider := fakeSystemConfigProvider{} + settings := cluster.MakeTestingClusterSettings() + stopper := stop.NewStopper() + ctx := context.Background() + + gcKnobs := &sql.GCJobTestingKnobs{} + s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{ + Settings: settings, + Stopper: stopper, + Knobs: base.TestingKnobs{ + GCJob: gcKnobs, + }, + }) + defer stopper.Stop(ctx) + codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec + n := gcjobnotifier.New(settings, &provider, codec, stopper) + n.Start(ctx) + gcKnobs.Notifier = n + + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + tdb.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'") + var id uint32 + tdb.QueryRow(t, "SELECT 'foo'::regclass::int").Scan(&id) + tdb.Exec(t, "DROP TABLE foo") + // We want to make sure there's a notifyee and then we want to + // make sure the job makes it to idle. + testutils.SucceedsSoon(t, func() error { + if n := provider.numNotifyees(); n == 0 { + return errors.Errorf("expected 0, got %d", n) + } + if n := provider.numCalls(); n < 2 { + return errors.Errorf("expected 2, got %d", n) + } + jr := s.JobRegistry().(*jobs.Registry) + if v := jr.MetricsStruct().RunningNonIdleJobs.Value(); v > 0 { + return errors.Errorf("expected 0, got %d", v) + } + return nil + }) + cfgProto := &zonepb.ZoneConfig{ + GC: &zonepb.GCPolicy{TTLSeconds: 0}, + } + cfg := config.NewSystemConfig(cfgProto) + descKV, err := kvDB.Get(ctx, codec.DescMetadataKey(id)) + require.NoError(t, err) + var zoneKV roachpb.KeyValue + zoneKV.Key = config.MakeZoneKey(codec, descpb.ID(id)) + require.NoError(t, zoneKV.Value.SetProto(cfgProto)) + defaultKV := zoneKV + defaultKV.Key = config.MakeZoneKey(codec, 0) + // We need to put in an entry for the descriptor both so that the notifier + // fires and so that we don't think the descriptor is missing. We also + // need a zone config KV to make the delta filter happy. + cfg.Values = []roachpb.KeyValue{ + {Key: descKV.Key, Value: *descKV.Value}, + defaultKV, + zoneKV, + } + + provider.setConfig(cfg) + tdb.CheckQueryResultsRetry(t, ` +SELECT status + FROM crdb_internal.jobs + WHERE description = 'GC for DROP TABLE defaultdb.public.foo'`, + [][]string{{"succeeded"}}) +} + +type fakeSystemConfigProvider struct { + mu struct { + syncutil.Mutex + + calls int + n int + config *config.SystemConfig + notifyees map[int]chan struct{} + } +} + +func (f *fakeSystemConfigProvider) GetSystemConfig() *config.SystemConfig { + f.mu.Lock() + defer f.mu.Unlock() + f.mu.calls++ + return f.mu.config +} + +func (f *fakeSystemConfigProvider) RegisterSystemConfigChannel() ( + _ <-chan struct{}, + unregister func(), +) { + f.mu.Lock() + defer f.mu.Unlock() + ch := make(chan struct{}, 1) + n := f.mu.n + f.mu.n++ + if f.mu.notifyees == nil { + f.mu.notifyees = map[int]chan struct{}{} + } + f.mu.notifyees[n] = ch + return ch, func() { + f.mu.Lock() + defer f.mu.Unlock() + delete(f.mu.notifyees, n) + } +} + +func (f *fakeSystemConfigProvider) setConfig(c *config.SystemConfig) { + f.mu.Lock() + defer f.mu.Unlock() + f.mu.config = c + for _, ch := range f.mu.notifyees { + select { + case ch <- struct{}{}: + default: + } + } +} + +func (f *fakeSystemConfigProvider) numNotifyees() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.mu.notifyees) +} + +func (f *fakeSystemConfigProvider) numCalls() int { + f.mu.Lock() + defer f.mu.Unlock() + return f.mu.calls +} + +var _ config.SystemConfigProvider = (*fakeSystemConfigProvider)(nil) diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 05a5571b89e9..6acc6dac4373 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/faketreeeval" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -2381,6 +2382,9 @@ type GCJobTestingKnobs struct { // protected timestamp status of a table or an index. The protection status is // passed in along with the jobID. RunAfterIsProtectedCheck func(jobID jobspb.JobID, isProtected bool) + + // Notifier is used to optionally inject a new gcjobnotifier.Notifier. + Notifier *gcjobnotifier.Notifier } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.