From bfdf2412267ba5fa3b107338fcd83d4ee8fb8105 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Mon, 9 Sep 2024 00:42:48 -0700 Subject: [PATCH] [receiver/datadog] Add support for sketches (#34662) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** This PR adds support for translating Datadog sketches into Exponential Histograms. Follow up of #33631, #33957 and #34180. The full version of the code can be found in the `cedwards/datadog-metrics-receiver-full` branch, or in Grafana Alloy: https://github.com/grafana/alloy/tree/main/internal/etc/datadogreceiver **Link to tracking Issue:** #18278 **Testing:** Unit tests, as well as an end-to-end test, have been added. --------- Signed-off-by: Federico Torres Signed-off-by: György Krajcsovits Co-authored-by: Federico Torres Co-authored-by: György Krajcsovits --- .chloggen/datadogreceiver-sketches.yaml | 27 + .../internal/translator/sketches.go | 294 +++++++ .../internal/translator/sketches_test.go | 721 ++++++++++++++++++ .../internal/translator/testutil.go | 13 + receiver/datadogreceiver/receiver.go | 24 +- receiver/datadogreceiver/receiver_test.go | 87 +++ 6 files changed, 1163 insertions(+), 3 deletions(-) create mode 100644 .chloggen/datadogreceiver-sketches.yaml create mode 100644 receiver/datadogreceiver/internal/translator/sketches.go create mode 100644 receiver/datadogreceiver/internal/translator/sketches_test.go diff --git a/.chloggen/datadogreceiver-sketches.yaml b/.chloggen/datadogreceiver-sketches.yaml new file mode 100644 index 000000000000..76a07480626b --- /dev/null +++ b/.chloggen/datadogreceiver-sketches.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: datadogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add support for sketch metrics in Datadog receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [18278] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/datadogreceiver/internal/translator/sketches.go b/receiver/datadogreceiver/internal/translator/sketches.go new file mode 100644 index 000000000000..cbe31c9e3e0a --- /dev/null +++ b/receiver/datadogreceiver/internal/translator/sketches.go @@ -0,0 +1,294 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator" + +import ( + "fmt" + "io" + "math" + "net/http" + "sort" + "time" + + "github.com/DataDog/agent-payload/v5/gogen" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +const ( + // The relativeAccuracy (also called epsilon or eps) comes from DDSketch's logarithmic mapping, which is used for sketches + // in the Datadog agent. The Datadog agent uses the default value from opentelemetry-go-mapping configuration + // See: + // https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L15 + relativeAccuracy = 1.0 / 128 + + // The gamma value comes from the default values of the epsilon/relative accuracy from opentelemetry-go-mapping. This value is used for + // finding the lower boundary of the bucket at a specific index + // See: + // https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L138 + gamma = 1 + 2*relativeAccuracy + + // Since the default bucket factor for Sketches (gamma value) is 1.015625, this corresponds to a scale between 5 (2^2^-5=1.0219) + // and 6 (2^2^-6=1.01088928605). However, the lower resolution of 5 will produce larger buckets which allows for easier mapping + scale = 5 + + // The agentSketchOffset value comes from the following calculation: + // min = 1e-9 + // emin = math.Floor((math.Log(min)/math.Log1p(2*relativeAccuracy)) + // offset = -emin + 1 + // The resulting value is 1338. + // See: https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L154 + // (Note: in Datadog's code, it is referred to as 'bias') + agentSketchOffset int32 = 1338 + + // The max limit for the index of a sketch bucket + // See https://github.com/DataDog/opentelemetry-mapping-go/blob/00c3f838161a00de395d7d0ed44d967ac71e43b9/pkg/quantile/ddsketch.go#L21 + // and https://github.com/DataDog/opentelemetry-mapping-go/blob/00c3f838161a00de395d7d0ed44d967ac71e43b9/pkg/quantile/ddsketch.go#L127 + maxIndex = math.MaxInt16 +) + +// Unmarshal the sketch payload, which contains the underlying Dogsketch structure used for the translation +func (mt *MetricsTranslator) HandleSketchesPayload(req *http.Request) (sp []gogen.SketchPayload_Sketch, err error) { + buf := GetBuffer() + defer PutBuffer(buf) + if _, err := io.Copy(buf, req.Body); err != nil { + return sp, err + } + + pl := new(gogen.SketchPayload) + if err := pl.Unmarshal(buf.Bytes()); err != nil { + return sp, err + } + + return pl.GetSketches(), nil +} + +func (mt *MetricsTranslator) TranslateSketches(sketches []gogen.SketchPayload_Sketch) pmetric.Metrics { + bt := newBatcher() + bt.Metrics = pmetric.NewMetrics() + + for _, sketch := range sketches { + dimensions := parseSeriesProperties(sketch.Metric, "sketch", sketch.Tags, sketch.Host, mt.buildInfo.Version, mt.stringPool) + metric, metricID := bt.Lookup(dimensions) + metric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + dps := metric.ExponentialHistogram().DataPoints() + + dps.EnsureCapacity(len(sketch.Dogsketches)) + + // The dogsketches field of the payload contains the sketch data + for i := range sketch.Dogsketches { + dp := dps.AppendEmpty() + + err := sketchToDatapoint(sketch.Dogsketches[i], dp, dimensions.dpAttrs) + if err != nil { + // If a sketch is invalid, remove this datapoint + metric.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool { + if dp.Positive().BucketCounts().Len() == 0 && dp.Negative().BucketCounts().Len() == 0 { + return true + } + return false + }) + continue + } + stream := identity.OfStream(metricID, dp) + if ts, ok := mt.streamHasTimestamp(stream); ok { + dp.SetStartTimestamp(ts) + } + mt.updateLastTsForStream(stream, dp.Timestamp()) + } + } + + return bt.Metrics +} + +func sketchToDatapoint(sketch gogen.SketchPayload_Sketch_Dogsketch, dp pmetric.ExponentialHistogramDataPoint, attributes pcommon.Map) error { + dp.SetTimestamp(pcommon.Timestamp(sketch.Ts * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds + + dp.SetCount(uint64(sketch.Cnt)) + dp.SetSum(sketch.Sum) + dp.SetMin(sketch.Min) + dp.SetMax(sketch.Max) + dp.SetScale(scale) + dp.SetZeroThreshold(math.Exp(float64(1-agentSketchOffset) / (1 / math.Log(gamma)))) // See https://github.com/DataDog/sketches-go/blob/7546f8f95179bb41d334d35faa281bfe97812a86/ddsketch/mapping/logarithmic_mapping.go#L48 + + attributes.CopyTo(dp.Attributes()) + + negativeBuckets, positiveBuckets, zeroCount, err := mapSketchBucketsToHistogramBuckets(sketch.K, sketch.N) + if err != nil { + return err + } + + dp.SetZeroCount(zeroCount) + + convertBucketLayout(positiveBuckets, dp.Positive()) + convertBucketLayout(negativeBuckets, dp.Negative()) + + return nil +} + +// mapSketchBucketsToHistogramBuckets attempts to map the counts in each Sketch bucket to the closest equivalent Exponential Histogram +// bucket(s). It works by first calculating an Exponential Histogram key that corresponds most closely with the Sketch key (using the lower +// bound of the sketch bucket the key corresponds to), calculates differences in the range of the Sketch bucket and exponential histogram bucket, +// and distributes the count to the corresponding bucket, and the bucket(s) after it, based on the proportion of overlap between the +// exponential histogram buckets and the Sketch bucket. Note that the Sketch buckets are not separated into positive and negative buckets, but exponential +// histograms store positive and negative buckets separately. Negative buckets in exponential histograms are mapped in the same way as positive buckets. +// Note that negative indices in exponential histograms do not necessarily correspond to negative values; they correspond with values between 0 and 1, +// on either the negative or positive side +func mapSketchBucketsToHistogramBuckets(sketchKeys []int32, sketchCounts []uint32) (map[int]uint64, map[int]uint64, uint64, error) { + var zeroCount uint64 + + var positiveBuckets = make(map[int]uint64) + var negativeBuckets = make(map[int]uint64) + + // The data format for the sketch received from the sketch payload does not have separate positive and negative buckets, + // and instead just uses a single list of sketch keys that are in order by increasing bucket index, starting with negative indices, + // which correspond to negative buckets + for i := range sketchKeys { + if sketchKeys[i] == 0 { // A sketch key of 0 corresponds to the zero bucket + zeroCount += uint64(sketchCounts[i]) + continue + } + if sketchKeys[i] >= maxIndex { + // This should not happen, as sketches that contain bucket(s) with an index higher than the max + // limit should have already been discarded. However, if there happens to be an index > maxIndex, + // it can cause an infinite loop within the below inner for loop on some operating systems. Therefore, + // throw an error for sketches that have an index above the max limit + return nil, nil, 0, fmt.Errorf("Sketch contains bucket index %d which exceeds maximum supported index value %d", sketchKeys[i], maxIndex) + } + + // The approach here is to use the Datadog sketch index's lower bucket boundary to find the + // OTel exponential histogram bucket that with the closest range to the sketch bucket. Then, + // the buckets before and after that bucket are also checked for overlap with the sketch bucket. + // A count proportional to the intersection of the sketch bucket with the OTel bucket(s) is then + // added to the OTel bucket(s). After looping through all possible buckets that are within the Sketch + // bucket range, the bucket with the highest proportion of overlap is given the remaining count + sketchLowerBound, sketchUpperBound := getSketchBounds(sketchKeys[i]) + sketchBucketSize := sketchUpperBound - sketchLowerBound + histogramKey := sketchLowerBoundToHistogramIndex(sketchLowerBound) + highestCountProportion := 0.0 + highestCountIdx := 0 + targetBucketCount := uint64(sketchCounts[i]) + var currentAssignedCount uint64 + + //TODO: look into better algorithms for applying fractional counts + for outIndex := histogramKey; histogramLowerBound(outIndex) < sketchUpperBound; outIndex++ { + histogramLowerBound, histogramUpperBound := getHistogramBounds(outIndex) + lowerIntersection := math.Max(histogramLowerBound, sketchLowerBound) + higherIntersection := math.Min(histogramUpperBound, sketchUpperBound) + + intersectionSize := higherIntersection - lowerIntersection + proportion := intersectionSize / sketchBucketSize + if proportion <= 0 { + continue // In this case, the bucket does not overlap with the sketch bucket, so continue to the next bucket + } + if proportion > highestCountProportion { + highestCountProportion = proportion + highestCountIdx = outIndex + } + // OTel exponential histograms only support integer bucket counts, so rounding needs to be done here + roundedCount := uint64(proportion * float64(sketchCounts[i])) + if sketchKeys[i] < 0 { + negativeBuckets[outIndex] += roundedCount + } else { + positiveBuckets[outIndex] += roundedCount + } + currentAssignedCount += roundedCount + } + // Add the difference between the original sketch bucket's count and the total count that has been + // added to the matching OTel bucket(s) thus far to the bucket that had the highest proportion of + // overlap between the original sketch bucket and the corresponding exponential histogram buckets + if highestCountProportion > 0 { + additionalCount := targetBucketCount - currentAssignedCount + if sketchKeys[i] < 0 { + negativeBuckets[highestCountIdx] += additionalCount + } else { + positiveBuckets[highestCountIdx] += additionalCount + } + } + + } + + return negativeBuckets, positiveBuckets, zeroCount, nil +} + +// convertBucketLayout populates the count for positive or negative buckets in the resulting OTel +// exponential histogram structure. The bucket layout is dense and consists of an offset, which is the +// index of the first populated bucket, and a list of counts, which correspond to the counts at the offset +// bucket's index, and the counts of each bucket after. Unpopulated/empty buckets must be represented with +// a count of 0. After assigning bucket counts, it sets the offset for the bucket layout +func convertBucketLayout(inputBuckets map[int]uint64, outputBuckets pmetric.ExponentialHistogramDataPointBuckets) { + if len(inputBuckets) == 0 { + return + } + bucketIdxs := make([]int, 0, len(inputBuckets)) + for k := range inputBuckets { + bucketIdxs = append(bucketIdxs, k) + } + sort.Ints(bucketIdxs) + + bucketsSize := bucketIdxs[len(bucketIdxs)-1] - bucketIdxs[0] + 1 // find total number of buckets needed + outputBuckets.BucketCounts().EnsureCapacity(bucketsSize) + outputBuckets.BucketCounts().Append(make([]uint64, bucketsSize)...) + + offset := bucketIdxs[0] + outputBuckets.SetOffset(int32(offset)) + + for _, idx := range bucketIdxs { + delta := idx - offset + outputBuckets.BucketCounts().SetAt(delta, inputBuckets[idx]) + } +} + +// getSketchBounds calculates the lower and upper bounds of a sketch bucket based on the index of the bucket. +// This is based on sketch buckets placing values in bucket so that γ^k <= v < γ^(k+1) +// See https://github.com/DataDog/datadog-agent/blob/0ada7a97fed6727838a6f4d9c87123d2aafde735/pkg/quantile/config.go#L83 +// and https://github.com/DataDog/sketches-go/blob/8a1961cf57f80fbbe26e7283464fcc01ebf17d5c/ddsketch/ddsketch.go#L468 +func getSketchBounds(index int32) (float64, float64) { + if index < 0 { + index = -index + } + return sketchLowerBound(index), sketchLowerBound(index + 1) +} + +// sketchLowerBound calculates the lower bound of a sketch bucket based on the index of the bucket. +// It uses the index offset and multiplier (represented by (1 / math.Log(gamma))). The logic behind this +// is based on the DD agent using logarithmic mapping for definition DD agent sketches +// See: +// https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L54 +// https://github.com/DataDog/sketches-go/blob/8a1961cf57f80fbbe26e7283464fcc01ebf17d5c/ddsketch/mapping/logarithmic_mapping.go#L39 +func sketchLowerBound(index int32) float64 { + if index < 0 { + index = -index + } + return math.Exp((float64(index-agentSketchOffset) / (1 / math.Log(gamma)))) +} + +// getHistogramBounds returns the lower and upper boundaries of the histogram bucket that +// corresponds to the specified bucket index +func getHistogramBounds(histIndex int) (float64, float64) { + return histogramLowerBound(histIndex), histogramLowerBound(histIndex + 1) +} + +// This equation for finding the lower bound of the exponential histogram bucket +// Based on: https://github.com/open-telemetry/opentelemetry-go/blob/3a72c5ea94bf843beeaa044b0dda2ce4d627bb7b/sdk/metric/internal/aggregate/exponential_histogram.go#L122 +// See also: https://github.com/open-telemetry/opentelemetry-go/blob/3a72c5ea94bf843beeaa044b0dda2ce4d627bb7b/sdk/metric/internal/aggregate/exponential_histogram.go#L139 +func histogramLowerBound(histIndex int) float64 { + inverseFactor := math.Ldexp(math.Ln2, -scale) + return math.Exp(float64(histIndex) * inverseFactor) +} + +// sketchLowerBoundToHistogramIndex takes the lower boundary of a sketch bucket and computes the +// closest equivalent exponential histogram index that corresponds to an exponential histogram +// bucket that has a range covering that lower bound +// See: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function +func sketchLowerBoundToHistogramIndex(value float64) int { + if frac, exp := math.Frexp(value); frac == 0.5 { + return ((exp - 1) << scale) - 1 + } + scaleFactor := math.Ldexp(math.Log2E, scale) + + return int(math.Floor(math.Log(value) * scaleFactor)) +} diff --git a/receiver/datadogreceiver/internal/translator/sketches_test.go b/receiver/datadogreceiver/internal/translator/sketches_test.go new file mode 100644 index 000000000000..7603e80efb7f --- /dev/null +++ b/receiver/datadogreceiver/internal/translator/sketches_test.go @@ -0,0 +1,721 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator" + +import ( + "bytes" + "io" + "net/http" + "testing" + "time" + + "github.com/DataDog/agent-payload/v5/gogen" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func TestHandleSketchPayload(t *testing.T) { + tests := []struct { + name string + sketchPayload gogen.SketchPayload + expectedSketchesCount int + expectedDogsketchesCounts []int + }{ + { + name: "Test simple sketch payload with single sketch", + sketchPayload: gogen.SketchPayload{ + Sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 6, + Min: 1, + Max: 3, + Avg: 2.3333, + Sum: 14.0, + K: []int32{1338, 1383, 1409}, + N: []uint32{1, 2, 3}, + }, + }, + }, + }, + }, + expectedSketchesCount: 1, + expectedDogsketchesCounts: []int{1}, + }, + { + name: "Test simple sketch payload with multiple dogsketches", + sketchPayload: gogen.SketchPayload{ + Sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 6, + Min: 1, + Max: 3, + Avg: 2.3333, + Sum: 14.0, + K: []int32{1338, 1383, 1409}, + N: []uint32{1, 2, 3}, + }, + { + Ts: 500, + Cnt: 15, + Min: 4, + Max: 5, + Avg: 4.7333, + Sum: 71.0, + K: []int32{1427, 1442, 1454}, + N: []uint32{4, 5, 6}, + }, + }, + }, + }, + }, + expectedSketchesCount: 1, + expectedDogsketchesCounts: []int{2}, + }, + { + name: "Test sketch payload with multiple sketches", + sketchPayload: gogen.SketchPayload{ + Sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 6, + Min: 1, + Max: 3, + Avg: 2.3333, + Sum: 14.0, + K: []int32{1338, 1383, 1409}, + N: []uint32{1, 2, 3}, + }, + }, + }, + { + Metric: "Test2", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 6, + Min: 1, + Max: 3, + Avg: 2.3333, + Sum: 14.0, + K: []int32{1338, 1383, 1409}, + N: []uint32{1, 2, 3}, + }, + }, + }, + }, + }, + expectedSketchesCount: 2, + expectedDogsketchesCounts: []int{1, 1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pb, err := tt.sketchPayload.Marshal() + require.NoError(t, err) + + req, err := http.NewRequest(http.MethodPost, "/api/beta/sketches", io.NopCloser(bytes.NewReader(pb))) + require.NoError(t, err) + + mt := createMetricsTranslator() + metrics, err := mt.HandleSketchesPayload(req) + require.NoError(t, err) + require.Len(t, metrics, tt.expectedSketchesCount) + for i, metric := range metrics { + require.Len(t, metric.Dogsketches, tt.expectedDogsketchesCounts[i]) + } + }) + } +} + +func TestTranslateSketches(t *testing.T) { + tests := []struct { + name string + sketches []gogen.SketchPayload_Sketch + }{ + { + name: "Single sketch with only positive buckets and no zero bucket", + sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 75, 231, 97, 55, 101, 239, 66}, + }, + }, + }, + }, + }, + { + name: "Single sketch with only negative buckets and no zero bucket", + sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 941, + Min: -6.0, + Max: -1.0, + Avg: -3.0, + Sum: 2038.0, + K: []int32{-1464, -1454, -1442, -1427, -1409, -1383, -1338}, + N: []uint32{152, 231, 97, 55, 101, 239, 66}, + }, + }, + }, + }, + }, + { + name: "Single sketch with negative and positive buckets and no zero bucket", + sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 1952, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 1019.0, + K: []int32{-1464, -1454, -1442, -1427, -1409, -1383, -1338, 1338, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{152, 231, 97, 55, 101, 239, 66, 43, 99, 123, 62, 194, 251, 239}, + }, + }, + }, + }, + }, + { + name: "Single sketch with only positive buckets and zero bucket", + sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 954, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2049.0, + K: []int32{0, 1338, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 231, 97, 55, 101, 239, 66}, + }, + }, + }, + }, + }, + { + name: "Single sketch with only negative buckets and no zero bucket", + sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 941, + Min: -6.0, + Max: -1.0, + Avg: -3.0, + Sum: -2049, + K: []int32{-1464, -1454, -1442, -1427, -1409, -1383, -1338}, + N: []uint32{152, 231, 97, 55, 101, 239, 66}, + }, + }, + }, + }, + }, + { + name: "Single sketch with negative and positive buckets and zero bucket", + sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 1964, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 1589.0, + K: []int32{-1464, -1454, -1442, -1427, -1409, -1383, -1338, 0, 1338, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{152, 231, 97, 55, 101, 239, 66, 12, 43, 99, 123, 62, 194, 251, 239}, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := createMetricsTranslator() + result := mt.TranslateSketches(tt.sketches) + require.Equal(t, 1, result.MetricCount()) + require.Equal(t, 1, result.DataPointCount()) + metrics := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + require.Equal(t, 1, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + + metric := metrics.At(0) + require.Equal(t, pmetric.MetricTypeExponentialHistogram, metric.Type()) + + for _, sketch := range tt.sketches { + require.Equal(t, sketch.GetMetric(), metric.Name()) + for i, dogsketch := range sketch.Dogsketches { + m := metric.ExponentialHistogram().DataPoints().At(i) + require.Equal(t, pcommon.Timestamp(dogsketch.Ts*time.Second.Nanoseconds()), m.Timestamp()) + require.Equal(t, uint64(dogsketch.Cnt), m.Count()) + require.Equal(t, dogsketch.Sum, m.Sum()) + require.Equal(t, dogsketch.Min, m.Min()) + require.Equal(t, dogsketch.Max, m.Max()) + require.Equal(t, m.Count(), totalHistBucketCounts(m)) // Ensure that buckets contain same number of counts as total count + } + } + }) + } +} + +func TestHandleInvalidBuckets(t *testing.T) { + tests := []struct { + name string + sketches []gogen.SketchPayload_Sketch + }{ + { + name: "Sketch that contains invalid index is excluded", + sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"version:tag1"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 100, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 75, 231, 97, 55, 101, 239, 66}, + }, + { + Ts: 200, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 50000}, + N: []uint32{13, 152, 75, 231, 97}, + }, + { + Ts: 300, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 75, 231, 97, 55, 101, 239, 66}, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := createMetricsTranslator() + result := mt.TranslateSketches(tt.sketches) + require.Equal(t, 1, result.ResourceMetrics().Len()) + require.Equal(t, 1, result.MetricCount()) + require.Equal(t, 2, result.DataPointCount()) + + requireScope(t, result, pcommon.NewMap(), component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + require.Equal(t, 1, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, pmetric.MetricTypeExponentialHistogram, metric.At(0).Type()) + + // While the input was 3 sketches, the result should exclude the 2nd input due to an invalid bucket + require.Equal(t, 2, metric.At(0).ExponentialHistogram().DataPoints().Len()) + + var lastTimestamp pcommon.Timestamp + for i := 0; i < metric.At(0).ExponentialHistogram().DataPoints().Len(); i++ { + m := metric.At(0).ExponentialHistogram().DataPoints().At(i) + if i == 0 { + require.Equal(t, m.StartTimestamp(), pcommon.Timestamp(0)) + } else { + require.Equal(t, m.StartTimestamp(), lastTimestamp) + } + lastTimestamp = m.Timestamp() + } + }) + } +} + +func TestSketchTemporality(t *testing.T) { + tests := []struct { + name string + sketches []gogen.SketchPayload_Sketch + }{ + { + name: "Two metrics with multiple data points", + sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"version:tag1"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 100, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 75, 231, 97, 55, 101, 239, 66}, + }, + { + Ts: 200, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 75, 231, 97, 55, 101, 239, 66}, + }, + { + Ts: 300, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 75, 231, 97, 55, 101, 239, 66}, + }, + }, + }, + { + Metric: "Test2", + Host: "Host2", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 20, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 75, 231, 97, 55, 101, 239, 66}, + }, + { + Ts: 30, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 75, 231, 97, 55, 101, 239, 66}, + }, + { + Ts: 40, + Cnt: 1029, + Min: 1.0, + Max: 6.0, + Avg: 3.0, + Sum: 2038.0, + K: []int32{0, 1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{13, 152, 75, 231, 97, 55, 101, 239, 66}, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := createMetricsTranslator() + result := mt.TranslateSketches(tt.sketches) + require.Equal(t, 2, result.ResourceMetrics().Len()) + require.Equal(t, 2, result.MetricCount()) + require.Equal(t, 6, result.DataPointCount()) + + requireScope(t, result, pcommon.NewMap(), component.NewDefaultBuildInfo().Version) + + metric1 := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + require.Equal(t, 1, result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, pmetric.MetricTypeExponentialHistogram, metric1.At(0).Type()) + + metric2 := result.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics() + require.Equal(t, 1, result.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, pmetric.MetricTypeExponentialHistogram, metric2.At(0).Type()) + + var lastTimestamp pcommon.Timestamp + for i := 0; i < metric1.At(0).ExponentialHistogram().DataPoints().Len(); i++ { + m := metric1.At(0).ExponentialHistogram().DataPoints().At(i) + if i == 0 { + require.Equal(t, m.StartTimestamp(), pcommon.Timestamp(0)) + } else { + require.Equal(t, m.StartTimestamp(), lastTimestamp) + } + lastTimestamp = m.Timestamp() + } + for i := 0; i < metric2.At(0).ExponentialHistogram().DataPoints().Len(); i++ { + m := metric2.At(0).ExponentialHistogram().DataPoints().At(i) + if i == 0 { + require.Equal(t, m.StartTimestamp(), pcommon.Timestamp(0)) + } else { + require.Equal(t, m.StartTimestamp(), lastTimestamp) + } + lastTimestamp = m.Timestamp() + } + }) + } +} + +func TestConvertBucketLayout(t *testing.T) { + tests := []struct { + name string + inputBuckets map[int]uint64 + expectedOffset int32 + expectedBucketCountsLen int + expectedBucketCounts map[int]uint64 + }{ + { + name: "Empty input buckets", + inputBuckets: map[int]uint64{}, + expectedOffset: 0, + expectedBucketCountsLen: 0, + expectedBucketCounts: map[int]uint64{}, + }, + { + name: "Non-empty input buckets and no offset", + inputBuckets: map[int]uint64{5: 75, 64: 33, 83: 239, 0: 152, 32: 231, 50: 24, 51: 73, 63: 22, 74: 79, 75: 22, 90: 66}, + expectedOffset: 0, + expectedBucketCountsLen: 91, + expectedBucketCounts: map[int]uint64{0: 152, 5: 75, 32: 231, 50: 24, 51: 73, 63: 22, 64: 33, 74: 79, 75: 22, 83: 239, 90: 66}, + }, + { + name: "Non-empty input buckets with offset", + inputBuckets: map[int]uint64{5: 75, 64: 33, 83: 239, 32: 231, 50: 24, 51: 73, 63: 22, 74: 79, 75: 22, 90: 66}, + expectedOffset: 5, + expectedBucketCountsLen: 86, + expectedBucketCounts: map[int]uint64{0: 75, 27: 231, 45: 24, 46: 73, 58: 22, 59: 33, 69: 79, 70: 22, 78: 239, 85: 66}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + outputBuckets := pmetric.NewExponentialHistogramDataPointBuckets() + + convertBucketLayout(tt.inputBuckets, outputBuckets) + + require.Equal(t, tt.expectedOffset, outputBuckets.Offset()) + require.Equal(t, tt.expectedBucketCountsLen, outputBuckets.BucketCounts().Len()) + + for k, v := range outputBuckets.BucketCounts().AsRaw() { + require.Equal(t, tt.expectedBucketCounts[k], v) + } + }) + } +} + +func TestMapSketchBucketsToHistogramBuckets(t *testing.T) { + tests := []struct { + name string + sketchKeys []int32 + sketchCounts []uint32 + expectedNegativeBuckets map[int]uint64 + expectedPositiveBuckets map[int]uint64 + expectedZeroCount uint64 + }{ + { + name: "Empty sketch buckets", + sketchKeys: []int32{}, + sketchCounts: []uint32{}, + expectedNegativeBuckets: map[int]uint64{}, + expectedPositiveBuckets: map[int]uint64{}, + expectedZeroCount: 0, + }, + { + name: "Zero bucket only", + sketchKeys: []int32{0}, + sketchCounts: []uint32{100}, + expectedNegativeBuckets: map[int]uint64{}, + expectedPositiveBuckets: map[int]uint64{}, + expectedZeroCount: 100, + }, + { + name: "Single positive bucket covered by single exponential bucket", + sketchKeys: []int32{1338}, // Key-offset=0, bucket [1, 1.015625) + sketchCounts: []uint32{100}, + expectedNegativeBuckets: map[int]uint64{}, + expectedPositiveBuckets: map[int]uint64{0: 100}, + expectedZeroCount: 0, // At zero offset, bucket (1, 1.0219] + }, + { + name: "Single positive bucket covered by multiple exponential buckets", + sketchKeys: []int32{1339}, // Key-offset=1, bucket [1.015625, 1.031494140625) + sketchCounts: []uint32{100}, + expectedNegativeBuckets: map[int]uint64{}, + expectedPositiveBuckets: map[int]uint64{0: 39, 1: 61}, // At zero offset, buckets (1, 1.0219] and (1.0219, 1.044] + expectedZeroCount: 0, + }, + { + name: "Higher single positive bucket covered by multiple exponential buckets", + sketchKeys: []int32{1340}, // Key-offset=2, bucket [1.031494140625, 1.0476112365722656) + sketchCounts: []uint32{100}, + expectedNegativeBuckets: map[int]uint64{}, + expectedPositiveBuckets: map[int]uint64{1: 80, 2: 20}, // At zero offset, buckets (1.0219, 1.044] and (1.044, 1.067] + expectedZeroCount: 0, + }, + { + name: "Two positive buckets covered by multiple overlapping exponential buckets", + // The superposition of the two previous cases. + sketchKeys: []int32{1339, 1340}, + sketchCounts: []uint32{100, 100}, + expectedNegativeBuckets: map[int]uint64{}, + expectedPositiveBuckets: map[int]uint64{0: 39, 1: 141, 2: 20}, + expectedZeroCount: 0, + }, + { + name: "Single negative bucket covered by single exponential bucket", + sketchKeys: []int32{-1338}, // (-Key)-offset=0, bucket (-1.015625, -1] + sketchCounts: []uint32{100}, + expectedNegativeBuckets: map[int]uint64{0: 100}, // At zero offset, bucket (-1.0219, -1] + expectedPositiveBuckets: map[int]uint64{}, + expectedZeroCount: 0, + }, + { + name: "Lowest possible positive bucket", + sketchKeys: []int32{1}, // Key-offset=-1337, bucket [9.941854089121418e-10, 1.0097195559263958e-09) + sketchCounts: []uint32{100}, + expectedNegativeBuckets: map[int]uint64{}, + expectedPositiveBuckets: map[int]uint64{-957: 100}, // At zero offset, bucket (9.938519454345803e-10, 1.0156144692239443e-09] + expectedZeroCount: 0, + }, + { + name: "Only positive buckets and no zero bucket", + sketchKeys: []int32{1338, 1345, 1383, 1409, 1427, 1442, 1454, 1464}, + sketchCounts: []uint32{152, 75, 231, 97, 55, 101, 239, 66}, + expectedNegativeBuckets: map[int]uint64{}, + expectedPositiveBuckets: map[int]uint64{0: 152, 5: 75, 32: 231, 50: 24, 51: 73, 63: 22, 64: 33, 74: 79, 75: 22, 83: 239, 90: 66}, + expectedZeroCount: 0, + }, + { + name: "Only negative buckets and no zero bucket", + sketchKeys: []int32{-1464, -1454, -1442, -1427, -1409, -1383, -1338}, + sketchCounts: []uint32{152, 231, 97, 55, 101, 239, 66}, + expectedNegativeBuckets: map[int]uint64{0: 66, 32: 239, 50: 25, 51: 76, 63: 22, 64: 33, 74: 76, 75: 21, 83: 231, 90: 152}, + expectedPositiveBuckets: map[int]uint64{}, + expectedZeroCount: 0, + }, + { + name: "Negative and positive buckets and no zero bucket", + sketchKeys: []int32{-1464, -1454, -1442, -1427, -1409, -1383, -1338, 1338, 1383, 1409, 1427, 1442, 1454, 1464}, + sketchCounts: []uint32{152, 231, 97, 55, 101, 239, 66, 43, 99, 123, 62, 194, 251, 239}, + expectedNegativeBuckets: map[int]uint64{0: 66, 32: 239, 50: 25, 51: 76, 63: 22, 64: 33, 74: 76, 75: 21, 83: 231, 90: 152}, + expectedPositiveBuckets: map[int]uint64{0: 43, 32: 99, 50: 30, 51: 93, 63: 25, 64: 37, 74: 152, 75: 42, 83: 251, 90: 239}, + expectedZeroCount: 0, + }, + { + name: "Only positive buckets and zero bucket", + sketchKeys: []int32{0, 1338, 1383, 1409, 1427, 1442, 1454, 1464}, + sketchCounts: []uint32{13, 152, 231, 97, 55, 101, 239, 66}, + expectedNegativeBuckets: map[int]uint64{}, + expectedPositiveBuckets: map[int]uint64{0: 152, 32: 231, 50: 24, 51: 73, 63: 22, 64: 33, 74: 79, 75: 22, 83: 239, 90: 66}, + expectedZeroCount: 13, + }, + { + name: "Only negative buckets and zero bucket", + sketchKeys: []int32{-1464, -1454, -1442, -1427, -1409, -1383, -1338, 0}, + sketchCounts: []uint32{152, 231, 97, 55, 101, 239, 66, 13}, + expectedNegativeBuckets: map[int]uint64{0: 66, 32: 239, 50: 25, 51: 76, 63: 22, 64: 33, 74: 76, 75: 21, 83: 231, 90: 152}, + expectedPositiveBuckets: map[int]uint64{}, + expectedZeroCount: 13, + }, + { + name: "Negative and positive buckets and zero bucket", + sketchKeys: []int32{-1464, -1454, -1442, -1427, -1409, -1383, -1338, 0, 1338, 1383, 1409, 1427, 1442, 1454, 1464}, + sketchCounts: []uint32{152, 231, 97, 55, 101, 239, 66, 12, 43, 99, 123, 62, 194, 251, 239}, + expectedNegativeBuckets: map[int]uint64{0: 66, 32: 239, 50: 25, 51: 76, 63: 22, 64: 33, 74: 76, 75: 21, 83: 231, 90: 152}, + expectedPositiveBuckets: map[int]uint64{0: 43, 32: 99, 50: 30, 51: 93, 63: 25, 64: 37, 74: 152, 75: 42, 83: 251, 90: 239}, + expectedZeroCount: 12, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + negativeBuckets, positiveBuckets, zeroCount, err := mapSketchBucketsToHistogramBuckets(tt.sketchKeys, tt.sketchCounts) + require.NoError(t, err) + + require.Equal(t, tt.expectedNegativeBuckets, negativeBuckets) + require.Equal(t, tt.expectedPositiveBuckets, positiveBuckets) + require.Equal(t, tt.expectedZeroCount, zeroCount) + }) + } +} diff --git a/receiver/datadogreceiver/internal/translator/testutil.go b/receiver/datadogreceiver/internal/translator/testutil.go index 060e3030a07d..254a509b8178 100644 --- a/receiver/datadogreceiver/internal/translator/testutil.go +++ b/receiver/datadogreceiver/internal/translator/testutil.go @@ -68,3 +68,16 @@ func requireDp(t *testing.T, dp pmetric.NumberDataPoint, expectedAttrs pcommon.M require.Equal(t, expectedValue, dp.DoubleValue()) require.Equal(t, expectedAttrs, dp.Attributes()) } + +func totalHistBucketCounts(hist pmetric.ExponentialHistogramDataPoint) uint64 { + var totalCount uint64 + for i := 0; i < hist.Negative().BucketCounts().Len(); i++ { + totalCount += hist.Negative().BucketCounts().At(i) + } + + totalCount += hist.ZeroCount() + for i := 0; i < hist.Positive().BucketCounts().Len(); i++ { + totalCount += hist.Positive().BucketCounts().At(i) + } + return totalCount +} diff --git a/receiver/datadogreceiver/receiver.go b/receiver/datadogreceiver/receiver.go index f605d53c6125..bdfb6b9c53a8 100644 --- a/receiver/datadogreceiver/receiver.go +++ b/receiver/datadogreceiver/receiver.go @@ -11,6 +11,7 @@ import ( "io" "net/http" + "github.com/DataDog/agent-payload/v5/gogen" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" "github.com/tinylib/msgp/msgp" "go.opentelemetry.io/collector/component" @@ -364,9 +365,26 @@ func (ddr *datadogReceiver) handleSketches(w http.ResponseWriter, req *http.Requ ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err) }(&metricsCount) - err = fmt.Errorf("sketches endpoint not implemented") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - ddr.params.Logger.Warn("metrics consumer errored out", zap.Error(err)) + var ddSketches []gogen.SketchPayload_Sketch + ddSketches, err = ddr.metricsTranslator.HandleSketchesPayload(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + ddr.params.Logger.Error(err.Error()) + return + } + + metrics := ddr.metricsTranslator.TranslateSketches(ddSketches) + metricsCount = metrics.DataPointCount() + + err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err)) + return + } + + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte("OK")) } // handleIntake handles operational calls made by the agent to submit host tags and other metadata to the backend. diff --git a/receiver/datadogreceiver/receiver_test.go b/receiver/datadogreceiver/receiver_test.go index e352a82851f1..f983010c96a8 100644 --- a/receiver/datadogreceiver/receiver_test.go +++ b/receiver/datadogreceiver/receiver_test.go @@ -395,6 +395,93 @@ func TestDatadogMetricsV2_EndToEnd(t *testing.T) { assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(1).StartTimestamp()) } +func TestDatadogSketches_EndToEnd(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:0" // Using a randomly assigned address + sink := new(consumertest.MetricsSink) + + dd, err := newDataDogReceiver( + cfg, + receivertest.NewNopSettings(), + ) + require.NoError(t, err, "Must not error when creating receiver") + dd.(*datadogReceiver).nextMetricsConsumer = sink + + require.NoError(t, dd.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, dd.Shutdown(context.Background())) + }() + + sketchPayload := gogen.SketchPayload{ + Sketches: []gogen.SketchPayload_Sketch{ + { + Metric: "Test1", + Host: "Host1", + Tags: []string{"env:tag1", "version:tag2"}, + Distributions: []gogen.SketchPayload_Sketch_Distribution{}, + Dogsketches: []gogen.SketchPayload_Sketch_Dogsketch{ + { + Ts: 400, + Cnt: 13, + Min: -6.0, + Max: 6.0, + Avg: 1.0, + Sum: 11.0, + K: []int32{-1442, -1427, -1409, -1383, -1338, 0, 1338, 1383, 1409, 1427, 1442, 1454, 1464}, + N: []uint32{152, 124, 68, 231, 97, 55, 101, 239, 66, 43, 167, 209, 154}, + }, + }, + }, + }, + } + + pb, err := sketchPayload.Marshal() + assert.NoError(t, err) + + req, err := http.NewRequest( + http.MethodPost, + fmt.Sprintf("http://%s/api/beta/sketches", dd.(*datadogReceiver).address), + io.NopCloser(bytes.NewReader(pb)), + ) + require.NoError(t, err, "Must not error when creating request") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err, "Must not error performing request") + + body, err := io.ReadAll(resp.Body) + require.NoError(t, multierr.Combine(err, resp.Body.Close()), "Must not error when reading body") + require.Equal(t, "OK", string(body), "Expected response to be 'OK', got %s", string(body)) + require.Equal(t, http.StatusAccepted, resp.StatusCode) + + mds := sink.AllMetrics() + require.Len(t, mds, 1) + got := mds[0] + require.Equal(t, 1, got.ResourceMetrics().Len()) + metrics := got.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + assert.Equal(t, 1, metrics.Len()) + metric := metrics.At(0) + assert.Equal(t, pmetric.MetricTypeExponentialHistogram, metric.Type()) + assert.Equal(t, "Test1", metric.Name()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, metric.ExponentialHistogram().AggregationTemporality()) + assert.Equal(t, pcommon.Timestamp(400*1_000_000_000), metric.ExponentialHistogram().DataPoints().At(0).Timestamp()) + assert.Equal(t, uint64(13), metric.ExponentialHistogram().DataPoints().At(0).Count()) + assert.Equal(t, 11.0, metric.ExponentialHistogram().DataPoints().At(0).Sum()) + assert.Equal(t, -6.0, metric.ExponentialHistogram().DataPoints().At(0).Min()) + assert.Equal(t, 6.0, metric.ExponentialHistogram().DataPoints().At(0).Max()) + assert.Equal(t, int32(5), metric.ExponentialHistogram().DataPoints().At(0).Scale()) + assert.Equal(t, uint64(55), metric.ExponentialHistogram().DataPoints().At(0).ZeroCount()) + assert.Equal(t, 91, metric.ExponentialHistogram().DataPoints().At(0).Positive().BucketCounts().Len()) + expectedPositiveInputBuckets := map[int]uint64{64: 26, 74: 131, 75: 36, 0: 101, 32: 239, 50: 16, 51: 50, 63: 17, 83: 209, 90: 154} + for k, v := range metric.ExponentialHistogram().DataPoints().At(0).Positive().BucketCounts().AsRaw() { + assert.Equal(t, expectedPositiveInputBuckets[k], v) + } + assert.Equal(t, 76, metric.ExponentialHistogram().DataPoints().At(0).Negative().BucketCounts().Len()) + expectedNegativeInputBuckets := map[int]uint64{74: 119, 75: 33, 63: 51, 64: 73, 50: 17, 51: 51, 32: 231, 0: 97} + for k, v := range metric.ExponentialHistogram().DataPoints().At(0).Negative().BucketCounts().AsRaw() { + assert.Equal(t, expectedNegativeInputBuckets[k], v) + } +} + func TestStats_EndToEnd(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Endpoint = "localhost:0" // Using a randomly assigned address