diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index c70ce6345d..23d74efd7d 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -176,6 +176,132 @@ func TestDownsamplerAggregationWithRulesConfigMappingRules(t *testing.T) { testDownsamplerAggregation(t, testDownsampler) } +func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMappingRule(t *testing.T) { + gaugeMetric := testGaugeMetric{ + tags: map[string]string{ + nameTag: "foo_metric", + "app": "nginx_edge", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0}, + }, + } + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ + autoMappingRules: []AutoMappingRule{ + { + Aggregations: []aggregation.Type{aggregation.Sum}, + Policies: policy.StoragePolicies{ + policy.MustParseStoragePolicy("2s:24h"), + policy.MustParseStoragePolicy("4s:48h"), + }, + }, + }, + rulesConfig: &RulesConfiguration{ + MappingRules: []MappingRuleConfiguration{ + { + Filter: "app:nginx*", + Aggregations: []aggregation.Type{aggregation.Max}, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: 2 * time.Second, + Retention: 24 * time.Hour, + }, + }, + }, + }, + }, + ingest: &testDownsamplerOptionsIngest{ + gaugeMetrics: []testGaugeMetric{gaugeMetric}, + }, + expect: &testDownsamplerOptionsExpect{ + writes: []testExpectedWrite{ + // Expect the max to be used and override the default auto + // mapping rule for the storage policy 2s:24h. + { + tags: gaugeMetric.tags, + value: 30, + attributes: &storage.Attributes{ + MetricsType: storage.AggregatedMetricsType, + Resolution: 2 * time.Second, + Retention: 24 * time.Hour, + }, + }, + // Expect the sum to still be used for the storage + // policy 4s:48h. + { + tags: gaugeMetric.tags, + value: 60, + attributes: &storage.Attributes{ + MetricsType: storage.AggregatedMetricsType, + Resolution: 4 * time.Second, + Retention: 48 * time.Hour, + }, + }, + }, + }, + }) + + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + +func TestDownsamplerAggregationWithRulesConfigMappingRulesReplaceAutoMappingRule(t *testing.T) { + gaugeMetric := testGaugeMetric{ + tags: map[string]string{ + nameTag: "foo_metric", + "app": "nginx_edge", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0}, + }, + } + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ + autoMappingRules: []AutoMappingRule{ + { + Aggregations: []aggregation.Type{aggregation.Sum}, + Policies: policy.StoragePolicies{ + policy.MustParseStoragePolicy("2s:24h"), + }, + }, + }, + rulesConfig: &RulesConfiguration{ + MappingRules: []MappingRuleConfiguration{ + { + Filter: "app:nginx*", + Aggregations: []aggregation.Type{aggregation.Max}, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: 2 * time.Second, + Retention: 24 * time.Hour, + }, + }, + }, + }, + }, + ingest: &testDownsamplerOptionsIngest{ + gaugeMetrics: []testGaugeMetric{gaugeMetric}, + }, + expect: &testDownsamplerOptionsExpect{ + writes: []testExpectedWrite{ + // Expect the max to be used and override the default auto + // mapping rule for the storage policy 2s:24h. + { + tags: gaugeMetric.tags, + value: 30, + attributes: &storage.Attributes{ + MetricsType: storage.AggregatedMetricsType, + Resolution: 2 * time.Second, + Retention: 24 * time.Hour, + }, + }, + }, + }, + }) + + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + func TestDownsamplerAggregationWithRulesConfigRollupRules(t *testing.T) { gaugeMetric := testGaugeMetric{ tags: map[string]string{ @@ -197,7 +323,6 @@ func TestDownsamplerAggregationWithRulesConfigRollupRules(t *testing.T) { res := 5 * time.Second ret := 30 * 24 * time.Hour testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ - instrumentOpts: instrument.NewTestOptions(t), rulesConfig: &RulesConfiguration{ RollupRules: []RollupRuleConfiguration{ { @@ -472,7 +597,8 @@ CheckAllWritesArrivedLoop: for _, expectedWrite := range expectedWrites { name := expectedWrite.tags[nameTag] - if _, ok := findWrite(t, writes, name); !ok { + _, ok := findWrite(t, writes, name, expectedWrite.attributes) + if !ok { time.Sleep(100 * time.Millisecond) continue CheckAllWritesArrivedLoop } @@ -497,7 +623,7 @@ CheckAllWritesArrivedLoop: name := expectedWrite.tags[nameTag] value := expectedWrite.value - write, found := findWrite(t, writes, name) + write, found := findWrite(t, writes, name, expectedWrite.attributes) require.True(t, found) assert.Equal(t, expectedWrite.tags, tagsToStringMap(write.Tags)) require.Equal(t, 1, len(write.Datapoints)) @@ -724,7 +850,8 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl clockOpts = opts.clockOpts } - instrumentOpts := instrument.NewOptions() + // Use a test instrument options by default to get the debug logs on by default. + instrumentOpts := instrument.NewTestOptions(t) if opts.instrumentOpts != nil { instrumentOpts = opts.instrumentOpts } @@ -827,12 +954,20 @@ func findWrite( t *testing.T, writes []*storage.WriteQuery, name string, + optionalMatchAttrs *storage.Attributes, ) (*storage.WriteQuery, bool) { for _, w := range writes { if t, ok := w.Tags.Get([]byte(nameTag)); ok { - if bytes.Equal(t, []byte(name)) { - return w, true + if !bytes.Equal(t, []byte(name)) { + // Does not match name. + continue + } + if optionalMatchAttrs != nil && w.Attributes != *optionalMatchAttrs { + // Tried to match attributes and not matched. + continue } + // Matches name and all optional lookups. + return w, true } } return nil, false diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index 4cf106732b..3bde367251 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/metrics/generated/proto/metricpb" "github.com/m3db/m3/src/metrics/matcher" "github.com/m3db/m3/src/metrics/metadata" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/serialize" @@ -57,6 +58,8 @@ type metricsAppenderOptions struct { matcher matcher.Matcher metricTagsIteratorPool serialize.MetricTagsIteratorPool + mappingRuleStoragePolicies []policy.StoragePolicy + clockOpts clock.Options debugLogging bool logger *zap.Logger @@ -120,11 +123,30 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp }) } } else { - // Always aggregate any default staged metadats - for _, stagedMetadatas := range a.defaultStagedMetadatas { - a.debugLogMatch("downsampler applying default mapping rule", + // NB(r): First apply mapping rules to see which storage policies + // have been applied, any that have been applied as part of + // mapping rules that exact match a default storage policy will be + // skipped when applying default rules, so as to avoid storing + // the same metrics in the same namespace with the same metric + // name and tags (i.e. overwriting each other). + a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0] + + stagedMetadatas := matchResult.ForExistingIDAt(nowNanos) + if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 { + a.debugLogMatch("downsampler applying matched mapping rule", debugLogMatchOptions{Meta: stagedMetadatas}) + // Collect all the current active mapping rules + for _, stagedMetadata := range stagedMetadatas { + for _, pipe := range stagedMetadata.Pipelines { + for _, sp := range pipe.StoragePolicies { + a.mappingRuleStoragePolicies = + append(a.mappingRuleStoragePolicies, sp) + } + } + } + + // Only sample if going to actually aggregate a.multiSamplesAppender.addSamplesAppender(samplesAppender{ agg: a.agg, clientRemote: a.clientRemote, @@ -133,12 +155,74 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp }) } - stagedMetadatas := matchResult.ForExistingIDAt(nowNanos) - if !stagedMetadatas.IsDefault() && len(stagedMetadatas) != 0 { - a.debugLogMatch("downsampler applying matched mapping rule", + // Always aggregate any default staged metadatas (unless + // mapping rule has provided an override for a storage policy, + // if so then skip aggregating for that storage policy). + for _, stagedMetadatas := range a.defaultStagedMetadatas { + a.debugLogMatch("downsampler applying default mapping rule", debugLogMatchOptions{Meta: stagedMetadatas}) - // Only sample if going to actually aggregate + stagedMetadataBeforeFilter := stagedMetadatas[:] + if len(a.mappingRuleStoragePolicies) != 0 { + // If mapping rules have applied aggregations for + // storage policies then de-dupe so we don't have two + // active aggregations for the same storage policy. + stagedMetadatasAfterFilter := stagedMetadatas[:0] + for _, stagedMetadata := range stagedMetadatas { + pipesAfterFilter := stagedMetadata.Pipelines[:0] + for _, pipe := range stagedMetadata.Pipelines { + storagePoliciesAfterFilter := pipe.StoragePolicies[:0] + for _, sp := range pipe.StoragePolicies { + // Check aggregation for storage policy not already + // set by a mapping rule. + matchedByMappingRule := false + for _, existing := range a.mappingRuleStoragePolicies { + if sp.Equivalent(existing) { + matchedByMappingRule = true + a.debugLogMatch("downsampler skipping default mapping rule storage policy", + debugLogMatchOptions{Meta: stagedMetadataBeforeFilter}) + break + } + } + if !matchedByMappingRule { + // Keep storage policy if not matched by mapping rule. + storagePoliciesAfterFilter = + append(storagePoliciesAfterFilter, sp) + } + } + + // Update storage policies slice after filtering. + pipe.StoragePolicies = storagePoliciesAfterFilter + + if len(pipe.StoragePolicies) != 0 { + // Keep storage policy if still has some storage policies. + pipesAfterFilter = append(pipesAfterFilter, pipe) + } + } + + // Update pipelnes after filtering. + stagedMetadata.Pipelines = pipesAfterFilter + + if len(stagedMetadata.Pipelines) != 0 { + // Keep staged metadata if still has some pipelines. + stagedMetadatasAfterFilter = + append(stagedMetadatasAfterFilter, stagedMetadata) + } + } + + // Finally set the staged metadatas we're keeping + // as those that were kept after filtering. + stagedMetadatas = stagedMetadatasAfterFilter + } + + // Now skip appending if after filtering there's no staged metadatas + // after any filtering that's applied. + if len(stagedMetadatas) == 0 { + a.debugLogMatch("downsampler skipping default mapping rule completely", + debugLogMatchOptions{Meta: stagedMetadataBeforeFilter}) + continue + } + a.multiSamplesAppender.addSamplesAppender(samplesAppender{ agg: a.agg, clientRemote: a.clientRemote, @@ -167,8 +251,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp } type debugLogMatchOptions struct { - Meta metadata.StagedMetadatas - RollupID []byte + Meta metadata.StagedMetadatas + StoragePolicy policy.StoragePolicy + RollupID []byte } func (a *metricsAppender) debugLogMatch(str string, opts debugLogMatchOptions) { @@ -184,6 +269,9 @@ func (a *metricsAppender) debugLogMatch(str string, opts debugLogMatchOptions) { if v := opts.Meta; v != nil { fields = append(fields, stagedMetadatasLogField(v)) } + if v := opts.StoragePolicy; v != policy.EmptyStoragePolicy { + fields = append(fields, zap.Stringer("storagePolicy", v)) + } a.logger.Debug(str, fields...) } diff --git a/src/metrics/policy/storage_policy.go b/src/metrics/policy/storage_policy.go index 7c409bac78..1c4221a22e 100644 --- a/src/metrics/policy/storage_policy.go +++ b/src/metrics/policy/storage_policy.go @@ -73,6 +73,14 @@ func NewStoragePolicyFromProto(pb *policypb.StoragePolicy) (StoragePolicy, error return sp, nil } +// Equivalent returns whether two storage policies are equal by their +// retention width and resolution. The resolution precision is ignored +// for equivalency (hence why the method is not named Equal). +func (p StoragePolicy) Equivalent(other StoragePolicy) bool { + return p.resolution.Window == other.resolution.Window && + p.retention == other.retention +} + // String is the string representation of a storage policy. func (p StoragePolicy) String() string { return fmt.Sprintf("%s%s%s", p.resolution.String(), resolutionRetentionSeparator, p.retention.String())