From f249c01cbfd697a09b0c7e4fa36b5a38ce0c9eb7 Mon Sep 17 00:00:00 2001 From: Prateek Rungta Date: Fri, 11 Oct 2024 17:37:54 -0400 Subject: [PATCH] [DO NOT LAND] trying amortizing allocs in ReverseMatch --- src/metrics/filters/filter.go | 9 +- src/metrics/filters/mock_filter.go | 9 +- src/metrics/filters/tags_filter.go | 7 +- src/metrics/rules/active_ruleset.go | 82 ++++++++++++------- .../rules/active_ruleset_bench_test.go | 7 ++ 5 files changed, 75 insertions(+), 39 deletions(-) create mode 100644 src/metrics/rules/active_ruleset_bench_test.go diff --git a/src/metrics/filters/filter.go b/src/metrics/filters/filter.go index 69ecfe80b3..9cc550bdee 100644 --- a/src/metrics/filters/filter.go +++ b/src/metrics/filters/filter.go @@ -92,10 +92,11 @@ type TagMatchOptions struct { // Function to extract name and tags from an id. NameAndTagsFn id.NameAndTagsFn - // Function to get a sorted tag iterator from id tags. - // The caller of Matches is the owner of the Iterator and is responsible for closing it, this allows reusing the - // same Iterator across many Matches. - SortedTagIteratorFn id.SortedTagIteratorFn + // SortedTagIterator is an iterator for use in interpreting an []byte ID. + // Match functions will mutate this iterator. The caller passes this down in order to allow reuse of + // the iterator across calls, without pooling (which was expensive in previous versions of this code). + // N.B.: this is an iteration of https://github.com/m3db/m3/pull/3988. + SortedTagIterator id.SortedTagIterator } type filter interface { diff --git a/src/metrics/filters/mock_filter.go b/src/metrics/filters/mock_filter.go index 738302258f..2fea929d93 100644 --- a/src/metrics/filters/mock_filter.go +++ b/src/metrics/filters/mock_filter.go @@ -54,13 +54,18 @@ func tagsToPairs(tags []byte) []mockTagPair { // NewMockSortedTagIterator creates a mock SortedTagIterator based on given ID. func NewMockSortedTagIterator(tags []byte) id.SortedTagIterator { - pairs := tagsToPairs(tags) - return &mockSortedTagIterator{idx: -1, pairs: pairs} + iter := &mockSortedTagIterator{} + iter.Reset(tags) + return iter } func (it *mockSortedTagIterator) Reset(tags []byte) { it.idx = -1 it.err = nil + it.pairs = nil + if len(tags) == 0 { + return + } it.pairs = tagsToPairs(tags) } diff --git a/src/metrics/filters/tags_filter.go b/src/metrics/filters/tags_filter.go index 3f7eead844..59b028ebd8 100644 --- a/src/metrics/filters/tags_filter.go +++ b/src/metrics/filters/tags_filter.go @@ -186,12 +186,12 @@ func (f *tagsFilter) String() string { return buf.String() } -func (f *tagsFilter) Matches(id []byte, opts TagMatchOptions) (bool, error) { +func (f *tagsFilter) Matches(idBytes []byte, opts TagMatchOptions) (bool, error) { if f.nameFilter == nil && len(f.tagFilters) == 0 { return true, nil } - name, tags, err := opts.NameAndTagsFn(id) + name, tags, err := opts.NameAndTagsFn(idBytes) if err != nil { return false, err } @@ -205,7 +205,8 @@ func (f *tagsFilter) Matches(id []byte, opts TagMatchOptions) (bool, error) { } } - iter := opts.SortedTagIteratorFn(tags) + iter := opts.SortedTagIterator + iter.Reset(tags) currIdx := 0 diff --git a/src/metrics/rules/active_ruleset.go b/src/metrics/rules/active_ruleset.go index 851f73b8cf..5733732c34 100644 --- a/src/metrics/rules/active_ruleset.go +++ b/src/metrics/rules/active_ruleset.go @@ -50,6 +50,11 @@ type activeRuleSet struct { includeTagKeys map[uint64]struct{} } +type optimizedMatchOptions struct { + NameAndTagsFn metricid.NameAndTagsFn + SortedTagIterator metricid.SortedTagIterator +} + func newActiveRuleSet( version int, mappingRules []*mappingRule, @@ -111,7 +116,12 @@ func (as *activeRuleSet) ForwardMatch( fromNanos, toNanos int64, opts MatchOptions, ) (MatchResult, error) { - currMatchRes, err := as.forwardMatchAt(id.Bytes(), fromNanos, opts) + optimizedOpts := optimizedMatchOptions{ + NameAndTagsFn: opts.NameAndTagsFn, + SortedTagIterator: opts.SortedTagIteratorFn(nil), + } + + currMatchRes, err := as.forwardMatchAt(id.Bytes(), fromNanos, optimizedOpts) if err != nil { return MatchResult{}, err } @@ -124,7 +134,7 @@ func (as *activeRuleSet) ForwardMatch( ) for nextIdx < len(as.cutoverTimesAsc) && nextCutoverNanos < toNanos { - nextMatchRes, err := as.forwardMatchAt(id.Bytes(), nextCutoverNanos, opts) + nextMatchRes, err := as.forwardMatchAt(id.Bytes(), nextCutoverNanos, optimizedOpts) if err != nil { return MatchResult{}, err } @@ -164,6 +174,11 @@ func (as *activeRuleSet) ReverseMatch( keepOriginal bool ) + optimizedMatchOpts := optimizedMatchOptions{ + NameAndTagsFn: as.tagsFilterOpts.NameAndTagsFn, + SortedTagIterator: as.tagsFilterOpts.SortedTagIteratorFn(nil), + } + // Determine whether the ID is a rollup metric ID. name, tags, err := as.tagsFilterOpts.NameAndTagsFn(id.Bytes()) if err == nil { @@ -180,6 +195,7 @@ func (as *activeRuleSet) ReverseMatch( at, isMultiAggregationTypesAllowed, aggTypesOpts, + optimizedMatchOpts, ) if err != nil { return MatchResult{}, err @@ -202,6 +218,7 @@ func (as *activeRuleSet) ReverseMatch( at, isMultiAggregationTypesAllowed, aggTypesOpts, + optimizedMatchOpts, ) if err != nil { return MatchResult{}, err @@ -229,13 +246,13 @@ func (as *activeRuleSet) ReverseMatch( func (as *activeRuleSet) forwardMatchAt( id []byte, timeNanos int64, - matchOpts MatchOptions, + optimizedMatchOpts optimizedMatchOptions, ) (forwardMatchResult, error) { - mappingResults, err := as.mappingsForNonRollupID(id, timeNanos, matchOpts) + mappingResults, err := as.mappingsForNonRollupID(id, timeNanos, optimizedMatchOpts) if err != nil { return forwardMatchResult{}, err } - rollupResults, err := as.rollupResultsFor(id, timeNanos, matchOpts) + rollupResults, err := as.rollupResultsFor(id, timeNanos, optimizedMatchOpts) if err != nil { return forwardMatchResult{}, err } @@ -263,7 +280,7 @@ func (as *activeRuleSet) forwardMatchAt( func (as *activeRuleSet) mappingsForNonRollupID( id []byte, timeNanos int64, - matchOpts MatchOptions, + optimizedMatchOpts optimizedMatchOptions, ) (mappingResults, error) { var ( cutoverNanos int64 @@ -275,8 +292,9 @@ func (as *activeRuleSet) mappingsForNonRollupID( continue } matches, err := snapshot.filter.Matches(id, filters.TagMatchOptions{ - SortedTagIteratorFn: matchOpts.SortedTagIteratorFn, - NameAndTagsFn: matchOpts.NameAndTagsFn, + NameAndTagsFn: optimizedMatchOpts.NameAndTagsFn, + // TODO: this needs the change to not use the alloc + SortedTagIterator: optimizedMatchOpts.SortedTagIterator, }) if err != nil { return mappingResults{}, err @@ -336,7 +354,11 @@ func (as *activeRuleSet) LatestRollupRules(_ []byte, timeNanos int64) ([]view.Ro return out, nil } -func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64, matchOpts MatchOptions) (rollupResults, error) { +func (as *activeRuleSet) rollupResultsFor( + id []byte, + timeNanos int64, + optimizedMatchOpts optimizedMatchOptions, +) (rollupResults, error) { var ( cutoverNanos int64 rollupTargets []rollupTarget @@ -350,8 +372,9 @@ func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64, matchOpts continue } match, err := snapshot.filter.Matches(id, filters.TagMatchOptions{ - NameAndTagsFn: matchOpts.NameAndTagsFn, - SortedTagIteratorFn: matchOpts.SortedTagIteratorFn, + // TODO: same comment, different line + NameAndTagsFn: optimizedMatchOpts.NameAndTagsFn, + SortedTagIterator: optimizedMatchOpts.SortedTagIterator, }) if err != nil { return rollupResults{}, err @@ -382,7 +405,7 @@ func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64, matchOpts } } // NB: could log the matching error here if needed. - res, _ := as.toRollupResults(id, cutoverNanos, rollupTargets, keepOriginal, tags, matchOpts) + res, _ := as.toRollupResults(id, cutoverNanos, rollupTargets, keepOriginal, tags, optimizedMatchOpts) return res, nil } @@ -399,7 +422,7 @@ func (as *activeRuleSet) toRollupResults( targets []rollupTarget, keepOriginal bool, tags [][]models.Tag, - matchOpts MatchOptions, + optimizedMatchOpts optimizedMatchOptions, ) (rollupResults, error) { if len(targets) == 0 { return rollupResults{}, nil @@ -407,7 +430,7 @@ func (as *activeRuleSet) toRollupResults( // If we cannot extract tags from the id, this is likely an invalid // metric and we bail early. - _, sortedTagPairBytes, err := matchOpts.NameAndTagsFn(id) + _, sortedTagPairBytes, err := optimizedMatchOpts.NameAndTagsFn(id) if err != nil { return rollupResults{}, err } @@ -456,7 +479,7 @@ func (as *activeRuleSet) toRollupResults( tagPairs, tags[idx], matchRollupTargetOptions{generateRollupID: true}, - matchOpts) + optimizedMatchOpts) if err != nil { multiErr = multiErr.Add(err) continue @@ -473,7 +496,7 @@ func (as *activeRuleSet) toRollupResults( continue } tagPairs = tagPairs[:0] - applied, err := as.applyIDToPipeline(sortedTagPairBytes, toApply, tagPairs, tags[idx], matchOpts) + applied, err := as.applyIDToPipeline(sortedTagPairBytes, toApply, tagPairs, tags[idx], optimizedMatchOpts) if err != nil { err = fmt.Errorf("failed to apply id %s to pipeline %v: %v", id, toApply, err) multiErr = multiErr.Add(err) @@ -518,7 +541,7 @@ func (as *activeRuleSet) matchRollupTarget( tagPairs []metricid.TagPair, // buffer for reuse to generate rollup ID across calls tags []models.Tag, targetOpts matchRollupTargetOptions, - matchOpts MatchOptions, + optimizedMatchOpts optimizedMatchOptions, ) ([]byte, bool, error) { if rollupOp.Type == mpipeline.ExcludeByRollupType && !targetOpts.generateRollupID { // Exclude by tag always matches, if not generating rollup ID @@ -528,12 +551,13 @@ func (as *activeRuleSet) matchRollupTarget( var ( rollupTags = rollupOp.Tags - sortedTagIter = matchOpts.SortedTagIteratorFn(sortedTagPairBytes) matchTagIdx = 0 + sortedTagIter = optimizedMatchOpts.SortedTagIterator nameTagName = as.tagsFilterOpts.NameTagKey nameTagValue []byte includeTagNames = as.includeTagKeys ) + sortedTagIter.Reset(sortedTagPairBytes) switch rollupOp.Type { case mpipeline.GroupByRollupType: @@ -653,7 +677,7 @@ func (as *activeRuleSet) applyIDToPipeline( pipeline mpipeline.Pipeline, tagPairs []metricid.TagPair, // buffer for reuse across calls tags []models.Tag, - matchOpts MatchOptions, + optimizedMatchOpts optimizedMatchOptions, ) (applied.Pipeline, error) { operations := make([]applied.OpUnion, 0, pipeline.Len()) for i := 0; i < pipeline.Len(); i++ { @@ -674,7 +698,7 @@ func (as *activeRuleSet) applyIDToPipeline( tagPairs, tags, matchRollupTargetOptions{generateRollupID: true}, - matchOpts) + optimizedMatchOpts) if err != nil { return applied.Pipeline{}, err } @@ -694,6 +718,7 @@ func (as *activeRuleSet) applyIDToPipeline( return applied.NewPipeline(operations), nil } +// amainsd: hi func (as *activeRuleSet) reverseMappingsFor( id, name, tags []byte, isRollupID bool, @@ -702,11 +727,12 @@ func (as *activeRuleSet) reverseMappingsFor( at aggregation.Type, isMultiAggregationTypesAllowed bool, aggTypesOpts aggregation.TypesOptions, + optimizedMatchOpts optimizedMatchOptions, ) (reverseMatchResult, bool, error) { if !isRollupID { - return as.reverseMappingsForNonRollupID(id, timeNanos, mt, at, aggTypesOpts) + return as.reverseMappingsForNonRollupID(id, timeNanos, mt, at, aggTypesOpts, optimizedMatchOpts) } - return as.reverseMappingsForRollupID(name, tags, timeNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts) + return as.reverseMappingsForRollupID(name, tags, timeNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts, optimizedMatchOpts) } type reverseMatchResult struct { @@ -722,11 +748,9 @@ func (as *activeRuleSet) reverseMappingsForNonRollupID( mt metric.Type, at aggregation.Type, aggTypesOpts aggregation.TypesOptions, + optimizedMatchOpts optimizedMatchOptions, ) (reverseMatchResult, bool, error) { - mapping, err := as.mappingsForNonRollupID(id, timeNanos, MatchOptions{ - NameAndTagsFn: as.tagsFilterOpts.NameAndTagsFn, - SortedTagIteratorFn: as.tagsFilterOpts.SortedTagIteratorFn, - }) + mapping, err := as.mappingsForNonRollupID(id, timeNanos, optimizedMatchOpts) if err != nil { return reverseMatchResult{}, false, err } @@ -769,6 +793,7 @@ func (as *activeRuleSet) reverseMappingsForRollupID( at aggregation.Type, isMultiAggregationTypesAllowed bool, aggTypesOpts aggregation.TypesOptions, + optimizedMatchOpts optimizedMatchOptions, ) (reverseMatchResult, bool, error) { for _, rollupRule := range as.rollupRules { snapshot := rollupRule.activeSnapshot(timeNanos) @@ -792,10 +817,7 @@ func (as *activeRuleSet) reverseMappingsForRollupID( nil, nil, matchRollupTargetOptions{generateRollupID: false}, - MatchOptions{ - NameAndTagsFn: as.tagsFilterOpts.NameAndTagsFn, - SortedTagIteratorFn: as.tagsFilterOpts.SortedTagIteratorFn, - }, + optimizedMatchOpts, ) if err != nil { return reverseMatchResult{}, false, err diff --git a/src/metrics/rules/active_ruleset_bench_test.go b/src/metrics/rules/active_ruleset_bench_test.go new file mode 100644 index 0000000000..64a30cac45 --- /dev/null +++ b/src/metrics/rules/active_ruleset_bench_test.go @@ -0,0 +1,7 @@ +package rules + +import "testing" + +func BenchmarkActiveRuleSet(b *testing.B) { + +}