diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index a81ad42d15c3..80c590304813 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -180,9 +180,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) }() p := execCtx.(sql.JobExecContext) // TODO(pbardea): Wait for no versions. - execCfg := p.ExecCfg() - if err := p.ExecCfg().JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil { + // Clone the ExecConfig so that fields can be overwritten for testing knobs. + execCfg := *p.ExecCfg() + if n := execCfg.GCJobTestingKnobs.Notifier; n != nil { + execCfg.GCJobNotifier = n + } + // Use the same SystemConfigProvider as the notifier. + execCfg.SystemConfig = execCfg.GCJobNotifier.SystemConfigProvider() + + if err := execCfg.JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil { return err } @@ -191,12 +198,12 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) return err } } - details, progress, err := initDetailsAndProgress(ctx, execCfg, r.job.ID()) + details, progress, err := initDetailsAndProgress(ctx, &execCfg, r.job.ID()) if err != nil { return err } - if err := maybeUnsplitRanges(ctx, execCfg, r.job.ID(), details, progress); err != nil { + if err := maybeUnsplitRanges(ctx, &execCfg, r.job.ID(), details, progress); err != nil { return err } @@ -221,10 +228,10 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) if details.Tenant == nil { remainingTables := getAllTablesWaitingForGC(details, progress) expired, earliestDeadline = refreshTables( - ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress, + ctx, &execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress, ) } else { - expired, earliestDeadline, err = refreshTenant(ctx, execCfg, details.Tenant.DropTime, details, progress) + expired, earliestDeadline, err = refreshTenant(ctx, &execCfg, details.Tenant.DropTime, details, progress) if err != nil { return err } @@ -233,16 +240,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{}) if expired { // Some elements have been marked as DELETING so save the progress. - persistProgress(ctx, execCfg, r.job.ID(), progress, runningStatusGC(progress)) + persistProgress(ctx, &execCfg, r.job.ID(), progress, runningStatusGC(progress)) if fn := execCfg.GCJobTestingKnobs.RunBeforePerformGC; fn != nil { if err := fn(r.job.ID()); err != nil { return err } } - if err := performGC(ctx, execCfg, details, progress); err != nil { + if err := performGC(ctx, &execCfg, details, progress); err != nil { return err } - persistProgress(ctx, execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC) + persistProgress(ctx, &execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC) // Trigger immediate re-run in case of more expired elements. timerDuration = 0 diff --git a/pkg/sql/gcjob/gcjobnotifier/notifier.go b/pkg/sql/gcjob/gcjobnotifier/notifier.go index 2fdda49708b1..17d5da8f8a8d 100644 --- a/pkg/sql/gcjob/gcjobnotifier/notifier.go +++ b/pkg/sql/gcjob/gcjobnotifier/notifier.go @@ -60,6 +60,12 @@ func New( return n } +// SystemConfigProvider provides access to the notifier's underlying +// SystemConfigProvider. +func (n *Notifier) SystemConfigProvider() config.SystemConfigProvider { + return n.provider +} + func noopFunc() {} // AddNotifyee should be called prior to the first reading of the system config. diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index c42630571c57..fdd028663070 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -92,8 +92,15 @@ func updateStatusForGCElements( ) (expired, missing bool, timeToNextTrigger time.Time) { defTTL := execCfg.DefaultZoneConfig.GC.TTLSeconds cfg := execCfg.SystemConfig.GetSystemConfig() + // If the system config is nil, it means we have not seen an initial system + // config. Because we register for notifications when the system config + // changes before we get here, we'll get notified to update statuses as soon + // a new configuration is available. If we were to proceed, we'd hit a nil + // pointer panic. + if cfg == nil { + return false, false, maxDeadline + } protectedtsCache := execCfg.ProtectedTimestampProvider - earliestDeadline := timeutil.Unix(0, int64(math.MaxInt64)) if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error { diff --git a/pkg/sql/gcjob_test/BUILD.bazel b/pkg/sql/gcjob_test/BUILD.bazel index e702ec76b4f9..94330318f6eb 100644 --- a/pkg/sql/gcjob_test/BUILD.bazel +++ b/pkg/sql/gcjob_test/BUILD.bazel @@ -9,6 +9,8 @@ go_test( ], deps = [ "//pkg/base", + "//pkg/config", + "//pkg/config/zonepb", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", @@ -18,6 +20,7 @@ go_test( "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catalogkeys", @@ -25,6 +28,7 @@ go_test( "//pkg/sql/catalog/descs", "//pkg/sql/catalog/tabledesc", "//pkg/sql/gcjob", + "//pkg/sql/gcjob/gcjobnotifier", "//pkg/testutils", "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", @@ -34,6 +38,8 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", + "//pkg/util/stop", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/sql/gcjob_test/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go index 4d66690983c5..2eee542fdaa7 100644 --- a/pkg/sql/gcjob_test/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -19,6 +19,8 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" @@ -33,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/gcjob" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -41,6 +45,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -582,3 +588,139 @@ func TestGCTenant(t *testing.T) { require.True(t, nil == r.Value) }) } + +// TestGCJobNoSystemConfig tests that the GC job is robust to running with +// no system config provided by the SystemConfigProvider. It is a regression +// test for a panic which could occur due to a slow systemconfigwatcher +// initialization. +func TestGCJobNoSystemConfig(t *testing.T) { + defer leaktest.AfterTest(t)() + + provider := fakeSystemConfigProvider{} + settings := cluster.MakeTestingClusterSettings() + stopper := stop.NewStopper() + ctx := context.Background() + + gcKnobs := &sql.GCJobTestingKnobs{} + s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{ + Settings: settings, + Stopper: stopper, + Knobs: base.TestingKnobs{ + GCJob: gcKnobs, + }, + }) + defer stopper.Stop(ctx) + codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec + n := gcjobnotifier.New(settings, &provider, codec, stopper) + n.Start(ctx) + gcKnobs.Notifier = n + + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + tdb.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'") + var id uint32 + tdb.QueryRow(t, "SELECT 'foo'::regclass::int").Scan(&id) + tdb.Exec(t, "DROP TABLE foo") + // We want to make sure there's a notifyee and that the job attempted + // to read the status twice. We expect it once for the notifier and + // once for the job itself. + testutils.SucceedsSoon(t, func() error { + if n := provider.numNotifyees(); n == 0 { + return errors.Errorf("expected 1 notifyee, got %d", n) + } + if n := provider.numCalls(); n < 2 { + return errors.Errorf("expected at least 2 calls, got %d", n) + } + return nil + }) + cfgProto := &zonepb.ZoneConfig{ + GC: &zonepb.GCPolicy{TTLSeconds: 0}, + } + cfg := config.NewSystemConfig(cfgProto) + descKV, err := kvDB.Get(ctx, codec.DescMetadataKey(id)) + require.NoError(t, err) + var zoneKV roachpb.KeyValue + zoneKV.Key = config.MakeZoneKey(codec, descpb.ID(id)) + require.NoError(t, zoneKV.Value.SetProto(cfgProto)) + defaultKV := zoneKV + defaultKV.Key = config.MakeZoneKey(codec, 0) + // We need to put in an entry for the descriptor both so that the notifier + // fires and so that we don't think the descriptor is missing. We also + // need a zone config KV to make the delta filter happy. + cfg.Values = []roachpb.KeyValue{ + {Key: descKV.Key, Value: *descKV.Value}, + defaultKV, + zoneKV, + } + + provider.setConfig(cfg) + tdb.CheckQueryResultsRetry(t, ` +SELECT status + FROM crdb_internal.jobs + WHERE description = 'GC for DROP TABLE defaultdb.public.foo'`, + [][]string{{"succeeded"}}) +} + +type fakeSystemConfigProvider struct { + mu struct { + syncutil.Mutex + + calls int + n int + config *config.SystemConfig + notifyees map[int]chan struct{} + } +} + +func (f *fakeSystemConfigProvider) GetSystemConfig() *config.SystemConfig { + f.mu.Lock() + defer f.mu.Unlock() + f.mu.calls++ + return f.mu.config +} + +func (f *fakeSystemConfigProvider) RegisterSystemConfigChannel() ( + _ <-chan struct{}, + unregister func(), +) { + f.mu.Lock() + defer f.mu.Unlock() + ch := make(chan struct{}, 1) + n := f.mu.n + f.mu.n++ + if f.mu.notifyees == nil { + f.mu.notifyees = map[int]chan struct{}{} + } + f.mu.notifyees[n] = ch + return ch, func() { + f.mu.Lock() + defer f.mu.Unlock() + delete(f.mu.notifyees, n) + } +} + +func (f *fakeSystemConfigProvider) setConfig(c *config.SystemConfig) { + f.mu.Lock() + defer f.mu.Unlock() + f.mu.config = c + for _, ch := range f.mu.notifyees { + select { + case ch <- struct{}{}: + default: + } + } +} + +func (f *fakeSystemConfigProvider) numNotifyees() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.mu.notifyees) +} + +func (f *fakeSystemConfigProvider) numCalls() int { + f.mu.Lock() + defer f.mu.Unlock() + return f.mu.calls +} + +var _ config.SystemConfigProvider = (*fakeSystemConfigProvider)(nil) diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index b78c4403162a..ae081e7535c3 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/faketreeeval" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -2379,6 +2380,9 @@ type GCJobTestingKnobs struct { // protected timestamp status of a table or an index. The protection status is // passed in along with the jobID. RunAfterIsProtectedCheck func(jobID jobspb.JobID, isProtected bool) + + // Notifier is used to optionally inject a new gcjobnotifier.Notifier. + Notifier *gcjobnotifier.Notifier } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.