From ee0439badc3d8dbe1e099f1d0b4b2e9e584f4ad8 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Wed, 21 Aug 2024 20:07:56 -0300 Subject: [PATCH 1/9] [processor/interval]: Only keep last received gauge for each interval Signed-off-by: Arthur Silva Sens --- processor/intervalprocessor/processor.go | 6 +++++- processor/intervalprocessor/processor_test.go | 1 + .../input.yaml | 2 ++ .../output.yaml => gauges_are_aggregated/next.yaml} | 0 .../next.yaml => gauges_are_aggregated/output.yaml} | 13 +------------ 5 files changed, 9 insertions(+), 13 deletions(-) rename processor/intervalprocessor/testdata/{gauges_are_passed_through => gauges_are_aggregated}/input.yaml (89%) rename processor/intervalprocessor/testdata/{gauges_are_passed_through/output.yaml => gauges_are_aggregated/next.yaml} (100%) rename processor/intervalprocessor/testdata/{gauges_are_passed_through/next.yaml => gauges_are_aggregated/output.yaml} (65%) diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index 6960472e5395..ccb0744d9a3c 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -102,8 +102,12 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { switch m.Type() { - case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: + case pmetric.MetricTypeSummary: return false + case pmetric.MetricTypeGauge: + mClone, metricID := p.getOrCloneMetric(rm, sm, m) + aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup) + return true case pmetric.MetricTypeSum: // Check if we care about this value sum := m.Sum() diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index 39cb953d2310..ea4d63fc5220 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -27,6 +27,7 @@ func TestAggregation(t *testing.T) { "summaries_are_passed_through", "histograms_are_aggregated", "exp_histograms_are_aggregated", + "gauges_are_aggregated", "all_delta_metrics_are_passed_through", } diff --git a/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml b/processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml similarity index 89% rename from processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml rename to processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml index a3d65c2986e0..727a3ec724ab 100644 --- a/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml +++ b/processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml @@ -31,9 +31,11 @@ resourceMetrics: - key: aaa value: stringValue: bbb + # For interval processor point of view, only the last datapoint should be passed through. - timeUnixNano: 80 asDouble: 178 attributes: - key: aaa value: stringValue: bbb + \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml b/processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml similarity index 100% rename from processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml rename to processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml diff --git a/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml b/processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml similarity index 65% rename from processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml rename to processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml index a3d65c2986e0..9ed7c9da208c 100644 --- a/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml +++ b/processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml @@ -19,21 +19,10 @@ resourceMetrics: gauge: aggregationTemporality: 2 dataPoints: - - timeUnixNano: 50 - asDouble: 345 - attributes: - - key: aaa - value: - stringValue: bbb - - timeUnixNano: 20 - asDouble: 258 - attributes: - - key: aaa - value: - stringValue: bbb - timeUnixNano: 80 asDouble: 178 attributes: - key: aaa value: stringValue: bbb + \ No newline at end of file From 76977d669f8f5d9dd4a42a2b489f8679a267a279 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Wed, 21 Aug 2024 20:13:49 -0300 Subject: [PATCH 2/9] [processor/interval]: Only keep last received summary for each interval Signed-off-by: Arthur Silva Sens --- processor/intervalprocessor/processor.go | 27 +++++++++++++++++- processor/intervalprocessor/processor_test.go | 5 ++-- .../input.yaml | 1 + .../summaries_are_aggregated/next.yaml | 1 + .../output.yaml} | 28 ------------------- .../summaries_are_passed_through/output.yaml | 1 - 6 files changed, 31 insertions(+), 32 deletions(-) rename processor/intervalprocessor/testdata/{summaries_are_passed_through => summaries_are_aggregated}/input.yaml (97%) create mode 100644 processor/intervalprocessor/testdata/summaries_are_aggregated/next.yaml rename processor/intervalprocessor/testdata/{summaries_are_passed_through/next.yaml => summaries_are_aggregated/output.yaml} (50%) delete mode 100644 processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index ccb0744d9a3c..fb9854f61c38 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -36,6 +36,7 @@ type Processor struct { numberLookup map[identity.Stream]pmetric.NumberDataPoint histogramLookup map[identity.Stream]pmetric.HistogramDataPoint expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint + summaryLookup map[identity.Stream]pmetric.SummaryDataPoint exportInterval time.Duration @@ -59,6 +60,7 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics numberLookup: map[identity.Stream]pmetric.NumberDataPoint{}, histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{}, expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{}, + summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{}, exportInterval: config.Interval, @@ -103,7 +105,29 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { switch m.Type() { case pmetric.MetricTypeSummary: - return false + mClone, metricID := p.getOrCloneMetric(rm, sm, m) + + // The ideal scenario is that we would re-use `aggregateDataPoints()` here, but we can't + // because the SummaryDataPoint does not fufill the DataPoint interface. + datapoints := m.Summary().DataPoints() + for i := 0; i < datapoints.Len(); i++ { + dp := datapoints.At(i) + + streamID := identity.OfStream(metricID, dp) + existingDP, ok := p.summaryLookup[streamID] + if !ok { + dpClone := mClone.Summary().DataPoints().AppendEmpty() + dp.CopyTo(dpClone) + p.summaryLookup[streamID] = dpClone + continue + } + + // Check if the datapoint is newer and replace it + if dp.Timestamp() > existingDP.Timestamp() { + dp.CopyTo(existingDP) + } + } + return true case pmetric.MetricTypeGauge: mClone, metricID := p.getOrCloneMetric(rm, sm, m) aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup) @@ -206,6 +230,7 @@ func (p *Processor) exportMetrics() { clear(p.numberLookup) clear(p.histogramLookup) clear(p.expHistogramLookup) + clear(p.summaryLookup) return out }() diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index ea4d63fc5220..0a03b44815b2 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -23,12 +23,12 @@ func TestAggregation(t *testing.T) { testCases := []string{ "basic_aggregation", - "non_monotonic_sums_are_passed_through", - "summaries_are_passed_through", "histograms_are_aggregated", "exp_histograms_are_aggregated", "gauges_are_aggregated", + "summaries_are_aggregated", "all_delta_metrics_are_passed_through", + "non_monotonic_sums_are_passed_through", } ctx, cancel := context.WithCancel(context.Background()) @@ -76,6 +76,7 @@ func TestAggregation(t *testing.T) { require.Empty(t, processor.numberLookup) require.Empty(t, processor.histogramLookup) require.Empty(t, processor.expHistogramLookup) + require.Empty(t, processor.summaryLookup) // Exporting again should return nothing processor.exportMetrics() diff --git a/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml b/processor/intervalprocessor/testdata/summaries_are_aggregated/input.yaml similarity index 97% rename from processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml rename to processor/intervalprocessor/testdata/summaries_are_aggregated/input.yaml index 15862ceb73e8..c0190dd5c614 100644 --- a/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml +++ b/processor/intervalprocessor/testdata/summaries_are_aggregated/input.yaml @@ -46,6 +46,7 @@ resourceMetrics: - key: aaa value: stringValue: bbb + # Only last summary should pass through - timeUnixNano: 80 quantileValues: - quantile: 0.25 diff --git a/processor/intervalprocessor/testdata/summaries_are_aggregated/next.yaml b/processor/intervalprocessor/testdata/summaries_are_aggregated/next.yaml new file mode 100644 index 000000000000..d2e76ef0f16b --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_are_aggregated/next.yaml @@ -0,0 +1 @@ +resourceMetrics: [] \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml b/processor/intervalprocessor/testdata/summaries_are_aggregated/output.yaml similarity index 50% rename from processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml rename to processor/intervalprocessor/testdata/summaries_are_aggregated/output.yaml index 15862ceb73e8..75b8475e9ba7 100644 --- a/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml +++ b/processor/intervalprocessor/testdata/summaries_are_aggregated/output.yaml @@ -18,34 +18,6 @@ resourceMetrics: - name: summary.test summary: dataPoints: - - timeUnixNano: 50 - quantileValues: - - quantile: 0.25 - value: 50 - - quantile: 0.5 - value: 20 - - quantile: 0.75 - value: 75 - - quantile: 0.95 - value: 10 - attributes: - - key: aaa - value: - stringValue: bbb - - timeUnixNano: 20 - quantileValues: - - quantile: 0.25 - value: 40 - - quantile: 0.5 - value: 10 - - quantile: 0.75 - value: 60 - - quantile: 0.95 - value: 5 - attributes: - - key: aaa - value: - stringValue: bbb - timeUnixNano: 80 quantileValues: - quantile: 0.25 diff --git a/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml deleted file mode 100644 index 3949e7c54ded..000000000000 --- a/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml +++ /dev/null @@ -1 +0,0 @@ -resourceMetrics: [] From 63bb73ded109173b708d2adc25171d662270426f Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Wed, 21 Aug 2024 20:28:11 -0300 Subject: [PATCH 3/9] chore: Update changelog and README Signed-off-by: Arthur Silva Sens --- .../intervalprocessor_gauge_summary.yaml | 27 +++++++++++++++++++ processor/intervalprocessor/README.md | 4 +-- 2 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 .chloggen/intervalprocessor_gauge_summary.yaml diff --git a/.chloggen/intervalprocessor_gauge_summary.yaml b/.chloggen/intervalprocessor_gauge_summary.yaml new file mode 100644 index 000000000000..96894ec68371 --- /dev/null +++ b/.chloggen/intervalprocessor_gauge_summary.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/interval + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support for gauge and summary metrics. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34803] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Only the last value of a gauge or summary metric is reported in the interval processor, instead of all values. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 0c4971e73566..f1b123a43c97 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -19,13 +19,13 @@ The interval processor (`intervalprocessor`) aggregates metrics and periodically * Monotonically increasing, cumulative sums * Monotonically increasing, cumulative histograms * Monotonically increasing, cumulative exponential histograms +* Gauges +* Summaries The following metric types will *not* be aggregated, and will instead be passed, unchanged, to the next component in the pipeline: * All delta metrics * Non-monotonically increasing sums -* Gauges -* Summaries ## Configuration From 131e812fe99becfb46b76651cd5c0f0513ca2240 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Thu, 22 Aug 2024 08:13:52 -0300 Subject: [PATCH 4/9] chore: Fix linting issues Signed-off-by: Arthur Silva Sens --- processor/intervalprocessor/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index fb9854f61c38..3a7bef01d4a6 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -108,7 +108,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro mClone, metricID := p.getOrCloneMetric(rm, sm, m) // The ideal scenario is that we would re-use `aggregateDataPoints()` here, but we can't - // because the SummaryDataPoint does not fufill the DataPoint interface. + // because the SummaryDataPoint does not fulfill the DataPoint interface. datapoints := m.Summary().DataPoints() for i := 0; i < datapoints.Len(); i++ { dp := datapoints.At(i) From 2a2f9ffa1f7b725a7af135b5aaf37deb8ee88c44 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 23 Aug 2024 15:05:15 -0300 Subject: [PATCH 5/9] Make gauge/summary optinally pass through Signed-off-by: Arthur Silva Sens --- processor/intervalprocessor/README.md | 6 +- processor/intervalprocessor/config.go | 8 ++- processor/intervalprocessor/factory.go | 4 +- processor/intervalprocessor/processor.go | 16 ++++- processor/intervalprocessor/processor_test.go | 32 +++++----- .../testdata/gauges_passes_through/input.yaml | 41 ++++++++++++ .../testdata/gauges_passes_through/next.yaml | 40 ++++++++++++ .../gauges_passes_through/output.yaml | 2 + .../summaries_passes_through/input.yaml | 63 +++++++++++++++++++ .../summaries_passes_through/next.yaml | 62 ++++++++++++++++++ .../summaries_passes_through/output.yaml | 1 + 11 files changed, 255 insertions(+), 20 deletions(-) create mode 100644 processor/intervalprocessor/testdata/gauges_passes_through/input.yaml create mode 100644 processor/intervalprocessor/testdata/gauges_passes_through/next.yaml create mode 100644 processor/intervalprocessor/testdata/gauges_passes_through/output.yaml create mode 100644 processor/intervalprocessor/testdata/summaries_passes_through/input.yaml create mode 100644 processor/intervalprocessor/testdata/summaries_passes_through/next.yaml create mode 100644 processor/intervalprocessor/testdata/summaries_passes_through/output.yaml diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index f1b123a43c97..41887c7f4951 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -19,7 +19,7 @@ The interval processor (`intervalprocessor`) aggregates metrics and periodically * Monotonically increasing, cumulative sums * Monotonically increasing, cumulative histograms * Monotonically increasing, cumulative exponential histograms -* Gauges +* Gauges * Summaries The following metric types will *not* be aggregated, and will instead be passed, unchanged, to the next component in the pipeline: @@ -27,11 +27,15 @@ The following metric types will *not* be aggregated, and will instead be passed, * All delta metrics * Non-monotonically increasing sums +While sending only the last for cumulative metrics doesn't represent data loss, it's arguable if we do the same thing for Gauges and Summaries. In a push-based model, one can tell its instrumentation library to push metrics in important situations and the `intervalprocessor` might discard the data point if it was not the last in a given interval. For this reason, `Gauge` and `Summary` metric types can be configured to be passed through as they are. Copying the same behavior as delta metrics. + ## Configuration The following settings can be optionally configured: * `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s +* `gauge_pass_through`: Whether gauges should pass through as they are to the next component or be aggregated. Default: false +* `summary_pass_through`: Whether summaries should pass through as they are to the next component or be aggregated. Default: false ## Example of metric flows diff --git a/processor/intervalprocessor/config.go b/processor/intervalprocessor/config.go index 1967afc972bb..96ad36189f80 100644 --- a/processor/intervalprocessor/config.go +++ b/processor/intervalprocessor/config.go @@ -18,8 +18,14 @@ var _ component.Config = (*Config)(nil) // Config defines the configuration for the processor. type Config struct { - // Interval is the time + // Interval is the time interval at which the processor will aggregate metrics. Interval time.Duration `mapstructure:"interval"` + // GaugePassThrough is a flag that determines whether gauge metrics should be passed through + // as they are or aggregated. + GaugePassThrough bool `mapstructure:"gauge_pass_through"` + // SummaryPassThrough is a flag that determines whether summary metrics should be passed through + // as they are or aggregated. + SummaryPassThrough bool `mapstructure:"summary_pass_through"` } // Validate checks whether the input configuration has all of the required fields for the processor. diff --git a/processor/intervalprocessor/factory.go b/processor/intervalprocessor/factory.go index 87a1278cbc0b..981cc63f29a2 100644 --- a/processor/intervalprocessor/factory.go +++ b/processor/intervalprocessor/factory.go @@ -25,7 +25,9 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - Interval: 60 * time.Second, + Interval: 60 * time.Second, + GaugePassThrough: false, + SummaryPassThrough: false, } } diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index 3a7bef01d4a6..241c829c4c61 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -38,7 +38,9 @@ type Processor struct { expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint summaryLookup map[identity.Stream]pmetric.SummaryDataPoint - exportInterval time.Duration + exportInterval time.Duration + gaugePassThrough bool + summaryPassThrough bool nextConsumer consumer.Metrics } @@ -62,7 +64,9 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{}, summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{}, - exportInterval: config.Interval, + exportInterval: config.Interval, + gaugePassThrough: config.GaugePassThrough, + summaryPassThrough: config.SummaryPassThrough, nextConsumer: nextConsumer, } @@ -105,6 +109,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { switch m.Type() { case pmetric.MetricTypeSummary: + if p.summaryPassThrough { + return false + } + mClone, metricID := p.getOrCloneMetric(rm, sm, m) // The ideal scenario is that we would re-use `aggregateDataPoints()` here, but we can't @@ -129,6 +137,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro } return true case pmetric.MetricTypeGauge: + if p.gaugePassThrough { + return false + } + mClone, metricID := p.getOrCloneMetric(rm, sm, m) aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup) return true diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index 0a03b44815b2..131ecc53e48f 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -21,27 +21,29 @@ import ( func TestAggregation(t *testing.T) { t.Parallel() - testCases := []string{ - "basic_aggregation", - "histograms_are_aggregated", - "exp_histograms_are_aggregated", - "gauges_are_aggregated", - "summaries_are_aggregated", - "all_delta_metrics_are_passed_through", - "non_monotonic_sums_are_passed_through", + testCases := []struct { + name string + passThrough bool + }{ + {name: "basic_aggregation"}, + {name: "histograms_are_aggregated"}, + {name: "exp_histograms_are_aggregated"}, + {name: "gauges_are_aggregated"}, + {name: "summaries_are_aggregated"}, + {name: "all_delta_metrics_are_passed_through"}, // Deltas are passed through even when aggregation is enabled + {name: "non_monotonic_sums_are_passed_through"}, // Non-monotonic sums are passed through even when aggregation is enabled + {name: "gauges_passes_through", passThrough: true}, + {name: "summaries_passes_through", passThrough: true}, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - config := &Config{Interval: time.Second} - + var config *Config for _, tc := range testCases { - testName := tc - - t.Run(testName, func(t *testing.T) { - t.Parallel() + config = &Config{Interval: time.Second, GaugePassThrough: tc.passThrough, SummaryPassThrough: tc.passThrough} + t.Run(tc.name, func(t *testing.T) { // next stores the results of the filter metric processor next := &consumertest.MetricsSink{} @@ -54,7 +56,7 @@ func TestAggregation(t *testing.T) { ) require.NoError(t, err) - dir := filepath.Join("testdata", testName) + dir := filepath.Join("testdata", tc.name) md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml")) require.NoError(t, err) diff --git a/processor/intervalprocessor/testdata/gauges_passes_through/input.yaml b/processor/intervalprocessor/testdata/gauges_passes_through/input.yaml new file mode 100644 index 000000000000..727a3ec724ab --- /dev/null +++ b/processor/intervalprocessor/testdata/gauges_passes_through/input.yaml @@ -0,0 +1,41 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: test.gauge + gauge: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + asDouble: 345 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + asDouble: 258 + attributes: + - key: aaa + value: + stringValue: bbb + # For interval processor point of view, only the last datapoint should be passed through. + - timeUnixNano: 80 + asDouble: 178 + attributes: + - key: aaa + value: + stringValue: bbb + \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_passes_through/next.yaml b/processor/intervalprocessor/testdata/gauges_passes_through/next.yaml new file mode 100644 index 000000000000..a6c2b9cbe362 --- /dev/null +++ b/processor/intervalprocessor/testdata/gauges_passes_through/next.yaml @@ -0,0 +1,40 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: test.gauge + gauge: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 50 + asDouble: 345 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + asDouble: 258 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 80 + asDouble: 178 + attributes: + - key: aaa + value: + stringValue: bbb + \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_passes_through/output.yaml b/processor/intervalprocessor/testdata/gauges_passes_through/output.yaml new file mode 100644 index 000000000000..0d5f877dc7b8 --- /dev/null +++ b/processor/intervalprocessor/testdata/gauges_passes_through/output.yaml @@ -0,0 +1,2 @@ +resourceMetrics: [] + \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/summaries_passes_through/input.yaml b/processor/intervalprocessor/testdata/summaries_passes_through/input.yaml new file mode 100644 index 000000000000..c0190dd5c614 --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_passes_through/input.yaml @@ -0,0 +1,63 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: summary.test + summary: + dataPoints: + - timeUnixNano: 50 + quantileValues: + - quantile: 0.25 + value: 50 + - quantile: 0.5 + value: 20 + - quantile: 0.75 + value: 75 + - quantile: 0.95 + value: 10 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + quantileValues: + - quantile: 0.25 + value: 40 + - quantile: 0.5 + value: 10 + - quantile: 0.75 + value: 60 + - quantile: 0.95 + value: 5 + attributes: + - key: aaa + value: + stringValue: bbb + # Only last summary should pass through + - timeUnixNano: 80 + quantileValues: + - quantile: 0.25 + value: 80 + - quantile: 0.5 + value: 35 + - quantile: 0.75 + value: 90 + - quantile: 0.95 + value: 15 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/summaries_passes_through/next.yaml b/processor/intervalprocessor/testdata/summaries_passes_through/next.yaml new file mode 100644 index 000000000000..15862ceb73e8 --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_passes_through/next.yaml @@ -0,0 +1,62 @@ +resourceMetrics: + - schemaUrl: https://test-res-schema.com/schema + resource: + attributes: + - key: asdf + value: + stringValue: foo + scopeMetrics: + - schemaUrl: https://test-scope-schema.com/schema + scope: + name: MyTestInstrument + version: "1.2.3" + attributes: + - key: foo + value: + stringValue: bar + metrics: + - name: summary.test + summary: + dataPoints: + - timeUnixNano: 50 + quantileValues: + - quantile: 0.25 + value: 50 + - quantile: 0.5 + value: 20 + - quantile: 0.75 + value: 75 + - quantile: 0.95 + value: 10 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 20 + quantileValues: + - quantile: 0.25 + value: 40 + - quantile: 0.5 + value: 10 + - quantile: 0.75 + value: 60 + - quantile: 0.95 + value: 5 + attributes: + - key: aaa + value: + stringValue: bbb + - timeUnixNano: 80 + quantileValues: + - quantile: 0.25 + value: 80 + - quantile: 0.5 + value: 35 + - quantile: 0.75 + value: 90 + - quantile: 0.95 + value: 15 + attributes: + - key: aaa + value: + stringValue: bbb diff --git a/processor/intervalprocessor/testdata/summaries_passes_through/output.yaml b/processor/intervalprocessor/testdata/summaries_passes_through/output.yaml new file mode 100644 index 000000000000..d2e76ef0f16b --- /dev/null +++ b/processor/intervalprocessor/testdata/summaries_passes_through/output.yaml @@ -0,0 +1 @@ +resourceMetrics: [] \ No newline at end of file From e19542a1d44f82d7248ff29e93469e174a4f2ee1 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 23 Aug 2024 15:07:53 -0300 Subject: [PATCH 6/9] Reuse 'aggregateDataPoints' for summaries Signed-off-by: Arthur Silva Sens --- .../internal/metrics/metrics.go | 3 --- processor/intervalprocessor/processor.go | 22 +------------------ 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/processor/intervalprocessor/internal/metrics/metrics.go b/processor/intervalprocessor/internal/metrics/metrics.go index c3febf1a173a..f06a91a8bc06 100644 --- a/processor/intervalprocessor/internal/metrics/metrics.go +++ b/processor/intervalprocessor/internal/metrics/metrics.go @@ -5,7 +5,6 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con import ( "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" ) type DataPointSlice[DP DataPoint[DP]] interface { @@ -15,8 +14,6 @@ type DataPointSlice[DP DataPoint[DP]] interface { } type DataPoint[Self any] interface { - pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint - Timestamp() pcommon.Timestamp Attributes() pcommon.Map CopyTo(dest Self) diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index 241c829c4c61..fa49a04211d8 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -114,27 +114,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro } mClone, metricID := p.getOrCloneMetric(rm, sm, m) - - // The ideal scenario is that we would re-use `aggregateDataPoints()` here, but we can't - // because the SummaryDataPoint does not fulfill the DataPoint interface. - datapoints := m.Summary().DataPoints() - for i := 0; i < datapoints.Len(); i++ { - dp := datapoints.At(i) - - streamID := identity.OfStream(metricID, dp) - existingDP, ok := p.summaryLookup[streamID] - if !ok { - dpClone := mClone.Summary().DataPoints().AppendEmpty() - dp.CopyTo(dpClone) - p.summaryLookup[streamID] = dpClone - continue - } - - // Check if the datapoint is newer and replace it - if dp.Timestamp() > existingDP.Timestamp() { - dp.CopyTo(existingDP) - } - } + aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup) return true case pmetric.MetricTypeGauge: if p.gaugePassThrough { From 5671166c12ac4818687dabafb70084396439cdb6 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 23 Aug 2024 15:14:04 -0300 Subject: [PATCH 7/9] Remove empty lines Signed-off-by: Arthur Silva Sens --- .../testdata/gauges_are_aggregated/input.yaml | 3 +-- .../intervalprocessor/testdata/gauges_are_aggregated/next.yaml | 2 +- .../testdata/gauges_are_aggregated/output.yaml | 3 +-- .../testdata/gauges_passes_through/input.yaml | 3 +-- .../intervalprocessor/testdata/gauges_passes_through/next.yaml | 3 +-- .../testdata/gauges_passes_through/output.yaml | 3 +-- .../testdata/summaries_passes_through/input.yaml | 2 +- .../testdata/summaries_passes_through/next.yaml | 2 +- 8 files changed, 8 insertions(+), 13 deletions(-) diff --git a/processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml b/processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml index 727a3ec724ab..019dd6dd8511 100644 --- a/processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml +++ b/processor/intervalprocessor/testdata/gauges_are_aggregated/input.yaml @@ -37,5 +37,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb - \ No newline at end of file + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml b/processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml index 3949e7c54ded..d2e76ef0f16b 100644 --- a/processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml +++ b/processor/intervalprocessor/testdata/gauges_are_aggregated/next.yaml @@ -1 +1 @@ -resourceMetrics: [] +resourceMetrics: [] \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml b/processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml index 9ed7c9da208c..fe0b264bd1db 100644 --- a/processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml +++ b/processor/intervalprocessor/testdata/gauges_are_aggregated/output.yaml @@ -24,5 +24,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb - \ No newline at end of file + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_passes_through/input.yaml b/processor/intervalprocessor/testdata/gauges_passes_through/input.yaml index 727a3ec724ab..53e18776b063 100644 --- a/processor/intervalprocessor/testdata/gauges_passes_through/input.yaml +++ b/processor/intervalprocessor/testdata/gauges_passes_through/input.yaml @@ -37,5 +37,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb - \ No newline at end of file + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_passes_through/next.yaml b/processor/intervalprocessor/testdata/gauges_passes_through/next.yaml index a6c2b9cbe362..c1e8b3add92e 100644 --- a/processor/intervalprocessor/testdata/gauges_passes_through/next.yaml +++ b/processor/intervalprocessor/testdata/gauges_passes_through/next.yaml @@ -36,5 +36,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb - \ No newline at end of file + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/gauges_passes_through/output.yaml b/processor/intervalprocessor/testdata/gauges_passes_through/output.yaml index 0d5f877dc7b8..d2e76ef0f16b 100644 --- a/processor/intervalprocessor/testdata/gauges_passes_through/output.yaml +++ b/processor/intervalprocessor/testdata/gauges_passes_through/output.yaml @@ -1,2 +1 @@ -resourceMetrics: [] - \ No newline at end of file +resourceMetrics: [] \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/summaries_passes_through/input.yaml b/processor/intervalprocessor/testdata/summaries_passes_through/input.yaml index c0190dd5c614..e2cbc1ea9128 100644 --- a/processor/intervalprocessor/testdata/summaries_passes_through/input.yaml +++ b/processor/intervalprocessor/testdata/summaries_passes_through/input.yaml @@ -60,4 +60,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb + stringValue: bbb \ No newline at end of file diff --git a/processor/intervalprocessor/testdata/summaries_passes_through/next.yaml b/processor/intervalprocessor/testdata/summaries_passes_through/next.yaml index 15862ceb73e8..7d9cdfd5b6fd 100644 --- a/processor/intervalprocessor/testdata/summaries_passes_through/next.yaml +++ b/processor/intervalprocessor/testdata/summaries_passes_through/next.yaml @@ -59,4 +59,4 @@ resourceMetrics: attributes: - key: aaa value: - stringValue: bbb + stringValue: bbb \ No newline at end of file From 9254ffbbabde301f3da2172d0cd7007f6d38fda7 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 23 Aug 2024 15:22:17 -0300 Subject: [PATCH 8/9] Restore original dir names in test data Signed-off-by: Arthur Silva Sens --- processor/intervalprocessor/processor_test.go | 4 ++-- .../input.yaml | 1 - .../next.yaml | 0 .../output.yaml | 0 .../next.yaml => summaries_are_passed_through/input.yaml} | 0 .../input.yaml => summaries_are_passed_through/next.yaml} | 1 - .../output.yaml | 0 7 files changed, 2 insertions(+), 4 deletions(-) rename processor/intervalprocessor/testdata/{gauges_passes_through => gauges_are_passed_through}/input.yaml (91%) rename processor/intervalprocessor/testdata/{gauges_passes_through => gauges_are_passed_through}/next.yaml (100%) rename processor/intervalprocessor/testdata/{gauges_passes_through => gauges_are_passed_through}/output.yaml (100%) rename processor/intervalprocessor/testdata/{summaries_passes_through/next.yaml => summaries_are_passed_through/input.yaml} (100%) rename processor/intervalprocessor/testdata/{summaries_passes_through/input.yaml => summaries_are_passed_through/next.yaml} (97%) rename processor/intervalprocessor/testdata/{summaries_passes_through => summaries_are_passed_through}/output.yaml (100%) diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go index 131ecc53e48f..cda18e561b5d 100644 --- a/processor/intervalprocessor/processor_test.go +++ b/processor/intervalprocessor/processor_test.go @@ -32,8 +32,8 @@ func TestAggregation(t *testing.T) { {name: "summaries_are_aggregated"}, {name: "all_delta_metrics_are_passed_through"}, // Deltas are passed through even when aggregation is enabled {name: "non_monotonic_sums_are_passed_through"}, // Non-monotonic sums are passed through even when aggregation is enabled - {name: "gauges_passes_through", passThrough: true}, - {name: "summaries_passes_through", passThrough: true}, + {name: "gauges_are_passed_through", passThrough: true}, + {name: "summaries_are_passed_through", passThrough: true}, } ctx, cancel := context.WithCancel(context.Background()) diff --git a/processor/intervalprocessor/testdata/gauges_passes_through/input.yaml b/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml similarity index 91% rename from processor/intervalprocessor/testdata/gauges_passes_through/input.yaml rename to processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml index 53e18776b063..89b1879ee4d8 100644 --- a/processor/intervalprocessor/testdata/gauges_passes_through/input.yaml +++ b/processor/intervalprocessor/testdata/gauges_are_passed_through/input.yaml @@ -31,7 +31,6 @@ resourceMetrics: - key: aaa value: stringValue: bbb - # For interval processor point of view, only the last datapoint should be passed through. - timeUnixNano: 80 asDouble: 178 attributes: diff --git a/processor/intervalprocessor/testdata/gauges_passes_through/next.yaml b/processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml similarity index 100% rename from processor/intervalprocessor/testdata/gauges_passes_through/next.yaml rename to processor/intervalprocessor/testdata/gauges_are_passed_through/next.yaml diff --git a/processor/intervalprocessor/testdata/gauges_passes_through/output.yaml b/processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml similarity index 100% rename from processor/intervalprocessor/testdata/gauges_passes_through/output.yaml rename to processor/intervalprocessor/testdata/gauges_are_passed_through/output.yaml diff --git a/processor/intervalprocessor/testdata/summaries_passes_through/next.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml similarity index 100% rename from processor/intervalprocessor/testdata/summaries_passes_through/next.yaml rename to processor/intervalprocessor/testdata/summaries_are_passed_through/input.yaml diff --git a/processor/intervalprocessor/testdata/summaries_passes_through/input.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml similarity index 97% rename from processor/intervalprocessor/testdata/summaries_passes_through/input.yaml rename to processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml index e2cbc1ea9128..7d9cdfd5b6fd 100644 --- a/processor/intervalprocessor/testdata/summaries_passes_through/input.yaml +++ b/processor/intervalprocessor/testdata/summaries_are_passed_through/next.yaml @@ -46,7 +46,6 @@ resourceMetrics: - key: aaa value: stringValue: bbb - # Only last summary should pass through - timeUnixNano: 80 quantileValues: - quantile: 0.25 diff --git a/processor/intervalprocessor/testdata/summaries_passes_through/output.yaml b/processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml similarity index 100% rename from processor/intervalprocessor/testdata/summaries_passes_through/output.yaml rename to processor/intervalprocessor/testdata/summaries_are_passed_through/output.yaml From 88448a0e73cf791c881011db415562275e27d7f2 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 23 Aug 2024 15:23:44 -0300 Subject: [PATCH 9/9] Update README Signed-off-by: Arthur Silva Sens --- processor/intervalprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 41887c7f4951..857bcf6c7d58 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -27,7 +27,7 @@ The following metric types will *not* be aggregated, and will instead be passed, * All delta metrics * Non-monotonically increasing sums -While sending only the last for cumulative metrics doesn't represent data loss, it's arguable if we do the same thing for Gauges and Summaries. In a push-based model, one can tell its instrumentation library to push metrics in important situations and the `intervalprocessor` might discard the data point if it was not the last in a given interval. For this reason, `Gauge` and `Summary` metric types can be configured to be passed through as they are. Copying the same behavior as delta metrics. +> NOTE: Aggregating data over an interval is an inherently "lossy" process. For monotonically increasing, cumulative sums, histograms, and exponential histograms, you "lose" precision, but you don't lose overall data. But for non-monotonically increasing sums, gauges, and summaries, aggregation represents actual data loss. IE you could "lose" that a value increased and then decreased back to the original value. In most cases, this data "loss" is ok. However, if you would rather these values be passed through, and *not* aggregated, you can set that in the configuration ## Configuration