From bcf6fc921211da5d765d82f12d516988e4d685a3 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 21 Dec 2020 21:34:50 -0500 Subject: [PATCH] [cluster] Add config wiring for etcd timeouts, retries, serialized gets (#3035) --- src/cluster/client/etcd/client.go | 10 +- src/cluster/client/etcd/config.go | 32 ++++- src/cluster/client/etcd/options.go | 109 ++++++++++++++++-- src/cluster/client/etcd/options_test.go | 12 ++ src/cluster/client/etcd/types.go | 26 +++++ src/cluster/kv/etcd/options.go | 29 +++++ src/cluster/kv/etcd/options_test.go | 45 ++++++++ src/cluster/kv/etcd/store.go | 6 +- src/cluster/kv/etcd/store_test.go | 26 +++++ .../services/m3dbnode/config/config_test.go | 14 ++- 10 files changed, 292 insertions(+), 17 deletions(-) create mode 100644 src/cluster/kv/etcd/options_test.go diff --git a/src/cluster/client/etcd/client.go b/src/cluster/client/etcd/client.go index 381b41935d..dc4f717328 100644 --- a/src/cluster/client/etcd/client.go +++ b/src/cluster/client/etcd/client.go @@ -182,12 +182,18 @@ func (c *csclient) newkvOptions( cacheFileFn cacheFileForZoneFn, ) etcdkv.Options { kvOpts := etcdkv.NewOptions(). - SetInstrumentsOptions(instrument.NewOptions(). + SetInstrumentsOptions(c.opts.InstrumentOptions(). SetLogger(c.logger). SetMetricsScope(c.kvScope)). SetCacheFileFn(cacheFileFn(opts.Zone())). SetWatchWithRevision(c.opts.WatchWithRevision()). - SetNewDirectoryMode(c.opts.NewDirectoryMode()) + SetNewDirectoryMode(c.opts.NewDirectoryMode()). + SetEnableFastGets(c.opts.EnableFastGets()). + SetRetryOptions(c.opts.RetryOptions()). + SetRequestTimeout(c.opts.RequestTimeout()). + SetWatchChanInitTimeout(c.opts.WatchChanInitTimeout()). + SetWatchChanCheckInterval(c.opts.WatchChanCheckInterval()). + SetWatchChanResetInterval(c.opts.WatchChanResetInterval()) if ns := opts.Namespace(); ns != "" { kvOpts = kvOpts.SetPrefix(kvOpts.ApplyPrefix(ns)) diff --git a/src/cluster/client/etcd/config.go b/src/cluster/client/etcd/config.go index 539ca48387..49232be416 100644 --- a/src/cluster/client/etcd/config.go +++ b/src/cluster/client/etcd/config.go @@ -24,9 +24,12 @@ import ( "os" "time" + "github.com/uber-go/tally" + "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/retry" ) // ClusterConfig is the config for a zoned etcd cluster. @@ -98,6 +101,15 @@ type Configuration struct { SDConfig services.Configuration `yaml:"m3sd"` WatchWithRevision int64 `yaml:"watchWithRevision"` NewDirectoryMode *os.FileMode `yaml:"newDirectoryMode"` + + Retry retry.Configuration `yaml:"retry"` + RequestTimeout time.Duration `yaml:"requestTimeout"` + WatchChanInitTimeout time.Duration `yaml:"watchChanInitTimeout"` + WatchChanCheckInterval time.Duration `yaml:"watchChanCheckInterval"` + WatchChanResetInterval time.Duration `yaml:"watchChanResetInterval"` + // EnableFastGets trades consistency for latency and throughput using clientv3.WithSerializable() + // on etcd ops. + EnableFastGets bool `yaml:"enableFastGets"` } // NewClient creates a new config service client. @@ -114,7 +126,25 @@ func (cfg Configuration) NewOptions() Options { SetCacheDir(cfg.CacheDir). SetClusters(cfg.etcdClusters()). SetServicesOptions(cfg.SDConfig.NewOptions()). - SetWatchWithRevision(cfg.WatchWithRevision) + SetWatchWithRevision(cfg.WatchWithRevision). + SetEnableFastGets(cfg.EnableFastGets). + SetRetryOptions(cfg.Retry.NewOptions(tally.NoopScope)) + + if cfg.RequestTimeout > 0 { + opts = opts.SetRequestTimeout(cfg.RequestTimeout) + } + + if cfg.WatchChanInitTimeout > 0 { + opts = opts.SetWatchChanInitTimeout(cfg.WatchChanInitTimeout) + } + + if cfg.WatchChanCheckInterval > 0 { + opts = opts.SetWatchChanCheckInterval(cfg.WatchChanCheckInterval) + } + + if cfg.WatchChanResetInterval > 0 { + opts = opts.SetWatchChanResetInterval(cfg.WatchChanResetInterval) + } if v := cfg.NewDirectoryMode; v != nil { opts = opts.SetNewDirectoryMode(*v) diff --git a/src/cluster/client/etcd/options.go b/src/cluster/client/etcd/options.go index 566d240e10..8d88985e52 100644 --- a/src/cluster/client/etcd/options.go +++ b/src/cluster/client/etcd/options.go @@ -44,6 +44,11 @@ const ( defaultKeepAlivePeriodMaxJitter = 10 * time.Second defaultKeepAliveTimeout = 10 * time.Second + defaultRequestTimeout = 10 * time.Second + defaultWatchChanCheckInterval = 10 * time.Second + defaultWatchChanResetInterval = 10 * time.Second + defaultWatchChanInitTimeout = 10 * time.Second + defaultRetryInitialBackoff = 2 * time.Second defaultRetryBackoffFactor = 2.0 defaultRetryMaxRetries = 3 @@ -171,8 +176,12 @@ func (o tlsOptions) Config() (*tls.Config, error) { // NewOptions creates a set of Options. func NewOptions() Options { return options{ - sdOpts: services.NewOptions(), - iopts: instrument.NewOptions(), + sdOpts: services.NewOptions(), + iopts: instrument.NewOptions(), + requestTimeout: defaultRequestTimeout, + watchChanInitTimeout: defaultWatchChanInitTimeout, + watchChanCheckInterval: defaultWatchChanCheckInterval, + watchChanResetInterval: defaultWatchChanResetInterval, // NB(r): Set some default retry options so changes to retry // option defaults don't change behavior of this client's retry options retryOpts: retry.NewOptions(). @@ -186,16 +195,21 @@ func NewOptions() Options { } type options struct { - env string - zone string - service string - cacheDir string - watchWithRevision int64 - sdOpts services.Options - clusters map[string]Cluster - iopts instrument.Options - retryOpts retry.Options - newDirectoryMode os.FileMode + requestTimeout time.Duration + env string + zone string + service string + cacheDir string + watchChanCheckInterval time.Duration + watchChanResetInterval time.Duration + watchChanInitTimeout time.Duration + watchWithRevision int64 + enableFastGets bool + sdOpts services.Options + clusters map[string]Cluster + iopts instrument.Options + retryOpts retry.Options + newDirectoryMode os.FileMode } func (o options) Validate() error { @@ -211,6 +225,22 @@ func (o options) Validate() error { return errors.New("invalid options, no instrument options set") } + if o.watchChanCheckInterval <= 0 { + return errors.New("invalid watch channel check interval") + } + + if o.watchChanResetInterval <= 0 { + return errors.New("invalid watch reset interval") + } + + if o.watchChanInitTimeout <= 0 { + return errors.New("invalid watch init interval") + } + + if o.requestTimeout <= 0 { + return errors.New("invalid request timeout") + } + return nil } @@ -289,6 +319,17 @@ func (o options) SetInstrumentOptions(iopts instrument.Options) Options { return o } +//nolint:gocritic +func (o options) RequestTimeout() time.Duration { + return o.requestTimeout +} + +//nolint:gocritic +func (o options) SetRequestTimeout(t time.Duration) Options { + o.requestTimeout = t + return o +} + func (o options) RetryOptions() retry.Options { return o.retryOpts } @@ -298,6 +339,39 @@ func (o options) SetRetryOptions(retryOpts retry.Options) Options { return o } +//nolint:gocritic +func (o options) WatchChanCheckInterval() time.Duration { + return o.watchChanCheckInterval +} + +//nolint:gocritic +func (o options) SetWatchChanCheckInterval(t time.Duration) Options { + o.watchChanCheckInterval = t + return o +} + +//nolint:gocritic +func (o options) WatchChanResetInterval() time.Duration { + return o.watchChanResetInterval +} + +//nolint:gocritic +func (o options) SetWatchChanResetInterval(t time.Duration) Options { + o.watchChanResetInterval = t + return o +} + +//nolint:gocritic +func (o options) WatchChanInitTimeout() time.Duration { + return o.watchChanInitTimeout +} + +//nolint:gocritic +func (o options) SetWatchChanInitTimeout(t time.Duration) Options { + o.watchChanInitTimeout = t + return o +} + func (o options) WatchWithRevision() int64 { return o.watchWithRevision } @@ -316,6 +390,17 @@ func (o options) NewDirectoryMode() os.FileMode { return o.newDirectoryMode } +//nolint:gocritic +func (o options) EnableFastGets() bool { + return o.enableFastGets +} + +//nolint:gocritic +func (o options) SetEnableFastGets(enabled bool) Options { + o.enableFastGets = enabled + return o +} + // NewCluster creates a Cluster. func NewCluster() Cluster { return cluster{ diff --git a/src/cluster/client/etcd/options_test.go b/src/cluster/client/etcd/options_test.go index befd638960..235ea13d46 100644 --- a/src/cluster/client/etcd/options_test.go +++ b/src/cluster/client/etcd/options_test.go @@ -98,6 +98,7 @@ func TestTLSOptions(t *testing.T) { assert.Equal(t, "key", aOpts.KeyPath()) assert.Equal(t, "ca", aOpts.CACrtPath()) } + func TestOptions(t *testing.T) { opts := NewOptions() assert.Equal(t, "", opts.Zone()) @@ -111,6 +112,17 @@ func TestOptions(t *testing.T) { _, ok := opts.ClusterForZone("z") assert.False(t, ok) assert.NotNil(t, opts.InstrumentOptions()) + assert.Equal(t, defaultRequestTimeout, opts.RequestTimeout()) + assert.Equal(t, defaultWatchChanCheckInterval, opts.WatchChanCheckInterval()) + assert.Equal(t, defaultWatchChanResetInterval, opts.WatchChanCheckInterval()) + assert.Equal(t, defaultWatchChanInitTimeout, opts.WatchChanInitTimeout()) + assert.False(t, opts.EnableFastGets()) + ropts := opts.RetryOptions() + assert.Equal(t, defaultRetryJitter, ropts.Jitter()) + assert.Equal(t, defaultRetryInitialBackoff, ropts.InitialBackoff()) + assert.Equal(t, defaultRetryBackoffFactor, ropts.BackoffFactor()) + assert.Equal(t, defaultRetryMaxRetries, ropts.MaxRetries()) + assert.Equal(t, defaultRetryMaxBackoff, ropts.MaxBackoff()) c1 := NewCluster().SetZone("z1") c2 := NewCluster().SetZone("z2") diff --git a/src/cluster/client/etcd/types.go b/src/cluster/client/etcd/types.go index a387357890..151c5eccda 100644 --- a/src/cluster/client/etcd/types.go +++ b/src/cluster/client/etcd/types.go @@ -33,6 +33,11 @@ import ( // Options is the Options to create a config service client. type Options interface { + // RequestTimeout is the timeout for etcd requests + RequestTimeout() time.Duration + // SetRequestTimeout sets the RequestTimeout + SetRequestTimeout(t time.Duration) Options + Env() string SetEnv(e string) Options @@ -58,9 +63,30 @@ type Options interface { RetryOptions() retry.Options SetRetryOptions(retryOpts retry.Options) Options + // WatchChanCheckInterval will be used to periodically check if a watch chan + // is no longer being subscribed and should be closed + WatchChanCheckInterval() time.Duration + // SetWatchChanCheckInterval sets the WatchChanCheckInterval + SetWatchChanCheckInterval(t time.Duration) Options + + // WatchChanResetInterval is the delay before resetting the etcd watch chan + WatchChanResetInterval() time.Duration + // SetWatchChanResetInterval sets the WatchChanResetInterval + SetWatchChanResetInterval(t time.Duration) Options + + // WatchChanInitTimeout is the timeout for a watchChan initialization + WatchChanInitTimeout() time.Duration + // SetWatchChanInitTimeout sets the WatchChanInitTimeout + SetWatchChanInitTimeout(t time.Duration) Options + WatchWithRevision() int64 SetWatchWithRevision(rev int64) Options + // EnableFastGets returns whether to use clientv3.WithSerializable() option to speed up gets. + EnableFastGets() bool + // SetEnableFastGets sets clientv3.WithSerializable() to speed up gets, but can fetch stale data. + SetEnableFastGets(enabled bool) Options + SetNewDirectoryMode(fm os.FileMode) Options NewDirectoryMode() os.FileMode diff --git a/src/cluster/kv/etcd/options.go b/src/cluster/kv/etcd/options.go index 7b2caa23f0..78700c5865 100644 --- a/src/cluster/kv/etcd/options.go +++ b/src/cluster/kv/etcd/options.go @@ -79,6 +79,11 @@ type Options interface { // WatchWithRevision is the revision that watch requests will start from. WatchWithRevision() int64 + // EnableFastGets returns whether to use clientv3.WithSerializable() option to speed up gets. + EnableFastGets() bool + // SetEnableFastGets sets clientv3.WithSerializable() to speed up gets, but can fetch stale data. + SetEnableFastGets(enabled bool) Options + // SetWatchWithRevision sets the revision that watch requests will start // from. SetWatchWithRevision(rev int64) Options @@ -111,6 +116,7 @@ type options struct { watchChanResetInterval time.Duration watchChanInitTimeout time.Duration watchWithRevision int64 + enableFastGets bool cacheFileFn CacheFileFn newDirectoryMode os.FileMode } @@ -141,6 +147,18 @@ func (o options) Validate() error { return errors.New("invalid watch channel check interval") } + if o.watchChanResetInterval <= 0 { + return errors.New("invalid watch reset interval") + } + + if o.watchChanInitTimeout <= 0 { + return errors.New("invalid watch init interval") + } + + if o.requestTimeout <= 0 { + return errors.New("invalid request timeout") + } + return nil } @@ -207,6 +225,17 @@ func (o options) SetWatchWithRevision(rev int64) Options { return o } +//nolint:gocritic +func (o options) EnableFastGets() bool { + return o.enableFastGets +} + +//nolint:gocritic +func (o options) SetEnableFastGets(enabled bool) Options { + o.enableFastGets = enabled + return o +} + func (o options) CacheFileFn() CacheFileFn { return o.cacheFileFn } diff --git a/src/cluster/kv/etcd/options_test.go b/src/cluster/kv/etcd/options_test.go new file mode 100644 index 0000000000..60b3dabeb3 --- /dev/null +++ b/src/cluster/kv/etcd/options_test.go @@ -0,0 +1,45 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package etcd + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestOptions(t *testing.T) { + opts := NewOptions() + assert.NoError(t, opts.Validate()) + assert.Equal(t, defaultRequestTimeout, opts.RequestTimeout()) + assert.Equal(t, defaultWatchChanCheckInterval, opts.WatchChanCheckInterval()) + assert.Equal(t, defaultWatchChanResetInterval, opts.WatchChanCheckInterval()) + assert.Equal(t, defaultWatchChanInitTimeout, opts.WatchChanInitTimeout()) + assert.False(t, opts.EnableFastGets()) + ropts := opts.RetryOptions() + assert.Equal(t, true, ropts.Jitter()) + assert.Equal(t, time.Second, ropts.InitialBackoff()) + assert.EqualValues(t, 2, ropts.BackoffFactor()) + assert.EqualValues(t, 5, ropts.MaxRetries()) + assert.Equal(t, time.Duration(math.MaxInt64), ropts.MaxBackoff()) +} diff --git a/src/cluster/kv/etcd/store.go b/src/cluster/kv/etcd/store.go index efbd3643e6..cf0f6b4540 100644 --- a/src/cluster/kv/etcd/store.go +++ b/src/cluster/kv/etcd/store.go @@ -153,7 +153,11 @@ func (c *client) get(key string) (kv.Value, error) { ctx, cancel := c.context() defer cancel() - r, err := c.kv.Get(ctx, key) + var opts []clientv3.OpOption + if c.opts.EnableFastGets() { + opts = append(opts, clientv3.WithSerializable()) + } + r, err := c.kv.Get(ctx, key, opts...) if err != nil { c.m.etcdGetError.Inc(1) cachedV, ok := c.getCache(key) diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index bbba9f9966..70fb5e10a9 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -1141,7 +1141,33 @@ func TestWatchWithStartRevision(t *testing.T) { verifyValue(t, w1.Get(), "bar-50", 50) }) } +} + +func TestSerializedGets(t *testing.T) { + ec, opts, closeFn := testStore(t) + defer closeFn() + + opts = opts.SetEnableFastGets(true) + require.NoError(t, opts.Validate()) + + store, err := NewStore(ec, opts) + require.NoError(t, err) + v, err := store.Set("foo", genProto("bar")) + require.EqualValues(t, 1, v) + require.NoError(t, err) + + val, err := store.Get("foo") + verifyValue(t, val, "bar", 1) + require.NoError(t, err) + + v, err = store.Set("foo", genProto("42")) + require.EqualValues(t, 2, v) + require.NoError(t, err) + + val, err = store.Get("foo") + verifyValue(t, val, "42", 2) + require.NoError(t, err) } func verifyValue(t *testing.T, v kv.Value, value string, version int) { diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 8218580a33..02ca66b97e 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -287,7 +287,7 @@ db: - 1.1.1.1:2379 - 1.1.1.2:2379 - 1.1.1.3:2379 - + seedNodes: listenPeerUrls: - http://0.0.0.0:2380 @@ -634,6 +634,18 @@ func TestConfiguration(t *testing.T) { initTimeout: null watchWithRevision: 0 newDirectoryMode: null + retry: + initialBackoff: 0s + backoffFactor: 0 + maxBackoff: 0s + maxRetries: 0 + forever: null + jitter: null + requestTimeout: 0s + watchChanInitTimeout: 0s + watchChanCheckInterval: 0s + watchChanResetInterval: 0s + enableFastGets: false statics: [] seedNodes: rootDir: /var/lib/etcd