Skip to content

Commit

Permalink
sql/gcjob: deal with a missing system config span
Browse files Browse the repository at this point in the history
This can cause panics if not handled. It must be quite rare that the jobs
subsystem gets going before we have a system config, but clearly it's
possible.

See https://teamcityartifacts.crdb.io/presignedTokenAuth/158ce0ab-b5b6-4cc1-9f1b-672d20d0a56e/repository/download/Cockroach_Ci_Tests_LocalRoachtest/5545173:id/acceptance/version-upgrade/run_1/artifacts.zip!/logs/2.unredacted/cockroach-stderr.05b0f904debc.roach.2022-06-23T14_53_26Z.103197.log

Release note (bug fix): Fixed a bug where a panic could occur during server
startup when restarting a node which is running a GC job.
  • Loading branch information
ajwerner committed Jun 24, 2022
1 parent 324c837 commit 472d193
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 10 deletions.
25 changes: 16 additions & 9 deletions pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
}()
p := execCtx.(sql.JobExecContext)
// TODO(pbardea): Wait for no versions.
execCfg := p.ExecCfg()

if err := p.ExecCfg().JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil {
// Clone the ExecConfig so that fields can be overwritten for testing knobs.
execCfg := *p.ExecCfg()
if n := execCfg.GCJobTestingKnobs.Notifier; n != nil {
execCfg.GCJobNotifier = n
}
// Use the same SystemConfigProvider as the notifier.
execCfg.SystemConfig = execCfg.GCJobNotifier.SystemConfigProvider()

if err := execCfg.JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil {
return err
}

Expand All @@ -191,12 +198,12 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
return err
}
}
details, progress, err := initDetailsAndProgress(ctx, execCfg, r.job.ID())
details, progress, err := initDetailsAndProgress(ctx, &execCfg, r.job.ID())
if err != nil {
return err
}

if err := maybeUnsplitRanges(ctx, execCfg, r.job.ID(), details, progress); err != nil {
if err := maybeUnsplitRanges(ctx, &execCfg, r.job.ID(), details, progress); err != nil {
return err
}

Expand All @@ -221,10 +228,10 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
if details.Tenant == nil {
remainingTables := getAllTablesWaitingForGC(details, progress)
expired, earliestDeadline = refreshTables(
ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress,
ctx, &execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress,
)
} else {
expired, earliestDeadline, err = refreshTenant(ctx, execCfg, details.Tenant.DropTime, details, progress)
expired, earliestDeadline, err = refreshTenant(ctx, &execCfg, details.Tenant.DropTime, details, progress)
if err != nil {
return err
}
Expand All @@ -233,16 +240,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})

if expired {
// Some elements have been marked as DELETING so save the progress.
persistProgress(ctx, execCfg, r.job.ID(), progress, runningStatusGC(progress))
persistProgress(ctx, &execCfg, r.job.ID(), progress, runningStatusGC(progress))
if fn := execCfg.GCJobTestingKnobs.RunBeforePerformGC; fn != nil {
if err := fn(r.job.ID()); err != nil {
return err
}
}
if err := performGC(ctx, execCfg, details, progress); err != nil {
if err := performGC(ctx, &execCfg, details, progress); err != nil {
return err
}
persistProgress(ctx, execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC)
persistProgress(ctx, &execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC)

// Trigger immediate re-run in case of more expired elements.
timerDuration = 0
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/gcjob/gcjobnotifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func New(
return n
}

// SystemConfigProvider provides access to the notifier's underlying
// SystemConfigProvider.
func (n *Notifier) SystemConfigProvider() config.SystemConfigProvider {
return n.provider
}

func noopFunc() {}

// AddNotifyee should be called prior to the first reading of the system config.
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/gcjob/refresh_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,15 @@ func updateStatusForGCElements(
) (expired, missing bool, timeToNextTrigger time.Time) {
defTTL := execCfg.DefaultZoneConfig.GC.TTLSeconds
cfg := execCfg.SystemConfig.GetSystemConfig()
// If the system config is nil, it means we have not seen an initial system
// config. Because we register for notifications when the system config
// changes before we get here, we'll get notified to update statuses as soon
// a new configuration is available. If we were to proceed, we'd hit a nil
// pointer panic.
if cfg == nil {
return false, false, maxDeadline
}
protectedtsCache := execCfg.ProtectedTimestampProvider

earliestDeadline := timeutil.Unix(0, int64(math.MaxInt64))

if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/gcjob_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ go_test(
],
deps = [
"//pkg/base",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand All @@ -19,13 +21,15 @@ go_test(
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/gcjob",
"//pkg/sql/gcjob/gcjobnotifier",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
Expand All @@ -35,6 +39,8 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
145 changes: 145 additions & 0 deletions pkg/sql/gcjob_test/gc_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -41,6 +45,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -564,3 +570,142 @@ func TestGCTenant(t *testing.T) {
require.True(t, nil == r.Value)
})
}

// TestGCJobNoSystemConfig tests that the GC job is robust to running with
// no system config provided by the SystemConfigProvider. It is a regression
// test for a panic which could occur due to a slow systemconfigwatcher
// initialization.
func TestGCJobNoSystemConfig(t *testing.T) {
defer leaktest.AfterTest(t)()

provider := fakeSystemConfigProvider{}
settings := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
ctx := context.Background()

gcKnobs := &sql.GCJobTestingKnobs{}
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
Stopper: stopper,
Knobs: base.TestingKnobs{
GCJob: gcKnobs,
},
})
defer stopper.Stop(ctx)
codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec
n := gcjobnotifier.New(settings, &provider, codec, stopper)
n.Start(ctx)
gcKnobs.Notifier = n

tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)")
tdb.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'")
var id uint32
tdb.QueryRow(t, "SELECT 'foo'::regclass::int").Scan(&id)
tdb.Exec(t, "DROP TABLE foo")
// We want to make sure there's a notifyee and then we want to
// make sure the job makes it to idle.
testutils.SucceedsSoon(t, func() error {
if n := provider.numNotifyees(); n == 0 {
return errors.Errorf("expected 1 notifyee, got %d", n)
}
if n := provider.numCalls(); n < 2 {
return errors.Errorf("expected at least 2 calls, got %d", n)
}
jr := s.JobRegistry().(*jobs.Registry)
if v := jr.MetricsStruct().RunningNonIdleJobs.Value(); v > 0 {
return errors.Errorf("expected 0 non-idle jobs, got %d", v)
}
return nil
})
cfgProto := &zonepb.ZoneConfig{
GC: &zonepb.GCPolicy{TTLSeconds: 0},
}
cfg := config.NewSystemConfig(cfgProto)
descKV, err := kvDB.Get(ctx, codec.DescMetadataKey(id))
require.NoError(t, err)
var zoneKV roachpb.KeyValue
zoneKV.Key = config.MakeZoneKey(codec, descpb.ID(id))
require.NoError(t, zoneKV.Value.SetProto(cfgProto))
defaultKV := zoneKV
defaultKV.Key = config.MakeZoneKey(codec, 0)
// We need to put in an entry for the descriptor both so that the notifier
// fires and so that we don't think the descriptor is missing. We also
// need a zone config KV to make the delta filter happy.
cfg.Values = []roachpb.KeyValue{
{Key: descKV.Key, Value: *descKV.Value},
defaultKV,
zoneKV,
}

provider.setConfig(cfg)
tdb.CheckQueryResultsRetry(t, `
SELECT status
FROM crdb_internal.jobs
WHERE description = 'GC for DROP TABLE defaultdb.public.foo'`,
[][]string{{"succeeded"}})
}

type fakeSystemConfigProvider struct {
mu struct {
syncutil.Mutex

calls int
n int
config *config.SystemConfig
notifyees map[int]chan struct{}
}
}

func (f *fakeSystemConfigProvider) GetSystemConfig() *config.SystemConfig {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.calls++
return f.mu.config
}

func (f *fakeSystemConfigProvider) RegisterSystemConfigChannel() (
_ <-chan struct{},
unregister func(),
) {
f.mu.Lock()
defer f.mu.Unlock()
ch := make(chan struct{}, 1)
n := f.mu.n
f.mu.n++
if f.mu.notifyees == nil {
f.mu.notifyees = map[int]chan struct{}{}
}
f.mu.notifyees[n] = ch
return ch, func() {
f.mu.Lock()
defer f.mu.Unlock()
delete(f.mu.notifyees, n)
}
}

func (f *fakeSystemConfigProvider) setConfig(c *config.SystemConfig) {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.config = c
for _, ch := range f.mu.notifyees {
select {
case ch <- struct{}{}:
default:
}
}
}

func (f *fakeSystemConfigProvider) numNotifyees() int {
f.mu.Lock()
defer f.mu.Unlock()
return len(f.mu.notifyees)
}

func (f *fakeSystemConfigProvider) numCalls() int {
f.mu.Lock()
defer f.mu.Unlock()
return f.mu.calls
}

var _ config.SystemConfigProvider = (*fakeSystemConfigProvider)(nil)
4 changes: 4 additions & 0 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/faketreeeval"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -2381,6 +2382,9 @@ type GCJobTestingKnobs struct {
// protected timestamp status of a table or an index. The protection status is
// passed in along with the jobID.
RunAfterIsProtectedCheck func(jobID jobspb.JobID, isProtected bool)

// Notifier is used to optionally inject a new gcjobnotifier.Notifier.
Notifier *gcjobnotifier.Notifier
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 472d193

Please sign in to comment.