diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go index df8e4da8d7..f2734e57b8 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -137,6 +137,7 @@ func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppe metricTagsIteratorPool: agg.pools.metricTagsIteratorPool, debugLogging: debugLogging, logger: logger, + untimedRollups: agg.untimedRollups, } } diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 979ef48cae..043c9af3d0 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -1813,7 +1813,321 @@ func TestDownsamplerAggregationWithRulesConfigRollupRuleAndDropPolicyAndDropTime "status_code": "500", "endpoint": "/foo/bar", }, - values: []expectedValue{{value: 55}, {value: 39}}, + values: []expectedValue{{value: 94}}, + attributes: &storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }) + + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + +func TestDownsamplerAggregationWithRulesConfigRollupRuleUntimedRollups(t *testing.T) { + t.Parallel() + + gaugeMetrics := []testGaugeMetric{ + { + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value_1", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 42}, + {value: 12, offset: 2 * time.Second}, + }, + expectDropTimestamp: true, + }, + { + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value_2", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 13}, + {value: 27, offset: 2 * time.Second}, + }, + expectDropTimestamp: true, + }, + } + res := 1 * time.Second + ret := 30 * 24 * time.Hour + filter := fmt.Sprintf("%s:http_requests app:* status_code:* endpoint:*", nameTag) + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ + untimedRollups: true, + rulesConfig: &RulesConfiguration{ + MappingRules: []MappingRuleConfiguration{ + { + Filter: "app:nginx*", + Aggregations: []aggregation.Type{aggregation.Max}, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: 1 * time.Second, + Retention: 30 * 24 * time.Hour, + }, + }, + }, + }, + RollupRules: []RollupRuleConfiguration{ + { + Filter: filter, + Transforms: []TransformConfiguration{ + { + Rollup: &RollupOperationConfiguration{ + MetricName: "http_requests_by_status_code", + GroupBy: []string{"app", "status_code", "endpoint"}, + Aggregations: []aggregation.Type{aggregation.Sum}, + }, + }, + }, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }, + ingest: &testDownsamplerOptionsIngest{ + gaugeMetrics: gaugeMetrics, + }, + expect: &testDownsamplerOptionsExpect{ + writes: []testExpectedWrite{ + { + tags: map[string]string{ + nameTag: "http_requests_by_status_code", + string(rollupTagName): string(rollupTagValue), + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + }, + values: []expectedValue{{value: 94}}, + attributes: &storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }) + + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + +func TestDownsamplerAggregationWithRulesConfigRollupRuleUntimedRollupsWaitForOffset(t *testing.T) { + t.Parallel() + + gaugeMetrics := []testGaugeMetric{ + { + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value_1", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 42}, + }, + expectDropPolicyApplied: true, + expectDropTimestamp: true, + }, + { + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value_2", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 12, offset: 2 * time.Second}, + }, + expectDropPolicyApplied: true, + expectDropTimestamp: true, + }, + { + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value_3", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 13}, + }, + expectDropPolicyApplied: true, + expectDropTimestamp: true, + }, + } + res := 1 * time.Second + ret := 30 * 24 * time.Hour + filter := fmt.Sprintf("%s:http_requests app:* status_code:* endpoint:*", nameTag) + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ + waitForOffset: true, + untimedRollups: true, + rulesConfig: &RulesConfiguration{ + MappingRules: []MappingRuleConfiguration{ + { + Filter: filter, + Drop: true, + }, + }, + RollupRules: []RollupRuleConfiguration{ + { + Filter: filter, + Transforms: []TransformConfiguration{ + { + Rollup: &RollupOperationConfiguration{ + MetricName: "http_requests_by_status_code", + GroupBy: []string{"app", "status_code", "endpoint"}, + Aggregations: []aggregation.Type{aggregation.Sum}, + }, + }, + }, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }, + ingest: &testDownsamplerOptionsIngest{ + gaugeMetrics: gaugeMetrics, + }, + expect: &testDownsamplerOptionsExpect{ + writes: []testExpectedWrite{ + { + tags: map[string]string{ + nameTag: "http_requests_by_status_code", + string(rollupTagName): string(rollupTagValue), + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + }, + values: []expectedValue{{value: 42}, {value: 25}}, + attributes: &storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }) + + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + +func TestDownsamplerAggregationWithRulesConfigRollupRuleRollupLaterUntimedRollups(t *testing.T) { + t.Parallel() + + gaugeMetrics := []testGaugeMetric{ + { + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value_1", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 42}, + {value: 12, offset: 2 * time.Second}, + }, + expectDropTimestamp: true, + }, + { + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value_2", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 13}, + {value: 27, offset: 2 * time.Second}, + }, + expectDropTimestamp: true, + }, + } + res := 1 * time.Second + ret := 30 * 24 * time.Hour + filter := fmt.Sprintf("%s:http_requests app:* status_code:* endpoint:*", nameTag) + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ + untimedRollups: true, + rulesConfig: &RulesConfiguration{ + MappingRules: []MappingRuleConfiguration{ + { + Filter: "app:nginx*", + Aggregations: []aggregation.Type{aggregation.Max}, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: 1 * time.Second, + Retention: 30 * 24 * time.Hour, + }, + }, + }, + }, + RollupRules: []RollupRuleConfiguration{ + { + Filter: filter, + Transforms: []TransformConfiguration{ + { + Transform: &TransformOperationConfiguration{ + Type: transformation.Add, + }, + }, + { + Rollup: &RollupOperationConfiguration{ + MetricName: "http_requests_by_status_code", + GroupBy: []string{"app", "status_code", "endpoint"}, + Aggregations: []aggregation.Type{aggregation.Sum}, + }, + }, + }, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }, + ingest: &testDownsamplerOptionsIngest{ + gaugeMetrics: gaugeMetrics, + }, + expect: &testDownsamplerOptionsExpect{ + writes: []testExpectedWrite{ + { + tags: map[string]string{ + nameTag: "http_requests_by_status_code", + string(rollupTagName): string(rollupTagValue), + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + }, + values: []expectedValue{{value: 39}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, Resolution: res, @@ -2644,7 +2958,14 @@ func testDownsamplerAggregationIngest( if sample.offset > 0 { sample.time = sample.time.Add(sample.offset) } - err = samplesAppender.AppendCounterSample(sample.time, sample.value, nil) + if testOpts.waitForOffset { + time.Sleep(sample.offset) + } + if samplesAppenderResult.ShouldDropTimestamp { + err = samplesAppender.AppendUntimedCounterSample(sample.value, nil) + } else { + err = samplesAppender.AppendCounterSample(sample.time, sample.value, nil) + } require.NoError(t, err) } } @@ -2674,7 +2995,14 @@ func testDownsamplerAggregationIngest( if sample.offset > 0 { sample.time = sample.time.Add(sample.offset) } - err = samplesAppender.AppendGaugeSample(sample.time, sample.value, nil) + if testOpts.waitForOffset { + time.Sleep(sample.offset) + } + if samplesAppenderResult.ShouldDropTimestamp { + err = samplesAppender.AppendUntimedGaugeSample(sample.value, nil) + } else { + err = samplesAppender.AppendGaugeSample(sample.time, sample.value, nil) + } require.NoError(t, err) } } @@ -2718,6 +3046,8 @@ type testDownsamplerOptions struct { clockOpts clock.Options instrumentOpts instrument.Options identTag string + untimedRollups bool + waitForOffset bool // Options for the test autoMappingRules []m3.ClusterNamespaceOptions @@ -2808,6 +3138,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl cfg.Rules = opts.rulesConfig } cfg.Matcher = opts.matcherConfig + cfg.UntimedRollups = opts.untimedRollups instance, err := cfg.NewDownsampler(DownsamplerOptions{ Storage: storage, diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index 428272e3ab..b3eb2c7828 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -102,6 +102,7 @@ type metricsAppenderOptions struct { matcher matcher.Matcher tagEncoderPool serialize.TagEncoderPool metricTagsIteratorPool serialize.MetricTagsIteratorPool + untimedRollups bool clockOpts clock.Options debugLogging bool @@ -377,7 +378,10 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp // Apply the custom tags first so that they apply even if mapping // rules drop the metric. - dropTimestamp = a.curr.Pipelines.ApplyCustomTags() + dropTimestamp = a.curr.Pipelines.ShouldDropTimestamp( + metadata.ShouldDropTimestampOptions{ + UntimedRollups: a.untimedRollups, + }) // Apply drop policies results a.curr.Pipelines, dropApplyResult = a.curr.Pipelines.ApplyOrRemoveDropPolicies() @@ -404,6 +408,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp unownedID: rollup.ID, stagedMetadatas: rollup.Metadatas, }) + if a.untimedRollups { + dropTimestamp = true + } } dropPolicyApplied := dropApplyResult != metadata.NoDropPolicyPresentResult diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 88a1d1fa48..a7296ebff8 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -234,9 +234,10 @@ type agg struct { aggregator aggregator.Aggregator clientRemote client.Client - clockOpts clock.Options - matcher matcher.Matcher - pools aggPools + clockOpts clock.Options + matcher matcher.Matcher + pools aggPools + untimedRollups bool } // Configuration configurates a downsampler. @@ -268,8 +269,12 @@ type Configuration struct { // BufferPastLimits specifies the buffer past limits. BufferPastLimits []BufferPastLimitConfiguration `yaml:"bufferPastLimits"` - // EntryTTL determines how long an entry remains alive before it may be expired due to inactivity. + // EntryTTL determines how long an entry remains alive before it may be + // expired due to inactivity. EntryTTL time.Duration `yaml:"entryTTL"` + + // UntimedRollups indicates rollup rules should be untimed. + UntimedRollups bool `yaml:"untimedRollups"` } // MatcherConfiguration is the configuration for the rule matcher. @@ -808,9 +813,10 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } return agg{ - clientRemote: client, - matcher: matcher, - pools: pools, + clientRemote: client, + matcher: matcher, + pools: pools, + untimedRollups: cfg.UntimedRollups, }, nil } @@ -972,9 +978,10 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } return agg{ - aggregator: aggregatorInstance, - matcher: matcher, - pools: pools, + aggregator: aggregatorInstance, + matcher: matcher, + pools: pools, + untimedRollups: cfg.UntimedRollups, }, nil } diff --git a/src/metrics/metadata/metadata.go b/src/metrics/metadata/metadata.go index 94aa0275be..474eda6c48 100644 --- a/src/metrics/metadata/metadata.go +++ b/src/metrics/metadata/metadata.go @@ -115,6 +115,11 @@ func (m PipelineMetadata) IsMappingRule() bool { return m.Pipeline.IsMappingRule() } +// IsAnyRollupRules returns whether any of the rules have rollups. +func (m PipelineMetadata) IsAnyRollupRules() bool { + return !m.Pipeline.IsMappingRule() +} + // IsDropPolicyApplied returns whether this is the default standard pipeline // but with the drop policy applied. func (m PipelineMetadata) IsDropPolicyApplied() bool { @@ -280,21 +285,29 @@ func (metadatas PipelineMetadatas) ApplyOrRemoveDropPolicies() ( return result, RemovedIneffectiveDropPoliciesResult } -// ApplyCustomTags applies custom M3 tags. -func (metadatas PipelineMetadatas) ApplyCustomTags() ( - dropTimestamp bool, -) { - // Go over metadatas and process M3 custom tags. +// ShouldDropTimestampOptions are options for the should drop timestamp method. +type ShouldDropTimestampOptions struct { + UntimedRollups bool +} + +// ShouldDropTimestamp applies custom M3 tags. +func (metadatas PipelineMetadatas) ShouldDropTimestamp(opts ShouldDropTimestampOptions) bool { + // Go over metadatas and and look for drop timestamp tag. for i := range metadatas { + if opts.UntimedRollups { + if metadatas[i].IsAnyRollupRules() { + return true + } + } for j := range metadatas[i].Tags { // If any metadata has the drop timestamp tag, then return that we // should send untimed metrics to the aggregator. if bytes.Equal(metadatas[i].Tags[j].Name, metric.M3MetricsDropTimestamp) { - dropTimestamp = true + return true } } } - return + return false } // Metadata represents the metadata associated with a metric.