From 247a9f996e09a83cdc25addf70c05e42b8b30186 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 12 Apr 2024 13:22:48 +0200 Subject: [PATCH] [exporter/prometheusremotewrite] Hash labels using xxhash for performance (#31385) **Description:** Improve performance in pkg/translator/prometheusremotewrite by using the same [xxhash](https://github.com/cespare/xxhash) based time series signature algorithm as Prometheus itself ([`labels.StableHash()`](https://github.com/prometheus/prometheus/blob/c6c8f63516741fd38ece5387254afee798cfc8d7/model/labels/sharding.go#L24), which is guaranteed to not change over time). I became aware of time series signature calculation being a bit of a bottleneck when profiling Grafana Mimir's OTLP endpoint. This change involves moving from a string hash, to a `uint64` one. My thesis is that if Prometheus uses this algorithm to identify time series/label sets, it should be suitable for this translation logic too. #### Hash collisions I've attempted to handle hash collisions in the same way as Prometheus: `PrometheusConverter` has a `unique` field, which is its main `map` from hash to `TimeSeries`, as well as a `conflicts` field, being its secondary `map` from hash to a slice of `TimeSeries` in case of hash collisions. If a label set should hash to an existing entry in `unique`, but not be equal to the existing entry's label set, the label set is attempted matched to the `conflicts` `map` instead. If its equal is not found among the conflicts for the hash in question either, it's added to the conflicts slice (for the hash). **Testing:** I've run `make test`/`make lint` and run the `BenchmarkFromMetrics` benchmark. Benchmark stats included below, they show an average speedup of 3.68% and an average memory reduction of 17.13%. **NB:** The benchmark stats reveal performance regressions in a few cases, because of using the `prometheusConverter` API _via_ the `FromMetrics` function. --------- Signed-off-by: Arve Knudsen Co-authored-by: Anthony Mirabella --- .chloggen/arve_xxhash.yaml | 27 + pkg/translator/prometheusremotewrite/go.mod | 1 + pkg/translator/prometheusremotewrite/go.sum | 2 + .../prometheusremotewrite/helper.go | 450 ++++++++------- .../prometheusremotewrite/helper_test.go | 517 ++++++++++-------- .../prometheusremotewrite/histograms.go | 53 +- .../prometheusremotewrite/histograms_test.go | 40 +- .../prometheusremotewrite/metrics_to_prw.go | 145 ++++- .../metrics_to_prw_test.go | 43 +- .../number_data_points.go | 152 +++-- .../number_data_points_test.go | 89 ++- .../prometheusremotewrite/testutils_test.go | 62 ++- 12 files changed, 887 insertions(+), 694 deletions(-) create mode 100755 .chloggen/arve_xxhash.yaml diff --git a/.chloggen/arve_xxhash.yaml b/.chloggen/arve_xxhash.yaml new file mode 100755 index 000000000000..86890054293d --- /dev/null +++ b/.chloggen/arve_xxhash.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Optimize the prometheusremotewrite.FromMetrics function, based around more performant metric identifier hashing. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31385] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/translator/prometheusremotewrite/go.mod b/pkg/translator/prometheusremotewrite/go.mod index c0f934206d42..58fc56ca4695 100644 --- a/pkg/translator/prometheusremotewrite/go.mod +++ b/pkg/translator/prometheusremotewrite/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/ go 1.21 require ( + github.com/cespare/xxhash/v2 v2.3.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.98.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.98.0 github.com/prometheus/common v0.52.2 diff --git a/pkg/translator/prometheusremotewrite/go.sum b/pkg/translator/prometheusremotewrite/go.sum index c8954c3d7da0..5f11c06547d1 100644 --- a/pkg/translator/prometheusremotewrite/go.sum +++ b/pkg/translator/prometheusremotewrite/go.sum @@ -1,3 +1,5 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= diff --git a/pkg/translator/prometheusremotewrite/helper.go b/pkg/translator/prometheusremotewrite/helper.go index 98761617049b..3c44ab467c03 100644 --- a/pkg/translator/prometheusremotewrite/helper.go +++ b/pkg/translator/prometheusremotewrite/helper.go @@ -8,12 +8,13 @@ import ( "fmt" "log" "math" + "slices" "sort" "strconv" - "strings" "time" "unicode/utf8" + "github.com/cespare/xxhash/v2" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" @@ -46,7 +47,7 @@ const ( ) type bucketBoundsData struct { - sig string + ts *prompb.TimeSeries bound float64 } @@ -64,94 +65,47 @@ func (a ByLabelName) Len() int { return len(a) } func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name } func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it -// creates a new TimeSeries in the map if not found and returns the time series signature. -// tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil. -func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []prompb.Label, - datatype string) string { - if sample == nil || labels == nil || tsMap == nil { - // This shouldn't happen - return "" - } +// timeSeriesSignature returns a hashed label set signature. +// The label slice should not contain duplicate label names; this method sorts the slice by label name before creating +// the signature. +// The algorithm is the same as in Prometheus' labels.StableHash function. +func timeSeriesSignature(labels []prompb.Label) uint64 { + sort.Sort(ByLabelName(labels)) - sig := timeSeriesSignature(datatype, labels) - ts := tsMap[sig] - if ts != nil { - ts.Samples = append(ts.Samples, *sample) - } else { - newTs := &prompb.TimeSeries{ - Labels: labels, - Samples: []prompb.Sample{*sample}, + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + for i, v := range labels { + if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB+ do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, v := range labels[i:] { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(seps) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(seps) + } + return h.Sum64() } - tsMap[sig] = newTs - } - - return sig -} - -// addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig; -// we only add exemplars if samples are presents -// tsMap is unmodified if either of its parameters is nil and samples are nil. -func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []prompb.Exemplar, bucketBoundsData []bucketBoundsData) { - if len(tsMap) == 0 || len(bucketBoundsData) == 0 || len(exemplars) == 0 { - return - } - - sort.Sort(byBucketBoundsData(bucketBoundsData)) - for _, exemplar := range exemplars { - addExemplar(tsMap, bucketBoundsData, exemplar) + b = append(b, v.Name...) + b = append(b, seps[0]) + b = append(b, v.Value...) + b = append(b, seps[0]) } + return xxhash.Sum64(b) } -func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBoundsData, exemplar prompb.Exemplar) { - for _, bucketBound := range bucketBounds { - sig := bucketBound.sig - bound := bucketBound.bound - - ts := tsMap[sig] - if ts != nil && len(ts.Samples) > 0 && exemplar.Value <= bound { - ts.Exemplars = append(ts.Exemplars, exemplar) - return - } - } -} - -// timeSeries return a string signature in the form of: -// -// TYPE-label1-value1- ... -labelN-valueN -// -// the label slice should not contain duplicate label names; this method sorts the slice by label name before creating -// the signature. -func timeSeriesSignature(datatype string, labels []prompb.Label) string { - length := len(datatype) - - for _, lb := range labels { - length += 2 + len(lb.GetName()) + len(lb.GetValue()) - } - - b := strings.Builder{} - b.Grow(length) - b.WriteString(datatype) - - sort.Sort(ByLabelName(labels)) - - for _, lb := range labels { - b.WriteString("-") - b.WriteString(lb.GetName()) - b.WriteString("-") - b.WriteString(lb.GetValue()) - } - - return b.String() -} +var seps = []byte{'\xff'} // createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. -// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen, and overwrites are -// logged. Resulting label names are sanitized. -func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []prompb.Label { - serviceName, haveServiceName := resource.Attributes().Get(conventions.AttributeServiceName) - instance, haveInstanceID := resource.Attributes().Get(conventions.AttributeServiceInstanceID) +// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and +// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. +func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, + ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label { + resourceAttrs := resource.Attributes() + serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) + instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) // Calculate the maximum possible number of labels we could return so we can preallocate l maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 @@ -169,9 +123,13 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa // Ensure attributes are sorted by key for consistent merging of keys which // collide when sanitized. - labels := make([]prompb.Label, 0, attributes.Len()) + labels := make([]prompb.Label, 0, maxLabelCount) + // XXX: Should we always drop service namespace/service name/service instance ID from the labels + // (as they get mapped to other Prometheus labels)? attributes.Range(func(key string, value pcommon.Value) bool { - labels = append(labels, prompb.Label{Name: key, Value: value.AsString()}) + if !slices.Contains(ignoreAttrs, key) { + labels = append(labels, prompb.Label{Name: key, Value: value.AsString()}) + } return true }) sort.Stable(ByLabelName(labels)) @@ -188,7 +146,7 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa // Map service.name + service.namespace to job if haveServiceName { val := serviceName.AsString() - if serviceNamespace, ok := resource.Attributes().Get(conventions.AttributeServiceNamespace); ok { + if serviceNamespace, ok := resourceAttrs.Get(conventions.AttributeServiceNamespace); ok { val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val) } l[model.JobLabel] = val @@ -211,7 +169,7 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa break } _, found := l[extras[i]] - if found { + if found && logOnOverwrite { log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.") } // internal labels should be maintained @@ -222,12 +180,12 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa l[name] = extras[i+1] } - s := make([]prompb.Label, 0, len(l)) + labels = labels[:0] for k, v := range l { - s = append(s, prompb.Label{Name: k, Value: v}) + labels = append(labels, prompb.Label{Name: k, Value: v}) } - return s + return labels } // isValidAggregationTemporality checks whether an OTel metric has a valid @@ -247,85 +205,84 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { return false } -// addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It -// ignore extra buckets if len(ExplicitBounds) > len(BucketCounts) -func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries, baseName string) { - timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) +func (c *prometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice, + resource pcommon.Resource, settings Settings, baseName string) { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + timestamp := convertTimeStamp(pt.Timestamp()) + baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) + + // If the sum is unset, it indicates the _sum metric point should be + // omitted + if pt.HasSum() { + // treat sum as a sample in an individual TimeSeries + sum := &prompb.Sample{ + Value: pt.Sum(), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + sum.Value = math.Float64frombits(value.StaleNaN) + } - // If the sum is unset, it indicates the _sum metric point should be - // omitted - if pt.HasSum() { - // treat sum as a sample in an individual TimeSeries - sum := &prompb.Sample{ - Value: pt.Sum(), + sumlabels := createLabels(baseName+sumStr, baseLabels) + c.addSample(sum, sumlabels) + + } + + // treat count as a sample in an individual TimeSeries + count := &prompb.Sample{ + Value: float64(pt.Count()), Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { - sum.Value = math.Float64frombits(value.StaleNaN) + count.Value = math.Float64frombits(value.StaleNaN) } - sumlabels := createLabels(baseName+sumStr, baseLabels) - addSample(tsMap, sum, sumlabels, metric.Type().String()) + countlabels := createLabels(baseName+countStr, baseLabels) + c.addSample(count, countlabels) - } + // cumulative count for conversion to cumulative histogram + var cumulativeCount uint64 - // treat count as a sample in an individual TimeSeries - count := &prompb.Sample{ - Value: float64(pt.Count()), - Timestamp: timestamp, - } - if pt.Flags().NoRecordedValue() { - count.Value = math.Float64frombits(value.StaleNaN) - } - - countlabels := createLabels(baseName+countStr, baseLabels) - addSample(tsMap, count, countlabels, metric.Type().String()) - - // cumulative count for conversion to cumulative histogram - var cumulativeCount uint64 + var bucketBounds []bucketBoundsData - promExemplars := getPromExemplars[pmetric.HistogramDataPoint](pt) - - var bucketBounds []bucketBoundsData + // process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1 + for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ { + bound := pt.ExplicitBounds().At(i) + cumulativeCount += pt.BucketCounts().At(i) + bucket := &prompb.Sample{ + Value: float64(cumulativeCount), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + bucket.Value = math.Float64frombits(value.StaleNaN) + } + boundStr := strconv.FormatFloat(bound, 'f', -1, 64) + labels := createLabels(baseName+bucketStr, baseLabels, leStr, boundStr) + ts := c.addSample(bucket, labels) - // process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1 - for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ { - bound := pt.ExplicitBounds().At(i) - cumulativeCount += pt.BucketCounts().At(i) - bucket := &prompb.Sample{ - Value: float64(cumulativeCount), + bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: bound}) + } + // add le=+Inf bucket + infBucket := &prompb.Sample{ Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { - bucket.Value = math.Float64frombits(value.StaleNaN) + infBucket.Value = math.Float64frombits(value.StaleNaN) + } else { + infBucket.Value = float64(pt.Count()) } - boundStr := strconv.FormatFloat(bound, 'f', -1, 64) - labels := createLabels(baseName+bucketStr, baseLabels, leStr, boundStr) - sig := addSample(tsMap, bucket, labels, metric.Type().String()) - - bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound}) - } - // add le=+Inf bucket - infBucket := &prompb.Sample{ - Timestamp: timestamp, - } - if pt.Flags().NoRecordedValue() { - infBucket.Value = math.Float64frombits(value.StaleNaN) - } else { - infBucket.Value = float64(pt.Count()) - } - infLabels := createLabels(baseName+bucketStr, baseLabels, leStr, pInfStr) - sig := addSample(tsMap, infBucket, infLabels, metric.Type().String()) + infLabels := createLabels(baseName+bucketStr, baseLabels, leStr, pInfStr) + ts := c.addSample(infBucket, infLabels) - bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)}) - addExemplars(tsMap, promExemplars, bucketBounds) + bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: math.Inf(1)}) + c.addExemplars(pt, bucketBounds) - // add _created time series if needed - startTimestamp := pt.StartTimestamp() - if settings.ExportCreatedMetric && startTimestamp != 0 { - labels := createLabels(baseName+createdSuffix, baseLabels) - addCreatedTimeSeriesIfNeeded(tsMap, labels, startTimestamp, pt.Timestamp(), metric.Type().String()) + startTimestamp := pt.StartTimestamp() + if settings.ExportCreatedMetric && startTimestamp != 0 { + labels := createLabels(baseName+createdSuffix, baseLabels) + c.addTimeSeriesIfNeeded(labels, startTimestamp, pt.Timestamp()) + } } } @@ -431,55 +388,56 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp { return b } -// addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples. -func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, - tsMap map[string]*prompb.TimeSeries, baseName string) { - timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) +func (c *prometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, + settings Settings, baseName string) { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + timestamp := convertTimeStamp(pt.Timestamp()) + baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) - // treat sum as a sample in an individual TimeSeries - sum := &prompb.Sample{ - Value: pt.Sum(), - Timestamp: timestamp, - } - if pt.Flags().NoRecordedValue() { - sum.Value = math.Float64frombits(value.StaleNaN) - } - // sum and count of the summary should append suffix to baseName - sumlabels := createLabels(baseName+sumStr, baseLabels) - addSample(tsMap, sum, sumlabels, metric.Type().String()) - - // treat count as a sample in an individual TimeSeries - count := &prompb.Sample{ - Value: float64(pt.Count()), - Timestamp: timestamp, - } - if pt.Flags().NoRecordedValue() { - count.Value = math.Float64frombits(value.StaleNaN) - } - countlabels := createLabels(baseName+countStr, baseLabels) - addSample(tsMap, count, countlabels, metric.Type().String()) - - // process each percentile/quantile - for i := 0; i < pt.QuantileValues().Len(); i++ { - qt := pt.QuantileValues().At(i) - quantile := &prompb.Sample{ - Value: qt.Value(), + // treat sum as a sample in an individual TimeSeries + sum := &prompb.Sample{ + Value: pt.Sum(), Timestamp: timestamp, } if pt.Flags().NoRecordedValue() { - quantile.Value = math.Float64frombits(value.StaleNaN) + sum.Value = math.Float64frombits(value.StaleNaN) + } + // sum and count of the summary should append suffix to baseName + sumlabels := createLabels(baseName+sumStr, baseLabels) + c.addSample(sum, sumlabels) + + // treat count as a sample in an individual TimeSeries + count := &prompb.Sample{ + Value: float64(pt.Count()), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + count.Value = math.Float64frombits(value.StaleNaN) + } + countlabels := createLabels(baseName+countStr, baseLabels) + c.addSample(count, countlabels) + + // process each percentile/quantile + for i := 0; i < pt.QuantileValues().Len(); i++ { + qt := pt.QuantileValues().At(i) + quantile := &prompb.Sample{ + Value: qt.Value(), + Timestamp: timestamp, + } + if pt.Flags().NoRecordedValue() { + quantile.Value = math.Float64frombits(value.StaleNaN) + } + percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) + qtlabels := createLabels(baseName, baseLabels, quantileStr, percentileStr) + c.addSample(quantile, qtlabels) } - percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) - qtlabels := createLabels(baseName, baseLabels, quantileStr, percentileStr) - addSample(tsMap, quantile, qtlabels, metric.Type().String()) - } - // add _created time series if needed - startTimestamp := pt.StartTimestamp() - if settings.ExportCreatedMetric && startTimestamp != 0 { - createdLabels := createLabels(baseName+createdSuffix, baseLabels) - addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()) + startTimestamp := pt.StartTimestamp() + if settings.ExportCreatedMetric && startTimestamp != 0 { + createdLabels := createLabels(baseName+createdSuffix, baseLabels) + c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) + } } } @@ -501,63 +459,93 @@ func createLabels(name string, baseLabels []prompb.Label, extras ...string) []pr return labels } -// addCreatedTimeSeriesIfNeeded adds {name}_created time series with a single -// sample. If the series exists, then new samples won't be added. -func addCreatedTimeSeriesIfNeeded( - series map[string]*prompb.TimeSeries, - labels []prompb.Label, - startTimestamp pcommon.Timestamp, - timestamp pcommon.Timestamp, - metricType string, -) { - sig := timeSeriesSignature(metricType, labels) - if _, ok := series[sig]; !ok { - series[sig] = &prompb.TimeSeries{ - Labels: labels, - Samples: []prompb.Sample{ - { // convert ns to ms - Value: float64(convertTimeStamp(startTimestamp)), - Timestamp: convertTimeStamp(timestamp), - }, +// getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false. +// Otherwise it creates a new one and returns that, and true. +func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) { + h := timeSeriesSignature(lbls) + ts := c.unique[h] + if ts != nil { + if isSameMetric(ts, lbls) { + // We already have this metric + return ts, false + } + + // Look for a matching conflict + for _, cTS := range c.conflicts[h] { + if isSameMetric(cTS, lbls) { + // We already have this metric + return cTS, false + } + } + + // New conflict + ts = &prompb.TimeSeries{ + Labels: lbls, + } + c.conflicts[h] = append(c.conflicts[h], ts) + return ts, true + } + + // This metric is new + ts = &prompb.TimeSeries{ + Labels: lbls, + } + c.unique[h] = ts + return ts, true +} + +// addTimeSeriesIfNeeded adds a corresponding time series if it doesn't already exist. +// If the time series doesn't already exist, it gets added with startTimestamp for its value and timestamp for its timestamp, +// both converted to milliseconds. +func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) { + ts, created := c.getOrCreateTimeSeries(lbls) + if created { + ts.Samples = []prompb.Sample{ + { + // convert ns to ms + Value: float64(convertTimeStamp(startTimestamp)), + Timestamp: convertTimeStamp(timestamp), }, } } } -// addResourceTargetInfo converts the resource to the target info metric -func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, tsMap map[string]*prompb.TimeSeries) { +// addResourceTargetInfo converts the resource to the target info metric. +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *prometheusConverter) { if settings.DisableTargetInfo { return } - // Use resource attributes (other than those used for job+instance) as the - // metric labels for the target info metric - attributes := pcommon.NewMap() - resource.Attributes().CopyTo(attributes) - attributes.RemoveIf(func(k string, _ pcommon.Value) bool { - switch k { - case conventions.AttributeServiceName, conventions.AttributeServiceNamespace, conventions.AttributeServiceInstanceID: - // Remove resource attributes used for job + instance - return true - default: - return false + + attributes := resource.Attributes() + identifyingAttrs := []string{ + conventions.AttributeServiceNamespace, + conventions.AttributeServiceName, + conventions.AttributeServiceInstanceID, + } + nonIdentifyingAttrsCount := attributes.Len() + for _, a := range identifyingAttrs { + _, haveAttr := attributes.Get(a) + if haveAttr { + nonIdentifyingAttrsCount-- } - }) - if attributes.Len() == 0 { + } + if nonIdentifyingAttrsCount == 0 { // If we only have job + instance, then target_info isn't useful, so don't add it. return } - // create parameters for addSample + name := targetMetricName if len(settings.Namespace) > 0 { name = settings.Namespace + "_" + name } - labels := createAttributes(resource, attributes, settings.ExternalLabels, model.MetricNameLabel, name) + + labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) sample := &prompb.Sample{ Value: float64(1), // convert ns to ms Timestamp: convertTimeStamp(timestamp), } - addSample(tsMap, sample, labels, infoType) + converter.addSample(sample, labels) } // convertTimeStamp converts OTLP timestamp in ns to timestamp in ms diff --git a/pkg/translator/prometheusremotewrite/helper_test.go b/pkg/translator/prometheusremotewrite/helper_test.go index e55841911df7..7f54e5eedd9a 100644 --- a/pkg/translator/prometheusremotewrite/helper_test.go +++ b/pkg/translator/prometheusremotewrite/helper_test.go @@ -4,14 +4,18 @@ package prometheusremotewrite import ( + "fmt" "math" + "sort" "testing" "time" + "github.com/cespare/xxhash/v2" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" @@ -102,110 +106,135 @@ func Test_isValidAggregationTemporality(t *testing.T) { } } -// Test_addSample checks addSample updates the map it receives correctly based on the sample and Label -// set it receives. -// Test cases are two samples belonging to the same TimeSeries, two samples belong to different TimeSeries, and nil -// case. -func Test_addSample(t *testing.T) { +// TestPrometheusConverter_addSample verifies that prometheusConverter.addSample adds the sample to the correct time series. +func TestPrometheusConverter_addSample(t *testing.T) { type testCase struct { metric pmetric.Metric sample prompb.Sample labels []prompb.Label } + t.Run("empty_case", func(t *testing.T) { + converter := newPrometheusConverter() + converter.addSample(nil, nil) + assert.Empty(t, converter.unique) + assert.Empty(t, converter.conflicts) + }) + tests := []struct { name string - orig map[string]*prompb.TimeSeries testCase []testCase - want map[string]*prompb.TimeSeries + want map[uint64]*prompb.TimeSeries }{ { - "two_points_same_ts_same_metric", - map[string]*prompb.TimeSeries{}, - []testCase{ - {validMetrics1[validDoubleGauge], - getSample(floatVal1, msTime1), - promLbs1, + name: "two_points_same_ts_same_metric", + testCase: []testCase{ + { + metric: validMetrics1[validDoubleGauge], + sample: getSample(floatVal1, msTime1), + labels: promLbs1, }, { - validMetrics1[validDoubleGauge], - getSample(floatVal2, msTime2), - promLbs1, + metric: validMetrics1[validDoubleGauge], + sample: getSample(floatVal2, msTime2), + labels: promLbs1, }, }, - twoPointsSameTs, + want: twoPointsSameTs(), }, { - "two_points_different_ts_same_metric", - map[string]*prompb.TimeSeries{}, - []testCase{ - {validMetrics1[validIntGauge], - getSample(float64(intVal1), msTime1), - promLbs1, + name: "two_points_different_ts_same_metric", + testCase: []testCase{ + { + sample: getSample(float64(intVal1), msTime1), + labels: promLbs1, }, - {validMetrics1[validIntGauge], - getSample(float64(intVal1), msTime2), - promLbs2, + { + sample: getSample(float64(intVal1), msTime2), + labels: promLbs2, }, }, - twoPointsDifferentTs, + want: twoPointsDifferentTs(), }, } - t.Run("empty_case", func(t *testing.T) { - tsMap := map[string]*prompb.TimeSeries{} - addSample(tsMap, nil, nil, "") - assert.Exactly(t, tsMap, map[string]*prompb.TimeSeries{}) - }) - // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - addSample(tt.orig, &tt.testCase[0].sample, tt.testCase[0].labels, tt.testCase[0].metric.Type().String()) - addSample(tt.orig, &tt.testCase[1].sample, tt.testCase[1].labels, tt.testCase[1].metric.Type().String()) - assert.Exactly(t, tt.want, tt.orig) + converter := newPrometheusConverter() + converter.addSample(&tt.testCase[0].sample, tt.testCase[0].labels) + converter.addSample(&tt.testCase[1].sample, tt.testCase[1].labels) + assert.Exactly(t, tt.want, converter.unique) + assert.Empty(t, converter.conflicts) }) } } -// Test_timeSeries checks timeSeriesSignature returns consistent and unique signatures for a distinct label set and -// metric type combination. +// Test_timeSeriesSignature checks that timeSeriesSignature returns consistent and unique signatures for a distinct label set. func Test_timeSeriesSignature(t *testing.T) { + var oneKBLabels []prompb.Label + for i := 0; i < 100; i++ { + const name = "12345" + const value = "12345" + oneKBLabels = append(oneKBLabels, prompb.Label{Name: name, Value: value}) + } + tests := []struct { name string lbs []prompb.Label metric pmetric.Metric - want string }{ { "int64_signature", promLbs1, validMetrics1[validIntGauge], - validMetrics1[validIntGauge].Type().String() + lb1Sig, }, { "histogram_signature", promLbs2, validMetrics1[validHistogram], - validMetrics1[validHistogram].Type().String() + lb2Sig, }, { "unordered_signature", getPromLabels(label22, value22, label21, value21), validMetrics1[validHistogram], - validMetrics1[validHistogram].Type().String() + lb2Sig, }, // descriptor type cannot be nil, as checked by validateAggregationTemporality { "nil_case", nil, validMetrics1[validHistogram], - validMetrics1[validHistogram].Type().String(), }, + { + // Case that triggers optimized logic when exceeding 1 kb + "greater_than_1kb_signature", + oneKBLabels, + validMetrics1[validIntGauge], + }, + } + + calcSig := func(labels []prompb.Label) uint64 { + sort.Sort(ByLabelName(labels)) + + h := xxhash.New() + for _, l := range labels { + _, err := h.WriteString(l.Name) + require.NoError(t, err) + _, err = h.Write(seps) + require.NoError(t, err) + _, err = h.WriteString(l.Value) + require.NoError(t, err) + _, err = h.Write(seps) + require.NoError(t, err) + } + + return h.Sum64() } // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.EqualValues(t, tt.want, timeSeriesSignature(tt.metric.Type().String(), tt.lbs)) + exp := calcSig(tt.lbs) + sig := timeSeriesSignature(tt.lbs) + assert.EqualValues(t, exp, sig) }) } } @@ -331,7 +360,7 @@ func Test_createLabelSet(t *testing.T) { // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.ElementsMatch(t, tt.want, createAttributes(tt.resource, tt.orig, tt.externalLabels, tt.extras...)) + assert.ElementsMatch(t, tt.want, createAttributes(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...)) }) } } @@ -349,79 +378,72 @@ func BenchmarkCreateAttributes(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - createAttributes(r, m, ext) + createAttributes(r, m, ext, nil, true) } } -// Test_addExemplars checks addExemplars updates the map it receives correctly based on the exemplars and bucket bounds data it receives. -func Test_addExemplars(t *testing.T) { - type testCase struct { - exemplars []prompb.Exemplar - bucketBounds []bucketBoundsData - } - +// TestPrometheusConverter_addExemplars verifies that prometheusConverter.addExemplars adds exemplars correctly given bucket bounds data. +func TestPrometheusConverter_addExemplars(t *testing.T) { + ts1 := getTimeSeries( + getPromLabels(label11, value11, label12, value12), + getSample(float64(intVal1), msTime1), + ) + ts2 := getTimeSeries( + getPromLabels(label11, value11, label12, value12), + getSample(float64(intVal1), msTime1), + ) + tsMap1 := tsWithoutSampleAndExemplar() tests := []struct { - name string - orig map[string]*prompb.TimeSeries - testCase []testCase - want map[string]*prompb.TimeSeries + name string + orig map[uint64]*prompb.TimeSeries + dataPoint pmetric.HistogramDataPoint + bucketBounds []bucketBoundsData + want map[uint64]*prompb.TimeSeries }{ { - "timeSeries_is_empty", - map[string]*prompb.TimeSeries{}, - []testCase{ - { - []prompb.Exemplar{getExemplar(float64(intVal1), msTime1)}, - getBucketBoundsData([]float64{1, 2, 3}), - }, - }, - map[string]*prompb.TimeSeries{}, + name: "timeSeries_is_empty", + orig: map[uint64]*prompb.TimeSeries{}, + dataPoint: getHistogramDataPointWithExemplars(t, time.UnixMilli(msTime1), float64(intVal1), traceIDValue1, "", "", ""), + bucketBounds: getBucketBoundsData( + []float64{1, 2, 3}, + getTimeSeries(getPromLabels(label11, value11, label12, value12), getSample(float64(intVal1), msTime1)), + ), + want: map[uint64]*prompb.TimeSeries{}, }, { - "timeSeries_without_sample", - tsWithoutSampleAndExemplar, - []testCase{ - { - []prompb.Exemplar{getExemplar(float64(intVal1), msTime1)}, - getBucketBoundsData([]float64{1, 2, 3}), - }, - }, - tsWithoutSampleAndExemplar, + name: "timeSeries_without_sample", + orig: tsMap1, + dataPoint: getHistogramDataPointWithExemplars(t, time.UnixMilli(msTime1), float64(intVal1), traceIDValue1, "", "", ""), + bucketBounds: getBucketBoundsData([]float64{1, 2, 3}, tsMap1[lb1Sig]), + want: tsWithoutSampleAndExemplar(), }, { - "exemplar_value_less_than_bucket_bound", - map[string]*prompb.TimeSeries{ - lb1Sig: getTimeSeries(getPromLabels(label11, value11, label12, value12), - getSample(float64(intVal1), msTime1)), + name: "exemplar_value_less_than_bucket_bound", + orig: map[uint64]*prompb.TimeSeries{ + lb1Sig: ts1, }, - []testCase{ - { - []prompb.Exemplar{getExemplar(floatVal2, msTime1)}, - getBucketBoundsData([]float64{1, 2, 3}), - }, - }, - tsWithSamplesAndExemplars, + dataPoint: getHistogramDataPointWithExemplars(t, time.UnixMilli(msTime1), floatVal2, traceIDValue1, "", "", ""), + bucketBounds: getBucketBoundsData([]float64{1, 2, 3}, ts1), + want: tsWithSamplesAndExemplars(), }, { - "infinite_bucket_bound", - map[string]*prompb.TimeSeries{ - lb1Sig: getTimeSeries(getPromLabels(label11, value11, label12, value12), - getSample(float64(intVal1), msTime1)), - }, - []testCase{ - { - []prompb.Exemplar{getExemplar(math.MaxFloat64, msTime1)}, - getBucketBoundsData([]float64{1, math.Inf(1)}), - }, + name: "infinite_bucket_bound", + orig: map[uint64]*prompb.TimeSeries{ + lb1Sig: ts2, }, - tsWithInfiniteBoundExemplarValue, + dataPoint: getHistogramDataPointWithExemplars(t, time.UnixMilli(msTime1), math.MaxFloat64, traceIDValue1, "", "", ""), + bucketBounds: getBucketBoundsData([]float64{1, math.Inf(1)}, ts2), + want: tsWithInfiniteBoundExemplarValue(), }, } // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - addExemplars(tt.orig, tt.testCase[0].exemplars, tt.testCase[0].bucketBounds) - assert.Exactly(t, tt.want, tt.orig) + converter := &prometheusConverter{ + unique: tt.orig, + } + converter.addExemplars(tt.dataPoint, tt.bucketBounds) + assert.Exactly(t, tt.want, converter.unique) }) } } @@ -515,45 +537,33 @@ func TestAddResourceTargetInfo(t *testing.T) { resourceWithOnlyServiceAttrs := pcommon.NewResource() assert.NoError(t, resourceWithOnlyServiceAttrs.Attributes().FromRaw(resourceAttrMap)) for _, tc := range []struct { - desc string - resource pcommon.Resource - settings Settings - timestamp pcommon.Timestamp - expected map[string]*prompb.TimeSeries + desc string + resource pcommon.Resource + settings Settings + timestamp pcommon.Timestamp + wantLabels []prompb.Label }{ { desc: "empty resource", resource: pcommon.NewResource(), - expected: map[string]*prompb.TimeSeries{}, }, { desc: "disable target info metric", resource: testdata.GenerateMetricsNoLibraries().ResourceMetrics().At(0).Resource(), settings: Settings{DisableTargetInfo: true}, - expected: map[string]*prompb.TimeSeries{}, }, { desc: "with resource", resource: testdata.GenerateMetricsNoLibraries().ResourceMetrics().At(0).Resource(), timestamp: testdata.TestMetricStartTimestamp, - expected: map[string]*prompb.TimeSeries{ - "info-__name__-target_info-resource_attr-resource-attr-val-1": { - Labels: []prompb.Label{ - { - Name: "__name__", - Value: "target_info", - }, - { - Name: "resource_attr", - Value: "resource-attr-val-1", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1581452772000, - }, - }, + wantLabels: []prompb.Label{ + { + Name: model.MetricNameLabel, + Value: targetMetricName, + }, + { + Name: "resource_attr", + Value: "resource-attr-val-1", }, }, }, @@ -562,24 +572,14 @@ func TestAddResourceTargetInfo(t *testing.T) { resource: testdata.GenerateMetricsNoLibraries().ResourceMetrics().At(0).Resource(), timestamp: testdata.TestMetricStartTimestamp, settings: Settings{Namespace: "foo"}, - expected: map[string]*prompb.TimeSeries{ - "info-__name__-foo_target_info-resource_attr-resource-attr-val-1": { - Labels: []prompb.Label{ - { - Name: "__name__", - Value: "foo_target_info", - }, - { - Name: "resource_attr", - Value: "resource-attr-val-1", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1581452772000, - }, - }, + wantLabels: []prompb.Label{ + { + Name: model.MetricNameLabel, + Value: fmt.Sprintf("foo_%s", targetMetricName), + }, + { + Name: "resource_attr", + Value: "resource-attr-val-1", }, }, }, @@ -587,32 +587,22 @@ func TestAddResourceTargetInfo(t *testing.T) { desc: "with resource, with service attributes", resource: resourceWithServiceAttrs, timestamp: testdata.TestMetricStartTimestamp, - expected: map[string]*prompb.TimeSeries{ - "info-__name__-target_info-instance-service-instance-id-job-service-namespace/service-name-resource_attr-resource-attr-val-1": { - Labels: []prompb.Label{ - { - Name: "__name__", - Value: "target_info", - }, - { - Name: "instance", - Value: "service-instance-id", - }, - { - Name: "job", - Value: "service-namespace/service-name", - }, - { - Name: "resource_attr", - Value: "resource-attr-val-1", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1581452772000, - }, - }, + wantLabels: []prompb.Label{ + { + Name: model.MetricNameLabel, + Value: targetMetricName, + }, + { + Name: "instance", + Value: "service-instance-id", + }, + { + Name: "job", + Value: "service-namespace/service-name", + }, + { + Name: "resource_attr", + Value: "resource-attr-val-1", }, }, }, @@ -620,13 +610,31 @@ func TestAddResourceTargetInfo(t *testing.T) { desc: "with resource, with only service attributes", resource: resourceWithOnlyServiceAttrs, timestamp: testdata.TestMetricStartTimestamp, - expected: map[string]*prompb.TimeSeries{}, }, } { t.Run(tc.desc, func(t *testing.T) { - tsMap := map[string]*prompb.TimeSeries{} - addResourceTargetInfo(tc.resource, tc.settings, tc.timestamp, tsMap) - assert.Exactly(t, tc.expected, tsMap) + converter := newPrometheusConverter() + + addResourceTargetInfo(tc.resource, tc.settings, tc.timestamp, converter) + + if len(tc.wantLabels) == 0 || tc.settings.DisableTargetInfo { + assert.Empty(t, converter.timeSeries()) + return + } + + expected := map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(tc.wantLabels): { + Labels: tc.wantLabels, + Samples: []prompb.Sample{ + { + Value: 1, + Timestamp: 1581452772000, + }, + }, + }, + } + assert.Exactly(t, expected, converter.unique) + assert.Empty(t, converter.conflicts) }) } } @@ -659,12 +667,12 @@ func TestMostRecentTimestampInMetric(t *testing.T) { } } -func TestAddSingleSummaryDataPoint(t *testing.T) { +func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { ts := pcommon.Timestamp(time.Now().UnixNano()) tests := []struct { name string metric func() pmetric.Metric - want func() map[string]*prompb.TimeSeries + want func() map[uint64]*prompb.TimeSeries }{ { name: "summary with start time", @@ -679,7 +687,7 @@ func TestAddSingleSummaryDataPoint(t *testing.T) { return metric }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_summary" + countStr}, } @@ -689,20 +697,20 @@ func TestAddSingleSummaryDataPoint(t *testing.T) { sumLabels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_summary" + sumStr}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSummary.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, }, }, - timeSeriesSignature(pmetric.MetricTypeSummary.String(), sumLabels): { + timeSeriesSignature(sumLabels): { Labels: sumLabels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, }, }, - timeSeriesSignature(pmetric.MetricTypeSummary.String(), createdLabels): { + timeSeriesSignature(createdLabels): { Labels: createdLabels, Samples: []prompb.Sample{ {Value: float64(convertTimeStamp(ts)), Timestamp: convertTimeStamp(ts)}, @@ -723,21 +731,21 @@ func TestAddSingleSummaryDataPoint(t *testing.T) { return metric }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_summary" + countStr}, } sumLabels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_summary" + sumStr}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSummary.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, }, }, - timeSeriesSignature(pmetric.MetricTypeSummary.String(), sumLabels): { + timeSeriesSignature(sumLabels): { Labels: sumLabels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, @@ -750,31 +758,29 @@ func TestAddSingleSummaryDataPoint(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() + converter := newPrometheusConverter() - got := make(map[string]*prompb.TimeSeries) - for x := 0; x < metric.Summary().DataPoints().Len(); x++ { - addSingleSummaryDataPoint( - metric.Summary().DataPoints().At(x), - pcommon.NewResource(), - metric, - Settings{ - ExportCreatedMetric: true, - }, - got, - metric.Name(), - ) - } - assert.Equal(t, tt.want(), got) + converter.addSummaryDataPoints( + metric.Summary().DataPoints(), + pcommon.NewResource(), + Settings{ + ExportCreatedMetric: true, + }, + metric.Name(), + ) + + assert.Equal(t, tt.want(), converter.unique) + assert.Empty(t, converter.conflicts) }) } } -func TestAddSingleHistogramDataPoint(t *testing.T) { +func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { ts := pcommon.Timestamp(time.Now().UnixNano()) tests := []struct { name string metric func() pmetric.Metric - want func() map[string]*prompb.TimeSeries + want func() map[uint64]*prompb.TimeSeries }{ { name: "histogram with start time", @@ -789,7 +795,7 @@ func TestAddSingleHistogramDataPoint(t *testing.T) { return metric }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_hist" + countStr}, } @@ -800,20 +806,20 @@ func TestAddSingleHistogramDataPoint(t *testing.T) { {Name: model.MetricNameLabel, Value: "test_hist_bucket"}, {Name: model.BucketLabel, Value: "+Inf"}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeHistogram.String(), infLabels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(infLabels): { Labels: infLabels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, }, }, - timeSeriesSignature(pmetric.MetricTypeHistogram.String(), labels): { + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, }, }, - timeSeriesSignature(pmetric.MetricTypeHistogram.String(), createdLabels): { + timeSeriesSignature(createdLabels): { Labels: createdLabels, Samples: []prompb.Sample{ {Value: float64(convertTimeStamp(ts)), Timestamp: convertTimeStamp(ts)}, @@ -834,7 +840,7 @@ func TestAddSingleHistogramDataPoint(t *testing.T) { return metric }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_hist" + countStr}, } @@ -842,14 +848,14 @@ func TestAddSingleHistogramDataPoint(t *testing.T) { {Name: model.MetricNameLabel, Value: "test_hist_bucket"}, {Name: model.BucketLabel, Value: "+Inf"}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeHistogram.String(), infLabels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(infLabels): { Labels: infLabels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, }, }, - timeSeriesSignature(pmetric.MetricTypeHistogram.String(), labels): { + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, @@ -862,25 +868,92 @@ func TestAddSingleHistogramDataPoint(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() + converter := newPrometheusConverter() - got := make(map[string]*prompb.TimeSeries) - for x := 0; x < metric.Histogram().DataPoints().Len(); x++ { - addSingleHistogramDataPoint( - metric.Histogram().DataPoints().At(x), - pcommon.NewResource(), - metric, - Settings{ - ExportCreatedMetric: true, - }, - got, - metric.Name(), - ) - } - assert.Equal(t, tt.want(), got) + converter.addHistogramDataPoints( + metric.Histogram().DataPoints(), + pcommon.NewResource(), + Settings{ + ExportCreatedMetric: true, + }, + metric.Name(), + ) + + assert.Equal(t, tt.want(), converter.unique) + assert.Empty(t, converter.conflicts) }) } } +func TestPrometheusConverter_getOrCreateTimeSeries(t *testing.T) { + converter := newPrometheusConverter() + lbls := []prompb.Label{ + { + Name: "key1", + Value: "value1", + }, + { + Name: "key2", + Value: "value2", + }, + } + ts, created := converter.getOrCreateTimeSeries(lbls) + require.NotNil(t, ts) + require.True(t, created) + + // Now, get (not create) the unique time series + gotTS, created := converter.getOrCreateTimeSeries(ts.Labels) + require.Same(t, ts, gotTS) + require.False(t, created) + + var keys []uint64 + for k := range converter.unique { + keys = append(keys, k) + } + require.Len(t, keys, 1) + h := keys[0] + + // Make sure that state is correctly set + require.Equal(t, map[uint64]*prompb.TimeSeries{ + h: ts, + }, converter.unique) + require.Empty(t, converter.conflicts) + + // Fake a hash collision, by making this not equal to the next series with the same hash + ts.Labels = append(ts.Labels, prompb.Label{Name: "key3", Value: "value3"}) + + // Make the first hash collision + cTS1, created := converter.getOrCreateTimeSeries(lbls) + require.NotNil(t, cTS1) + require.True(t, created) + require.Equal(t, map[uint64][]*prompb.TimeSeries{ + h: {cTS1}, + }, converter.conflicts) + + // Fake a hash collision, by making this not equal to the next series with the same hash + cTS1.Labels = append(cTS1.Labels, prompb.Label{Name: "key3", Value: "value3"}) + + // Make the second hash collision + cTS2, created := converter.getOrCreateTimeSeries(lbls) + require.NotNil(t, cTS2) + require.True(t, created) + require.Equal(t, map[uint64][]*prompb.TimeSeries{ + h: {cTS1, cTS2}, + }, converter.conflicts) + + // Now, get (not create) the second colliding time series + gotCTS2, created := converter.getOrCreateTimeSeries(lbls) + require.Same(t, cTS2, gotCTS2) + require.False(t, created) + require.Equal(t, map[uint64][]*prompb.TimeSeries{ + h: {cTS1, cTS2}, + }, converter.conflicts) + + require.Equal(t, map[uint64]*prompb.TimeSeries{ + h: ts, + }, converter.unique) +} + func TestCreateLabels(t *testing.T) { testCases := []struct { name string diff --git a/pkg/translator/prometheusremotewrite/histograms.go b/pkg/translator/prometheusremotewrite/histograms.go index bae675ef446b..35ec21089177 100644 --- a/pkg/translator/prometheusremotewrite/histograms.go +++ b/pkg/translator/prometheusremotewrite/histograms.go @@ -16,41 +16,30 @@ import ( const defaultZeroThreshold = 1e-128 -func addSingleExponentialHistogramDataPoint( - metric string, - pt pmetric.ExponentialHistogramDataPoint, - resource pcommon.Resource, - settings Settings, - series map[string]*prompb.TimeSeries, -) error { - labels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - model.MetricNameLabel, - metric, - ) - - sig := timeSeriesSignature( - pmetric.MetricTypeExponentialHistogram.String(), - labels, - ) - ts, ok := series[sig] - if !ok { - ts = &prompb.TimeSeries{ - Labels: labels, +func (c *prometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, + resource pcommon.Resource, settings Settings, baseName string) error { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + lbls := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nil, + true, + model.MetricNameLabel, + baseName, + ) + ts, _ := c.getOrCreateTimeSeries(lbls) + + histogram, err := exponentialToNativeHistogram(pt) + if err != nil { + return err } - series[sig] = ts - } + ts.Histograms = append(ts.Histograms, histogram) - histogram, err := exponentialToNativeHistogram(pt) - if err != nil { - return err + exemplars := getPromExemplars[pmetric.ExponentialHistogramDataPoint](pt) + ts.Exemplars = append(ts.Exemplars, exemplars...) } - ts.Histograms = append(ts.Histograms, histogram) - - exemplars := getPromExemplars[pmetric.ExponentialHistogramDataPoint](pt) - ts.Exemplars = append(ts.Exemplars, exemplars...) return nil } diff --git a/pkg/translator/prometheusremotewrite/histograms_test.go b/pkg/translator/prometheusremotewrite/histograms_test.go index 84261b4e7da4..d2b3cba24ae2 100644 --- a/pkg/translator/prometheusremotewrite/histograms_test.go +++ b/pkg/translator/prometheusremotewrite/histograms_test.go @@ -597,11 +597,11 @@ func validateNativeHistogramCount(t *testing.T, h prompb.Histogram) { assert.Equal(t, want, actualCount, "native histogram count mismatch") } -func TestAddSingleExponentialHistogramDataPoint(t *testing.T) { +func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) { tests := []struct { name string metric func() pmetric.Metric - wantSeries func() map[string]*prompb.TimeSeries + wantSeries func() map[uint64]*prompb.TimeSeries }{ { name: "histogram data points with same labels", @@ -628,13 +628,13 @@ func TestAddSingleExponentialHistogramDataPoint(t *testing.T) { return metric }, - wantSeries: func() map[string]*prompb.TimeSeries { + wantSeries: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_hist"}, {Name: "attr", Value: "test_attr"}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeExponentialHistogram.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Histograms: []prompb.Histogram{ { @@ -687,7 +687,7 @@ func TestAddSingleExponentialHistogramDataPoint(t *testing.T) { return metric }, - wantSeries: func() map[string]*prompb.TimeSeries { + wantSeries: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_hist"}, {Name: "attr", Value: "test_attr"}, @@ -697,8 +697,8 @@ func TestAddSingleExponentialHistogramDataPoint(t *testing.T) { {Name: "attr", Value: "test_attr_two"}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeExponentialHistogram.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Histograms: []prompb.Histogram{ { @@ -714,7 +714,7 @@ func TestAddSingleExponentialHistogramDataPoint(t *testing.T) { {Value: 1}, }, }, - timeSeriesSignature(pmetric.MetricTypeExponentialHistogram.String(), labelsAnother): { + timeSeriesSignature(labelsAnother): { Labels: labelsAnother, Histograms: []prompb.Histogram{ { @@ -738,20 +738,16 @@ func TestAddSingleExponentialHistogramDataPoint(t *testing.T) { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() - gotSeries := make(map[string]*prompb.TimeSeries) - - for x := 0; x < metric.ExponentialHistogram().DataPoints().Len(); x++ { - err := addSingleExponentialHistogramDataPoint( - prometheustranslator.BuildCompliantName(metric, "", true), - metric.ExponentialHistogram().DataPoints().At(x), - pcommon.NewResource(), - Settings{}, - gotSeries, - ) - require.NoError(t, err) - } + converter := newPrometheusConverter() + require.NoError(t, converter.addExponentialHistogramDataPoints( + metric.ExponentialHistogram().DataPoints(), + pcommon.NewResource(), + Settings{}, + prometheustranslator.BuildCompliantName(metric, "", true), + )) - assert.Equal(t, tt.wantSeries(), gotSeries) + assert.Equal(t, tt.wantSeries(), converter.unique) + assert.Empty(t, converter.conflicts) }) } } diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw.go b/pkg/translator/prometheusremotewrite/metrics_to_prw.go index f048f7534fba..05d62498548a 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw.go @@ -6,6 +6,8 @@ package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry import ( "errors" "fmt" + "sort" + "strconv" "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/collector/pdata/pcommon" @@ -25,9 +27,33 @@ type Settings struct { } // FromMetrics converts pmetric.Metrics to Prometheus remote write format. -func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*prompb.TimeSeries, errs error) { - tsMap = make(map[string]*prompb.TimeSeries) +func FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) { + c := newPrometheusConverter() + errs := c.fromMetrics(md, settings) + tss := c.timeSeries() + out := make(map[string]*prompb.TimeSeries, len(tss)) + for i := range tss { + out[strconv.Itoa(i)] = &tss[i] + } + + return out, errs +} + +// prometheusConverter converts from OTel write format to Prometheus write format. +type prometheusConverter struct { + unique map[uint64]*prompb.TimeSeries + conflicts map[uint64][]*prompb.TimeSeries +} +func newPrometheusConverter() *prometheusConverter { + return &prometheusConverter{ + unique: map[uint64]*prompb.TimeSeries{}, + conflicts: map[uint64][]*prompb.TimeSeries{}, + } +} + +// fromMetrics converts pmetric.Metrics to Prometheus remote write format. +func (c *prometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) { resourceMetricsSlice := md.ResourceMetrics() for i := 0; i < resourceMetricsSlice.Len(); i++ { resourceMetrics := resourceMetricsSlice.At(i) @@ -37,8 +63,7 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp // use with the "target" info metric var mostRecentTimestamp pcommon.Timestamp for j := 0; j < scopeMetricsSlice.Len(); j++ { - scopeMetrics := scopeMetricsSlice.At(j) - metricSlice := scopeMetrics.Metrics() + metricSlice := scopeMetricsSlice.At(j).Metrics() // TODO: decide if instrumentation library information should be exported as labels for k := 0; k < metricSlice.Len(); k++ { @@ -52,65 +77,125 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*promp promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) - // handle individual metric based on type + // handle individual metrics based on type //exhaustive:enforce switch metric.Type() { case pmetric.MetricTypeGauge: dataPoints := metric.Gauge().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break } - for x := 0; x < dataPoints.Len(); x++ { - addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) - } + c.addGaugeNumberDataPoints(dataPoints, resource, settings, promName) case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break } - for x := 0; x < dataPoints.Len(); x++ { - addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) - } + c.addSumNumberDataPoints(dataPoints, resource, metric, settings, promName) case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break } - for x := 0; x < dataPoints.Len(); x++ { - addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) - } + c.addHistogramDataPoints(dataPoints, resource, settings, promName) case pmetric.MetricTypeExponentialHistogram: dataPoints := metric.ExponentialHistogram().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break } - for x := 0; x < dataPoints.Len(); x++ { - errs = multierr.Append( - errs, - addSingleExponentialHistogramDataPoint( - promName, - dataPoints.At(x), - resource, - settings, - tsMap, - ), - ) - } + errs = multierr.Append(errs, c.addExponentialHistogramDataPoints( + dataPoints, + resource, + settings, + promName, + )) case pmetric.MetricTypeSummary: dataPoints := metric.Summary().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break } - for x := 0; x < dataPoints.Len(); x++ { - addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap, promName) - } + c.addSummaryDataPoints(dataPoints, resource, settings, promName) default: errs = multierr.Append(errs, errors.New("unsupported metric type")) } } } - addResourceTargetInfo(resource, settings, mostRecentTimestamp, tsMap) + addResourceTargetInfo(resource, settings, mostRecentTimestamp, c) } return } + +// timeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format. +func (c *prometheusConverter) timeSeries() []prompb.TimeSeries { + conflicts := 0 + for _, ts := range c.conflicts { + conflicts += len(ts) + } + allTS := make([]prompb.TimeSeries, 0, len(c.unique)+conflicts) + for _, ts := range c.unique { + allTS = append(allTS, *ts) + } + for _, cTS := range c.conflicts { + for _, ts := range cTS { + allTS = append(allTS, *ts) + } + } + + return allTS +} + +func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool { + if len(ts.Labels) != len(lbls) { + return false + } + for i, l := range ts.Labels { + if l.Name != ts.Labels[i].Name || l.Value != ts.Labels[i].Value { + return false + } + } + return true +} + +// addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value, +// the exemplar is added to the bucket bound's time series, provided that the time series' has samples. +func (c *prometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { + if len(bucketBounds) == 0 { + return + } + + exemplars := getPromExemplars(dataPoint) + if len(exemplars) == 0 { + return + } + + sort.Sort(byBucketBoundsData(bucketBounds)) + for _, exemplar := range exemplars { + for _, bound := range bucketBounds { + if len(bound.ts.Samples) > 0 && exemplar.Value <= bound.bound { + bound.ts.Exemplars = append(bound.ts.Exemplars, exemplar) + break + } + } + } +} + +// addSample finds a TimeSeries that corresponds to lbls, and adds sample to it. +// If there is no corresponding TimeSeries already, it's created. +// The corresponding TimeSeries is returned. +// If either lbls is nil/empty or sample is nil, nothing is done. +func (c *prometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries { + if sample == nil || len(lbls) == 0 { + // This shouldn't happen + return nil + } + + ts, _ := c.getOrCreateTimeSeries(lbls) + ts.Samples = append(ts.Samples, *sample) + return ts +} diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go b/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go index 8be6bdcd60a9..f6171b2c2233 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go @@ -35,11 +35,46 @@ func BenchmarkFromMetrics(b *testing.B) { payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries) for i := 0; i < b.N; i++ { - _, err := FromMetrics(payload.Metrics(), Settings{}) + tsMap, err := FromMetrics(payload.Metrics(), Settings{}) + require.NoError(b, err) + require.NotNil(b, tsMap) + } + }) + } + }) + } + }) + } + }) + } + }) + } +} + +func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { + for _, resourceAttributeCount := range []int{0, 5, 50} { + b.Run(fmt.Sprintf("resource attribute count: %v", resourceAttributeCount), func(b *testing.B) { + for _, histogramCount := range []int{0, 1000} { + b.Run(fmt.Sprintf("histogram count: %v", histogramCount), func(b *testing.B) { + nonHistogramCounts := []int{0, 1000} + + if resourceAttributeCount == 0 && histogramCount == 0 { + // Don't bother running a scenario where we'll generate no series. + nonHistogramCounts = []int{1000} + } + + for _, nonHistogramCount := range nonHistogramCounts { + b.Run(fmt.Sprintf("non-histogram count: %v", nonHistogramCount), func(b *testing.B) { + for _, labelsPerMetric := range []int{2, 20} { + b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) { + for _, exemplarsPerSeries := range []int{0, 5, 10} { + b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { + payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries) - if err != nil { - require.NoError(b, err) - } + for i := 0; i < b.N; i++ { + converter := newPrometheusConverter() + require.NoError(b, converter.fromMetrics(payload.Metrics(), Settings{})) + require.NotNil(b, converter.timeSeries()) } }) } diff --git a/pkg/translator/prometheusremotewrite/number_data_points.go b/pkg/translator/prometheusremotewrite/number_data_points.go index a2d0e33a8ba8..d128359fef80 100644 --- a/pkg/translator/prometheusremotewrite/number_data_points.go +++ b/pkg/translator/prometheusremotewrite/number_data_points.go @@ -13,92 +13,84 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) -// addSingleGaugeNumberDataPoint converts the Gauge metric data point to a -// Prometheus time series with samples and labels. The result is stored in the -// series map. -func addSingleGaugeNumberDataPoint( - pt pmetric.NumberDataPoint, - resource pcommon.Resource, - metric pmetric.Metric, - settings Settings, - series map[string]*prompb.TimeSeries, - name string, -) { - labels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - model.MetricNameLabel, - name, - ) - sample := &prompb.Sample{ - // convert ns to ms - Timestamp: convertTimeStamp(pt.Timestamp()), - } - switch pt.ValueType() { - case pmetric.NumberDataPointValueTypeInt: - sample.Value = float64(pt.IntValue()) - case pmetric.NumberDataPointValueTypeDouble: - sample.Value = pt.DoubleValue() - } - if pt.Flags().NoRecordedValue() { - sample.Value = math.Float64frombits(value.StaleNaN) +func (c *prometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, settings Settings, name string) { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nil, + true, + model.MetricNameLabel, + name, + ) + sample := &prompb.Sample{ + // convert ns to ms + Timestamp: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + c.addSample(sample, labels) } - addSample(series, sample, labels, metric.Type().String()) } -// addSingleSumNumberDataPoint converts the Sum metric data point to a Prometheus -// time series with samples, labels and exemplars. The result is stored in the -// series map. -func addSingleSumNumberDataPoint( - pt pmetric.NumberDataPoint, - resource pcommon.Resource, - metric pmetric.Metric, - settings Settings, - series map[string]*prompb.TimeSeries, - name string, -) { - labels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - model.MetricNameLabel, name, - ) - sample := &prompb.Sample{ - // convert ns to ms - Timestamp: convertTimeStamp(pt.Timestamp()), - } - switch pt.ValueType() { - case pmetric.NumberDataPointValueTypeInt: - sample.Value = float64(pt.IntValue()) - case pmetric.NumberDataPointValueTypeDouble: - sample.Value = pt.DoubleValue() - } - if pt.Flags().NoRecordedValue() { - sample.Value = math.Float64frombits(value.StaleNaN) - } - sig := addSample(series, sample, labels, metric.Type().String()) - - if ts := series[sig]; sig != "" && ts != nil { - exemplars := getPromExemplars[pmetric.NumberDataPoint](pt) - ts.Exemplars = append(ts.Exemplars, exemplars...) - } - - // add _created time series if needed - if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { - startTimestamp := pt.StartTimestamp() - if startTimestamp == 0 { - return +func (c *prometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + lbls := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nil, + true, + model.MetricNameLabel, + name, + ) + sample := &prompb.Sample{ + // convert ns to ms + Timestamp: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + ts := c.addSample(sample, lbls) + if ts != nil { + exemplars := getPromExemplars[pmetric.NumberDataPoint](pt) + ts.Exemplars = append(ts.Exemplars, exemplars...) + } + + // add created time series if needed + if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { + startTimestamp := pt.StartTimestamp() + if startTimestamp == 0 { + return + } - createdLabels := make([]prompb.Label, len(labels)) - copy(createdLabels, labels) - for i, l := range createdLabels { - if l.Name == model.MetricNameLabel { - createdLabels[i].Value = name + createdSuffix - break + createdLabels := make([]prompb.Label, len(lbls)) + copy(createdLabels, lbls) + for i, l := range createdLabels { + if l.Name == model.MetricNameLabel { + createdLabels[i].Value = name + createdSuffix + break + } } + c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) } - addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, pt.Timestamp(), metric.Type().String()) } } diff --git a/pkg/translator/prometheusremotewrite/number_data_points_test.go b/pkg/translator/prometheusremotewrite/number_data_points_test.go index c9be5bad343b..31dd796ae8d2 100644 --- a/pkg/translator/prometheusremotewrite/number_data_points_test.go +++ b/pkg/translator/prometheusremotewrite/number_data_points_test.go @@ -14,12 +14,12 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) -func TestAddSingleGaugeNumberDataPoint(t *testing.T) { +func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) { ts := uint64(time.Now().UnixNano()) tests := []struct { name string metric func() pmetric.Metric - want func() map[string]*prompb.TimeSeries + want func() map[uint64]*prompb.TimeSeries }{ { name: "gauge", @@ -30,12 +30,12 @@ func TestAddSingleGaugeNumberDataPoint(t *testing.T) { 1, ts, ) }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test"}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeGauge.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ { @@ -50,30 +50,27 @@ func TestAddSingleGaugeNumberDataPoint(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() + converter := newPrometheusConverter() - gotSeries := make(map[string]*prompb.TimeSeries) + converter.addGaugeNumberDataPoints( + metric.Gauge().DataPoints(), + pcommon.NewResource(), + Settings{}, + metric.Name(), + ) - for x := 0; x < metric.Gauge().DataPoints().Len(); x++ { - addSingleGaugeNumberDataPoint( - metric.Gauge().DataPoints().At(x), - pcommon.NewResource(), - metric, - Settings{}, - gotSeries, - metric.Name(), - ) - } - assert.Equal(t, tt.want(), gotSeries) + assert.Equal(t, tt.want(), converter.unique) + assert.Empty(t, converter.conflicts) }) } } -func TestAddSingleSumNumberDataPoint(t *testing.T) { +func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) { ts := pcommon.Timestamp(time.Now().UnixNano()) tests := []struct { name string metric func() pmetric.Metric - want func() map[string]*prompb.TimeSeries + want func() map[uint64]*prompb.TimeSeries }{ { name: "sum", @@ -85,12 +82,12 @@ func TestAddSingleSumNumberDataPoint(t *testing.T) { 1, uint64(ts.AsTime().UnixNano()), ) }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test"}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSum.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ { @@ -113,12 +110,12 @@ func TestAddSingleSumNumberDataPoint(t *testing.T) { m.Sum().DataPoints().At(0).Exemplars().AppendEmpty().SetDoubleValue(2) return m }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test"}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSum.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{{ Value: 1, @@ -146,21 +143,21 @@ func TestAddSingleSumNumberDataPoint(t *testing.T) { return metric }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_sum"}, } createdLabels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_sum" + createdSuffix}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSum.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ {Value: 1, Timestamp: convertTimeStamp(ts)}, }, }, - timeSeriesSignature(pmetric.MetricTypeSum.String(), createdLabels): { + timeSeriesSignature(createdLabels): { Labels: createdLabels, Samples: []prompb.Sample{ {Value: float64(convertTimeStamp(ts)), Timestamp: convertTimeStamp(ts)}, @@ -182,12 +179,12 @@ func TestAddSingleSumNumberDataPoint(t *testing.T) { return metric }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_sum"}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSum.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, @@ -209,12 +206,12 @@ func TestAddSingleSumNumberDataPoint(t *testing.T) { return metric }, - want: func() map[string]*prompb.TimeSeries { + want: func() map[uint64]*prompb.TimeSeries { labels := []prompb.Label{ {Name: model.MetricNameLabel, Value: "test_sum"}, } - return map[string]*prompb.TimeSeries{ - timeSeriesSignature(pmetric.MetricTypeSum.String(), labels): { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ {Value: 0, Timestamp: convertTimeStamp(ts)}, @@ -227,20 +224,18 @@ func TestAddSingleSumNumberDataPoint(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() + converter := newPrometheusConverter() - got := make(map[string]*prompb.TimeSeries) + converter.addSumNumberDataPoints( + metric.Sum().DataPoints(), + pcommon.NewResource(), + metric, + Settings{ExportCreatedMetric: true}, + metric.Name(), + ) - for x := 0; x < metric.Sum().DataPoints().Len(); x++ { - addSingleSumNumberDataPoint( - metric.Sum().DataPoints().At(x), - pcommon.NewResource(), - metric, - Settings{ExportCreatedMetric: true}, - got, - metric.Name(), - ) - } - assert.Equal(t, tt.want(), got) + assert.Equal(t, tt.want(), converter.unique) + assert.Empty(t, converter.conflicts) }) } } diff --git a/pkg/translator/prometheusremotewrite/testutils_test.go b/pkg/translator/prometheusremotewrite/testutils_test.go index bdc6fa594648..f0fa3df45358 100644 --- a/pkg/translator/prometheusremotewrite/testutils_test.go +++ b/pkg/translator/prometheusremotewrite/testutils_test.go @@ -67,33 +67,41 @@ var ( promLbs1 = getPromLabels(label11, value11, label12, value12) promLbs2 = getPromLabels(label21, value21, label22, value22) - lb1Sig = "-" + label11 + "-" + value11 + "-" + label12 + "-" + value12 - lb2Sig = "-" + label21 + "-" + value21 + "-" + label22 + "-" + value22 - - twoPointsSameTs = map[string]*prompb.TimeSeries{ - "Gauge" + "-" + label11 + "-" + value11 + "-" + label12 + "-" + value12: getTimeSeries(getPromLabels(label11, value11, label12, value12), - getSample(float64(intVal1), msTime1), - getSample(float64(intVal2), msTime2)), + lb1Sig = timeSeriesSignature(promLbs1) + + twoPointsSameTs = func() map[uint64]*prompb.TimeSeries { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(promLbs1): getTimeSeries(promLbs1, + getSample(float64(intVal1), msTime1), + getSample(float64(intVal2), msTime2)), + } } - twoPointsDifferentTs = map[string]*prompb.TimeSeries{ - "Gauge" + "-" + label11 + "-" + value11 + "-" + label12 + "-" + value12: getTimeSeries(getPromLabels(label11, value11, label12, value12), - getSample(float64(intVal1), msTime1)), - "Gauge" + "-" + label21 + "-" + value21 + "-" + label22 + "-" + value22: getTimeSeries(getPromLabels(label21, value21, label22, value22), - getSample(float64(intVal1), msTime2)), + twoPointsDifferentTs = func() map[uint64]*prompb.TimeSeries { + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(promLbs1): getTimeSeries(promLbs1, + getSample(float64(intVal1), msTime1)), + timeSeriesSignature(promLbs2): getTimeSeries(promLbs2, + getSample(float64(intVal1), msTime2)), + } } - tsWithSamplesAndExemplars = map[string]*prompb.TimeSeries{ - lb1Sig: getTimeSeriesWithSamplesAndExemplars(getPromLabels(label11, value11, label12, value12), - []prompb.Sample{getSample(float64(intVal1), msTime1)}, - []prompb.Exemplar{getExemplar(floatVal2, msTime1)}), + tsWithSamplesAndExemplars = func() map[uint64]*prompb.TimeSeries { + return map[uint64]*prompb.TimeSeries{ + lb1Sig: getTimeSeriesWithSamplesAndExemplars(promLbs1, + []prompb.Sample{getSample(float64(intVal1), msTime1)}, + []prompb.Exemplar{getExemplar(floatVal2, msTime1)}), + } } - tsWithInfiniteBoundExemplarValue = map[string]*prompb.TimeSeries{ - lb1Sig: getTimeSeriesWithSamplesAndExemplars(getPromLabels(label11, value11, label12, value12), - []prompb.Sample{getSample(float64(intVal1), msTime1)}, - []prompb.Exemplar{getExemplar(math.MaxFloat64, msTime1)}), + tsWithInfiniteBoundExemplarValue = func() map[uint64]*prompb.TimeSeries { + return map[uint64]*prompb.TimeSeries{ + lb1Sig: getTimeSeriesWithSamplesAndExemplars(promLbs1, + []prompb.Sample{getSample(float64(intVal1), msTime1)}, + []prompb.Exemplar{getExemplar(math.MaxFloat64, msTime1)}), + } } - tsWithoutSampleAndExemplar = map[string]*prompb.TimeSeries{ - lb1Sig: getTimeSeries(getPromLabels(label11, value11, label12, value12), - nil...), + tsWithoutSampleAndExemplar = func() map[uint64]*prompb.TimeSeries { + return map[uint64]*prompb.TimeSeries{ + lb1Sig: getTimeSeries(promLbs1, nil...), + } } validIntGauge = "valid_IntGauge" @@ -180,7 +188,9 @@ func getHistogramDataPointWithExemplars(t *testing.T, time time.Time, value floa e := h.Exemplars().AppendEmpty() e.SetDoubleValue(value) e.SetTimestamp(pcommon.NewTimestampFromTime(time)) - e.FilteredAttributes().PutStr(attributeKey, attributeValue) + if attributeKey != "" || attributeValue != "" { + e.FilteredAttributes().PutStr(attributeKey, attributeValue) + } if traceID != "" { var traceIDBytes [16]byte @@ -301,11 +311,11 @@ func getSummaryMetric(name string, attributes pcommon.Map, ts uint64, sum float6 return metric } -func getBucketBoundsData(values []float64) []bucketBoundsData { +func getBucketBoundsData(values []float64, timeSeries *prompb.TimeSeries) []bucketBoundsData { b := make([]bucketBoundsData, len(values)) for i, value := range values { - b[i] = bucketBoundsData{sig: lb1Sig, bound: value} + b[i] = bucketBoundsData{ts: timeSeries, bound: value} } return b