From 0f87c56bbd62ea283ebc91dc51aa2ba42568a3db Mon Sep 17 00:00:00 2001 From: Ruslan Kovalov Date: Fri, 6 Jan 2023 18:42:12 +0100 Subject: [PATCH] [pkg/translator/prw] Export _created times series if possible. Relevant to Summaries, Histograms and Monotonic Sum metrics. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17412 --- .chloggen/create_create-time-series-prw.yaml | 16 + .../exporter_test.go | 2 +- .../prometheusremotewrite/helper.go | 96 ++++- .../prometheusremotewrite/helper_test.go | 337 ++++++++++++++++++ 4 files changed, 434 insertions(+), 17 deletions(-) create mode 100644 .chloggen/create_create-time-series-prw.yaml diff --git a/.chloggen/create_create-time-series-prw.yaml b/.chloggen/create_create-time-series-prw.yaml new file mode 100644 index 000000000000..b4c2b13a1ae4 --- /dev/null +++ b/.chloggen/create_create-time-series-prw.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Export `_created` metric for Summary, Histogram and Monotonic Sum metric points if `StartTimeUnixNano` is set. + +# One or more tracking issues related to the change +issues: [17412, 12426] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 5dd8c56eb1b2..12dc58a3e042 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -460,7 +460,7 @@ func Test_PushMetrics(t *testing.T) { name: "intSum_case", metrics: intSumBatch, reqTestFunc: checkFunc, - expectedTimeSeries: 3, + expectedTimeSeries: 5, httpResponseCode: http.StatusAccepted, }, { diff --git a/pkg/translator/prometheusremotewrite/helper.go b/pkg/translator/prometheusremotewrite/helper.go index 9fe5c1790a6b..ba7b7917c984 100644 --- a/pkg/translator/prometheusremotewrite/helper.go +++ b/pkg/translator/prometheusremotewrite/helper.go @@ -37,13 +37,14 @@ import ( ) const ( - nameStr = "__name__" - sumStr = "_sum" - countStr = "_count" - bucketStr = "_bucket" - leStr = "le" - quantileStr = "quantile" - pInfStr = "+Inf" + nameStr = "__name__" + sumStr = "_sum" + countStr = "_count" + bucketStr = "_bucket" + leStr = "le" + quantileStr = "quantile" + pInfStr = "+Inf" + createdPrefix = "_created" // maxExemplarRunes is the maximum number of UTF-8 exemplar characters // according to the prometheus specification // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars @@ -277,12 +278,27 @@ func addSingleNumberDataPoint(pt pmetric.NumberDataPoint, resource pcommon.Resou sample.Value = math.Float64frombits(value.StaleNaN) } addSample(tsMap, sample, labels, metric.Type().String()) + + // add _created time series if needed + if metric.Type() == pmetric.MetricTypeSum && metric.Sum().IsMonotonic() { + startTimestamp := pt.StartTimestamp() + if startTimestamp != 0 { + createdLabels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nameStr, + name+createdPrefix, + ) + addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) + } + } } // addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It // ignore extra buckets if len(ExplicitBounds) > len(BucketCounts) func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) { - time := convertTimeStamp(pt.Timestamp()) + timestamp := convertTimeStamp(pt.Timestamp()) // sum, count, and buckets of the histogram should append suffix to baseName baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) @@ -292,7 +308,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon // treat sum as a sample in an individual TimeSeries sum := &prompb.Sample{ Value: pt.Sum(), - Timestamp: time, + Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { sum.Value = math.Float64frombits(value.StaleNaN) @@ -300,12 +316,13 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon sumlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+sumStr) addSample(tsMap, sum, sumlabels, metric.Type().String()) + } // treat count as a sample in an individual TimeSeries count := &prompb.Sample{ Value: float64(pt.Count()), - Timestamp: time, + Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { count.Value = math.Float64frombits(value.StaleNaN) @@ -327,7 +344,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon cumulativeCount += pt.BucketCounts().At(i) bucket := &prompb.Sample{ Value: float64(cumulativeCount), - Timestamp: time, + Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { bucket.Value = math.Float64frombits(value.StaleNaN) @@ -340,7 +357,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon } // add le=+Inf bucket infBucket := &prompb.Sample{ - Timestamp: time, + Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { infBucket.Value = math.Float64frombits(value.StaleNaN) @@ -352,6 +369,19 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)}) addExemplars(tsMap, promExemplars, bucketBounds) + + // add _created time series if needed + startTimestamp := pt.StartTimestamp() + if startTimestamp != 0 { + createdLabels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nameStr, + baseName+createdPrefix, + ) + addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) + } } func getPromExemplars(pt pmetric.HistogramDataPoint) []prompb.Exemplar { @@ -448,13 +478,13 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp { // addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples. func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) { - time := convertTimeStamp(pt.Timestamp()) + timestamp := convertTimeStamp(pt.Timestamp()) // sum and count of the summary should append suffix to baseName baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) // treat sum as a sample in an individual TimeSeries sum := &prompb.Sample{ Value: pt.Sum(), - Timestamp: time, + Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { sum.Value = math.Float64frombits(value.StaleNaN) @@ -465,7 +495,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res // treat count as a sample in an individual TimeSeries count := &prompb.Sample{ Value: float64(pt.Count()), - Timestamp: time, + Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { count.Value = math.Float64frombits(value.StaleNaN) @@ -478,7 +508,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res qt := pt.QuantileValues().At(i) quantile := &prompb.Sample{ Value: qt.Value(), - Timestamp: time, + Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { quantile.Value = math.Float64frombits(value.StaleNaN) @@ -487,6 +517,40 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res qtlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName, quantileStr, percentileStr) addSample(tsMap, quantile, qtlabels, metric.Type().String()) } + + // add _created time series if needed + startTimestamp := pt.StartTimestamp() + if startTimestamp != 0 { + createdLabels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nameStr, + baseName+createdPrefix, + ) + addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) + } +} + +// addCreatedTimeSeriesIfNeeded adds {name}_created time series with a single +// sample. If the series exists, then new samples won't be added. +func addCreatedTimeSeriesIfNeeded( + series map[string]*prompb.TimeSeries, + labels []prompb.Label, + startTimestamp pcommon.Timestamp, + metricType string, +) { + sig := timeSeriesSignature(metricType, &labels) + if _, ok := series[sig]; !ok { + series[sig] = &prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{ + { // convert ns to ms + Value: float64(convertTimeStamp(startTimestamp)), + }, + }, + } + } } // addResourceTargetInfo converts the resource to the target info metric diff --git a/pkg/translator/prometheusremotewrite/helper_test.go b/pkg/translator/prometheusremotewrite/helper_test.go index b932a6b420a5..b25b3bc377fc 100644 --- a/pkg/translator/prometheusremotewrite/helper_test.go +++ b/pkg/translator/prometheusremotewrite/helper_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" @@ -611,3 +612,339 @@ func TestMostRecentTimestampInMetric(t *testing.T) { }) } } + +func TestAddSingleNumberDataPoint(t *testing.T) { + ts := pcommon.Timestamp(time.Now().UnixNano()) + tests := []struct { + name string + metric func() pmetric.Metric + want func() map[string]*prompb.TimeSeries + }{ + { + name: "monotonic cumulative sum with start timestamp", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_sum") + metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + metric.SetEmptySum().SetIsMonotonic(true) + + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetDoubleValue(1) + dp.SetTimestamp(ts) + dp.SetStartTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_sum"}, + } + createdLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_sum" + createdPrefix}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 1, Timestamp: convertTimeStamp(ts)}, + }, + }, + timeSeriesSignature(pmetric.MetricTypeSum.String(), &createdLabels): { + Labels: createdLabels, + Samples: []prompb.Sample{ + {Value: float64(convertTimeStamp(ts))}, + }, + }, + } + }, + }, + { + name: "monotonic cumulative sum with no start time", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_sum") + metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + metric.SetEmptySum().SetIsMonotonic(true) + + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_sum"}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + } + }, + }, + { + name: "non-monotonic cumulative sum with start time", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_sum") + metric.SetEmptySum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + metric.SetEmptySum().SetIsMonotonic(false) + + dp := metric.Sum().DataPoints().AppendEmpty() + dp.SetTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_sum"}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSum.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := tt.metric() + + got := make(map[string]*prompb.TimeSeries) + for x := 0; x < metric.Sum().DataPoints().Len(); x++ { + addSingleNumberDataPoint( + metric.Sum().DataPoints().At(x), + pcommon.NewResource(), + metric, + Settings{}, + got, + ) + } + + assert.Equal(t, tt.want(), got) + }) + } +} + +func TestAddSingleSummaryDataPoint(t *testing.T) { + ts := pcommon.Timestamp(time.Now().UnixNano()) + tests := []struct { + name string + metric func() pmetric.Metric + want func() map[string]*prompb.TimeSeries + }{ + { + name: "summary with start time", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_summary") + metric.SetEmptySummary() + + dp := metric.Summary().DataPoints().AppendEmpty() + dp.SetTimestamp(ts) + dp.SetStartTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_summary" + countStr}, + } + createdLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_summary" + createdPrefix}, + } + sumLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_summary" + sumStr}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSummary.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + timeSeriesSignature(pmetric.MetricTypeSummary.String(), &sumLabels): { + Labels: sumLabels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + timeSeriesSignature(pmetric.MetricTypeSummary.String(), &createdLabels): { + Labels: createdLabels, + Samples: []prompb.Sample{ + {Value: float64(convertTimeStamp(ts))}, + }, + }, + } + }, + }, + { + name: "summary without start time", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_summary") + metric.SetEmptySummary() + + dp := metric.Summary().DataPoints().AppendEmpty() + dp.SetTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_summary" + countStr}, + } + sumLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_summary" + sumStr}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeSummary.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + timeSeriesSignature(pmetric.MetricTypeSummary.String(), &sumLabels): { + Labels: sumLabels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := tt.metric() + + got := make(map[string]*prompb.TimeSeries) + for x := 0; x < metric.Summary().DataPoints().Len(); x++ { + addSingleSummaryDataPoint( + metric.Summary().DataPoints().At(x), + pcommon.NewResource(), + metric, + Settings{}, + got, + ) + } + assert.Equal(t, tt.want(), got) + }) + } +} + +func TestAddSingleHistogramDataPoint(t *testing.T) { + ts := pcommon.Timestamp(time.Now().UnixNano()) + tests := []struct { + name string + metric func() pmetric.Metric + want func() map[string]*prompb.TimeSeries + }{ + { + name: "histogram with start time", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_hist") + metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + pt := metric.Histogram().DataPoints().AppendEmpty() + pt.SetTimestamp(ts) + pt.SetStartTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_hist" + countStr}, + } + createdLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_hist" + createdPrefix}, + } + infLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_hist_bucket"}, + {Name: model.BucketLabel, Value: "+Inf"}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeHistogram.String(), &infLabels): { + Labels: infLabels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + timeSeriesSignature(pmetric.MetricTypeHistogram.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + timeSeriesSignature(pmetric.MetricTypeHistogram.String(), &createdLabels): { + Labels: createdLabels, + Samples: []prompb.Sample{ + {Value: float64(convertTimeStamp(ts))}, + }, + }, + } + }, + }, + { + name: "histogram without start time", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_hist") + metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + pt := metric.Histogram().DataPoints().AppendEmpty() + pt.SetTimestamp(ts) + + return metric + }, + want: func() map[string]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_hist" + countStr}, + } + infLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_hist_bucket"}, + {Name: model.BucketLabel, Value: "+Inf"}, + } + return map[string]*prompb.TimeSeries{ + timeSeriesSignature(pmetric.MetricTypeHistogram.String(), &infLabels): { + Labels: infLabels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + timeSeriesSignature(pmetric.MetricTypeHistogram.String(), &labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(ts)}, + }, + }, + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := tt.metric() + + got := make(map[string]*prompb.TimeSeries) + for x := 0; x < metric.Histogram().DataPoints().Len(); x++ { + addSingleHistogramDataPoint( + metric.Histogram().DataPoints().At(x), + pcommon.NewResource(), + metric, + Settings{}, + got, + ) + } + assert.Equal(t, tt.want(), got) + }) + } +}