Skip to content

Commit

Permalink
gcjobnotifier: unify gcjobs around a single copy of the system config
Browse files Browse the repository at this point in the history
Prior to this change, each GC job would hold on to a complete copy of the
system config. This was problematic in cases where there were a large number
of GC jobs. This PR introduces a new object in a new package to coordinate
the relevant notifications.

Release note (bug fix): Fixed a bug which could lead to out of memory errors
when dropping large numbers of tables at high frequency.
  • Loading branch information
ajwerner authored and pbardea committed Sep 11, 2020
1 parent 53bc491 commit ea13c91
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 34 deletions.
6 changes: 6 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
_ "github.com/cockroachdb/cockroach/pkg/sql/gcjob" // register jobs declared outside of pkg/sql
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -616,6 +617,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
distSQLMetrics := execinfra.MakeDistSQLMetrics(cfg.HistogramWindowInterval())
s.registry.AddMetricStruct(distSQLMetrics)

gcJobNotifier := gcjobnotifier.New(cfg.Settings, s.gossip, s.stopper)

// Set up Lease Manager
var lmKnobs sql.LeaseManagerTestingKnobs
if leaseManagerTestingKnobs := cfg.TestingKnobs.SQLLeaseManager; leaseManagerTestingKnobs != nil {
Expand Down Expand Up @@ -820,6 +823,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

QueryCache: querycache.New(s.cfg.SQLQueryCacheSize),
ProtectedTimestampProvider: s.protectedtsProvider,

GCJobNotifier: gcJobNotifier,
}

s.stopper.AddCloser(execCfg.ExecLogger)
Expand Down Expand Up @@ -1622,6 +1627,7 @@ func (s *Server) Start(ctx context.Context) error {
}
})

s.execCfg.GCJobNotifier.Start(ctx)
s.distSQLServer.Start()
s.pgServer.Start(ctx, s.stopper)

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -593,6 +594,8 @@ type ExecutorConfig struct {

// StmtDiagnosticsRecorder deals with recording statement diagnostics.
StmtDiagnosticsRecorder StmtDiagnosticsRecorder

GCJobNotifier *gcjobnotifier.Notifier
}

// Organization returns the value of cluster.organization.
Expand Down
19 changes: 3 additions & 16 deletions pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -96,7 +95,9 @@ func (r schemaChangeGCResumer) Resume(
if err != nil {
return err
}
zoneCfgFilter, gossipUpdateC := setupConfigWatcher(execCfg)

gossipUpdateC, cleanup := execCfg.GCJobNotifier.AddNotifyee(ctx)
defer cleanup()
tableDropTimes, indexDropTimes := getDropTimes(details)

allTables := getAllTablesWaitingForGC(details, progress)
Expand All @@ -121,20 +122,6 @@ func (r schemaChangeGCResumer) Resume(
if log.V(2) {
log.Info(ctx, "received a new system config")
}
// TODO (lucy): Currently we're calling refreshTables on every zone config
// update to any table. We should really be only updating a cached
// TTL whenever we get an update on one of the tables/indexes (or the db)
// that this job is responsible for, and computing the earliest deadline
// from our set of cached TTL values.
cfg := execCfg.Gossip.GetSystemConfig()
zoneConfigUpdated := false
zoneCfgFilter.ForModified(cfg, func(kv roachpb.KeyValue) {
zoneConfigUpdated = true
})
if !zoneConfigUpdated {
log.VEventf(ctx, 2, "no zone config updates, continuing")
continue
}
remainingTables := getAllTablesWaitingForGC(details, progress)
if len(remainingTables) == 0 {
return nil
Expand Down
173 changes: 173 additions & 0 deletions pkg/sql/gcjob/gcjobnotifier/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// Package gcjobnotifier provides a mechanism to share a SystemConfigDeltaFilter
// among all gc jobs.
//
// It exists in a separate package to avoid import cycles between sql and gcjob.
package gcjobnotifier

import (
"context"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

type systemConfigProvider interface {
GetSystemConfig() *config.SystemConfig
RegisterSystemConfigChannel() <-chan struct{}
}

// Notifier is used to maybeNotify GC jobs of their need to potentially
// update gc ttls. It exists to be a single copy of the system config rather
// than one per GC job.
type Notifier struct {
provider systemConfigProvider
prefix roachpb.Key
stopper *stop.Stopper
settings *cluster.Settings
mu struct {
syncutil.Mutex
started, stopped bool
deltaFilter *gossip.SystemConfigDeltaFilter
notifyees map[chan struct{}]struct{}
}
}

// New constructs a new Notifier.
func New(
settings *cluster.Settings, provider systemConfigProvider, stopper *stop.Stopper,
) *Notifier {
n := &Notifier{
provider: provider,
prefix: keys.MakeTablePrefix(uint32(keys.ZonesTableID)),
stopper: stopper,
settings: settings,
}
n.mu.notifyees = make(map[chan struct{}]struct{})
return n
}

func noopFunc() {}

// AddNotifyee should be called prior to the first reading of the system config.
//
// TODO(lucy,ajwerner): Currently we're calling refreshTables on every zone
// config update to any table. We should really be only updating a cached
// TTL whenever we get an update on one of the tables/indexes (or the db)
// that this job is responsible for, and computing the earliest deadline
// from our set of cached TTL values. To do this we'd need to know the full
// set of zone object IDs which may be relevant for a given table. In general
// that should be fine as a the relevant IDs should be stable for the life of
// anything being deleted. If we did this, we'd associate a set of keys with
// each notifyee.
func (n *Notifier) AddNotifyee(ctx context.Context) (onChange <-chan struct{}, cleanup func()) {
n.mu.Lock()
defer n.mu.Unlock()
if !n.mu.started {
log.ReportOrPanic(ctx, &n.settings.SV,
"adding a notifyee to a Notifier before starting")
}
if n.mu.stopped {
return nil, noopFunc
}
if n.mu.deltaFilter == nil {
zoneCfgFilter := gossip.MakeSystemConfigDeltaFilter(n.prefix)
n.mu.deltaFilter = &zoneCfgFilter
// Initialize the filter with the current values.
n.mu.deltaFilter.ForModified(n.provider.GetSystemConfig(), func(kv roachpb.KeyValue) {})
}
c := make(chan struct{}, 1)
n.mu.notifyees[c] = struct{}{}
return c, func() { n.cleanup(c) }
}

func (n *Notifier) cleanup(c chan struct{}) {
n.mu.Lock()
defer n.mu.Unlock()
delete(n.mu.notifyees, c)
if len(n.mu.notifyees) == 0 {
n.mu.deltaFilter = nil
}
}

func (n *Notifier) markStopped() {
n.mu.Lock()
defer n.mu.Unlock()
n.mu.stopped = true
}

func (n *Notifier) markStarted() (alreadyStarted bool) {
n.mu.Lock()
defer n.mu.Unlock()
alreadyStarted = n.mu.started
n.mu.started = true
return alreadyStarted
}

// Start starts the notifier. It must be started before calling AddNotifyee.
// Start must not be called more than once.
func (n *Notifier) Start(ctx context.Context) {
if alreadyStarted := n.markStarted(); alreadyStarted {
log.ReportOrPanic(ctx, &n.settings.SV, "started Notifier more than once")
return
}
if err := n.stopper.RunAsyncTask(ctx, "gcjob.Notifier", n.run); err != nil {
n.markStopped()
}
}

func (n *Notifier) run(_ context.Context) {
defer n.markStopped()
gossipUpdateCh := n.provider.RegisterSystemConfigChannel()
for {
select {
case <-n.stopper.ShouldQuiesce():
return
case <-gossipUpdateCh:
n.maybeNotify()
}
}
}

func (n *Notifier) maybeNotify() {
n.mu.Lock()
defer n.mu.Unlock()

// Nobody is listening.
if len(n.mu.notifyees) == 0 {
return
}

cfg := n.provider.GetSystemConfig()
zoneConfigUpdated := false
n.mu.deltaFilter.ForModified(cfg, func(kv roachpb.KeyValue) {
zoneConfigUpdated = true
})

// Nothing we care about was updated.
if !zoneConfigUpdated {
return
}

for c := range n.mu.notifyees {
select {
case c <- struct{}{}:
default:
}
}
}
Loading

0 comments on commit ea13c91

Please sign in to comment.