Skip to content

Commit

Permalink
[cluster] Add config wiring for etcd timeouts, retries, serialized ge…
Browse files Browse the repository at this point in the history
…ts (#3035)
  • Loading branch information
vdarulis authored Dec 22, 2020
1 parent e8996ba commit bcf6fc9
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 17 deletions.
10 changes: 8 additions & 2 deletions src/cluster/client/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,18 @@ func (c *csclient) newkvOptions(
cacheFileFn cacheFileForZoneFn,
) etcdkv.Options {
kvOpts := etcdkv.NewOptions().
SetInstrumentsOptions(instrument.NewOptions().
SetInstrumentsOptions(c.opts.InstrumentOptions().
SetLogger(c.logger).
SetMetricsScope(c.kvScope)).
SetCacheFileFn(cacheFileFn(opts.Zone())).
SetWatchWithRevision(c.opts.WatchWithRevision()).
SetNewDirectoryMode(c.opts.NewDirectoryMode())
SetNewDirectoryMode(c.opts.NewDirectoryMode()).
SetEnableFastGets(c.opts.EnableFastGets()).
SetRetryOptions(c.opts.RetryOptions()).
SetRequestTimeout(c.opts.RequestTimeout()).
SetWatchChanInitTimeout(c.opts.WatchChanInitTimeout()).
SetWatchChanCheckInterval(c.opts.WatchChanCheckInterval()).
SetWatchChanResetInterval(c.opts.WatchChanResetInterval())

if ns := opts.Namespace(); ns != "" {
kvOpts = kvOpts.SetPrefix(kvOpts.ApplyPrefix(ns))
Expand Down
32 changes: 31 additions & 1 deletion src/cluster/client/etcd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ import (
"os"
"time"

"github.com/uber-go/tally"

"github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/retry"
)

// ClusterConfig is the config for a zoned etcd cluster.
Expand Down Expand Up @@ -98,6 +101,15 @@ type Configuration struct {
SDConfig services.Configuration `yaml:"m3sd"`
WatchWithRevision int64 `yaml:"watchWithRevision"`
NewDirectoryMode *os.FileMode `yaml:"newDirectoryMode"`

Retry retry.Configuration `yaml:"retry"`
RequestTimeout time.Duration `yaml:"requestTimeout"`
WatchChanInitTimeout time.Duration `yaml:"watchChanInitTimeout"`
WatchChanCheckInterval time.Duration `yaml:"watchChanCheckInterval"`
WatchChanResetInterval time.Duration `yaml:"watchChanResetInterval"`
// EnableFastGets trades consistency for latency and throughput using clientv3.WithSerializable()
// on etcd ops.
EnableFastGets bool `yaml:"enableFastGets"`
}

// NewClient creates a new config service client.
Expand All @@ -114,7 +126,25 @@ func (cfg Configuration) NewOptions() Options {
SetCacheDir(cfg.CacheDir).
SetClusters(cfg.etcdClusters()).
SetServicesOptions(cfg.SDConfig.NewOptions()).
SetWatchWithRevision(cfg.WatchWithRevision)
SetWatchWithRevision(cfg.WatchWithRevision).
SetEnableFastGets(cfg.EnableFastGets).
SetRetryOptions(cfg.Retry.NewOptions(tally.NoopScope))

if cfg.RequestTimeout > 0 {
opts = opts.SetRequestTimeout(cfg.RequestTimeout)
}

if cfg.WatchChanInitTimeout > 0 {
opts = opts.SetWatchChanInitTimeout(cfg.WatchChanInitTimeout)
}

if cfg.WatchChanCheckInterval > 0 {
opts = opts.SetWatchChanCheckInterval(cfg.WatchChanCheckInterval)
}

if cfg.WatchChanResetInterval > 0 {
opts = opts.SetWatchChanResetInterval(cfg.WatchChanResetInterval)
}

if v := cfg.NewDirectoryMode; v != nil {
opts = opts.SetNewDirectoryMode(*v)
Expand Down
109 changes: 97 additions & 12 deletions src/cluster/client/etcd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ const (
defaultKeepAlivePeriodMaxJitter = 10 * time.Second
defaultKeepAliveTimeout = 10 * time.Second

defaultRequestTimeout = 10 * time.Second
defaultWatchChanCheckInterval = 10 * time.Second
defaultWatchChanResetInterval = 10 * time.Second
defaultWatchChanInitTimeout = 10 * time.Second

defaultRetryInitialBackoff = 2 * time.Second
defaultRetryBackoffFactor = 2.0
defaultRetryMaxRetries = 3
Expand Down Expand Up @@ -171,8 +176,12 @@ func (o tlsOptions) Config() (*tls.Config, error) {
// NewOptions creates a set of Options.
func NewOptions() Options {
return options{
sdOpts: services.NewOptions(),
iopts: instrument.NewOptions(),
sdOpts: services.NewOptions(),
iopts: instrument.NewOptions(),
requestTimeout: defaultRequestTimeout,
watchChanInitTimeout: defaultWatchChanInitTimeout,
watchChanCheckInterval: defaultWatchChanCheckInterval,
watchChanResetInterval: defaultWatchChanResetInterval,
// NB(r): Set some default retry options so changes to retry
// option defaults don't change behavior of this client's retry options
retryOpts: retry.NewOptions().
Expand All @@ -186,16 +195,21 @@ func NewOptions() Options {
}

type options struct {
env string
zone string
service string
cacheDir string
watchWithRevision int64
sdOpts services.Options
clusters map[string]Cluster
iopts instrument.Options
retryOpts retry.Options
newDirectoryMode os.FileMode
requestTimeout time.Duration
env string
zone string
service string
cacheDir string
watchChanCheckInterval time.Duration
watchChanResetInterval time.Duration
watchChanInitTimeout time.Duration
watchWithRevision int64
enableFastGets bool
sdOpts services.Options
clusters map[string]Cluster
iopts instrument.Options
retryOpts retry.Options
newDirectoryMode os.FileMode
}

func (o options) Validate() error {
Expand All @@ -211,6 +225,22 @@ func (o options) Validate() error {
return errors.New("invalid options, no instrument options set")
}

if o.watchChanCheckInterval <= 0 {
return errors.New("invalid watch channel check interval")
}

if o.watchChanResetInterval <= 0 {
return errors.New("invalid watch reset interval")
}

if o.watchChanInitTimeout <= 0 {
return errors.New("invalid watch init interval")
}

if o.requestTimeout <= 0 {
return errors.New("invalid request timeout")
}

return nil
}

Expand Down Expand Up @@ -289,6 +319,17 @@ func (o options) SetInstrumentOptions(iopts instrument.Options) Options {
return o
}

//nolint:gocritic
func (o options) RequestTimeout() time.Duration {
return o.requestTimeout
}

//nolint:gocritic
func (o options) SetRequestTimeout(t time.Duration) Options {
o.requestTimeout = t
return o
}

func (o options) RetryOptions() retry.Options {
return o.retryOpts
}
Expand All @@ -298,6 +339,39 @@ func (o options) SetRetryOptions(retryOpts retry.Options) Options {
return o
}

//nolint:gocritic
func (o options) WatchChanCheckInterval() time.Duration {
return o.watchChanCheckInterval
}

//nolint:gocritic
func (o options) SetWatchChanCheckInterval(t time.Duration) Options {
o.watchChanCheckInterval = t
return o
}

//nolint:gocritic
func (o options) WatchChanResetInterval() time.Duration {
return o.watchChanResetInterval
}

//nolint:gocritic
func (o options) SetWatchChanResetInterval(t time.Duration) Options {
o.watchChanResetInterval = t
return o
}

//nolint:gocritic
func (o options) WatchChanInitTimeout() time.Duration {
return o.watchChanInitTimeout
}

//nolint:gocritic
func (o options) SetWatchChanInitTimeout(t time.Duration) Options {
o.watchChanInitTimeout = t
return o
}

func (o options) WatchWithRevision() int64 {
return o.watchWithRevision
}
Expand All @@ -316,6 +390,17 @@ func (o options) NewDirectoryMode() os.FileMode {
return o.newDirectoryMode
}

//nolint:gocritic
func (o options) EnableFastGets() bool {
return o.enableFastGets
}

//nolint:gocritic
func (o options) SetEnableFastGets(enabled bool) Options {
o.enableFastGets = enabled
return o
}

// NewCluster creates a Cluster.
func NewCluster() Cluster {
return cluster{
Expand Down
12 changes: 12 additions & 0 deletions src/cluster/client/etcd/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func TestTLSOptions(t *testing.T) {
assert.Equal(t, "key", aOpts.KeyPath())
assert.Equal(t, "ca", aOpts.CACrtPath())
}

func TestOptions(t *testing.T) {
opts := NewOptions()
assert.Equal(t, "", opts.Zone())
Expand All @@ -111,6 +112,17 @@ func TestOptions(t *testing.T) {
_, ok := opts.ClusterForZone("z")
assert.False(t, ok)
assert.NotNil(t, opts.InstrumentOptions())
assert.Equal(t, defaultRequestTimeout, opts.RequestTimeout())
assert.Equal(t, defaultWatchChanCheckInterval, opts.WatchChanCheckInterval())
assert.Equal(t, defaultWatchChanResetInterval, opts.WatchChanCheckInterval())
assert.Equal(t, defaultWatchChanInitTimeout, opts.WatchChanInitTimeout())
assert.False(t, opts.EnableFastGets())
ropts := opts.RetryOptions()
assert.Equal(t, defaultRetryJitter, ropts.Jitter())
assert.Equal(t, defaultRetryInitialBackoff, ropts.InitialBackoff())
assert.Equal(t, defaultRetryBackoffFactor, ropts.BackoffFactor())
assert.Equal(t, defaultRetryMaxRetries, ropts.MaxRetries())
assert.Equal(t, defaultRetryMaxBackoff, ropts.MaxBackoff())

c1 := NewCluster().SetZone("z1")
c2 := NewCluster().SetZone("z2")
Expand Down
26 changes: 26 additions & 0 deletions src/cluster/client/etcd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (

// Options is the Options to create a config service client.
type Options interface {
// RequestTimeout is the timeout for etcd requests
RequestTimeout() time.Duration
// SetRequestTimeout sets the RequestTimeout
SetRequestTimeout(t time.Duration) Options

Env() string
SetEnv(e string) Options

Expand All @@ -58,9 +63,30 @@ type Options interface {
RetryOptions() retry.Options
SetRetryOptions(retryOpts retry.Options) Options

// WatchChanCheckInterval will be used to periodically check if a watch chan
// is no longer being subscribed and should be closed
WatchChanCheckInterval() time.Duration
// SetWatchChanCheckInterval sets the WatchChanCheckInterval
SetWatchChanCheckInterval(t time.Duration) Options

// WatchChanResetInterval is the delay before resetting the etcd watch chan
WatchChanResetInterval() time.Duration
// SetWatchChanResetInterval sets the WatchChanResetInterval
SetWatchChanResetInterval(t time.Duration) Options

// WatchChanInitTimeout is the timeout for a watchChan initialization
WatchChanInitTimeout() time.Duration
// SetWatchChanInitTimeout sets the WatchChanInitTimeout
SetWatchChanInitTimeout(t time.Duration) Options

WatchWithRevision() int64
SetWatchWithRevision(rev int64) Options

// EnableFastGets returns whether to use clientv3.WithSerializable() option to speed up gets.
EnableFastGets() bool
// SetEnableFastGets sets clientv3.WithSerializable() to speed up gets, but can fetch stale data.
SetEnableFastGets(enabled bool) Options

SetNewDirectoryMode(fm os.FileMode) Options
NewDirectoryMode() os.FileMode

Expand Down
29 changes: 29 additions & 0 deletions src/cluster/kv/etcd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type Options interface {
// WatchWithRevision is the revision that watch requests will start from.
WatchWithRevision() int64

// EnableFastGets returns whether to use clientv3.WithSerializable() option to speed up gets.
EnableFastGets() bool
// SetEnableFastGets sets clientv3.WithSerializable() to speed up gets, but can fetch stale data.
SetEnableFastGets(enabled bool) Options

// SetWatchWithRevision sets the revision that watch requests will start
// from.
SetWatchWithRevision(rev int64) Options
Expand Down Expand Up @@ -111,6 +116,7 @@ type options struct {
watchChanResetInterval time.Duration
watchChanInitTimeout time.Duration
watchWithRevision int64
enableFastGets bool
cacheFileFn CacheFileFn
newDirectoryMode os.FileMode
}
Expand Down Expand Up @@ -141,6 +147,18 @@ func (o options) Validate() error {
return errors.New("invalid watch channel check interval")
}

if o.watchChanResetInterval <= 0 {
return errors.New("invalid watch reset interval")
}

if o.watchChanInitTimeout <= 0 {
return errors.New("invalid watch init interval")
}

if o.requestTimeout <= 0 {
return errors.New("invalid request timeout")
}

return nil
}

Expand Down Expand Up @@ -207,6 +225,17 @@ func (o options) SetWatchWithRevision(rev int64) Options {
return o
}

//nolint:gocritic
func (o options) EnableFastGets() bool {
return o.enableFastGets
}

//nolint:gocritic
func (o options) SetEnableFastGets(enabled bool) Options {
o.enableFastGets = enabled
return o
}

func (o options) CacheFileFn() CacheFileFn {
return o.cacheFileFn
}
Expand Down
Loading

0 comments on commit bcf6fc9

Please sign in to comment.