Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Only honor default aggregation policies if not matched by mapping rule #2203

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 141 additions & 6 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,132 @@ func TestDownsamplerAggregationWithRulesConfigMappingRules(t *testing.T) {
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMappingRule(t *testing.T) {
gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "foo_metric",
"app": "nginx_edge",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0},
},
}
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []AutoMappingRule{
{
Aggregations: []aggregation.Type{aggregation.Sum},
Policies: policy.StoragePolicies{
policy.MustParseStoragePolicy("2s:24h"),
policy.MustParseStoragePolicy("4s:48h"),
},
},
},
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: "app:nginx*",
Aggregations: []aggregation.Type{aggregation.Max},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
// Expect the max to be used and override the default auto
// mapping rule for the storage policy 2s:24h.
{
tags: gaugeMetric.tags,
value: 30,
attributes: &storage.Attributes{
MetricsType: storage.AggregatedMetricsType,
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
},
},
// Expect the sum to still be used for the storage
// policy 4s:48h.
{
tags: gaugeMetric.tags,
value: 60,
attributes: &storage.Attributes{
MetricsType: storage.AggregatedMetricsType,
Resolution: 4 * time.Second,
Retention: 48 * time.Hour,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigMappingRulesReplaceAutoMappingRule(t *testing.T) {
gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "foo_metric",
"app": "nginx_edge",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0},
},
}
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []AutoMappingRule{
{
Aggregations: []aggregation.Type{aggregation.Sum},
Policies: policy.StoragePolicies{
policy.MustParseStoragePolicy("2s:24h"),
},
},
},
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: "app:nginx*",
Aggregations: []aggregation.Type{aggregation.Max},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
// Expect the max to be used and override the default auto
// mapping rule for the storage policy 2s:24h.
{
tags: gaugeMetric.tags,
value: 30,
attributes: &storage.Attributes{
MetricsType: storage.AggregatedMetricsType,
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigRollupRules(t *testing.T) {
gaugeMetric := testGaugeMetric{
tags: map[string]string{
Expand All @@ -197,7 +323,6 @@ func TestDownsamplerAggregationWithRulesConfigRollupRules(t *testing.T) {
res := 5 * time.Second
ret := 30 * 24 * time.Hour
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
instrumentOpts: instrument.NewTestOptions(t),
rulesConfig: &RulesConfiguration{
RollupRules: []RollupRuleConfiguration{
{
Expand Down Expand Up @@ -472,7 +597,8 @@ CheckAllWritesArrivedLoop:

for _, expectedWrite := range expectedWrites {
name := expectedWrite.tags[nameTag]
if _, ok := findWrite(t, writes, name); !ok {
_, ok := findWrite(t, writes, name, expectedWrite.attributes)
if !ok {
time.Sleep(100 * time.Millisecond)
continue CheckAllWritesArrivedLoop
}
Expand All @@ -497,7 +623,7 @@ CheckAllWritesArrivedLoop:
name := expectedWrite.tags[nameTag]
value := expectedWrite.value

write, found := findWrite(t, writes, name)
write, found := findWrite(t, writes, name, expectedWrite.attributes)
require.True(t, found)
assert.Equal(t, expectedWrite.tags, tagsToStringMap(write.Tags))
require.Equal(t, 1, len(write.Datapoints))
Expand Down Expand Up @@ -724,7 +850,8 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
clockOpts = opts.clockOpts
}

instrumentOpts := instrument.NewOptions()
// Use a test instrument options by default to get the debug logs on by default.
instrumentOpts := instrument.NewTestOptions(t)
if opts.instrumentOpts != nil {
instrumentOpts = opts.instrumentOpts
}
Expand Down Expand Up @@ -827,12 +954,20 @@ func findWrite(
t *testing.T,
writes []*storage.WriteQuery,
name string,
optionalMatchAttrs *storage.Attributes,
) (*storage.WriteQuery, bool) {
for _, w := range writes {
if t, ok := w.Tags.Get([]byte(nameTag)); ok {
if bytes.Equal(t, []byte(name)) {
return w, true
if !bytes.Equal(t, []byte(name)) {
// Does not match name.
continue
}
if optionalMatchAttrs != nil && w.Attributes != *optionalMatchAttrs {
// Tried to match attributes and not matched.
continue
}
// Matches name and all optional lookups.
return w, true
}
}
return nil, false
Expand Down
106 changes: 97 additions & 9 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/m3db/m3/src/metrics/generated/proto/metricpb"
"github.com/m3db/m3/src/metrics/matcher"
"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/serialize"

Expand All @@ -57,6 +58,8 @@ type metricsAppenderOptions struct {
matcher matcher.Matcher
metricTagsIteratorPool serialize.MetricTagsIteratorPool

mappingRuleStoragePolicies []policy.StoragePolicy

clockOpts clock.Options
debugLogging bool
logger *zap.Logger
Expand Down Expand Up @@ -120,11 +123,30 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
})
}
} else {
// Always aggregate any default staged metadats
for _, stagedMetadatas := range a.defaultStagedMetadatas {
a.debugLogMatch("downsampler applying default mapping rule",
// NB(r): First apply mapping rules to see which storage policies
// have been applied, any that have been applied as part of
// mapping rules that exact match a default storage policy will
// skipped when applying default rules so to avoid storing
Copy link
Collaborator

@arnikola arnikola Mar 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; be skipped, rules, so as

// the same metrics in the same namespace with the same metric
// name and tags (i.e. overwriting each other).
a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0]

stagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched mapping rule",
debugLogMatchOptions{Meta: stagedMetadatas})

// Collect all the current active mapping rules
for _, stagedMetadata := range stagedMetadatas {
for _, pipe := range stagedMetadata.Pipelines {
for _, sp := range pipe.StoragePolicies {
a.mappingRuleStoragePolicies =
append(a.mappingRuleStoragePolicies, sp)
}
}
}

// Only sample if going to actually aggregate
a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
Expand All @@ -133,12 +155,74 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
})
}

stagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched mapping rule",
// Always aggregate any default staged metadatas (unless
// mapping rule has provided an override for a storage policy,
// if so then skip aggregating for that storage policy).
for _, stagedMetadatas := range a.defaultStagedMetadatas {
a.debugLogMatch("downsampler applying default mapping rule",
debugLogMatchOptions{Meta: stagedMetadatas})

// Only sample if going to actually aggregate
stagedMetadatsBeforeFilter := stagedMetadatas[:]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stagedMetadataBeforeFilter

if len(a.mappingRuleStoragePolicies) != 0 {
// If mapping rules have applied aggregations for
// storage policies then de-dupe so we don't have two
// active aggregations for the same storage policy.
stagedMetadatasAfterFilter := stagedMetadatas[:0]
for _, stagedMetadata := range stagedMetadatas {
pipesAfterFilter := stagedMetadata.Pipelines[:0]
for _, pipe := range stagedMetadata.Pipelines {
storagePoliciesAfterFilter := pipe.StoragePolicies[:0]
for _, sp := range pipe.StoragePolicies {
// Check aggregation for storage policy not already
// set by a mapping rule.
matchedByMappingRule := false
for _, existing := range a.mappingRuleStoragePolicies {
if sp.Equivalent(existing) {
matchedByMappingRule = true
a.debugLogMatch("downsampler skipping default mapping rule storage policy",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also log exactly which rule was skipped?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might get confusing since it could be a subsection of it - thankfully we print that out just as we start matching it (so its contextually close in the logs)

			a.debugLogMatch("downsampler applying default mapping rule",
				debugLogMatchOptions{Meta: stagedMetadatas})

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair

debugLogMatchOptions{Meta: stagedMetadatsBeforeFilter})
break
}
}
if !matchedByMappingRule {
// Keep storage policy if not matched by mapping rule.
storagePoliciesAfterFilter =
append(storagePoliciesAfterFilter, sp)
}
}

// Update storage policies slice after filtering.
pipe.StoragePolicies = storagePoliciesAfterFilter

if len(pipe.StoragePolicies) != 0 {
// Keep storage policy if still has some storage policies.
pipesAfterFilter = append(pipesAfterFilter, pipe)
}
}

// Update pipelnes after filtering.
stagedMetadata.Pipelines = pipesAfterFilter

if len(stagedMetadata.Pipelines) != 0 {
// Keep staged metadata if still has some pipelines.
stagedMetadatasAfterFilter =
append(stagedMetadatasAfterFilter, stagedMetadata)
}
}

// Finally set the staged metadatas we're keeping
// as those that were kept after filtering.
stagedMetadatas = stagedMetadatasAfterFilter
}

// Now skip appending if after filtering there's no staged metadatas
// after any filtering that's applied.
if len(stagedMetadatas) == 0 {
a.debugLogMatch("downsampler skipping default mapping rule completely",
debugLogMatchOptions{Meta: stagedMetadatsBeforeFilter})
continue
}

a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
Expand Down Expand Up @@ -167,8 +251,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
}

type debugLogMatchOptions struct {
Meta metadata.StagedMetadatas
RollupID []byte
Meta metadata.StagedMetadatas
StoragePolicy policy.StoragePolicy
RollupID []byte
}

func (a *metricsAppender) debugLogMatch(str string, opts debugLogMatchOptions) {
Expand All @@ -184,6 +269,9 @@ func (a *metricsAppender) debugLogMatch(str string, opts debugLogMatchOptions) {
if v := opts.Meta; v != nil {
fields = append(fields, stagedMetadatasLogField(v))
}
if v := opts.StoragePolicy; v != policy.EmptyStoragePolicy {
fields = append(fields, zap.Stringer("storagePolicy", v))
}
a.logger.Debug(str, fields...)
}

Expand Down
8 changes: 8 additions & 0 deletions src/metrics/policy/storage_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ func NewStoragePolicyFromProto(pb *policypb.StoragePolicy) (StoragePolicy, error
return sp, nil
}

// Equivalent returns whether two storage policies are equal by their
// retention width and resolution. The resolution precision is ignored
// for equivalency (hence why the method is not named Equal).
func (p StoragePolicy) Equivalent(other StoragePolicy) bool {
return p.resolution.Window == other.resolution.Window &&
p.retention == other.retention
}

// String is the string representation of a storage policy.
func (p StoragePolicy) String() string {
return fmt.Sprintf("%s%s%s", p.resolution.String(), resolutionRetentionSeparator, p.retention.String())
Expand Down