diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 1fa70f6555..65e09160f5 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -292,8 +292,8 @@ type MatcherConfiguration struct { // MatcherCacheConfiguration is the configuration for the rule matcher cache. type MatcherCacheConfiguration struct { - // Capacity if non-zero will set the capacity of the rules matching cache. - Capacity int `yaml:"capacity"` + // Capacity if set the capacity of the rules matching cache. + Capacity *int `yaml:"capacity"` } // RulesConfiguration is a set of rules configuration to use for downsampling. @@ -767,8 +767,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } matcherCacheCapacity := defaultMatcherCacheCapacity - if v := cfg.Matcher.Cache.Capacity; v > 0 { - matcherCacheCapacity = v + if v := cfg.Matcher.Cache.Capacity; v != nil { + matcherCacheCapacity = *v } matcher, err := o.newAggregatorMatcher(matcherOpts, matcherCacheCapacity) @@ -1055,14 +1055,19 @@ func (o DownsamplerOptions) newAggregatorMatcher( opts matcher.Options, capacity int, ) (matcher.Matcher, error) { - cacheOpts := cache.NewOptions(). - SetCapacity(capacity). - SetClockOptions(opts.ClockOptions()). - SetInstrumentOptions(opts.InstrumentOptions(). - SetMetricsScope(opts.InstrumentOptions().MetricsScope().SubScope("matcher-cache"))) - - cache := cache.NewCache(cacheOpts) - return matcher.NewMatcher(cache, opts) + var matcherCache cache.Cache + if capacity > 0 { + scope := opts.InstrumentOptions().MetricsScope().SubScope("matcher-cache") + instrumentOpts := opts.InstrumentOptions(). + SetMetricsScope(scope) + cacheOpts := cache.NewOptions(). + SetCapacity(capacity). + SetClockOptions(opts.ClockOptions()). + SetInstrumentOptions(instrumentOpts) + matcherCache = cache.NewCache(cacheOpts) + } + + return matcher.NewMatcher(matcherCache, opts) } func (o DownsamplerOptions) newAggregatorPlacementManager( diff --git a/src/m3ninx/index/segment/builder/fields_map_new.go b/src/m3ninx/index/segment/builder/fields_map_new.go index 015a1e21eb..86980955f6 100644 --- a/src/m3ninx/index/segment/builder/fields_map_new.go +++ b/src/m3ninx/index/segment/builder/fields_map_new.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/src/m3ninx/index/segment/builder/ids_map_new.go b/src/m3ninx/index/segment/builder/ids_map_new.go index 757181e444..cea45969b8 100644 --- a/src/m3ninx/index/segment/builder/ids_map_new.go +++ b/src/m3ninx/index/segment/builder/ids_map_new.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/src/m3ninx/index/segment/builder/postings_map_new.go b/src/m3ninx/index/segment/builder/postings_map_new.go index 5fb871a420..f986d1345b 100644 --- a/src/m3ninx/index/segment/builder/postings_map_new.go +++ b/src/m3ninx/index/segment/builder/postings_map_new.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/src/m3ninx/index/segment/mem/fields_map_new.go b/src/m3ninx/index/segment/mem/fields_map_new.go index c13cc1737e..598f09e756 100644 --- a/src/m3ninx/index/segment/mem/fields_map_new.go +++ b/src/m3ninx/index/segment/mem/fields_map_new.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/src/metrics/generated/mocks/generate.go b/src/metrics/generated/mocks/generate.go index 4574464705..25b6582530 100644 --- a/src/metrics/generated/mocks/generate.go +++ b/src/metrics/generated/mocks/generate.go @@ -19,7 +19,7 @@ // THE SOFTWARE. // mockgen rules for generating mocks for exported interfaces (reflection mode). -//go:generate sh -c "mockgen -package=id github.com/m3db/m3/src/metrics/metric/id ID | genclean -pkg github.com/m3db/m3/src/metrics/metric/id -out $GOPATH/src/github.com/m3db/m3/src/metrics/metric/id/id_mock.go" +//go:generate sh -c "mockgen -package=id github.com/m3db/m3/src/metrics/metric/id ID,SortedTagIterator | genclean -pkg github.com/m3db/m3/src/metrics/metric/id -out $GOPATH/src/github.com/m3db/m3/src/metrics/metric/id/id_mock.go" //go:generate sh -c "mockgen -package=matcher github.com/m3db/m3/src/metrics/matcher Matcher | genclean -pkg github.com/m3db/m3/src/metrics/matcher -out $GOPATH/src/github.com/m3db/m3/src/metrics/matcher/matcher_mock.go" //go:generate sh -c "mockgen -package=protobuf github.com/m3db/m3/src/metrics/encoding/protobuf UnaggregatedEncoder | genclean -pkg github.com/m3db/m3/src/metrics/encoding/protobuf -out $GOPATH/src/github.com/m3db/m3/src/metrics/encoding/protobuf/protobuf_mock.go" //go:generate sh -c "mockgen -package=rules github.com/m3db/m3/src/metrics/rules Store | genclean -pkg github.com/m3db/m3/src/metrics/rules -out $GOPATH/src/github.com/m3db/m3/src/metrics/rules/rules_mock.go" diff --git a/src/metrics/matcher/match.go b/src/metrics/matcher/match.go index b911e823ec..776428feb2 100644 --- a/src/metrics/matcher/match.go +++ b/src/metrics/matcher/match.go @@ -37,53 +37,95 @@ type Matcher interface { } type matcher struct { - opts Options + namespaceResolver namespaceResolver + namespaces Namespaces + cache cache.Cache +} + +type namespaceResolver struct { namespaceTag []byte defaultNamespace []byte +} - namespaces Namespaces - cache cache.Cache +func (r namespaceResolver) Resolve(id id.ID) []byte { + ns, found := id.TagValue(r.namespaceTag) + if !found { + ns = r.defaultNamespace + } + return ns } -// NewMatcher creates a new rule matcher. +// NewMatcher creates a new rule matcher, optionally with a cache. func NewMatcher(cache cache.Cache, opts Options) (Matcher, error) { + nsResolver := namespaceResolver{ + namespaceTag: opts.NamespaceTag(), + defaultNamespace: opts.DefaultNamespace(), + } + instrumentOpts := opts.InstrumentOptions() scope := instrumentOpts.MetricsScope() iOpts := instrumentOpts.SetMetricsScope(scope.SubScope("namespaces")) - namespacesOpts := opts.SetInstrumentOptions(iOpts). - SetOnNamespaceAddedFn(func(namespace []byte, ruleSet RuleSet) { - cache.Register(namespace, ruleSet) - }). - SetOnNamespaceRemovedFn(func(namespace []byte) { - cache.Unregister(namespace) - }). - SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) { - cache.Refresh(namespace, ruleSet) - }) - key := opts.NamespacesKey() - namespaces := NewNamespaces(key, namespacesOpts) + namespacesOpts := opts.SetInstrumentOptions(iOpts) + + if cache != nil { + namespacesOpts = namespacesOpts. + SetOnNamespaceAddedFn(func(namespace []byte, ruleSet RuleSet) { + cache.Register(namespace, ruleSet) + }). + SetOnNamespaceRemovedFn(func(namespace []byte) { + cache.Unregister(namespace) + }). + SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) { + cache.Refresh(namespace, ruleSet) + }) + } + + namespaces := NewNamespaces(opts.NamespacesKey(), namespacesOpts) if err := namespaces.Open(); err != nil { return nil, err } + if cache == nil { + return &noCacheMatcher{ + namespaceResolver: nsResolver, + namespaces: namespaces, + }, nil + } + return &matcher{ - opts: opts, - namespaceTag: opts.NamespaceTag(), - defaultNamespace: opts.DefaultNamespace(), - namespaces: namespaces, - cache: cache, + namespaceResolver: nsResolver, + namespaces: namespaces, + cache: cache, }, nil } -func (m *matcher) ForwardMatch(id id.ID, fromNanos, toNanos int64) rules.MatchResult { - ns, found := id.TagValue(m.namespaceTag) - if !found { - ns = m.defaultNamespace - } - return m.cache.ForwardMatch(ns, id.Bytes(), fromNanos, toNanos) +func (m *matcher) ForwardMatch( + id id.ID, + fromNanos, toNanos int64, +) rules.MatchResult { + return m.cache.ForwardMatch(m.namespaceResolver.Resolve(id), + id.Bytes(), fromNanos, toNanos) } func (m *matcher) Close() error { m.namespaces.Close() return m.cache.Close() } + +type noCacheMatcher struct { + namespaces Namespaces + namespaceResolver namespaceResolver +} + +func (m *noCacheMatcher) ForwardMatch( + id id.ID, + fromNanos, toNanos int64, +) rules.MatchResult { + return m.namespaces.ForwardMatch(m.namespaceResolver.Resolve(id), + id.Bytes(), fromNanos, toNanos) +} + +func (m *noCacheMatcher) Close() error { + m.namespaces.Close() + return nil +} diff --git a/src/metrics/matcher/match_test.go b/src/metrics/matcher/match_test.go index f5dc350286..cce42bab56 100644 --- a/src/metrics/matcher/match_test.go +++ b/src/metrics/matcher/match_test.go @@ -27,11 +27,20 @@ import ( "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/kv/mem" + "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/metrics/filters" + "github.com/m3db/m3/src/metrics/generated/proto/aggregationpb" + "github.com/m3db/m3/src/metrics/generated/proto/policypb" "github.com/m3db/m3/src/metrics/generated/proto/rulepb" "github.com/m3db/m3/src/metrics/matcher/cache" + "github.com/m3db/m3/src/metrics/metadata" + "github.com/m3db/m3/src/metrics/metric/id" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/metrics/rules" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/instrument" + xtest "github.com/m3db/m3/src/x/test" "github.com/m3db/m3/src/x/watch" "github.com/golang/mock/gomock" @@ -73,7 +82,9 @@ func TestMatcherMatchDoesNotExist(t *testing.T) { tagValueFn: func(tagName []byte) ([]byte, bool) { return nil, false }, } now := time.Now() - matcher := testMatcher(t, newMemCache()) + matcher := testMatcher(t, testMatcherOptions{ + cache: newMemCache(), + }) require.Equal(t, rules.EmptyMatchResult, matcher.ForwardMatch(id, now.UnixNano(), now.UnixNano())) } @@ -89,37 +100,152 @@ func TestMatcherMatchExists(t *testing.T) { memRes = memResults{results: map[string]rules.MatchResult{"foo": res}} ) cache := newMemCache() - matcher := testMatcher(t, cache) + matcher := testMatcher(t, testMatcherOptions{ + cache: cache, + }) c := cache.(*memCache) c.namespaces[ns] = memRes require.Equal(t, res, matcher.ForwardMatch(id, now.UnixNano(), now.UnixNano())) } +func TestMatcherMatchExistsNoCache(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + var ( + ns = "fooNs" + metric = &testMetricID{ + id: []byte("foo"), + tagValueFn: func(tagName []byte) ([]byte, bool) { + if string(tagName) == "fooTag" { + return []byte("fooValue"), true + } + return []byte(ns), true + }, + } + now = time.Now() + ) + matcher := testMatcher(t, testMatcherOptions{ + tagFilterOptions: filters.TagsFilterOptions{ + NameAndTagsFn: func(id []byte) (name []byte, tags []byte, err error) { + name = metric.id + return + }, + SortedTagIteratorFn: func(tagPairs []byte) id.SortedTagIterator { + iter := id.NewMockSortedTagIterator(ctrl) + iter.EXPECT().Next().Return(true) + iter.EXPECT().Current().Return([]byte("fooTag"), []byte("fooValue")) + iter.EXPECT().Next().Return(false) + iter.EXPECT().Err().Return(nil) + iter.EXPECT().Close() + return iter + }, + }, + storeSetup: func(t *testing.T, store kv.TxnStore) { + _, err := store.Set(testNamespacesKey, &rulepb.Namespaces{ + Namespaces: []*rulepb.Namespace{ + { + Name: ns, + Snapshots: []*rulepb.NamespaceSnapshot{ + { + ForRulesetVersion: 1, + Tombstoned: false, + }, + }, + }, + }, + }) + require.NoError(t, err) + + _, err = store.Set("/ruleset/fooNs", &rulepb.RuleSet{ + Namespace: ns, + MappingRules: []*rulepb.MappingRule{ + { + Snapshots: []*rulepb.MappingRuleSnapshot{ + { + Filter: "fooTag:fooValue", + AggregationTypes: []aggregationpb.AggregationType{ + aggregationpb.AggregationType_LAST, + }, + StoragePolicies: []*policypb.StoragePolicy{ + { + Resolution: policypb.Resolution{ + WindowSize: int64(time.Minute), + Precision: int64(time.Minute), + }, + Retention: policypb.Retention{ + Period: 24 * int64(time.Hour), + }, + }, + }, + }, + }, + }, + }, + }) + require.NoError(t, err) + }, + }) + + forExistingID := metadata.StagedMetadatas{ + metadata.StagedMetadata{ + Metadata: metadata.Metadata{ + Pipelines: metadata.PipelineMetadatas{ + metadata.PipelineMetadata{ + AggregationID: aggregation.MustCompressTypes(aggregation.Last), + StoragePolicies: policy.StoragePolicies{ + policy.MustParseStoragePolicy("1m:1d"), + }, + Tags: []models.Tag{}, + }, + }, + }, + }, + } + forNewRollupIDs := []rules.IDWithMetadatas{} + keepOriginal := false + expected := rules.NewMatchResult(1, math.MaxInt64, + forExistingID, forNewRollupIDs, keepOriginal) + + result := matcher.ForwardMatch(metric, now.UnixNano(), now.UnixNano()) + + require.Equal(t, expected, result) +} + func TestMatcherClose(t *testing.T) { - matcher := testMatcher(t, newMemCache()) + matcher := testMatcher(t, testMatcherOptions{ + cache: newMemCache(), + }) require.NoError(t, matcher.Close()) } -func testMatcher(t *testing.T, cache cache.Cache) Matcher { +type testMatcherOptions struct { + cache cache.Cache + storeSetup func(*testing.T, kv.TxnStore) + tagFilterOptions filters.TagsFilterOptions +} + +func testMatcher(t *testing.T, opts testMatcherOptions) Matcher { var ( - store = mem.NewStore() - opts = NewOptions(). - SetClockOptions(clock.NewOptions()). - SetInstrumentOptions(instrument.NewOptions()). - SetInitWatchTimeout(100 * time.Millisecond). - SetKVStore(store). - SetNamespacesKey(testNamespacesKey). - SetNamespaceTag([]byte("namespace")). - SetDefaultNamespace([]byte("default")). - SetRuleSetKeyFn(defaultRuleSetKeyFn). - SetRuleSetOptions(rules.NewOptions()). - SetMatchRangePast(0) + store = mem.NewStore() + matcherOpts = NewOptions(). + SetClockOptions(clock.NewOptions()). + SetInstrumentOptions(instrument.NewOptions()). + SetInitWatchTimeout(100 * time.Millisecond). + SetKVStore(store). + SetNamespacesKey(testNamespacesKey). + SetNamespaceTag([]byte("namespace")). + SetDefaultNamespace([]byte("default")). + SetRuleSetKeyFn(defaultRuleSetKeyFn). + SetRuleSetOptions(rules.NewOptions(). + SetTagsFilterOptions(opts.tagFilterOptions)). + SetMatchRangePast(0) proto = &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: true, }, @@ -128,10 +254,15 @@ func testMatcher(t *testing.T, cache cache.Cache) Matcher { }, } ) + _, err := store.SetIfNotExists(testNamespacesKey, proto) require.NoError(t, err) - m, err := NewMatcher(cache, opts) + if fn := opts.storeSetup; fn != nil { + fn(t, store) + } + + m, err := NewMatcher(opts.cache, matcherOpts) require.NoError(t, err) return m } diff --git a/src/metrics/metric/id/id_mock.go b/src/metrics/metric/id/id_mock.go index a2407de492..bbba1fdf27 100644 --- a/src/metrics/metric/id/id_mock.go +++ b/src/metrics/metric/id/id_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/metrics/metric/id (interfaces: ID) +// Source: github.com/m3db/m3/src/metrics/metric/id (interfaces: ID,SortedTagIterator) -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -81,3 +81,93 @@ func (mr *MockIDMockRecorder) TagValue(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TagValue", reflect.TypeOf((*MockID)(nil).TagValue), arg0) } + +// MockSortedTagIterator is a mock of SortedTagIterator interface +type MockSortedTagIterator struct { + ctrl *gomock.Controller + recorder *MockSortedTagIteratorMockRecorder +} + +// MockSortedTagIteratorMockRecorder is the mock recorder for MockSortedTagIterator +type MockSortedTagIteratorMockRecorder struct { + mock *MockSortedTagIterator +} + +// NewMockSortedTagIterator creates a new mock instance +func NewMockSortedTagIterator(ctrl *gomock.Controller) *MockSortedTagIterator { + mock := &MockSortedTagIterator{ctrl: ctrl} + mock.recorder = &MockSortedTagIteratorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockSortedTagIterator) EXPECT() *MockSortedTagIteratorMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockSortedTagIterator) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close +func (mr *MockSortedTagIteratorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSortedTagIterator)(nil).Close)) +} + +// Current mocks base method +func (m *MockSortedTagIterator) Current() ([]byte, []byte) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Current") + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].([]byte) + return ret0, ret1 +} + +// Current indicates an expected call of Current +func (mr *MockSortedTagIteratorMockRecorder) Current() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockSortedTagIterator)(nil).Current)) +} + +// Err mocks base method +func (m *MockSortedTagIterator) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") + ret0, _ := ret[0].(error) + return ret0 +} + +// Err indicates an expected call of Err +func (mr *MockSortedTagIteratorMockRecorder) Err() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockSortedTagIterator)(nil).Err)) +} + +// Next mocks base method +func (m *MockSortedTagIterator) Next() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Next indicates an expected call of Next +func (mr *MockSortedTagIteratorMockRecorder) Next() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockSortedTagIterator)(nil).Next)) +} + +// Reset mocks base method +func (m *MockSortedTagIterator) Reset(arg0 []byte) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Reset", arg0) +} + +// Reset indicates an expected call of Reset +func (mr *MockSortedTagIteratorMockRecorder) Reset(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockSortedTagIterator)(nil).Reset), arg0) +} diff --git a/src/query/graphite/storage/series_metadata_map_new.go b/src/query/graphite/storage/series_metadata_map_new.go index 6e8c0fefc1..ca10803ac6 100644 --- a/src/query/graphite/storage/series_metadata_map_new.go +++ b/src/query/graphite/storage/series_metadata_map_new.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal