Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Enable running M3 coordinator without Etcd #3814

Merged
merged 16 commits into from
Oct 6, 2021
6 changes: 6 additions & 0 deletions src/cluster/kv/mem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ type store struct {
watchables map[string]kv.ValueWatchable
}

// IsMem lets asserting if given store is an in memory one.
func IsMem(s kv.Store) bool {
_, ok := s.(*store)
return ok
}

func (s *store) Get(key string) (kv.Value, error) {
s.RLock()
defer s.RUnlock()
Expand Down
68 changes: 67 additions & 1 deletion src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/m3db/m3/src/aggregator/client"
clusterclient "github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/kv/mem"
dbclient "github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/metrics/aggregation"
Expand Down Expand Up @@ -2841,6 +2842,53 @@ func TestDownsamplerWithOverrideNamespace(t *testing.T) {
testDownsamplerAggregation(t, testDownsampler)
}

func TestSafeguardInProcessDownsampler(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
store := kv.NewMockStore(ctrl)
store.EXPECT().SetIfNotExists(gomock.Eq(matcher.NewOptions().NamespacesKey()), gomock.Any()).Return(0, nil).Times(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for Times(1) here, isn't it the default behavior?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, I will remove.

store.EXPECT().Set(gomock.Any(), gomock.Any()).Times(0)
store.EXPECT().CheckAndSet(gomock.Any(), gomock.Any(), gomock.Any()).Times(0)
store.EXPECT().Delete(gomock.Any()).Times(0)
Comment on lines +2853 to +2855
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Times(0) - isn't it the same as not defining those method calls?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By having them here explicitly I am trying to communicate that I am trying to ensure no mutation methods are invoked. And if in the future test fails with unexpected invocation someone will think twice before adding it here. Maybe I need to add comment for that? Or can you suggest some other approach?

_ = newTestDownsampler(t, testDownsamplerOptions{
remoteClientMock: nil,
kvStore: store,
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: slightly easier to read test code when you separate controller setup / mock setup / invocation with empty lines.

}

func TestDownsamplerNamespacesEtcdInit(t *testing.T) {
t.Run("does not reset namespaces key", func(t *testing.T) {
Comment on lines +2863 to +2864
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be cleaner to have separate test methods instead of all those t.Run? Or was your goal to have nice test descriptions? I wish testing.T simply had some kind of SetName for this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to group those namespace init related tests. This test file is 3k lines long. So different test methods can just get lost.

store := mem.NewStore()
initialNamespaces := rulepb.Namespaces{Namespaces: []*rulepb.Namespace{{Name: "testNamespace"}}}
_, err := store.Set(matcher.NewOptions().NamespacesKey(), &initialNamespaces)
require.NoError(t, err)

_ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store})

assert.Equal(t, initialNamespaces, readNamespacesKey(t, store), 1)
})

t.Run("initializes namespace key", func(t *testing.T) {
store := mem.NewStore()

_ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store})

ns := readNamespacesKey(t, store)
require.NotNil(t, ns)
assert.Len(t, ns.Namespaces, 0)
})

t.Run("do not initialize namespaces when RequireNamespaceWatchOnInit is true", func(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit for consistency:

Suggested change
t.Run("do not initialize namespaces when RequireNamespaceWatchOnInit is true", func(t *testing.T) {
t.Run("does not initialize namespaces when RequireNamespaceWatchOnInit is true", func(t *testing.T) {

store := mem.NewStore()

matcherConfig := MatcherConfiguration{RequireNamespaceWatchOnInit: true}
_ = newTestDownsampler(t, testDownsamplerOptions{kvStore: store, matcherConfig: matcherConfig})

_, err := store.Get(matcher.NewOptions().NamespacesKey())
require.Error(t, err)
})
}

func originalStagedMetadata(t *testing.T, testDownsampler testDownsampler) []metricpb.StagedMetadatas {
ds, ok := testDownsampler.downsampler.(*downsampler)
require.True(t, ok)
Expand Down Expand Up @@ -3428,6 +3476,8 @@ type testDownsamplerOptions struct {
// Test ingest and expectations overrides
ingest *testDownsamplerOptionsIngest
expect *testDownsamplerOptionsExpect

kvStore kv.Store
}

type testDownsamplerOptionsIngest struct {
Expand Down Expand Up @@ -3510,9 +3560,15 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
cfg.Matcher = opts.matcherConfig
cfg.UntimedRollups = opts.untimedRollups

clusterClient := clusterclient.NewMockClient(gomock.NewController(t))
kvStore := opts.kvStore
if kvStore == nil {
kvStore = mem.NewStore()
}
clusterClient.EXPECT().KV().Return(kvStore, nil).AnyTimes()
instance, err := cfg.NewDownsampler(DownsamplerOptions{
Storage: storage,
ClusterClient: clusterclient.NewMockClient(gomock.NewController(t)),
ClusterClient: clusterClient,
RulesKVStore: rulesKVStore,
ClusterNamespacesWatcher: m3.NewClusterNamespacesWatcher(),
ClockOptions: clockOpts,
Expand Down Expand Up @@ -3617,3 +3673,13 @@ func findWrites(
func testUpdateMetadata() rules.UpdateMetadata {
return rules.NewRuleSetUpdateHelper(0).NewUpdateMetadata(time.Now().UnixNano(), "test")
}

func readNamespacesKey(t *testing.T, store kv.Store) rulepb.Namespaces {
v, err := store.Get(matcher.NewOptions().NamespacesKey())
require.NoError(t, err)
var ns rulepb.Namespaces
err = v.Unmarshal(&ns)
require.NoError(t, err)
require.NotNil(t, ns)
return ns
}
33 changes: 30 additions & 3 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
kvTxnMemStore := mem.NewStore()

// Initialize the namespaces
_, err := kvTxnMemStore.Set(matcherOpts.NamespacesKey(), &rulepb.Namespaces{})
err := initStoreNamespaces(kvTxnMemStore, matcherOpts.NamespacesKey())
if err != nil {
return agg{}, err
}
Expand Down Expand Up @@ -805,6 +805,21 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
matcherCacheCapacity = *v
}

kvStore, err := o.ClusterClient.KV()
if err != nil {
return agg{}, err
}

// NB(antanas): matcher registers watcher on namespaces key. Making sure it is set, otherwise watcher times out.
// With RequireNamespaceWatchOnInit being true we expect namespaces to be set upfront
// so we do not initialize them here at all because it might potentially hide human error.
if !matcherOpts.RequireNamespaceWatchOnInit() {
err = initStoreNamespaces(kvStore, matcherOpts.NamespacesKey())
if err != nil {
return agg{}, err
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
err = initStoreNamespaces(kvStore, matcherOpts.NamespacesKey())
if err != nil {
return agg{}, err
}
if err := initStoreNamespaces(kvStore, matcherOpts.NamespacesKey()); err != nil {
return agg{}, err
}

}

matcher, err := o.newAggregatorMatcher(matcherOpts, matcherCacheCapacity)
if err != nil {
return agg{}, err
Expand Down Expand Up @@ -838,13 +853,17 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}, nil
}

localKVStore := kvStore
// NB(antanas): to protect against running with real Etcd and overriding existing placements.
if !mem.IsMem(localKVStore) {
localKVStore = mem.NewStore()
}

serviceID := services.NewServiceID().
SetEnvironment("production").
SetName("downsampler").
SetZone("embedded")

localKVStore := mem.NewStore()

placementManager, err := o.newAggregatorPlacementManager(serviceID, localKVStore)
if err != nil {
return agg{}, err
Expand Down Expand Up @@ -978,6 +997,14 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}, nil
}

func initStoreNamespaces(store kv.Store, nsKey string) error {
_, err := store.SetIfNotExists(nsKey, &rulepb.Namespaces{})
Copy link
Collaborator Author

@Antanukas Antanukas Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a subject to a race condition "set if not exist" when multiple writes might be happening to the store? It's probably a corner case and might be an issue only when Etcd is in uninitialized state.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll get back a kv.ErrAlreadyExists if there is a race. Etcd guarantees consistency since it each transaction uses raft consensus, so only one will win the race and the others will get back kv.ErrAlreadyExists.

if errors.Is(err, kv.ErrAlreadyExists) {
return nil
}
return err
}

type aggPools struct {
tagEncoderPool serialize.TagEncoderPool
tagDecoderPool serialize.TagDecoderPool
Expand Down
20 changes: 11 additions & 9 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
clusterclient "github.com/m3db/m3/src/cluster/client"
etcdclient "github.com/m3db/m3/src/cluster/client/etcd"
"github.com/m3db/m3/src/cluster/kv"
memcluster "github.com/m3db/m3/src/cluster/mem"
handleroptions3 "github.com/m3db/m3/src/cluster/placementhandler/handleroptions"
"github.com/m3db/m3/src/cmd/services/m3aggregator/serve"
"github.com/m3db/m3/src/cmd/services/m3coordinator/downsample"
Expand Down Expand Up @@ -827,6 +828,12 @@ func newDownsamplerAsync(
if err != nil {
return nil, nil, errors.Wrap(err, "unable to create cluster management etcd client")
}
} else if cfg.RemoteAggregator == nil {
// NB(antanas): M3 Coordinator with in process aggregator can run with in memory cluster client.
instrumentOptions.Logger().Info("no etcd config and no remote aggregator - will run with in memory cluster client")
clusterClient = memcluster.New(kv.NewOverrideOptions())
} else {
return nil, nil, fmt.Errorf("no configured cluster management config, must set this config for remote aggregator")
}

newDownsamplerFn := func() (downsample.Downsampler, error) {
Expand Down Expand Up @@ -866,7 +873,7 @@ func newDownsamplerAsync(

func newDownsampler(
cfg downsample.Configuration,
clusterManagementClient clusterclient.Client,
clusterClient clusterclient.Client,
storage storage.Appender,
clusterNamespacesWatcher m3.ClusterNamespacesWatcher,
tagOptions models.TagOptions,
Expand All @@ -880,22 +887,17 @@ func newDownsampler(
instrumentOpts = instrumentOpts.SetMetricsScope(
instrumentOpts.MetricsScope().SubScope("downsampler"))

if clusterManagementClient == nil {
return nil, fmt.Errorf("no configured cluster management config, " +
"must set this config for downsampler")
}

var kvStore kv.Store
var err error

if applyCustomRuleStore == nil {
kvStore, err = clusterManagementClient.KV()
kvStore, err = clusterClient.KV()
if err != nil {
return nil, errors.Wrap(err, "unable to create KV store from the "+
"cluster management config client")
}
} else {
kvStore, err = applyCustomRuleStore(clusterManagementClient, instrumentOpts)
kvStore, err = applyCustomRuleStore(clusterClient, instrumentOpts)
if err != nil {
return nil, errors.Wrap(err, "unable to apply custom rule store")
}
Expand All @@ -918,7 +920,7 @@ func newDownsampler(

downsampler, err := cfg.NewDownsampler(downsample.DownsamplerOptions{
Storage: storage,
ClusterClient: clusterManagementClient,
ClusterClient: clusterClient,
RulesKVStore: kvStore,
ClusterNamespacesWatcher: clusterNamespacesWatcher,
ClockOptions: clockOpts,
Expand Down
14 changes: 7 additions & 7 deletions src/query/server/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/metrics/generated/proto/metricpb"
"github.com/m3db/m3/src/metrics/generated/proto/rulepb"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/msg/generated/proto/msgpb"
m3msgproto "github.com/m3db/m3/src/msg/protocol/proto"
Expand Down Expand Up @@ -538,14 +537,15 @@ func runServer(t *testing.T, opts runServerOpts) (string, closeFn) {
doneCh = make(chan struct{})
listenerCh = make(chan net.Listener, 1)
clusterClient = clusterclient.NewMockClient(opts.ctrl)
clusterClientCh = make(chan clusterclient.Client, 1)
clusterClientCh chan clusterclient.Client
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this channel?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a away to pass in a mocked instance for cases when Etcd is required. Probably not a best thing to do but thats how it was before. With my change I just added case when no Etcd is needed I pass this channel as nil and expect server startup to suceed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, didn't notice it was there before the change, thought you had introduced it.

)

store := mem.NewStore()
_, err := store.Set("/namespaces", &rulepb.Namespaces{})
require.NoError(t, err)
clusterClient.EXPECT().KV().Return(store, nil).MaxTimes(1)
clusterClientCh <- clusterClient
if len(opts.cfg.Clusters) > 0 || opts.cfg.ClusterManagement.Etcd != nil {
clusterClientCh = make(chan clusterclient.Client, 1)
store := mem.NewStore()
clusterClient.EXPECT().KV().Return(store, nil).MaxTimes(2)
clusterClientCh <- clusterClient
}

go func() {
r := Run(RunOptions{
Expand Down