diff --git a/pkg/server/server.go b/pkg/server/server.go index 9e66bbf66b07..685523303685 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -64,6 +64,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" _ "github.com/cockroachdb/cockroach/pkg/sql/gcjob" // register jobs declared outside of pkg/sql + "github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier" "github.com/cockroachdb/cockroach/pkg/sql/pgwire" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -616,6 +617,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { distSQLMetrics := execinfra.MakeDistSQLMetrics(cfg.HistogramWindowInterval()) s.registry.AddMetricStruct(distSQLMetrics) + gcJobNotifier := gcjobnotifier.New(cfg.Settings, s.gossip, s.stopper) + // Set up Lease Manager var lmKnobs sql.LeaseManagerTestingKnobs if leaseManagerTestingKnobs := cfg.TestingKnobs.SQLLeaseManager; leaseManagerTestingKnobs != nil { @@ -820,6 +823,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { QueryCache: querycache.New(s.cfg.SQLQueryCacheSize), ProtectedTimestampProvider: s.protectedtsProvider, + + GCJobNotifier: gcJobNotifier, } s.stopper.AddCloser(execCfg.ExecLogger) @@ -1622,6 +1627,7 @@ func (s *Server) Start(ctx context.Context) error { } }) + s.execCfg.GCJobNotifier.Start(ctx) s.distSQLServer.Start() s.pgServer.Start(ctx, s.stopper) diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index fe137edd2aab..7993a7f7d3b1 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colexec" "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -593,6 +594,8 @@ type ExecutorConfig struct { // StmtDiagnosticsRecorder deals with recording statement diagnostics. StmtDiagnosticsRecorder StmtDiagnosticsRecorder + + GCJobNotifier *gcjobnotifier.Notifier } // Organization returns the value of cluster.organization. diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index c0d482037363..049f6b8f873e 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -96,7 +95,9 @@ func (r schemaChangeGCResumer) Resume( if err != nil { return err } - zoneCfgFilter, gossipUpdateC := setupConfigWatcher(execCfg) + + gossipUpdateC, cleanup := execCfg.GCJobNotifier.AddNotifyee(ctx) + defer cleanup() tableDropTimes, indexDropTimes := getDropTimes(details) allTables := getAllTablesWaitingForGC(details, progress) @@ -121,20 +122,6 @@ func (r schemaChangeGCResumer) Resume( if log.V(2) { log.Info(ctx, "received a new system config") } - // TODO (lucy): Currently we're calling refreshTables on every zone config - // update to any table. We should really be only updating a cached - // TTL whenever we get an update on one of the tables/indexes (or the db) - // that this job is responsible for, and computing the earliest deadline - // from our set of cached TTL values. - cfg := execCfg.Gossip.GetSystemConfig() - zoneConfigUpdated := false - zoneCfgFilter.ForModified(cfg, func(kv roachpb.KeyValue) { - zoneConfigUpdated = true - }) - if !zoneConfigUpdated { - log.VEventf(ctx, 2, "no zone config updates, continuing") - continue - } remainingTables := getAllTablesWaitingForGC(details, progress) if len(remainingTables) == 0 { return nil diff --git a/pkg/sql/gcjob/gcjobnotifier/notifier.go b/pkg/sql/gcjob/gcjobnotifier/notifier.go new file mode 100644 index 000000000000..85a0352e76ed --- /dev/null +++ b/pkg/sql/gcjob/gcjobnotifier/notifier.go @@ -0,0 +1,173 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package gcjobnotifier provides a mechanism to share a SystemConfigDeltaFilter +// among all gc jobs. +// +// It exists in a separate package to avoid import cycles between sql and gcjob. +package gcjobnotifier + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +type systemConfigProvider interface { + GetSystemConfig() *config.SystemConfig + RegisterSystemConfigChannel() <-chan struct{} +} + +// Notifier is used to maybeNotify GC jobs of their need to potentially +// update gc ttls. It exists to be a single copy of the system config rather +// than one per GC job. +type Notifier struct { + provider systemConfigProvider + prefix roachpb.Key + stopper *stop.Stopper + settings *cluster.Settings + mu struct { + syncutil.Mutex + started, stopped bool + deltaFilter *gossip.SystemConfigDeltaFilter + notifyees map[chan struct{}]struct{} + } +} + +// New constructs a new Notifier. +func New( + settings *cluster.Settings, provider systemConfigProvider, stopper *stop.Stopper, +) *Notifier { + n := &Notifier{ + provider: provider, + prefix: keys.MakeTablePrefix(uint32(keys.ZonesTableID)), + stopper: stopper, + settings: settings, + } + n.mu.notifyees = make(map[chan struct{}]struct{}) + return n +} + +func noopFunc() {} + +// AddNotifyee should be called prior to the first reading of the system config. +// +// TODO(lucy,ajwerner): Currently we're calling refreshTables on every zone +// config update to any table. We should really be only updating a cached +// TTL whenever we get an update on one of the tables/indexes (or the db) +// that this job is responsible for, and computing the earliest deadline +// from our set of cached TTL values. To do this we'd need to know the full +// set of zone object IDs which may be relevant for a given table. In general +// that should be fine as a the relevant IDs should be stable for the life of +// anything being deleted. If we did this, we'd associate a set of keys with +// each notifyee. +func (n *Notifier) AddNotifyee(ctx context.Context) (onChange <-chan struct{}, cleanup func()) { + n.mu.Lock() + defer n.mu.Unlock() + if !n.mu.started { + log.ReportOrPanic(ctx, &n.settings.SV, + "adding a notifyee to a Notifier before starting") + } + if n.mu.stopped { + return nil, noopFunc + } + if n.mu.deltaFilter == nil { + zoneCfgFilter := gossip.MakeSystemConfigDeltaFilter(n.prefix) + n.mu.deltaFilter = &zoneCfgFilter + // Initialize the filter with the current values. + n.mu.deltaFilter.ForModified(n.provider.GetSystemConfig(), func(kv roachpb.KeyValue) {}) + } + c := make(chan struct{}, 1) + n.mu.notifyees[c] = struct{}{} + return c, func() { n.cleanup(c) } +} + +func (n *Notifier) cleanup(c chan struct{}) { + n.mu.Lock() + defer n.mu.Unlock() + delete(n.mu.notifyees, c) + if len(n.mu.notifyees) == 0 { + n.mu.deltaFilter = nil + } +} + +func (n *Notifier) markStopped() { + n.mu.Lock() + defer n.mu.Unlock() + n.mu.stopped = true +} + +func (n *Notifier) markStarted() (alreadyStarted bool) { + n.mu.Lock() + defer n.mu.Unlock() + alreadyStarted = n.mu.started + n.mu.started = true + return alreadyStarted +} + +// Start starts the notifier. It must be started before calling AddNotifyee. +// Start must not be called more than once. +func (n *Notifier) Start(ctx context.Context) { + if alreadyStarted := n.markStarted(); alreadyStarted { + log.ReportOrPanic(ctx, &n.settings.SV, "started Notifier more than once") + return + } + if err := n.stopper.RunAsyncTask(ctx, "gcjob.Notifier", n.run); err != nil { + n.markStopped() + } +} + +func (n *Notifier) run(_ context.Context) { + defer n.markStopped() + gossipUpdateCh := n.provider.RegisterSystemConfigChannel() + for { + select { + case <-n.stopper.ShouldQuiesce(): + return + case <-gossipUpdateCh: + n.maybeNotify() + } + } +} + +func (n *Notifier) maybeNotify() { + n.mu.Lock() + defer n.mu.Unlock() + + // Nobody is listening. + if len(n.mu.notifyees) == 0 { + return + } + + cfg := n.provider.GetSystemConfig() + zoneConfigUpdated := false + n.mu.deltaFilter.ForModified(cfg, func(kv roachpb.KeyValue) { + zoneConfigUpdated = true + }) + + // Nothing we care about was updated. + if !zoneConfigUpdated { + return + } + + for c := range n.mu.notifyees { + select { + case c <- struct{}{}: + default: + } + } +} diff --git a/pkg/sql/gcjob/gcjobnotifier/notifier_test.go b/pkg/sql/gcjob/gcjobnotifier/notifier_test.go new file mode 100644 index 000000000000..0b250165202b --- /dev/null +++ b/pkg/sql/gcjob/gcjobnotifier/notifier_test.go @@ -0,0 +1,159 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gcjobnotifier + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/stretchr/testify/require" +) + +type testingProvider struct { + syncutil.Mutex + cfg *config.SystemConfig + ch chan struct{} +} + +func (t *testingProvider) GetSystemConfig() *config.SystemConfig { + t.Lock() + defer t.Unlock() + return t.cfg +} + +func (t *testingProvider) setSystemConfig(cfg *config.SystemConfig) { + t.Lock() + defer t.Unlock() + t.cfg = cfg +} + +func (t *testingProvider) RegisterSystemConfigChannel() <-chan struct{} { + return t.ch +} + +var _ systemConfigProvider = (*testingProvider)(nil) + +func TestNotifier(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + t.Run("start with stopped stopper leads to nil being returned", func(t *testing.T) { + stopper := stop.NewStopper() + stopper.Stop(ctx) + n := New(settings, &testingProvider{}, stopper) + n.Start(ctx) + ch, _ := n.AddNotifyee(ctx) + require.Nil(t, ch) + }) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + t.Run("panic on double start", func(t *testing.T) { + n := New(settings, &testingProvider{ch: make(chan struct{})}, stopper) + n.Start(ctx) + require.Panics(t, func() { + n.Start(ctx) + }) + }) + t.Run("panic on AddNotifyee before start", func(t *testing.T) { + n := New(settings, &testingProvider{ch: make(chan struct{})}, stopper) + require.Panics(t, func() { + n.AddNotifyee(ctx) + }) + }) + t.Run("notifies on changed delta and cleanup", func(t *testing.T) { + cfg := config.NewSystemConfig(zonepb.DefaultSystemZoneConfigRef()) + cfg.Values = []roachpb.KeyValue{ + mkZoneConfigKV(1, 1, "1"), + } + ch := make(chan struct{}, 1) + p := &testingProvider{ + cfg: mkSystemConfig(mkZoneConfigKV(1, 1, "1")), + ch: ch, + } + n := New(settings, p, stopper) + n.Start(ctx) + n1Ch, cleanup1 := n.AddNotifyee(ctx) + + t.Run("don't receive on new notifyee", func(t *testing.T) { + expectNoSend(t, n1Ch) + }) + t.Run("don't receive with no change", func(t *testing.T) { + ch <- struct{}{} + expectNoSend(t, n1Ch) + }) + n2Ch, _ := n.AddNotifyee(ctx) + t.Run("receive from all notifyees when data does change", func(t *testing.T) { + p.setSystemConfig(mkSystemConfig(mkZoneConfigKV(1, 2, "2"))) + ch <- struct{}{} + expectSend(t, n1Ch) + expectSend(t, n2Ch) + }) + t.Run("don't receive after cleanup", func(t *testing.T) { + cleanup1() + p.setSystemConfig(mkSystemConfig(mkZoneConfigKV(1, 3, "3"))) + ch <- struct{}{} + expectSend(t, n2Ch) + expectNoSend(t, n1Ch) + }) + }) +} + +const ( + // used for timeouts of things which should be fast + longTime = time.Second + // used for sanity check of channel sends which shouldn't happen + shortTime = 10 * time.Millisecond +) + +func expectNoSend(t *testing.T, ch <-chan struct{}) { + t.Helper() + select { + case <-ch: + t.Fatal("did not expect to receive") + case <-time.After(shortTime): + } +} + +func expectSend(t *testing.T, ch <-chan struct{}) { + t.Helper() + select { + case <-ch: + case <-time.After(longTime): + t.Fatal("expected to receive") + } +} + +func mkZoneConfigKV(id uint32, ts int64, value string) roachpb.KeyValue { + kv := roachpb.KeyValue{ + Key: config.MakeZoneKey(id), + Value: roachpb.Value{ + Timestamp: hlc.Timestamp{WallTime: ts}, + }, + } + kv.Value.SetString(value) + return kv +} + +func mkSystemConfig(kvs ...roachpb.KeyValue) *config.SystemConfig { + cfg := config.NewSystemConfig(zonepb.DefaultSystemZoneConfigRef()) + cfg.Values = kvs + return cfg +} diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index acfefc2e0be5..514c2d6b183c 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -16,16 +16,13 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -275,15 +272,3 @@ func isProtected( }) return protected } - -// setupConfigWatcher returns a filter to watch zone config changes and a -// channel that is notified when there are changes. -func setupConfigWatcher( - execCfg *sql.ExecutorConfig, -) (gossip.SystemConfigDeltaFilter, <-chan struct{}) { - k := keys.MakeTablePrefix(uint32(keys.ZonesTableID)) - k = encoding.EncodeUvarintAscending(k, uint64(keys.ZonesTablePrimaryIndexID)) - zoneCfgFilter := gossip.MakeSystemConfigDeltaFilter(k) - gossipUpdateC := execCfg.Gossip.RegisterSystemConfigChannel() - return zoneCfgFilter, gossipUpdateC -} diff --git a/pkg/sql/gcjob_test/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go index e7df2d1c1dbe..632f4fcf3fb7 100644 --- a/pkg/sql/gcjob_test/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -255,7 +255,7 @@ func TestSchemaChangeGCJobTableGCdWhileWaitingForExpiration(t *testing.T) { sqlDB.Exec(t, "CREATE DATABASE db") sqlDB.Exec(t, "CREATE TABLE db.foo ()") - var dbID, tableID descpb.ID + var dbID, tableID sqlbase.ID sqlDB.QueryRow(t, ` SELECT parent_id, table_id FROM crdb_internal.tables @@ -275,11 +275,11 @@ SELECT job_id, status // Manually delete the table. require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - nameKey := sqlbase.MakeNameMetadataKey(keys.SystemSQLCodec, dbID, keys.PublicSchemaID, "foo") + nameKey := sqlbase.MakeNameMetadataKey(dbID, keys.PublicSchemaID, "foo") if err := txn.Del(ctx, nameKey); err != nil { return err } - descKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableID) + descKey := sqlbase.MakeDescMetadataKey(tableID) return txn.Del(ctx, descKey) })) // Update the GC TTL to tickle the job to refresh the status and discover that