Skip to content

Commit

Permalink
[coordinator] Add ability to turn off rule matching cache (#3059)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Jan 4, 2021
1 parent b2280cb commit 877fa32
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 66 deletions.
29 changes: 17 additions & 12 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ type MatcherConfiguration struct {

// MatcherCacheConfiguration is the configuration for the rule matcher cache.
type MatcherCacheConfiguration struct {
// Capacity if non-zero will set the capacity of the rules matching cache.
Capacity int `yaml:"capacity"`
// Capacity if set the capacity of the rules matching cache.
Capacity *int `yaml:"capacity"`
}

// RulesConfiguration is a set of rules configuration to use for downsampling.
Expand Down Expand Up @@ -767,8 +767,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

matcherCacheCapacity := defaultMatcherCacheCapacity
if v := cfg.Matcher.Cache.Capacity; v > 0 {
matcherCacheCapacity = v
if v := cfg.Matcher.Cache.Capacity; v != nil {
matcherCacheCapacity = *v
}

matcher, err := o.newAggregatorMatcher(matcherOpts, matcherCacheCapacity)
Expand Down Expand Up @@ -1055,14 +1055,19 @@ func (o DownsamplerOptions) newAggregatorMatcher(
opts matcher.Options,
capacity int,
) (matcher.Matcher, error) {
cacheOpts := cache.NewOptions().
SetCapacity(capacity).
SetClockOptions(opts.ClockOptions()).
SetInstrumentOptions(opts.InstrumentOptions().
SetMetricsScope(opts.InstrumentOptions().MetricsScope().SubScope("matcher-cache")))

cache := cache.NewCache(cacheOpts)
return matcher.NewMatcher(cache, opts)
var matcherCache cache.Cache
if capacity > 0 {
scope := opts.InstrumentOptions().MetricsScope().SubScope("matcher-cache")
instrumentOpts := opts.InstrumentOptions().
SetMetricsScope(scope)
cacheOpts := cache.NewOptions().
SetCapacity(capacity).
SetClockOptions(opts.ClockOptions()).
SetInstrumentOptions(instrumentOpts)
matcherCache = cache.NewCache(cacheOpts)
}

return matcher.NewMatcher(matcherCache, opts)
}

func (o DownsamplerOptions) newAggregatorPlacementManager(
Expand Down
2 changes: 1 addition & 1 deletion src/m3ninx/index/segment/builder/fields_map_new.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 Uber Technologies, Inc.
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion src/m3ninx/index/segment/builder/ids_map_new.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 Uber Technologies, Inc.
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion src/m3ninx/index/segment/builder/postings_map_new.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 Uber Technologies, Inc.
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion src/m3ninx/index/segment/mem/fields_map_new.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 Uber Technologies, Inc.
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// THE SOFTWARE.

// mockgen rules for generating mocks for exported interfaces (reflection mode).
//go:generate sh -c "mockgen -package=id github.com/m3db/m3/src/metrics/metric/id ID | genclean -pkg github.com/m3db/m3/src/metrics/metric/id -out $GOPATH/src/github.com/m3db/m3/src/metrics/metric/id/id_mock.go"
//go:generate sh -c "mockgen -package=id github.com/m3db/m3/src/metrics/metric/id ID,SortedTagIterator | genclean -pkg github.com/m3db/m3/src/metrics/metric/id -out $GOPATH/src/github.com/m3db/m3/src/metrics/metric/id/id_mock.go"
//go:generate sh -c "mockgen -package=matcher github.com/m3db/m3/src/metrics/matcher Matcher | genclean -pkg github.com/m3db/m3/src/metrics/matcher -out $GOPATH/src/github.com/m3db/m3/src/metrics/matcher/matcher_mock.go"
//go:generate sh -c "mockgen -package=protobuf github.com/m3db/m3/src/metrics/encoding/protobuf UnaggregatedEncoder | genclean -pkg github.com/m3db/m3/src/metrics/encoding/protobuf -out $GOPATH/src/github.com/m3db/m3/src/metrics/encoding/protobuf/protobuf_mock.go"
//go:generate sh -c "mockgen -package=rules github.com/m3db/m3/src/metrics/rules Store | genclean -pkg github.com/m3db/m3/src/metrics/rules -out $GOPATH/src/github.com/m3db/m3/src/metrics/rules/rules_mock.go"
Expand Down
96 changes: 69 additions & 27 deletions src/metrics/matcher/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,53 +37,95 @@ type Matcher interface {
}

type matcher struct {
opts Options
namespaceResolver namespaceResolver
namespaces Namespaces
cache cache.Cache
}

type namespaceResolver struct {
namespaceTag []byte
defaultNamespace []byte
}

namespaces Namespaces
cache cache.Cache
func (r namespaceResolver) Resolve(id id.ID) []byte {
ns, found := id.TagValue(r.namespaceTag)
if !found {
ns = r.defaultNamespace
}
return ns
}

// NewMatcher creates a new rule matcher.
// NewMatcher creates a new rule matcher, optionally with a cache.
func NewMatcher(cache cache.Cache, opts Options) (Matcher, error) {
nsResolver := namespaceResolver{
namespaceTag: opts.NamespaceTag(),
defaultNamespace: opts.DefaultNamespace(),
}

instrumentOpts := opts.InstrumentOptions()
scope := instrumentOpts.MetricsScope()
iOpts := instrumentOpts.SetMetricsScope(scope.SubScope("namespaces"))
namespacesOpts := opts.SetInstrumentOptions(iOpts).
SetOnNamespaceAddedFn(func(namespace []byte, ruleSet RuleSet) {
cache.Register(namespace, ruleSet)
}).
SetOnNamespaceRemovedFn(func(namespace []byte) {
cache.Unregister(namespace)
}).
SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) {
cache.Refresh(namespace, ruleSet)
})
key := opts.NamespacesKey()
namespaces := NewNamespaces(key, namespacesOpts)
namespacesOpts := opts.SetInstrumentOptions(iOpts)

if cache != nil {
namespacesOpts = namespacesOpts.
SetOnNamespaceAddedFn(func(namespace []byte, ruleSet RuleSet) {
cache.Register(namespace, ruleSet)
}).
SetOnNamespaceRemovedFn(func(namespace []byte) {
cache.Unregister(namespace)
}).
SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) {
cache.Refresh(namespace, ruleSet)
})
}

namespaces := NewNamespaces(opts.NamespacesKey(), namespacesOpts)
if err := namespaces.Open(); err != nil {
return nil, err
}

if cache == nil {
return &noCacheMatcher{
namespaceResolver: nsResolver,
namespaces: namespaces,
}, nil
}

return &matcher{
opts: opts,
namespaceTag: opts.NamespaceTag(),
defaultNamespace: opts.DefaultNamespace(),
namespaces: namespaces,
cache: cache,
namespaceResolver: nsResolver,
namespaces: namespaces,
cache: cache,
}, nil
}

func (m *matcher) ForwardMatch(id id.ID, fromNanos, toNanos int64) rules.MatchResult {
ns, found := id.TagValue(m.namespaceTag)
if !found {
ns = m.defaultNamespace
}
return m.cache.ForwardMatch(ns, id.Bytes(), fromNanos, toNanos)
func (m *matcher) ForwardMatch(
id id.ID,
fromNanos, toNanos int64,
) rules.MatchResult {
return m.cache.ForwardMatch(m.namespaceResolver.Resolve(id),
id.Bytes(), fromNanos, toNanos)
}

func (m *matcher) Close() error {
m.namespaces.Close()
return m.cache.Close()
}

type noCacheMatcher struct {
namespaces Namespaces
namespaceResolver namespaceResolver
}

func (m *noCacheMatcher) ForwardMatch(
id id.ID,
fromNanos, toNanos int64,
) rules.MatchResult {
return m.namespaces.ForwardMatch(m.namespaceResolver.Resolve(id),
id.Bytes(), fromNanos, toNanos)
}

func (m *noCacheMatcher) Close() error {
m.namespaces.Close()
return nil
}
Loading

0 comments on commit 877fa32

Please sign in to comment.