Skip to content

Commit

Permalink
Merge pull request #83474 from ajwerner/backport22.1-83280
Browse files Browse the repository at this point in the history
  • Loading branch information
ajwerner authored Jun 28, 2022
2 parents cc498d1 + bf3a796 commit 0a6b5fa
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 10 deletions.
25 changes: 16 additions & 9 deletions pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
}()
p := execCtx.(sql.JobExecContext)
// TODO(pbardea): Wait for no versions.
execCfg := p.ExecCfg()

if err := p.ExecCfg().JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil {
// Clone the ExecConfig so that fields can be overwritten for testing knobs.
execCfg := *p.ExecCfg()
if n := execCfg.GCJobTestingKnobs.Notifier; n != nil {
execCfg.GCJobNotifier = n
}
// Use the same SystemConfigProvider as the notifier.
execCfg.SystemConfig = execCfg.GCJobNotifier.SystemConfigProvider()

if err := execCfg.JobRegistry.CheckPausepoint("gcjob.before_resume"); err != nil {
return err
}

Expand All @@ -191,12 +198,12 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
return err
}
}
details, progress, err := initDetailsAndProgress(ctx, execCfg, r.job.ID())
details, progress, err := initDetailsAndProgress(ctx, &execCfg, r.job.ID())
if err != nil {
return err
}

if err := maybeUnsplitRanges(ctx, execCfg, r.job.ID(), details, progress); err != nil {
if err := maybeUnsplitRanges(ctx, &execCfg, r.job.ID(), details, progress); err != nil {
return err
}

Expand All @@ -221,10 +228,10 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
if details.Tenant == nil {
remainingTables := getAllTablesWaitingForGC(details, progress)
expired, earliestDeadline = refreshTables(
ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress,
ctx, &execCfg, remainingTables, tableDropTimes, indexDropTimes, r.job.ID(), progress,
)
} else {
expired, earliestDeadline, err = refreshTenant(ctx, execCfg, details.Tenant.DropTime, details, progress)
expired, earliestDeadline, err = refreshTenant(ctx, &execCfg, details.Tenant.DropTime, details, progress)
if err != nil {
return err
}
Expand All @@ -233,16 +240,16 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})

if expired {
// Some elements have been marked as DELETING so save the progress.
persistProgress(ctx, execCfg, r.job.ID(), progress, runningStatusGC(progress))
persistProgress(ctx, &execCfg, r.job.ID(), progress, runningStatusGC(progress))
if fn := execCfg.GCJobTestingKnobs.RunBeforePerformGC; fn != nil {
if err := fn(r.job.ID()); err != nil {
return err
}
}
if err := performGC(ctx, execCfg, details, progress); err != nil {
if err := performGC(ctx, &execCfg, details, progress); err != nil {
return err
}
persistProgress(ctx, execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC)
persistProgress(ctx, &execCfg, r.job.ID(), progress, sql.RunningStatusWaitingGC)

// Trigger immediate re-run in case of more expired elements.
timerDuration = 0
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/gcjob/gcjobnotifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func New(
return n
}

// SystemConfigProvider provides access to the notifier's underlying
// SystemConfigProvider.
func (n *Notifier) SystemConfigProvider() config.SystemConfigProvider {
return n.provider
}

func noopFunc() {}

// AddNotifyee should be called prior to the first reading of the system config.
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/gcjob/refresh_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,15 @@ func updateStatusForGCElements(
) (expired, missing bool, timeToNextTrigger time.Time) {
defTTL := execCfg.DefaultZoneConfig.GC.TTLSeconds
cfg := execCfg.SystemConfig.GetSystemConfig()
// If the system config is nil, it means we have not seen an initial system
// config. Because we register for notifications when the system config
// changes before we get here, we'll get notified to update statuses as soon
// a new configuration is available. If we were to proceed, we'd hit a nil
// pointer panic.
if cfg == nil {
return false, false, maxDeadline
}
protectedtsCache := execCfg.ProtectedTimestampProvider

earliestDeadline := timeutil.Unix(0, int64(math.MaxInt64))

if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/gcjob_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ go_test(
],
deps = [
"//pkg/base",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand All @@ -18,13 +20,15 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/gcjob",
"//pkg/sql/gcjob/gcjobnotifier",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
Expand All @@ -34,6 +38,8 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
142 changes: 142 additions & 0 deletions pkg/sql/gcjob_test/gc_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -41,6 +45,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -582,3 +588,139 @@ func TestGCTenant(t *testing.T) {
require.True(t, nil == r.Value)
})
}

// TestGCJobNoSystemConfig tests that the GC job is robust to running with
// no system config provided by the SystemConfigProvider. It is a regression
// test for a panic which could occur due to a slow systemconfigwatcher
// initialization.
func TestGCJobNoSystemConfig(t *testing.T) {
defer leaktest.AfterTest(t)()

provider := fakeSystemConfigProvider{}
settings := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
ctx := context.Background()

gcKnobs := &sql.GCJobTestingKnobs{}
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Settings: settings,
Stopper: stopper,
Knobs: base.TestingKnobs{
GCJob: gcKnobs,
},
})
defer stopper.Stop(ctx)
codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec
n := gcjobnotifier.New(settings, &provider, codec, stopper)
n.Start(ctx)
gcKnobs.Notifier = n

tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)")
tdb.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'")
var id uint32
tdb.QueryRow(t, "SELECT 'foo'::regclass::int").Scan(&id)
tdb.Exec(t, "DROP TABLE foo")
// We want to make sure there's a notifyee and that the job attempted
// to read the status twice. We expect it once for the notifier and
// once for the job itself.
testutils.SucceedsSoon(t, func() error {
if n := provider.numNotifyees(); n == 0 {
return errors.Errorf("expected 1 notifyee, got %d", n)
}
if n := provider.numCalls(); n < 2 {
return errors.Errorf("expected at least 2 calls, got %d", n)
}
return nil
})
cfgProto := &zonepb.ZoneConfig{
GC: &zonepb.GCPolicy{TTLSeconds: 0},
}
cfg := config.NewSystemConfig(cfgProto)
descKV, err := kvDB.Get(ctx, codec.DescMetadataKey(id))
require.NoError(t, err)
var zoneKV roachpb.KeyValue
zoneKV.Key = config.MakeZoneKey(codec, descpb.ID(id))
require.NoError(t, zoneKV.Value.SetProto(cfgProto))
defaultKV := zoneKV
defaultKV.Key = config.MakeZoneKey(codec, 0)
// We need to put in an entry for the descriptor both so that the notifier
// fires and so that we don't think the descriptor is missing. We also
// need a zone config KV to make the delta filter happy.
cfg.Values = []roachpb.KeyValue{
{Key: descKV.Key, Value: *descKV.Value},
defaultKV,
zoneKV,
}

provider.setConfig(cfg)
tdb.CheckQueryResultsRetry(t, `
SELECT status
FROM crdb_internal.jobs
WHERE description = 'GC for DROP TABLE defaultdb.public.foo'`,
[][]string{{"succeeded"}})
}

type fakeSystemConfigProvider struct {
mu struct {
syncutil.Mutex

calls int
n int
config *config.SystemConfig
notifyees map[int]chan struct{}
}
}

func (f *fakeSystemConfigProvider) GetSystemConfig() *config.SystemConfig {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.calls++
return f.mu.config
}

func (f *fakeSystemConfigProvider) RegisterSystemConfigChannel() (
_ <-chan struct{},
unregister func(),
) {
f.mu.Lock()
defer f.mu.Unlock()
ch := make(chan struct{}, 1)
n := f.mu.n
f.mu.n++
if f.mu.notifyees == nil {
f.mu.notifyees = map[int]chan struct{}{}
}
f.mu.notifyees[n] = ch
return ch, func() {
f.mu.Lock()
defer f.mu.Unlock()
delete(f.mu.notifyees, n)
}
}

func (f *fakeSystemConfigProvider) setConfig(c *config.SystemConfig) {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.config = c
for _, ch := range f.mu.notifyees {
select {
case ch <- struct{}{}:
default:
}
}
}

func (f *fakeSystemConfigProvider) numNotifyees() int {
f.mu.Lock()
defer f.mu.Unlock()
return len(f.mu.notifyees)
}

func (f *fakeSystemConfigProvider) numCalls() int {
f.mu.Lock()
defer f.mu.Unlock()
return f.mu.calls
}

var _ config.SystemConfigProvider = (*fakeSystemConfigProvider)(nil)
4 changes: 4 additions & 0 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/faketreeeval"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -2379,6 +2380,9 @@ type GCJobTestingKnobs struct {
// protected timestamp status of a table or an index. The protection status is
// passed in along with the jobID.
RunAfterIsProtectedCheck func(jobID jobspb.JobID, isProtected bool)

// Notifier is used to optionally inject a new gcjobnotifier.Notifier.
Notifier *gcjobnotifier.Notifier
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 0a6b5fa

Please sign in to comment.