Skip to content

Commit

Permalink
[dbnode,query] Ensure only single listener for interrupt channel
Browse files Browse the repository at this point in the history
Passing the interrupt channel to multiple goroutines could
cause a race where the main thread ends up missing the interrupt
that triggers a server shutdown. This commit ensures that only
a single goroutine is listening for the interrupt at a given
time and all other interested parties can check the interrupted
channel. The interrupted channel will be closed as soon as
an interrupt is received. Since closed channels return immediately,
this allows any interested goroutine to know if it should terminate
by simply checking the interrupted channel.
  • Loading branch information
nbroyles committed Sep 24, 2021
1 parent dcfd221 commit 63fde68
Show file tree
Hide file tree
Showing 18 changed files with 224 additions and 94 deletions.
18 changes: 9 additions & 9 deletions src/cluster/kv/util/runtime/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ type Options interface {
// ProcessFn returns the process function.
ProcessFn() ProcessFn

// InterruptCh returns the interrupt channel.
InterruptCh() <-chan error
// InterruptedCh returns the interrupted channel.
InterruptedCh() <-chan struct{}

// SetInterruptCh sets the interrupt channel.
SetInterruptCh(value <-chan error) Options
// SetInterruptedCh sets the interrupted channel.
SetInterruptedCh(value <-chan struct{}) Options
}

type options struct {
Expand All @@ -76,7 +76,7 @@ type options struct {
kvStore kv.Store
unmarshalFn UnmarshalFn
processFn ProcessFn
interruptCh <-chan error
interruptedCh <-chan struct{}
}

// NewOptions creates a new set of options.
Expand Down Expand Up @@ -137,11 +137,11 @@ func (o *options) ProcessFn() ProcessFn {
return o.processFn
}

func (o *options) SetInterruptCh(ch <-chan error) Options {
o.interruptCh = ch
func (o *options) SetInterruptedCh(ch <-chan struct{}) Options {
o.interruptedCh = ch
return o
}

func (o *options) InterruptCh() <-chan error {
return o.interruptCh
func (o *options) InterruptedCh() <-chan struct{} {
return o.interruptedCh
}
2 changes: 1 addition & 1 deletion src/cluster/kv/util/runtime/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (v *value) initValue() {
SetGetUpdateFn(v.getUpdateFn).
SetProcessFn(v.updateFn).
SetKey(v.key).
SetInterruptCh(v.opts.InterruptCh())
SetInterruptedCh(v.opts.InterruptedCh())
v.Value = watch.NewValue(valueOpts)
}

Expand Down
10 changes: 5 additions & 5 deletions src/cluster/services/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,17 @@ func NewQueryOptions() QueryOptions { return new(queryOptions) }

type queryOptions struct {
includeUnhealthy bool
interruptCh <-chan error
interruptedCh <-chan struct{}
}

func (qo *queryOptions) IncludeUnhealthy() bool { return qo.includeUnhealthy }
func (qo *queryOptions) SetIncludeUnhealthy(h bool) QueryOptions { qo.includeUnhealthy = h; return qo }

func (qo *queryOptions) InterruptCh() <-chan error {
return qo.interruptCh
func (qo *queryOptions) InterruptedCh() <-chan struct{} {
return qo.interruptedCh
}

func (qo *queryOptions) SetInterruptCh(ch <-chan error) QueryOptions {
qo.interruptCh = ch
func (qo *queryOptions) SetInterruptedCh(ch <-chan struct{}) QueryOptions {
qo.interruptedCh = ch
return qo
}
19 changes: 10 additions & 9 deletions src/cluster/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
ps "github.com/m3db/m3/src/cluster/placement/service"
"github.com/m3db/m3/src/cluster/placement/storage"
"github.com/m3db/m3/src/cluster/shard"
xos "github.com/m3db/m3/src/x/os"
xwatch "github.com/m3db/m3/src/x/watch"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -307,7 +308,7 @@ func (c *client) Watch(sid ServiceID, opts QueryOptions) (Watch, error) {
return nil, err
}

initValue, err := c.waitForInitValue(kvm.kv, placementWatch, sid, c.opts.InitTimeout(), opts.InterruptCh())
initValue, err := c.waitForInitValue(kvm.kv, placementWatch, sid, c.opts.InitTimeout(), opts.InterruptedCh())
if err != nil {
return nil, fmt.Errorf("could not get init value for '%s', err: %w", key, err)
}
Expand Down Expand Up @@ -592,12 +593,12 @@ func (c *client) waitForInitValue(
w kv.ValueWatch,
sid ServiceID,
timeout time.Duration,
interruptCh <-chan error,
interruptedCh <-chan struct{},
) (kv.Value, error) {
if interruptCh == nil {
// NB(nate): if no interrupt channel is provided, then this wait is not
if interruptedCh == nil {
// NB(nate): if no interrupted channel is provided, then this wait is not
// gracefully interruptable.
interruptCh = make(chan error)
interruptedCh = make(chan struct{})
}

if timeout < 0 {
Expand All @@ -607,17 +608,17 @@ func (c *client) waitForInitValue(
select {
case <-w.C():
return w.Get(), nil
case err := <-interruptCh:
return nil, err
case <-interruptedCh:
return nil, xos.ErrInterrupted
}
}
select {
case <-w.C():
return w.Get(), nil
case <-time.After(timeout):
return kvStore.Get(c.placementKeyFn(sid))
case err := <-interruptCh:
return nil, err
case <-interruptedCh:
return nil, xos.ErrInterrupted
}
}

Expand Down
26 changes: 13 additions & 13 deletions src/cluster/services/services_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions src/cluster/services/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,15 +920,15 @@ func TestWatchInterruptedWithTimeout(t *testing.T) {
func testWatchInterrupted(t *testing.T, s Services) {
sid := NewServiceID().SetName("m3db").SetZone("zone1")

interruptCh := make(chan error, 1)
interruptCh <- errors.New("interrupt")
interruptedCh := make(chan struct{})
close(interruptedCh)

qopts := NewQueryOptions().
SetIncludeUnhealthy(true).
SetInterruptCh(interruptCh)
SetInterruptedCh(interruptedCh)
_, err := s.Watch(sid, qopts)
require.Error(t, err)
require.Contains(t, err.Error(), "interrupt")
require.Contains(t, err.Error(), "interrupted")
}

func TestHeartbeatService(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions src/cluster/services/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,11 @@ type QueryOptions interface {
// SetIncludeUnhealthy sets the value of IncludeUnhealthy.
SetIncludeUnhealthy(h bool) QueryOptions

// InterruptCh returns the interrupt channel.
InterruptCh() <-chan error
// InterruptedCh returns the interrupted channel.
InterruptedCh() <-chan struct{}

// SetInterruptCh sets the interrupt channel.
SetInterruptCh(value <-chan error) QueryOptions
// SetInterruptedCh sets the interrupted channel.
SetInterruptedCh(value <-chan struct{}) QueryOptions
}

// Metadata contains the metadata for a service.
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type DownsamplerOptions struct {
TagOptions models.TagOptions
MetricsAppenderPoolOptions pool.ObjectPoolOptions
RWOptions xio.Options
InterruptCh <-chan error
InterruptedCh <-chan struct{}
}

// AutoMappingRule is a mapping rule to apply to metrics.
Expand Down Expand Up @@ -728,7 +728,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
SetKVStore(o.RulesKVStore).
SetNamespaceTag([]byte(namespaceTag)).
SetRequireNamespaceWatchOnInit(cfg.Matcher.RequireNamespaceWatchOnInit).
SetInterruptCh(o.InterruptCh)
SetInterruptedCh(o.InterruptedCh)

// NB(r): If rules are being explicitly set in config then we are
// going to use an in memory KV store for rules and explicitly set them up.
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/environment/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c ConfigureResults) SyncCluster() (ConfigureResult, error) {

// ConfigurationParameters are options used to create new ConfigureResults
type ConfigurationParameters struct {
InterruptCh <-chan error
InterruptedCh <-chan struct{}
InstrumentOpts instrument.Options
HashingSeed uint32
HostID string
Expand Down Expand Up @@ -316,7 +316,7 @@ func (c Configuration) configureDynamic(cfgParams ConfigurationParameters) (Conf
SetServiceID(serviceID).
SetQueryOptions(services.NewQueryOptions().
SetIncludeUnhealthy(true).
SetInterruptCh(cfgParams.InterruptCh)).
SetInterruptedCh(cfgParams.InterruptedCh)).
SetInstrumentOptions(cfgParams.InstrumentOpts).
SetHashGen(sharding.NewHashGenWithSeed(cfgParams.HashingSeed))
topoInit := topology.NewDynamicInitializer(topoOpts)
Expand Down
23 changes: 17 additions & 6 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ func Run(runOpts RunOptions) {
}()
}

interruptOpts := xos.NewInterruptOptions()
if runOpts.InterruptCh != nil {
interruptOpts.InterruptCh = runOpts.InterruptCh
}
intWatchCancel := xos.WatchForInterrupt(logger, interruptOpts)
defer intWatchCancel()

defer logger.Sync()

cfg.Debug.SetRuntimeValues(logger)
Expand Down Expand Up @@ -746,7 +753,7 @@ func Run(runOpts RunOptions) {
logger.Info("creating dynamic config service client with m3cluster")

envCfgResults, err = envConfig.Configure(environment.ConfigurationParameters{
InterruptCh: runOpts.InterruptCh,
InterruptedCh: interruptOpts.InterruptedCh,
InstrumentOpts: iOpts,
HashingSeed: cfg.Hashing.Seed,
NewDirectoryMode: newDirectoryMode,
Expand All @@ -759,7 +766,7 @@ func Run(runOpts RunOptions) {
logger.Info("creating static config service client with m3cluster")

envCfgResults, err = envConfig.Configure(environment.ConfigurationParameters{
InterruptCh: runOpts.InterruptCh,
InterruptedCh: interruptOpts.InterruptedCh,
InstrumentOpts: iOpts,
HostID: hostID,
ForceColdWritesEnabled: forceColdWrites,
Expand Down Expand Up @@ -1091,10 +1098,14 @@ func Run(runOpts RunOptions) {
)
}()

// Wait for process interrupt.
xos.WaitForInterrupt(logger, xos.InterruptOptions{
InterruptCh: runOpts.InterruptCh,
})
// Stop our async watch and now block waiting for the interrupt.
intWatchCancel()
select {
case <-interruptOpts.InterruptedCh:
logger.Warn("interrupt already received. closing")
default:
xos.WaitForInterrupt(logger, interruptOpts)
}

// Attempt graceful server close.
closedCh := make(chan struct{})
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/matcher/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewNamespaces(key string, opts Options) Namespaces {
SetKVStore(n.store).
SetUnmarshalFn(n.toNamespaces).
SetProcessFn(n.process).
SetInterruptCh(opts.InterruptCh())
SetInterruptedCh(opts.InterruptedCh())
n.Value = runtime.NewValue(key, valueOpts)
return n
}
Expand Down
18 changes: 9 additions & 9 deletions src/metrics/matcher/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ type Options interface {
// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch.
RequireNamespaceWatchOnInit() bool

// InterruptCh returns the interrupt channel.
InterruptCh() <-chan error
// InterruptedCh returns the interrupted channel.
InterruptedCh() <-chan struct{}

// SetInterruptCh sets the interrupt channel.
SetInterruptCh(value <-chan error) Options
// SetInterruptedCh sets the interrupted channel.
SetInterruptedCh(value <-chan struct{}) Options
}

type options struct {
Expand All @@ -164,7 +164,7 @@ type options struct {
onNamespaceRemovedFn OnNamespaceRemovedFn
onRuleSetUpdatedFn OnRuleSetUpdatedFn
requireNamespaceWatchOnInit bool
interruptCh <-chan error
interruptedCh <-chan struct{}
}

// NewOptions creates a new set of options.
Expand Down Expand Up @@ -325,13 +325,13 @@ func (o *options) RequireNamespaceWatchOnInit() bool {
return o.requireNamespaceWatchOnInit
}

func (o *options) SetInterruptCh(ch <-chan error) Options {
o.interruptCh = ch
func (o *options) SetInterruptedCh(ch <-chan struct{}) Options {
o.interruptedCh = ch
return o
}

func (o *options) InterruptCh() <-chan error {
return o.interruptCh
func (o *options) InterruptedCh() <-chan struct{} {
return o.interruptedCh
}

func defaultRuleSetKeyFn(namespace []byte) string {
Expand Down
Loading

0 comments on commit 63fde68

Please sign in to comment.