Skip to content

Commit

Permalink
[receiver/datadog] Add support for sketches (#34662)
Browse files Browse the repository at this point in the history
**Description:**
This PR adds support for translating Datadog sketches into Exponential
Histograms.

Follow up of #33631, #33957 and #34180.

The full version of the code can be found in the
`cedwards/datadog-metrics-receiver-full` branch, or in Grafana Alloy:
https://github.com/grafana/alloy/tree/main/internal/etc/datadogreceiver

**Link to tracking Issue:** 
#18278 

**Testing:** 
Unit tests, as well as an end-to-end test, have been added.

---------

Signed-off-by: Federico Torres <[email protected]>
Signed-off-by: György Krajcsovits <[email protected]>
Co-authored-by: Federico Torres <[email protected]>
Co-authored-by: György Krajcsovits <[email protected]>
  • Loading branch information
3 people authored Sep 9, 2024
1 parent 45ffc03 commit 5e26464
Show file tree
Hide file tree
Showing 6 changed files with 1,163 additions and 3 deletions.
27 changes: 27 additions & 0 deletions .chloggen/datadogreceiver-sketches.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datadogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add support for sketch metrics in Datadog receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [18278]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
294 changes: 294 additions & 0 deletions receiver/datadogreceiver/internal/translator/sketches.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"fmt"
"io"
"math"
"net/http"
"sort"
"time"

"github.com/DataDog/agent-payload/v5/gogen"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

const (
// The relativeAccuracy (also called epsilon or eps) comes from DDSketch's logarithmic mapping, which is used for sketches
// in the Datadog agent. The Datadog agent uses the default value from opentelemetry-go-mapping configuration
// See:
// https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L15
relativeAccuracy = 1.0 / 128

// The gamma value comes from the default values of the epsilon/relative accuracy from opentelemetry-go-mapping. This value is used for
// finding the lower boundary of the bucket at a specific index
// See:
// https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L138
gamma = 1 + 2*relativeAccuracy

// Since the default bucket factor for Sketches (gamma value) is 1.015625, this corresponds to a scale between 5 (2^2^-5=1.0219)
// and 6 (2^2^-6=1.01088928605). However, the lower resolution of 5 will produce larger buckets which allows for easier mapping
scale = 5

// The agentSketchOffset value comes from the following calculation:
// min = 1e-9
// emin = math.Floor((math.Log(min)/math.Log1p(2*relativeAccuracy))
// offset = -emin + 1
// The resulting value is 1338.
// See: https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L154
// (Note: in Datadog's code, it is referred to as 'bias')
agentSketchOffset int32 = 1338

// The max limit for the index of a sketch bucket
// See https://github.com/DataDog/opentelemetry-mapping-go/blob/00c3f838161a00de395d7d0ed44d967ac71e43b9/pkg/quantile/ddsketch.go#L21
// and https://github.com/DataDog/opentelemetry-mapping-go/blob/00c3f838161a00de395d7d0ed44d967ac71e43b9/pkg/quantile/ddsketch.go#L127
maxIndex = math.MaxInt16
)

// Unmarshal the sketch payload, which contains the underlying Dogsketch structure used for the translation
func (mt *MetricsTranslator) HandleSketchesPayload(req *http.Request) (sp []gogen.SketchPayload_Sketch, err error) {
buf := GetBuffer()
defer PutBuffer(buf)
if _, err := io.Copy(buf, req.Body); err != nil {
return sp, err
}

pl := new(gogen.SketchPayload)
if err := pl.Unmarshal(buf.Bytes()); err != nil {
return sp, err
}

return pl.GetSketches(), nil
}

func (mt *MetricsTranslator) TranslateSketches(sketches []gogen.SketchPayload_Sketch) pmetric.Metrics {
bt := newBatcher()
bt.Metrics = pmetric.NewMetrics()

for _, sketch := range sketches {
dimensions := parseSeriesProperties(sketch.Metric, "sketch", sketch.Tags, sketch.Host, mt.buildInfo.Version, mt.stringPool)
metric, metricID := bt.Lookup(dimensions)
metric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
dps := metric.ExponentialHistogram().DataPoints()

dps.EnsureCapacity(len(sketch.Dogsketches))

// The dogsketches field of the payload contains the sketch data
for i := range sketch.Dogsketches {
dp := dps.AppendEmpty()

err := sketchToDatapoint(sketch.Dogsketches[i], dp, dimensions.dpAttrs)
if err != nil {
// If a sketch is invalid, remove this datapoint
metric.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
if dp.Positive().BucketCounts().Len() == 0 && dp.Negative().BucketCounts().Len() == 0 {
return true
}
return false
})
continue
}
stream := identity.OfStream(metricID, dp)
if ts, ok := mt.streamHasTimestamp(stream); ok {
dp.SetStartTimestamp(ts)
}
mt.updateLastTsForStream(stream, dp.Timestamp())
}
}

return bt.Metrics
}

func sketchToDatapoint(sketch gogen.SketchPayload_Sketch_Dogsketch, dp pmetric.ExponentialHistogramDataPoint, attributes pcommon.Map) error {
dp.SetTimestamp(pcommon.Timestamp(sketch.Ts * time.Second.Nanoseconds())) // OTel uses nanoseconds, while Datadog uses seconds

dp.SetCount(uint64(sketch.Cnt))
dp.SetSum(sketch.Sum)
dp.SetMin(sketch.Min)
dp.SetMax(sketch.Max)
dp.SetScale(scale)
dp.SetZeroThreshold(math.Exp(float64(1-agentSketchOffset) / (1 / math.Log(gamma)))) // See https://github.com/DataDog/sketches-go/blob/7546f8f95179bb41d334d35faa281bfe97812a86/ddsketch/mapping/logarithmic_mapping.go#L48

attributes.CopyTo(dp.Attributes())

negativeBuckets, positiveBuckets, zeroCount, err := mapSketchBucketsToHistogramBuckets(sketch.K, sketch.N)
if err != nil {
return err
}

dp.SetZeroCount(zeroCount)

convertBucketLayout(positiveBuckets, dp.Positive())
convertBucketLayout(negativeBuckets, dp.Negative())

return nil
}

// mapSketchBucketsToHistogramBuckets attempts to map the counts in each Sketch bucket to the closest equivalent Exponential Histogram
// bucket(s). It works by first calculating an Exponential Histogram key that corresponds most closely with the Sketch key (using the lower
// bound of the sketch bucket the key corresponds to), calculates differences in the range of the Sketch bucket and exponential histogram bucket,
// and distributes the count to the corresponding bucket, and the bucket(s) after it, based on the proportion of overlap between the
// exponential histogram buckets and the Sketch bucket. Note that the Sketch buckets are not separated into positive and negative buckets, but exponential
// histograms store positive and negative buckets separately. Negative buckets in exponential histograms are mapped in the same way as positive buckets.
// Note that negative indices in exponential histograms do not necessarily correspond to negative values; they correspond with values between 0 and 1,
// on either the negative or positive side
func mapSketchBucketsToHistogramBuckets(sketchKeys []int32, sketchCounts []uint32) (map[int]uint64, map[int]uint64, uint64, error) {
var zeroCount uint64

var positiveBuckets = make(map[int]uint64)
var negativeBuckets = make(map[int]uint64)

// The data format for the sketch received from the sketch payload does not have separate positive and negative buckets,
// and instead just uses a single list of sketch keys that are in order by increasing bucket index, starting with negative indices,
// which correspond to negative buckets
for i := range sketchKeys {
if sketchKeys[i] == 0 { // A sketch key of 0 corresponds to the zero bucket
zeroCount += uint64(sketchCounts[i])
continue
}
if sketchKeys[i] >= maxIndex {
// This should not happen, as sketches that contain bucket(s) with an index higher than the max
// limit should have already been discarded. However, if there happens to be an index > maxIndex,
// it can cause an infinite loop within the below inner for loop on some operating systems. Therefore,
// throw an error for sketches that have an index above the max limit
return nil, nil, 0, fmt.Errorf("Sketch contains bucket index %d which exceeds maximum supported index value %d", sketchKeys[i], maxIndex)
}

// The approach here is to use the Datadog sketch index's lower bucket boundary to find the
// OTel exponential histogram bucket that with the closest range to the sketch bucket. Then,
// the buckets before and after that bucket are also checked for overlap with the sketch bucket.
// A count proportional to the intersection of the sketch bucket with the OTel bucket(s) is then
// added to the OTel bucket(s). After looping through all possible buckets that are within the Sketch
// bucket range, the bucket with the highest proportion of overlap is given the remaining count
sketchLowerBound, sketchUpperBound := getSketchBounds(sketchKeys[i])
sketchBucketSize := sketchUpperBound - sketchLowerBound
histogramKey := sketchLowerBoundToHistogramIndex(sketchLowerBound)
highestCountProportion := 0.0
highestCountIdx := 0
targetBucketCount := uint64(sketchCounts[i])
var currentAssignedCount uint64

//TODO: look into better algorithms for applying fractional counts
for outIndex := histogramKey; histogramLowerBound(outIndex) < sketchUpperBound; outIndex++ {
histogramLowerBound, histogramUpperBound := getHistogramBounds(outIndex)
lowerIntersection := math.Max(histogramLowerBound, sketchLowerBound)
higherIntersection := math.Min(histogramUpperBound, sketchUpperBound)

intersectionSize := higherIntersection - lowerIntersection
proportion := intersectionSize / sketchBucketSize
if proportion <= 0 {
continue // In this case, the bucket does not overlap with the sketch bucket, so continue to the next bucket
}
if proportion > highestCountProportion {
highestCountProportion = proportion
highestCountIdx = outIndex
}
// OTel exponential histograms only support integer bucket counts, so rounding needs to be done here
roundedCount := uint64(proportion * float64(sketchCounts[i]))
if sketchKeys[i] < 0 {
negativeBuckets[outIndex] += roundedCount
} else {
positiveBuckets[outIndex] += roundedCount
}
currentAssignedCount += roundedCount
}
// Add the difference between the original sketch bucket's count and the total count that has been
// added to the matching OTel bucket(s) thus far to the bucket that had the highest proportion of
// overlap between the original sketch bucket and the corresponding exponential histogram buckets
if highestCountProportion > 0 {
additionalCount := targetBucketCount - currentAssignedCount
if sketchKeys[i] < 0 {
negativeBuckets[highestCountIdx] += additionalCount
} else {
positiveBuckets[highestCountIdx] += additionalCount
}
}

}

return negativeBuckets, positiveBuckets, zeroCount, nil
}

// convertBucketLayout populates the count for positive or negative buckets in the resulting OTel
// exponential histogram structure. The bucket layout is dense and consists of an offset, which is the
// index of the first populated bucket, and a list of counts, which correspond to the counts at the offset
// bucket's index, and the counts of each bucket after. Unpopulated/empty buckets must be represented with
// a count of 0. After assigning bucket counts, it sets the offset for the bucket layout
func convertBucketLayout(inputBuckets map[int]uint64, outputBuckets pmetric.ExponentialHistogramDataPointBuckets) {
if len(inputBuckets) == 0 {
return
}
bucketIdxs := make([]int, 0, len(inputBuckets))
for k := range inputBuckets {
bucketIdxs = append(bucketIdxs, k)
}
sort.Ints(bucketIdxs)

bucketsSize := bucketIdxs[len(bucketIdxs)-1] - bucketIdxs[0] + 1 // find total number of buckets needed
outputBuckets.BucketCounts().EnsureCapacity(bucketsSize)
outputBuckets.BucketCounts().Append(make([]uint64, bucketsSize)...)

offset := bucketIdxs[0]
outputBuckets.SetOffset(int32(offset))

for _, idx := range bucketIdxs {
delta := idx - offset
outputBuckets.BucketCounts().SetAt(delta, inputBuckets[idx])
}
}

// getSketchBounds calculates the lower and upper bounds of a sketch bucket based on the index of the bucket.
// This is based on sketch buckets placing values in bucket so that γ^k <= v < γ^(k+1)
// See https://github.com/DataDog/datadog-agent/blob/0ada7a97fed6727838a6f4d9c87123d2aafde735/pkg/quantile/config.go#L83
// and https://github.com/DataDog/sketches-go/blob/8a1961cf57f80fbbe26e7283464fcc01ebf17d5c/ddsketch/ddsketch.go#L468
func getSketchBounds(index int32) (float64, float64) {
if index < 0 {
index = -index
}
return sketchLowerBound(index), sketchLowerBound(index + 1)
}

// sketchLowerBound calculates the lower bound of a sketch bucket based on the index of the bucket.
// It uses the index offset and multiplier (represented by (1 / math.Log(gamma))). The logic behind this
// is based on the DD agent using logarithmic mapping for definition DD agent sketches
// See:
// https://github.com/DataDog/opentelemetry-mapping-go/blob/4a6d530273741c84fe2d8f76c55c514cd5eb7488/pkg/quantile/config.go#L54
// https://github.com/DataDog/sketches-go/blob/8a1961cf57f80fbbe26e7283464fcc01ebf17d5c/ddsketch/mapping/logarithmic_mapping.go#L39
func sketchLowerBound(index int32) float64 {
if index < 0 {
index = -index
}
return math.Exp((float64(index-agentSketchOffset) / (1 / math.Log(gamma))))
}

// getHistogramBounds returns the lower and upper boundaries of the histogram bucket that
// corresponds to the specified bucket index
func getHistogramBounds(histIndex int) (float64, float64) {
return histogramLowerBound(histIndex), histogramLowerBound(histIndex + 1)
}

// This equation for finding the lower bound of the exponential histogram bucket
// Based on: https://github.com/open-telemetry/opentelemetry-go/blob/3a72c5ea94bf843beeaa044b0dda2ce4d627bb7b/sdk/metric/internal/aggregate/exponential_histogram.go#L122
// See also: https://github.com/open-telemetry/opentelemetry-go/blob/3a72c5ea94bf843beeaa044b0dda2ce4d627bb7b/sdk/metric/internal/aggregate/exponential_histogram.go#L139
func histogramLowerBound(histIndex int) float64 {
inverseFactor := math.Ldexp(math.Ln2, -scale)
return math.Exp(float64(histIndex) * inverseFactor)
}

// sketchLowerBoundToHistogramIndex takes the lower boundary of a sketch bucket and computes the
// closest equivalent exponential histogram index that corresponds to an exponential histogram
// bucket that has a range covering that lower bound
// See: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function
func sketchLowerBoundToHistogramIndex(value float64) int {
if frac, exp := math.Frexp(value); frac == 0.5 {
return ((exp - 1) << scale) - 1
}
scaleFactor := math.Ldexp(math.Log2E, scale)

return int(math.Floor(math.Log(value) * scaleFactor))
}
Loading

0 comments on commit 5e26464

Please sign in to comment.