Skip to content

Commit

Permalink
[coordinator] Add config option to make rollup rules untimed (#3343)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan authored Mar 10, 2021
1 parent 079ac7d commit d086cc5
Show file tree
Hide file tree
Showing 5 changed files with 380 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppe
metricTagsIteratorPool: agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
untimedRollups: agg.untimedRollups,
}
}

Expand Down
337 changes: 334 additions & 3 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1813,7 +1813,321 @@ func TestDownsamplerAggregationWithRulesConfigRollupRuleAndDropPolicyAndDropTime
"status_code": "500",
"endpoint": "/foo/bar",
},
values: []expectedValue{{value: 55}, {value: 39}},
values: []expectedValue{{value: 94}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: res,
Retention: ret,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigRollupRuleUntimedRollups(t *testing.T) {
t.Parallel()

gaugeMetrics := []testGaugeMetric{
{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value_1",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 42},
{value: 12, offset: 2 * time.Second},
},
expectDropTimestamp: true,
},
{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value_2",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 13},
{value: 27, offset: 2 * time.Second},
},
expectDropTimestamp: true,
},
}
res := 1 * time.Second
ret := 30 * 24 * time.Hour
filter := fmt.Sprintf("%s:http_requests app:* status_code:* endpoint:*", nameTag)
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
untimedRollups: true,
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: "app:nginx*",
Aggregations: []aggregation.Type{aggregation.Max},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 1 * time.Second,
Retention: 30 * 24 * time.Hour,
},
},
},
},
RollupRules: []RollupRuleConfiguration{
{
Filter: filter,
Transforms: []TransformConfiguration{
{
Rollup: &RollupOperationConfiguration{
MetricName: "http_requests_by_status_code",
GroupBy: []string{"app", "status_code", "endpoint"},
Aggregations: []aggregation.Type{aggregation.Sum},
},
},
},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: res,
Retention: ret,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: gaugeMetrics,
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
{
tags: map[string]string{
nameTag: "http_requests_by_status_code",
string(rollupTagName): string(rollupTagValue),
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
},
values: []expectedValue{{value: 94}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: res,
Retention: ret,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigRollupRuleUntimedRollupsWaitForOffset(t *testing.T) {
t.Parallel()

gaugeMetrics := []testGaugeMetric{
{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value_1",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 42},
},
expectDropPolicyApplied: true,
expectDropTimestamp: true,
},
{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value_2",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 12, offset: 2 * time.Second},
},
expectDropPolicyApplied: true,
expectDropTimestamp: true,
},
{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value_3",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 13},
},
expectDropPolicyApplied: true,
expectDropTimestamp: true,
},
}
res := 1 * time.Second
ret := 30 * 24 * time.Hour
filter := fmt.Sprintf("%s:http_requests app:* status_code:* endpoint:*", nameTag)
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
waitForOffset: true,
untimedRollups: true,
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: filter,
Drop: true,
},
},
RollupRules: []RollupRuleConfiguration{
{
Filter: filter,
Transforms: []TransformConfiguration{
{
Rollup: &RollupOperationConfiguration{
MetricName: "http_requests_by_status_code",
GroupBy: []string{"app", "status_code", "endpoint"},
Aggregations: []aggregation.Type{aggregation.Sum},
},
},
},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: res,
Retention: ret,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: gaugeMetrics,
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
{
tags: map[string]string{
nameTag: "http_requests_by_status_code",
string(rollupTagName): string(rollupTagValue),
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
},
values: []expectedValue{{value: 42}, {value: 25}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: res,
Retention: ret,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigRollupRuleRollupLaterUntimedRollups(t *testing.T) {
t.Parallel()

gaugeMetrics := []testGaugeMetric{
{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value_1",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 42},
{value: 12, offset: 2 * time.Second},
},
expectDropTimestamp: true,
},
{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value_2",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 13},
{value: 27, offset: 2 * time.Second},
},
expectDropTimestamp: true,
},
}
res := 1 * time.Second
ret := 30 * 24 * time.Hour
filter := fmt.Sprintf("%s:http_requests app:* status_code:* endpoint:*", nameTag)
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
untimedRollups: true,
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: "app:nginx*",
Aggregations: []aggregation.Type{aggregation.Max},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 1 * time.Second,
Retention: 30 * 24 * time.Hour,
},
},
},
},
RollupRules: []RollupRuleConfiguration{
{
Filter: filter,
Transforms: []TransformConfiguration{
{
Transform: &TransformOperationConfiguration{
Type: transformation.Add,
},
},
{
Rollup: &RollupOperationConfiguration{
MetricName: "http_requests_by_status_code",
GroupBy: []string{"app", "status_code", "endpoint"},
Aggregations: []aggregation.Type{aggregation.Sum},
},
},
},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: res,
Retention: ret,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: gaugeMetrics,
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
{
tags: map[string]string{
nameTag: "http_requests_by_status_code",
string(rollupTagName): string(rollupTagValue),
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
},
values: []expectedValue{{value: 39}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: res,
Expand Down Expand Up @@ -2644,7 +2958,14 @@ func testDownsamplerAggregationIngest(
if sample.offset > 0 {
sample.time = sample.time.Add(sample.offset)
}
err = samplesAppender.AppendCounterSample(sample.time, sample.value, nil)
if testOpts.waitForOffset {
time.Sleep(sample.offset)
}
if samplesAppenderResult.ShouldDropTimestamp {
err = samplesAppender.AppendUntimedCounterSample(sample.value, nil)
} else {
err = samplesAppender.AppendCounterSample(sample.time, sample.value, nil)
}
require.NoError(t, err)
}
}
Expand Down Expand Up @@ -2674,7 +2995,14 @@ func testDownsamplerAggregationIngest(
if sample.offset > 0 {
sample.time = sample.time.Add(sample.offset)
}
err = samplesAppender.AppendGaugeSample(sample.time, sample.value, nil)
if testOpts.waitForOffset {
time.Sleep(sample.offset)
}
if samplesAppenderResult.ShouldDropTimestamp {
err = samplesAppender.AppendUntimedGaugeSample(sample.value, nil)
} else {
err = samplesAppender.AppendGaugeSample(sample.time, sample.value, nil)
}
require.NoError(t, err)
}
}
Expand Down Expand Up @@ -2718,6 +3046,8 @@ type testDownsamplerOptions struct {
clockOpts clock.Options
instrumentOpts instrument.Options
identTag string
untimedRollups bool
waitForOffset bool

// Options for the test
autoMappingRules []m3.ClusterNamespaceOptions
Expand Down Expand Up @@ -2808,6 +3138,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
cfg.Rules = opts.rulesConfig
}
cfg.Matcher = opts.matcherConfig
cfg.UntimedRollups = opts.untimedRollups

instance, err := cfg.NewDownsampler(DownsamplerOptions{
Storage: storage,
Expand Down
Loading

0 comments on commit d086cc5

Please sign in to comment.