Skip to content

Commit

Permalink
Revert "[exporter/awsemfexporter]Split EMF log with larger than 100 b…
Browse files Browse the repository at this point in the history
…uckets." (#36763)

Reverts #36336


leads to test failures, see
#36727
  • Loading branch information
songy23 authored Dec 10, 2024
1 parent 6c82b0c commit ffd031a
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 871 deletions.
27 changes: 0 additions & 27 deletions .chloggen/split-emf-log-when-buckets-larger-than-100.yaml

This file was deleted.

221 changes: 42 additions & 179 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,33 +109,6 @@ type summaryMetricEntry struct {
count uint64
}

// dataPointSplit is a structure used to manage segments of data points split from a histogram.
// It is not safe for concurrent use.
type dataPointSplit struct {
cWMetricHistogram *cWMetricHistogram
length int
capacity int
}

func (split *dataPointSplit) isFull() bool {
return split.length >= split.capacity
}

func (split *dataPointSplit) setMax(maxVal float64) {
split.cWMetricHistogram.Max = maxVal
}

func (split *dataPointSplit) setMin(minVal float64) {
split.cWMetricHistogram.Min = minVal
}

func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
}

// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
Expand Down Expand Up @@ -220,195 +193,85 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
}

// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
// As CloudWatch EMF logs allows in maximum of 100 target members, the exponential histogram metric are split into multiple data points as needed,
// each containing a maximum of 100 buckets, to comply with CloudWatch EMF log constraints.
// Note that the number of values and counts in each split may not be less than splitThreshold as we are only adding non-zero bucket counts.
//
// For each split data point:
// - Min and Max values are recalculated based on the bucket boundary within that specific split.
// - Sum is only assigned to the first split to ensure the total sum of the datapoints after aggregation is correct.
// - Count is accumulated based on the bucket counts within each split.
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

const splitThreshold = 100
currentBucketIndex := 0
currentPositiveIndex := metric.Positive().BucketCounts().Len() - 1
currentZeroIndex := 0
currentNegativeIndex := 0
var datapoints []dataPoint
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
if metric.ZeroCount() > 0 {
totalBucketLen++
}

for currentBucketIndex < totalBucketLen {
// Create a new dataPointSplit with a capacity of up to splitThreshold buckets
capacity := min(splitThreshold, totalBucketLen-currentBucketIndex)

sum := 0.0
// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
if currentBucketIndex == 0 {
sum = metric.Sum()
}

split := dataPointSplit{
cWMetricHistogram: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Max: metric.Max(),
Min: metric.Min(),
Count: 0,
Sum: sum,
},
length: 0,
capacity: capacity,
}

// Set collect values from positive buckets and save into split.
currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex)
// Set collect values from zero buckets and save into split.
currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex)
// Set collect values from negative buckets and save into split.
currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex)

if split.length > 0 {
// Add the current split to the datapoints list
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: split.cWMetricHistogram,
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
})
}
}

if len(datapoints) == 0 {
return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
}

// Override the min and max values of the first and last splits with the raw data of the metric.
datapoints[0].value.(*cWMetricHistogram).Max = metric.Max()
datapoints[len(datapoints)-1].value.(*cWMetricHistogram).Min = metric.Min()

return datapoints, true
}

func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) {
if split.isFull() || currentPositiveIndex < 0 {
return currentBucketIndex, currentPositiveIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
arrayValues := []float64{}
arrayCounts := []float64{}
var bucketBegin float64
var bucketEnd float64

// Set mid-point of positive buckets in values/counts array.
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin := 0.0
bucketEnd := 0.0

for !split.isFull() && currentPositiveIndex >= 0 {
index := currentPositiveIndex + int(positiveOffset)
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
bucketBegin = 0
bucketEnd = 0
for i := 0; i < positiveBucketCounts.Len(); i++ {
index := i + int(positiveOffset)
if bucketBegin == 0 {
bucketBegin = math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
bucketBegin = bucketEnd
}
bucketBegin = math.Pow(base, float64(index))
bucketEnd = math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(currentPositiveIndex)
count := positiveBucketCounts.At(i)
if count > 0 {
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.isFull() {
split.setMin(bucketBegin)
}
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
}
currentBucketIndex++
currentPositiveIndex--
}

return currentBucketIndex, currentPositiveIndex
}

func collectDatapointsWithZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) {
if metric.ZeroCount() > 0 && !split.isFull() && currentZeroIndex == 0 {
split.appendMetricData(0, metric.ZeroCount())

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(0)
}
if split.isFull() {
split.setMin(0)
}
currentZeroIndex++
currentBucketIndex++
// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 {
arrayValues = append(arrayValues, 0)
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
}

return currentBucketIndex, currentZeroIndex
}

func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) {
// Set mid-point of negative buckets in values/counts array.
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.
if split.isFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() {
return currentBucketIndex, currentNegativeIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
negativeBuckets := metric.Negative()
negativeOffset := negativeBuckets.Offset()
negativeBucketCounts := negativeBuckets.BucketCounts()
bucketBegin := 0.0
bucketEnd := 0.0

for !split.isFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
index := currentNegativeIndex + int(negativeOffset)
bucketBegin = 0
bucketEnd = 0
for i := 0; i < negativeBucketCounts.Len(); i++ {
index := i + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
}
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(currentNegativeIndex)
count := negativeBucketCounts.At(i)
if count > 0 {
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.isFull() {
split.setMin(bucketBegin)
}
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
}
currentBucketIndex++
currentNegativeIndex++
}

return currentBucketIndex, currentNegativeIndex
return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: arrayValues,
Counts: arrayCounts,
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
}

func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
Expand Down
Loading

0 comments on commit ffd031a

Please sign in to comment.