Skip to content

Commit

Permalink
[matcher/coordinator] Add latency metrics to rule matching (#3083)
Browse files Browse the repository at this point in the history
  • Loading branch information
wesleyk authored Jan 12, 2021
1 parent f26ccea commit 60e8e70
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 14 deletions.
34 changes: 30 additions & 4 deletions src/metrics/matcher/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
package matcher

import (
"time"

"github.com/uber-go/tally"

"github.com/m3db/m3/src/metrics/matcher/cache"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/rules"
Expand All @@ -40,6 +44,7 @@ type matcher struct {
namespaceResolver namespaceResolver
namespaces Namespaces
cache cache.Cache
metrics matcherMetrics
}

type namespaceResolver struct {
Expand Down Expand Up @@ -89,22 +94,25 @@ func NewMatcher(cache cache.Cache, opts Options) (Matcher, error) {
return &noCacheMatcher{
namespaceResolver: nsResolver,
namespaces: namespaces,
metrics: newMatcherMetrics(scope.SubScope("matcher")),
}, nil
}

return &matcher{
namespaceResolver: nsResolver,
namespaces: namespaces,
cache: cache,
metrics: newMatcherMetrics(scope.SubScope("cached-matcher")),
}, nil
}

func (m *matcher) ForwardMatch(
id id.ID,
fromNanos, toNanos int64,
) rules.MatchResult {
return m.cache.ForwardMatch(m.namespaceResolver.Resolve(id),
id.Bytes(), fromNanos, toNanos)
sw := m.metrics.matchLatency.Start()
defer sw.Stop()
return m.cache.ForwardMatch(m.namespaceResolver.Resolve(id), id.Bytes(), fromNanos, toNanos)
}

func (m *matcher) Close() error {
Expand All @@ -115,14 +123,32 @@ func (m *matcher) Close() error {
type noCacheMatcher struct {
namespaces Namespaces
namespaceResolver namespaceResolver
metrics matcherMetrics
}

type matcherMetrics struct {
matchLatency tally.Histogram
}

func newMatcherMetrics(scope tally.Scope) matcherMetrics {
return matcherMetrics{
matchLatency: scope.Histogram(
"match-latency",
append(
tally.DurationBuckets{0},
tally.MustMakeExponentialDurationBuckets(time.Millisecond, 1.5, 15)...,
),
),
}
}

func (m *noCacheMatcher) ForwardMatch(
id id.ID,
fromNanos, toNanos int64,
) rules.MatchResult {
return m.namespaces.ForwardMatch(m.namespaceResolver.Resolve(id),
id.Bytes(), fromNanos, toNanos)
sw := m.metrics.matchLatency.Start()
defer sw.Stop()
return m.namespaces.ForwardMatch(m.namespaceResolver.Resolve(id), id.Bytes(), fromNanos, toNanos)
}

func (m *noCacheMatcher) Close() error {
Expand Down
41 changes: 31 additions & 10 deletions src/metrics/matcher/match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"

"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/kv/mem"
"github.com/m3db/m3/src/metrics/aggregation"
Expand All @@ -42,9 +46,6 @@ import (
"github.com/m3db/m3/src/x/instrument"
xtest "github.com/m3db/m3/src/x/test"
"github.com/m3db/m3/src/x/watch"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)

func TestMatcherCreateWatchError(t *testing.T) {
Expand Down Expand Up @@ -82,10 +83,12 @@ func TestMatcherMatchDoesNotExist(t *testing.T) {
tagValueFn: func(tagName []byte) ([]byte, bool) { return nil, false },
}
now := time.Now()
matcher := testMatcher(t, testMatcherOptions{
matcher, testScope := testMatcher(t, testMatcherOptions{
cache: newMemCache(),
})
require.Equal(t, rules.EmptyMatchResult, matcher.ForwardMatch(id, now.UnixNano(), now.UnixNano()))

requireLatencyMetrics(t, "cached-matcher", testScope)
}

func TestMatcherMatchExists(t *testing.T) {
Expand All @@ -100,7 +103,7 @@ func TestMatcherMatchExists(t *testing.T) {
memRes = memResults{results: map[string]rules.MatchResult{"foo": res}}
)
cache := newMemCache()
matcher := testMatcher(t, testMatcherOptions{
matcher, _ := testMatcher(t, testMatcherOptions{
cache: cache,
})
c := cache.(*memCache)
Expand All @@ -125,7 +128,7 @@ func TestMatcherMatchExistsNoCache(t *testing.T) {
}
now = time.Now()
)
matcher := testMatcher(t, testMatcherOptions{
matcher, testScope := testMatcher(t, testMatcherOptions{
tagFilterOptions: filters.TagsFilterOptions{
NameAndTagsFn: func(id []byte) (name []byte, tags []byte, err error) {
name = metric.id
Expand Down Expand Up @@ -210,10 +213,13 @@ func TestMatcherMatchExistsNoCache(t *testing.T) {
result := matcher.ForwardMatch(metric, now.UnixNano(), now.UnixNano())

require.Equal(t, expected, result)

// Check that latency was measured
requireLatencyMetrics(t, "matcher", testScope)
}

func TestMatcherClose(t *testing.T) {
matcher := testMatcher(t, testMatcherOptions{
matcher, _ := testMatcher(t, testMatcherOptions{
cache: newMemCache(),
})
require.NoError(t, matcher.Close())
Expand All @@ -225,12 +231,13 @@ type testMatcherOptions struct {
tagFilterOptions filters.TagsFilterOptions
}

func testMatcher(t *testing.T, opts testMatcherOptions) Matcher {
func testMatcher(t *testing.T, opts testMatcherOptions) (Matcher, tally.TestScope) {
scope := tally.NewTestScope("", nil)
var (
store = mem.NewStore()
matcherOpts = NewOptions().
SetClockOptions(clock.NewOptions()).
SetInstrumentOptions(instrument.NewOptions()).
SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)).
SetInitWatchTimeout(100 * time.Millisecond).
SetKVStore(store).
SetNamespacesKey(testNamespacesKey).
Expand Down Expand Up @@ -264,7 +271,21 @@ func testMatcher(t *testing.T, opts testMatcherOptions) Matcher {

m, err := NewMatcher(opts.cache, matcherOpts)
require.NoError(t, err)
return m
return m, scope
}

func requireLatencyMetrics(t *testing.T, metricScope string, testScope tally.TestScope) {
// Check that latency was measured
values, found := testScope.Snapshot().Histograms()[metricScope+".match-latency+"]
require.True(t, found)
latencyMeasured := false
for _, valuesInBucket := range values.Durations() {
if valuesInBucket > 0 {
latencyMeasured = true
break
}
}
require.True(t, latencyMeasured)
}

type tagValueFn func(tagName []byte) ([]byte, bool)
Expand Down

0 comments on commit 60e8e70

Please sign in to comment.