Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/gcjob: deal with a missing system config span #83280

Merged
merged 1 commit into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
}()
p := execCtx.(sql.JobExecContext)
// TODO(pbardea): Wait for no versions.
execCfg := p.ExecCfg()

if err := p.ExecCfg().JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil {
// Clone the ExecConfig so that fields can be overwritten for testing knobs.
execCfg := *p.ExecCfg()
if n := execCfg.GCJobTestingKnobs.Notifier; n != nil {
execCfg.GCJobNotifier = n
}
// Use the same SystemConfigProvider as the notifier.
execCfg.SystemConfig = execCfg.GCJobNotifier.SystemConfigProvider()

if err := execCfg.JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil {
return err
}

Expand All @@ -191,12 +198,12 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
return err
}
}
details, progress, err := initDetailsAndProgress(ctx, execCfg, r.job.ID())
details, progress, err := initDetailsAndProgress(ctx, &execCfg, r.job.ID())
if err != nil {
return err
}

if err := maybeUnsplitRanges(ctx, execCfg, r.job.ID(), details, progress); err != nil {
if err := maybeUnsplitRanges(ctx, &execCfg, r.job.ID(), details, progress); err != nil {
return err
}

Expand All @@ -221,10 +228,10 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
if details.Tenant == nil {
remainingTables := getAllTablesWaitingForGC(details, progress)
expired, earliestDeadline = refreshTables(
ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress,
ctx, &execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress,
)
} else {
expired, earliestDeadline, err = refreshTenant(ctx, execCfg, details.Tenant.DropTime, details, progress)
expired, earliestDeadline, err = refreshTenant(ctx, &execCfg, details.Tenant.DropTime, details, progress)
if err != nil {
return err
}
Expand All @@ -233,16 +240,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})

if expired {
// Some elements have been marked as DELETING so save the progress.
persistProgress(ctx, execCfg, r.job.ID(), progress, runningStatusGC(progress))
persistProgress(ctx, &execCfg, r.job.ID(), progress, runningStatusGC(progress))
if fn := execCfg.GCJobTestingKnobs.RunBeforePerformGC; fn != nil {
if err := fn(r.job.ID()); err != nil {
return err
}
}
if err := performGC(ctx, execCfg, details, progress); err != nil {
if err := performGC(ctx, &execCfg, details, progress); err != nil {
return err
}
persistProgress(ctx, execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC)
persistProgress(ctx, &execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC)

// Trigger immediate re-run in case of more expired elements.
timerDuration = 0
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/gcjob/gcjobnotifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func New(
return n
}

// SystemConfigProvider provides access to the notifier's underlying
// SystemConfigProvider.
func (n *Notifier) SystemConfigProvider() config.SystemConfigProvider {
return n.provider
}

func noopFunc() {}

// AddNotifyee should be called prior to the first reading of the system config.
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/gcjob/refresh_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,15 @@ func updateStatusForGCElements(
) (expired, missing bool, timeToNextTrigger time.Time) {
defTTL := execCfg.DefaultZoneConfig.GC.TTLSeconds
cfg := execCfg.SystemConfig.GetSystemConfig()
// If the system config is nil, it means we have not seen an initial system
// config. Because we register for notifications when the system config
// changes before we get here, we'll get notified to update statuses as soon
// a new configuration is available. If we were to proceed, we'd hit a nil
// pointer panic.
if cfg == nil {
return false, false, maxDeadline
}
protectedtsCache := execCfg.ProtectedTimestampProvider

earliestDeadline := timeutil.Unix(0, int64(math.MaxInt64))

if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/gcjob_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ go_test(
],
deps = [
"//pkg/base",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand All @@ -19,13 +21,15 @@ go_test(
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/gcjob",
"//pkg/sql/gcjob/gcjobnotifier",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
Expand All @@ -35,6 +39,8 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
142 changes: 142 additions & 0 deletions pkg/sql/gcjob_test/gc_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -41,6 +45,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -564,3 +570,139 @@ func TestGCTenant(t *testing.T) {
require.True(t, nil == r.Value)
})
}

// TestGCJobNoSystemConfig tests that the GC job is robust to running with
// no system config provided by the SystemConfigProvider. It is a regression
// test for a panic which could occur due to a slow systemconfigwatcher
// initialization.
func TestGCJobNoSystemConfig(t *testing.T) {
defer leaktest.AfterTest(t)()

provider := fakeSystemConfigProvider{}
settings := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
ctx := context.Background()

gcKnobs := &sql.GCJobTestingKnobs{}
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
Stopper: stopper,
Knobs: base.TestingKnobs{
GCJob: gcKnobs,
},
})
defer stopper.Stop(ctx)
codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec
n := gcjobnotifier.New(settings, &provider, codec, stopper)
n.Start(ctx)
gcKnobs.Notifier = n

tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)")
tdb.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'")
var id uint32
tdb.QueryRow(t, "SELECT 'foo'::regclass::int").Scan(&id)
tdb.Exec(t, "DROP TABLE foo")
// We want to make sure there's a notifyee and that the job attempted
// to read the status twice. We expect it once for the notifier and
// once for the job itself.
testutils.SucceedsSoon(t, func() error {
if n := provider.numNotifyees(); n == 0 {
return errors.Errorf("expected 1 notifyee, got %d", n)
}
if n := provider.numCalls(); n < 2 {
return errors.Errorf("expected at least 2 calls, got %d", n)
}
return nil
})
cfgProto := &zonepb.ZoneConfig{
GC: &zonepb.GCPolicy{TTLSeconds: 0},
}
cfg := config.NewSystemConfig(cfgProto)
descKV, err := kvDB.Get(ctx, codec.DescMetadataKey(id))
require.NoError(t, err)
var zoneKV roachpb.KeyValue
zoneKV.Key = config.MakeZoneKey(codec, descpb.ID(id))
require.NoError(t, zoneKV.Value.SetProto(cfgProto))
defaultKV := zoneKV
defaultKV.Key = config.MakeZoneKey(codec, 0)
// We need to put in an entry for the descriptor both so that the notifier
// fires and so that we don't think the descriptor is missing. We also
// need a zone config KV to make the delta filter happy.
cfg.Values = []roachpb.KeyValue{
{Key: descKV.Key, Value: *descKV.Value},
defaultKV,
zoneKV,
}

provider.setConfig(cfg)
tdb.CheckQueryResultsRetry(t, `
SELECT status
FROM crdb_internal.jobs
WHERE description = 'GC for DROP TABLE defaultdb.public.foo'`,
[][]string{{"succeeded"}})
}

type fakeSystemConfigProvider struct {
mu struct {
syncutil.Mutex

calls int
n int
config *config.SystemConfig
notifyees map[int]chan struct{}
}
}

func (f *fakeSystemConfigProvider) GetSystemConfig() *config.SystemConfig {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.calls++
return f.mu.config
}

func (f *fakeSystemConfigProvider) RegisterSystemConfigChannel() (
_ <-chan struct{},
unregister func(),
) {
f.mu.Lock()
defer f.mu.Unlock()
ch := make(chan struct{}, 1)
n := f.mu.n
f.mu.n++
if f.mu.notifyees == nil {
f.mu.notifyees = map[int]chan struct{}{}
}
f.mu.notifyees[n] = ch
return ch, func() {
f.mu.Lock()
defer f.mu.Unlock()
delete(f.mu.notifyees, n)
}
}

func (f *fakeSystemConfigProvider) setConfig(c *config.SystemConfig) {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.config = c
for _, ch := range f.mu.notifyees {
select {
case ch <- struct{}{}:
default:
}
}
}

func (f *fakeSystemConfigProvider) numNotifyees() int {
f.mu.Lock()
defer f.mu.Unlock()
return len(f.mu.notifyees)
}

func (f *fakeSystemConfigProvider) numCalls() int {
f.mu.Lock()
defer f.mu.Unlock()
return f.mu.calls
}

var _ config.SystemConfigProvider = (*fakeSystemConfigProvider)(nil)
4 changes: 4 additions & 0 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/faketreeeval"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -2381,6 +2382,9 @@ type GCJobTestingKnobs struct {
// protected timestamp status of a table or an index. The protection status is
// passed in along with the jobID.
RunAfterIsProtectedCheck func(jobID jobspb.JobID, isProtected bool)

// Notifier is used to optionally inject a new gcjobnotifier.Notifier.
Notifier *gcjobnotifier.Notifier
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down