From 5eadad327600c2857ff0a716b327661502e5880a Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 13:25:09 +0300 Subject: [PATCH 01/14] [coordinator] enable running M3 coordinator without Etcd --- .../m3coordinator/downsample/options.go | 19 ++++++++++++++++--- src/query/server/query.go | 5 +++++ src/query/server/query_test.go | 16 ++++++++++------ 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 6d2c83d06e..dafbc86ba8 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -737,7 +737,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { kvTxnMemStore := mem.NewStore() // Initialize the namespaces - _, err := kvTxnMemStore.Set(matcherOpts.NamespacesKey(), &rulepb.Namespaces{}) + err := initStoreNamespaces(kvTxnMemStore, matcherOpts.NamespacesKey()) if err != nil { return agg{}, err } @@ -805,6 +805,16 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { matcherCacheCapacity = *v } + localKVStore, err := o.ClusterClient.Txn() + if err != nil { + return agg{}, err + } + + err = initStoreNamespaces(localKVStore, matcherOpts.NamespacesKey()) + if err != nil { + return agg{}, err + } + matcher, err := o.newAggregatorMatcher(matcherOpts, matcherCacheCapacity) if err != nil { return agg{}, err @@ -843,8 +853,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { SetName("downsampler"). SetZone("embedded") - localKVStore := mem.NewStore() - placementManager, err := o.newAggregatorPlacementManager(serviceID, localKVStore) if err != nil { return agg{}, err @@ -978,6 +986,11 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { }, nil } +func initStoreNamespaces(store kv.TxnStore, nsKey string) error { + _, err := store.CheckAndSet(nsKey, 0, &rulepb.Namespaces{}) + return err +} + type aggPools struct { tagEncoderPool serialize.TagEncoderPool tagDecoderPool serialize.TagDecoderPool diff --git a/src/query/server/query.go b/src/query/server/query.go index 04d8054a60..cd9a6a410a 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -34,6 +34,7 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cluster/kv" + memcluster "github.com/m3db/m3/src/cluster/mem" handleroptions3 "github.com/m3db/m3/src/cluster/placementhandler/handleroptions" "github.com/m3db/m3/src/cmd/services/m3aggregator/serve" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" @@ -827,6 +828,10 @@ func newDownsamplerAsync( if err != nil { return nil, nil, errors.Wrap(err, "unable to create cluster management etcd client") } + // NB(antanas): M3 Coordinator with in process aggregator can run with in memory cluster client. + } else if cfg.RemoteAggregator == nil { + instrumentOptions.Logger().Info("no etcd config and no remote aggregator - will run with in memory cluster client.") + clusterClient = memcluster.New(kv.NewOverrideOptions()) } newDownsamplerFn := func() (downsample.Downsampler, error) { diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index 5a4eb6de97..ff120a288b 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -228,6 +228,7 @@ backend: prom-remote tagOptions: allowTagNameDuplicates: true + `, externalFakePromServer.WriteAddr())) require.Equal(t, config.PromRemoteStorageType, cfg.Backend) @@ -538,14 +539,17 @@ func runServer(t *testing.T, opts runServerOpts) (string, closeFn) { doneCh = make(chan struct{}) listenerCh = make(chan net.Listener, 1) clusterClient = clusterclient.NewMockClient(opts.ctrl) - clusterClientCh = make(chan clusterclient.Client, 1) + clusterClientCh chan clusterclient.Client ) - store := mem.NewStore() - _, err := store.Set("/namespaces", &rulepb.Namespaces{}) - require.NoError(t, err) - clusterClient.EXPECT().KV().Return(store, nil).MaxTimes(1) - clusterClientCh <- clusterClient + if len(opts.cfg.Clusters) > 0 || opts.cfg.ClusterManagement.Etcd != nil { + clusterClientCh = make(chan clusterclient.Client, 1) + store := mem.NewStore() + _, err := store.Set("/namespaces", &rulepb.Namespaces{}) + require.NoError(t, err) + clusterClient.EXPECT().KV().Return(store, nil).MaxTimes(1) + clusterClientCh <- clusterClient + } go func() { r := Run(RunOptions{ From ad1e8d56f25bfaf8b281f46494d6e004a06b23f4 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 13:33:40 +0300 Subject: [PATCH 02/14] linter --- src/query/server/query.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/query/server/query.go b/src/query/server/query.go index cd9a6a410a..05a9318990 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -828,10 +828,12 @@ func newDownsamplerAsync( if err != nil { return nil, nil, errors.Wrap(err, "unable to create cluster management etcd client") } - // NB(antanas): M3 Coordinator with in process aggregator can run with in memory cluster client. } else if cfg.RemoteAggregator == nil { - instrumentOptions.Logger().Info("no etcd config and no remote aggregator - will run with in memory cluster client.") + // NB(antanas): M3 Coordinator with in process aggregator can run with in memory cluster client. + instrumentOptions.Logger().Info("no etcd config and no remote aggregator - will run with in memory cluster client") clusterClient = memcluster.New(kv.NewOverrideOptions()) + } else { + return nil, nil, fmt.Errorf("no configured cluster management config, must set this config for remote aggregator") } newDownsamplerFn := func() (downsample.Downsampler, error) { @@ -871,7 +873,7 @@ func newDownsamplerAsync( func newDownsampler( cfg downsample.Configuration, - clusterManagementClient clusterclient.Client, + clusterClient clusterclient.Client, storage storage.Appender, clusterNamespacesWatcher m3.ClusterNamespacesWatcher, tagOptions models.TagOptions, @@ -885,22 +887,17 @@ func newDownsampler( instrumentOpts = instrumentOpts.SetMetricsScope( instrumentOpts.MetricsScope().SubScope("downsampler")) - if clusterManagementClient == nil { - return nil, fmt.Errorf("no configured cluster management config, " + - "must set this config for downsampler") - } - var kvStore kv.Store var err error if applyCustomRuleStore == nil { - kvStore, err = clusterManagementClient.KV() + kvStore, err = clusterClient.KV() if err != nil { return nil, errors.Wrap(err, "unable to create KV store from the "+ "cluster management config client") } } else { - kvStore, err = applyCustomRuleStore(clusterManagementClient, instrumentOpts) + kvStore, err = applyCustomRuleStore(clusterClient, instrumentOpts) if err != nil { return nil, errors.Wrap(err, "unable to apply custom rule store") } @@ -923,7 +920,7 @@ func newDownsampler( downsampler, err := cfg.NewDownsampler(downsample.DownsamplerOptions{ Storage: storage, - ClusterClient: clusterManagementClient, + ClusterClient: clusterClient, RulesKVStore: kvStore, ClusterNamespacesWatcher: clusterNamespacesWatcher, ClockOptions: clockOpts, From ecce04a06b344f774d628c0d65487e1e44772e6e Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 14:01:20 +0300 Subject: [PATCH 03/14] rename + comment --- src/cmd/services/m3coordinator/downsample/options.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index dafbc86ba8..953bc06d0e 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -805,12 +805,13 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { matcherCacheCapacity = *v } - localKVStore, err := o.ClusterClient.Txn() + kvStore, err := o.ClusterClient.Txn() if err != nil { return agg{}, err } - err = initStoreNamespaces(localKVStore, matcherOpts.NamespacesKey()) + // NB(antanas): matcher registers watcher on namespaces key. Making sure it is set, otherwise watcher times out. + err = initStoreNamespaces(kvStore, matcherOpts.NamespacesKey()) if err != nil { return agg{}, err } @@ -853,14 +854,14 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { SetName("downsampler"). SetZone("embedded") - placementManager, err := o.newAggregatorPlacementManager(serviceID, localKVStore) + placementManager, err := o.newAggregatorPlacementManager(serviceID, kvStore) if err != nil { return agg{}, err } flushTimesManager := aggregator.NewFlushTimesManager( aggregator.NewFlushTimesManagerOptions(). - SetFlushTimesStore(localKVStore)) + SetFlushTimesStore(kvStore)) electionManager, err := o.newAggregatorElectionManager(serviceID, placementManager, flushTimesManager, clockOpts) From 878c4277e24721f5d1585a20a5d656fc43fe2290 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 14:07:33 +0300 Subject: [PATCH 04/14] fix test --- src/cmd/services/m3coordinator/downsample/options.go | 4 ++-- src/query/server/query_test.go | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 953bc06d0e..6f4c3158b2 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -805,7 +805,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { matcherCacheCapacity = *v } - kvStore, err := o.ClusterClient.Txn() + kvStore, err := o.ClusterClient.KV() if err != nil { return agg{}, err } @@ -987,7 +987,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { }, nil } -func initStoreNamespaces(store kv.TxnStore, nsKey string) error { +func initStoreNamespaces(store kv.Store, nsKey string) error { _, err := store.CheckAndSet(nsKey, 0, &rulepb.Namespaces{}) return err } diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index ff120a288b..a955e083eb 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -36,7 +36,6 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/metrics/generated/proto/metricpb" - "github.com/m3db/m3/src/metrics/generated/proto/rulepb" "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/msg/generated/proto/msgpb" m3msgproto "github.com/m3db/m3/src/msg/protocol/proto" @@ -545,9 +544,7 @@ func runServer(t *testing.T, opts runServerOpts) (string, closeFn) { if len(opts.cfg.Clusters) > 0 || opts.cfg.ClusterManagement.Etcd != nil { clusterClientCh = make(chan clusterclient.Client, 1) store := mem.NewStore() - _, err := store.Set("/namespaces", &rulepb.Namespaces{}) - require.NoError(t, err) - clusterClient.EXPECT().KV().Return(store, nil).MaxTimes(1) + clusterClient.EXPECT().KV().Return(store, nil).MaxTimes(2) clusterClientCh <- clusterClient } From 59bfd144e3951bce2fe82691254997abd9df9183 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 15:35:32 +0300 Subject: [PATCH 05/14] test and fix --- .../downsample/downsampler_test.go | 43 ++++++++++++++++++- .../m3coordinator/downsample/options.go | 5 ++- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 1c9db7c027..c7b9248d52 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/aggregator/client" clusterclient "github.com/m3db/m3/src/cluster/client" + "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/kv/mem" dbclient "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/metrics/aggregation" @@ -2841,6 +2842,28 @@ func TestDownsamplerWithOverrideNamespace(t *testing.T) { testDownsamplerAggregation(t, testDownsampler) } +func TestDownsamplerStoreInit(t *testing.T) { + t.Run("does not reset namespaces key", func(t *testing.T) { + store := mem.NewStore() + _, err := store.Set("/namespaces", &rulepb.Namespaces{Namespaces: []*rulepb.Namespace{{}}}) + require.NoError(t, err) + + _ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store}) + + ns := readNamespacesKey(t, store) + assert.Len(t, ns.Namespaces, 1) + }) + + t.Run("initializes namespace key", func(t *testing.T) { + store := mem.NewStore() + + _ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store}) + + ns := readNamespacesKey(t, store) + assert.Len(t, ns.Namespaces, 0) + }) +} + func originalStagedMetadata(t *testing.T, testDownsampler testDownsampler) []metricpb.StagedMetadatas { ds, ok := testDownsampler.downsampler.(*downsampler) require.True(t, ok) @@ -3428,6 +3451,8 @@ type testDownsamplerOptions struct { // Test ingest and expectations overrides ingest *testDownsamplerOptionsIngest expect *testDownsamplerOptionsExpect + + kvStore kv.Store } type testDownsamplerOptionsIngest struct { @@ -3510,9 +3535,15 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl cfg.Matcher = opts.matcherConfig cfg.UntimedRollups = opts.untimedRollups + clusterClient := clusterclient.NewMockClient(gomock.NewController(t)) + kvStore := opts.kvStore + if kvStore == nil { + kvStore = mem.NewStore() + } + clusterClient.EXPECT().KV().Return(kvStore, nil).AnyTimes() instance, err := cfg.NewDownsampler(DownsamplerOptions{ Storage: storage, - ClusterClient: clusterclient.NewMockClient(gomock.NewController(t)), + ClusterClient: clusterClient, RulesKVStore: rulesKVStore, ClusterNamespacesWatcher: m3.NewClusterNamespacesWatcher(), ClockOptions: clockOpts, @@ -3617,3 +3648,13 @@ func findWrites( func testUpdateMetadata() rules.UpdateMetadata { return rules.NewRuleSetUpdateHelper(0).NewUpdateMetadata(time.Now().UnixNano(), "test") } + +func readNamespacesKey(t *testing.T, store kv.Store) rulepb.Namespaces { + v, err := store.Get("/namespaces") + require.NoError(t, err) + var ns rulepb.Namespaces + err = v.Unmarshal(&ns) + require.NoError(t, err) + require.NotNil(t, ns) + return ns +} diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 6f4c3158b2..3ed258ae24 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -988,7 +988,10 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } func initStoreNamespaces(store kv.Store, nsKey string) error { - _, err := store.CheckAndSet(nsKey, 0, &rulepb.Namespaces{}) + _, err := store.SetIfNotExists(nsKey, &rulepb.Namespaces{}) + if errors.Is(err, kv.ErrAlreadyExists) { + return nil + } return err } From e881014181006683e3a05539d60e0918fed38058 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 15:49:14 +0300 Subject: [PATCH 06/14] avoid using hardcoded string --- .../m3coordinator/downsample/downsampler_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index c7b9248d52..43ad8ed7f2 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -2845,21 +2845,24 @@ func TestDownsamplerWithOverrideNamespace(t *testing.T) { func TestDownsamplerStoreInit(t *testing.T) { t.Run("does not reset namespaces key", func(t *testing.T) { store := mem.NewStore() - _, err := store.Set("/namespaces", &rulepb.Namespaces{Namespaces: []*rulepb.Namespace{{}}}) + initialNamespaces := rulepb.Namespaces{Namespaces: []*rulepb.Namespace{{Name: "testNamespace"}}} + _, err := store.Set(matcher.NewOptions().NamespacesKey(), &initialNamespaces) require.NoError(t, err) _ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store}) - ns := readNamespacesKey(t, store) - assert.Len(t, ns.Namespaces, 1) + assert.Equal(t, initialNamespaces, readNamespacesKey(t, store), 1) }) t.Run("initializes namespace key", func(t *testing.T) { store := mem.NewStore() + _, err := store.Get(matcher.NewOptions().NamespacesKey()) + require.Error(t, err) _ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store}) ns := readNamespacesKey(t, store) + require.NotNil(t, ns) assert.Len(t, ns.Namespaces, 0) }) } @@ -3650,7 +3653,7 @@ func testUpdateMetadata() rules.UpdateMetadata { } func readNamespacesKey(t *testing.T, store kv.Store) rulepb.Namespaces { - v, err := store.Get("/namespaces") + v, err := store.Get(matcher.NewOptions().NamespacesKey()) require.NoError(t, err) var ns rulepb.Namespaces err = v.Unmarshal(&ns) From 20786e3d6b0325a150151d37b18bd2317e7073d1 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 16:24:58 +0300 Subject: [PATCH 07/14] only init when flag true --- .../m3coordinator/downsample/downsampler_test.go | 14 +++++++++++--- .../services/m3coordinator/downsample/options.go | 10 +++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 43ad8ed7f2..89085ba626 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -2842,7 +2842,7 @@ func TestDownsamplerWithOverrideNamespace(t *testing.T) { testDownsamplerAggregation(t, testDownsampler) } -func TestDownsamplerStoreInit(t *testing.T) { +func TestDownsamplerNamespacesEtcdInit(t *testing.T) { t.Run("does not reset namespaces key", func(t *testing.T) { store := mem.NewStore() initialNamespaces := rulepb.Namespaces{Namespaces: []*rulepb.Namespace{{Name: "testNamespace"}}} @@ -2856,8 +2856,6 @@ func TestDownsamplerStoreInit(t *testing.T) { t.Run("initializes namespace key", func(t *testing.T) { store := mem.NewStore() - _, err := store.Get(matcher.NewOptions().NamespacesKey()) - require.Error(t, err) _ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store}) @@ -2865,6 +2863,16 @@ func TestDownsamplerStoreInit(t *testing.T) { require.NotNil(t, ns) assert.Len(t, ns.Namespaces, 0) }) + + t.Run("do not initialize namespaces when RequireNamespaceWatchOnInit is true", func(t *testing.T) { + store := mem.NewStore() + + matcherConfig := MatcherConfiguration{RequireNamespaceWatchOnInit: true} + _ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store, matcherConfig: matcherConfig}) + + _, err := store.Get(matcher.NewOptions().NamespacesKey()) + require.Error(t, err) + }) } func originalStagedMetadata(t *testing.T, testDownsampler testDownsampler) []metricpb.StagedMetadatas { diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 3ed258ae24..b0c5cae50b 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -811,9 +811,13 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } // NB(antanas): matcher registers watcher on namespaces key. Making sure it is set, otherwise watcher times out. - err = initStoreNamespaces(kvStore, matcherOpts.NamespacesKey()) - if err != nil { - return agg{}, err + // With RequireNamespaceWatchOnInit being true we expect namespaces to be set upfront + // so we do not initialize them here at all because it might potentially hide human error. + if !cfg.Matcher.RequireNamespaceWatchOnInit { + err = initStoreNamespaces(kvStore, matcherOpts.NamespacesKey()) + if err != nil { + return agg{}, err + } } matcher, err := o.newAggregatorMatcher(matcherOpts, matcherCacheCapacity) From 26a55ebc1430735c27ba3d41813dd55e3d4b4938 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 16:57:27 +0300 Subject: [PATCH 08/14] InMem safeguard --- src/cluster/kv/mem/store.go | 6 +++++ .../downsample/downsampler_test.go | 22 +++++++++++++++++-- .../m3coordinator/downsample/options.go | 8 ++++++- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/cluster/kv/mem/store.go b/src/cluster/kv/mem/store.go index 352df906a8..e61c831d3a 100644 --- a/src/cluster/kv/mem/store.go +++ b/src/cluster/kv/mem/store.go @@ -81,6 +81,12 @@ type store struct { watchables map[string]kv.ValueWatchable } +// IsMem lets asserting if given store is an in memory one. +func IsMem(s kv.Store) bool { + _, ok := s.(*store) + return ok +} + func (s *store) Get(key string) (kv.Value, error) { s.RLock() defer s.RUnlock() diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 89085ba626..6faf6cf628 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -2842,6 +2842,18 @@ func TestDownsamplerWithOverrideNamespace(t *testing.T) { testDownsamplerAggregation(t, testDownsampler) } +func TestSafeguardInProcessDownsampelr(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + store := kv.NewMockStore(ctrl) + store.EXPECT().SetIfNotExists(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() + _ = newTestDownsampler(t, testDownsamplerOptions{ + remoteClientMock: nil, + expectConstructError: "other store then in memory can yield unexpected side effects", + kvStore: store, + }) +} + func TestDownsamplerNamespacesEtcdInit(t *testing.T) { t.Run("does not reset namespaces key", func(t *testing.T) { store := mem.NewStore() @@ -3460,8 +3472,9 @@ type testDownsamplerOptions struct { matcherConfig MatcherConfiguration // Test ingest and expectations overrides - ingest *testDownsamplerOptionsIngest - expect *testDownsamplerOptionsExpect + ingest *testDownsamplerOptionsIngest + expect *testDownsamplerOptionsExpect + expectConstructError string kvStore kv.Store } @@ -3567,6 +3580,11 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl RWOptions: xio.NewOptions(), TagOptions: models.NewTagOptions(), }) + if opts.expectConstructError == "" { + require.Error(t, err) + assert.Contains(t, err.Error(), opts.expectConstructError) + return testDownsampler{} + } require.NoError(t, err) if len(opts.autoMappingRules) > 0 { diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index b0c5cae50b..f3a664bc84 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -813,7 +813,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { // NB(antanas): matcher registers watcher on namespaces key. Making sure it is set, otherwise watcher times out. // With RequireNamespaceWatchOnInit being true we expect namespaces to be set upfront // so we do not initialize them here at all because it might potentially hide human error. - if !cfg.Matcher.RequireNamespaceWatchOnInit { + if !matcherOpts.RequireNamespaceWatchOnInit() { err = initStoreNamespaces(kvStore, matcherOpts.NamespacesKey()) if err != nil { return agg{}, err @@ -853,6 +853,12 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { }, nil } + // NB(antanas): to protect against running with real Etcd and overriding existing placements. + if !mem.IsMem(kvStore) { + return agg{}, errors.New("running in process downsampler with other store " + + "then in memory can yield unexpected side effects") + } + serviceID := services.NewServiceID(). SetEnvironment("production"). SetName("downsampler"). From ec2caa963a92668936454f48a5e8a0d8e60bf18d Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 16:58:30 +0300 Subject: [PATCH 09/14] misstype --- src/cmd/services/m3coordinator/downsample/downsampler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 6faf6cf628..1bf2867bde 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -2842,7 +2842,7 @@ func TestDownsamplerWithOverrideNamespace(t *testing.T) { testDownsamplerAggregation(t, testDownsampler) } -func TestSafeguardInProcessDownsampelr(t *testing.T) { +func TestSafeguardInProcessDownsampler(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() store := kv.NewMockStore(ctrl) From f668cd12fd619996835a2eb06baa9aa2dbac258a Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 17:01:00 +0300 Subject: [PATCH 10/14] local var --- src/cmd/services/m3coordinator/downsample/options.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index f3a664bc84..fa94d86f35 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -858,20 +858,21 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { return agg{}, errors.New("running in process downsampler with other store " + "then in memory can yield unexpected side effects") } + localKVStore := kvStore serviceID := services.NewServiceID(). SetEnvironment("production"). SetName("downsampler"). SetZone("embedded") - placementManager, err := o.newAggregatorPlacementManager(serviceID, kvStore) + placementManager, err := o.newAggregatorPlacementManager(serviceID, localKVStore) if err != nil { return agg{}, err } flushTimesManager := aggregator.NewFlushTimesManager( aggregator.NewFlushTimesManagerOptions(). - SetFlushTimesStore(kvStore)) + SetFlushTimesStore(localKVStore)) electionManager, err := o.newAggregatorElectionManager(serviceID, placementManager, flushTimesManager, clockOpts) From 6ee83974426e6b03d6397dcedb2052c8c7d6f417 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 17:16:06 +0300 Subject: [PATCH 11/14] brainfart --- src/cmd/services/m3coordinator/downsample/downsampler_test.go | 2 +- src/query/server/query_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 1bf2867bde..6f8e7f1c6e 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -3580,7 +3580,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl RWOptions: xio.NewOptions(), TagOptions: models.NewTagOptions(), }) - if opts.expectConstructError == "" { + if opts.expectConstructError != "" { require.Error(t, err) assert.Contains(t, err.Error(), opts.expectConstructError) return testDownsampler{} diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index a955e083eb..0944ebf2f3 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -227,7 +227,6 @@ backend: prom-remote tagOptions: allowTagNameDuplicates: true - `, externalFakePromServer.WriteAddr())) require.Equal(t, config.PromRemoteStorageType, cfg.Backend) From 39ccce77c2946e9b6e5729136189c27a58b4b323 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Tue, 5 Oct 2021 18:56:46 +0300 Subject: [PATCH 12/14] set in memory store instead of failing --- .../downsample/downsampler_test.go | 20 ++++++++----------- .../m3coordinator/downsample/options.go | 7 +++---- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 6f8e7f1c6e..68c076695d 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -2846,11 +2846,13 @@ func TestSafeguardInProcessDownsampler(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() store := kv.NewMockStore(ctrl) - store.EXPECT().SetIfNotExists(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes() + store.EXPECT().SetIfNotExists(gomock.Eq(matcher.NewOptions().NamespacesKey()), gomock.Any()).Return(0, nil).Times(1) + store.EXPECT().Set(gomock.Any(), gomock.Any()).Times(0) + store.EXPECT().CheckAndSet(gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + store.EXPECT().Delete(gomock.Any()).Times(0) _ = newTestDownsampler(t, testDownsamplerOptions{ - remoteClientMock: nil, - expectConstructError: "other store then in memory can yield unexpected side effects", - kvStore: store, + remoteClientMock: nil, + kvStore: store, }) } @@ -3472,9 +3474,8 @@ type testDownsamplerOptions struct { matcherConfig MatcherConfiguration // Test ingest and expectations overrides - ingest *testDownsamplerOptionsIngest - expect *testDownsamplerOptionsExpect - expectConstructError string + ingest *testDownsamplerOptionsIngest + expect *testDownsamplerOptionsExpect kvStore kv.Store } @@ -3580,11 +3581,6 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl RWOptions: xio.NewOptions(), TagOptions: models.NewTagOptions(), }) - if opts.expectConstructError != "" { - require.Error(t, err) - assert.Contains(t, err.Error(), opts.expectConstructError) - return testDownsampler{} - } require.NoError(t, err) if len(opts.autoMappingRules) > 0 { diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index fa94d86f35..f61c6ca8f3 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -853,12 +853,11 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { }, nil } + localKVStore := kvStore // NB(antanas): to protect against running with real Etcd and overriding existing placements. - if !mem.IsMem(kvStore) { - return agg{}, errors.New("running in process downsampler with other store " + - "then in memory can yield unexpected side effects") + if !mem.IsMem(localKVStore) { + localKVStore = mem.NewStore() } - localKVStore := kvStore serviceID := services.NewServiceID(). SetEnvironment("production"). From 5bfd9faa8a9e373de940dda0e2f711afe1fc01c1 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Wed, 6 Oct 2021 11:22:10 +0300 Subject: [PATCH 13/14] review fixes --- .../services/m3coordinator/downsample/downsampler_test.go | 8 ++++++-- src/cmd/services/m3coordinator/downsample/options.go | 6 ++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 68c076695d..6f4d2ee456 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -2845,11 +2845,15 @@ func TestDownsamplerWithOverrideNamespace(t *testing.T) { func TestSafeguardInProcessDownsampler(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + store := kv.NewMockStore(ctrl) - store.EXPECT().SetIfNotExists(gomock.Eq(matcher.NewOptions().NamespacesKey()), gomock.Any()).Return(0, nil).Times(1) + store.EXPECT().SetIfNotExists(gomock.Eq(matcher.NewOptions().NamespacesKey()), gomock.Any()).Return(0, nil) + + // explicitly asserting that no more mutations are done for original store. store.EXPECT().Set(gomock.Any(), gomock.Any()).Times(0) store.EXPECT().CheckAndSet(gomock.Any(), gomock.Any(), gomock.Any()).Times(0) store.EXPECT().Delete(gomock.Any()).Times(0) + _ = newTestDownsampler(t, testDownsamplerOptions{ remoteClientMock: nil, kvStore: store, @@ -2878,7 +2882,7 @@ func TestDownsamplerNamespacesEtcdInit(t *testing.T) { assert.Len(t, ns.Namespaces, 0) }) - t.Run("do not initialize namespaces when RequireNamespaceWatchOnInit is true", func(t *testing.T) { + t.Run("does not initialize namespaces key when RequireNamespaceWatchOnInit is true", func(t *testing.T) { store := mem.NewStore() matcherConfig := MatcherConfiguration{RequireNamespaceWatchOnInit: true} diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index f61c6ca8f3..0ef7dcce39 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -737,8 +737,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { kvTxnMemStore := mem.NewStore() // Initialize the namespaces - err := initStoreNamespaces(kvTxnMemStore, matcherOpts.NamespacesKey()) - if err != nil { + if err := initStoreNamespaces(kvTxnMemStore, matcherOpts.NamespacesKey()); err != nil { return agg{}, err } @@ -814,8 +813,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { // With RequireNamespaceWatchOnInit being true we expect namespaces to be set upfront // so we do not initialize them here at all because it might potentially hide human error. if !matcherOpts.RequireNamespaceWatchOnInit() { - err = initStoreNamespaces(kvStore, matcherOpts.NamespacesKey()) - if err != nil { + if err := initStoreNamespaces(kvStore, matcherOpts.NamespacesKey()); err != nil { return agg{}, err } } From 2be0162a805b8edead8b3ffb7df48f3f010f20e8 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Wed, 6 Oct 2021 11:34:56 +0300 Subject: [PATCH 14/14] move few lines --- src/cmd/services/m3coordinator/downsample/options.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 0ef7dcce39..0d9dfbcb0d 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -851,17 +851,17 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { }, nil } + serviceID := services.NewServiceID(). + SetEnvironment("production"). + SetName("downsampler"). + SetZone("embedded") + localKVStore := kvStore // NB(antanas): to protect against running with real Etcd and overriding existing placements. if !mem.IsMem(localKVStore) { localKVStore = mem.NewStore() } - serviceID := services.NewServiceID(). - SetEnvironment("production"). - SetName("downsampler"). - SetZone("embedded") - placementManager, err := o.newAggregatorPlacementManager(serviceID, localKVStore) if err != nil { return agg{}, err