Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delta to cumulative prometheus #9919

Merged

Conversation

locmai
Copy link
Contributor

@locmai locmai commented May 10, 2022

Description:

An attempt to fix issue #4153 by adding the delta-to-cumulative code for the prometheus exporter.

Converting from delta points to cumulative point is inherently a stateful operation. To successfully translate, we need all incoming delta points to reach one destination which can keep the current counter state.

So for the prometheus exporter and the accumulateSum, the stateful operations are registered as maps in the lastValueAccumulator and the destination for the delta points to come is the accumulateSum function as well.

I handle the first drop case with unspecified aggregations and drop all non-monotonic delta aggregation next. So there will be no weird adding non-monotonic to the cumulative sum. Follow the step-by-step:

Upon receiving the first Delta point for a given counter we set up the following:

  • A new counter which stores the cumulative sum, set to the initial counter.
  • A start time that aligns with the start time of the first point.
  • A "last seen" time that aligns with the time of the first point.

So this has already been done with the line where we load the registeredMetrics, if none existed, then we created a new one exactly like defined.

v, ok := a.registeredMetrics.Load(signature)
if !ok {
m := createMetric(metric)
m.Sum().SetIsMonotonic(metric.Sum().IsMonotonic())
m.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative)
ip.CopyTo(m.Sum().DataPoints().AppendEmpty())
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, instrumentationLibrary: il, updated: now})
n++
continue
}

The next 3 conditions are for handling the follow-up data points:

If the next point aligns with the expected next-time window (see detecting delta restarts):

  • Update the "last seen" time to align with the time of the current point.
  • Add the current value to the cumulative counter
  • Output a new cumulative point with the original start time and current last seen time and count.

The lines I added did these things: update the last seen time (~ Timestamp) with the new value added.

// Delta-to-Cumulative
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta {
ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
ip.SetIntVal(ip.IntVal() + mv.value.Sum().DataPoints().At(0).IntVal())
}

if the current point precedes the start time, then drop this point.

This has already been done by the line which has if ip.Timestamp().AsTime().Before(…) , then it’s gonna drop the point.

if ip.Timestamp().AsTime().Before(mv.value.Sum().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
continue
}

if the next point does NOT align with the expected next-time window, then reset the counter following the same steps performed as if the current point was the first point seen.

This last one is tricky because I couldn’t fine any definition of the “expected next-time windows”. As for the prometheus exporter, we have the expirationTime for the metrics that would automatically delete them from the registeredMetrics map, I’m hoping that could be a way to reset the cumulative sum back to the new start time. Then let Prometheus scrape handle the rest.

Testing:

I have a simple config.yaml to set the metrics pipeline up:

receivers:
  statsd:
    endpoint: "0.0.0.0:8127"
    aggregation_interval: 5s
    enable_metric_type: true
    is_monotonic_counter: true
    timer_histogram_mapping:
      - statsd_type: "histogram"
        observer_type: "summary"
      - statsd_type: "timing"
        observer_type: "summary"
processors:
  batch:
exporters:
  prometheus:
    endpoint: "0.0.0.0:9090"
    metric_expiration: 180m
  file:
    path: ./metrics.json
service:
  pipelines:
    metrics:
      receivers: [statsd]
      processors: [batch]
      exporters: [prometheus]
    metrics/file:
      receivers: [statsd]
      processors: [batch]
      exporters: [file]

Then I built from source into the binary and run it with the config.yaml from bin directory:

make otelcontribcol
otelcontribcol --config config.yaml

Then test with a few nc (to the statsd receiver) to see if they could accumulate the sum correctly:

echo "test.metric:10|c|#myKey:myVal" | nc -w 1 -u localhost 8127
echo "test.metric:10|c|#myKey:myVal" | nc -w 1 -u localhost 8127
echo "test.metric:20|c|#myKey:myVal" | nc -w 1 -u localhost 8127

Also tested with different type of statsd, gauge:

echo "test.gauge:20|g|#myKey:myVal" | nc -w 1 -u localhost 8127
echo "test.gauge:10|g|#myKey:myVal" | nc -w 1 -u localhost 8127
echo "test.gauge:30|g|#myKey:myVal" | nc -w 1 -u localhost 8127

Documentation: Updated CHANGELOG.md

Link to tracking Issue: #4153

Some discussion in the previous PR: #7156

@locmai locmai requested a review from a team May 10, 2022 17:50
@locmai locmai requested a review from Aneurysm9 as a code owner May 10, 2022 17:50
Signed-off-by: Loc Mai <[email protected]>
@locmai locmai force-pushed the feat/delta-to-cumulative-prometheus branch from f89d604 to e5a80db Compare May 10, 2022 18:12
locmai added 2 commits May 11, 2022 01:24
Signed-off-by: Loc Mai <[email protected]>
Signed-off-by: Loc Mai <[email protected]>
// Delta-to-Cumulative
if doubleSum.AggregationTemporality() == pmetric.MetricAggregationTemporalityDelta {
ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
ip.SetIntVal(ip.IntVal() + mv.value.Sum().DataPoints().At(0).IntVal())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this line need to support float64-valued counters separately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the Prometheus counter is float64-based, so I think we should do that.

Should it be:

ip.SetIntVal(ip.IntVal() + mv.value.Sum().DataPoints().At(0).IntVal())
ip.SetDoubleVal(ip.DoubleVal() + mv.value.Sum().DataPoints().At(0).DoubleVal())

Or we must parse the IntVal to the DoubleVal type then add them up?

Or just do a simple if/else to check the type of the current delta value/last cumulative value was Int/Double?

Regarding to this, let me find if any other receiver produced the delta counter as StatsD receiver will always parse it's counters to Int.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- what you have is correct for statsd to PRW, but I was thinking of other OTLP senders (e.g., an SDK) configured for delta temporality that might use floating point. There is a pdata.MetricValueType that indicates what the incoming point has. I would say that since PRW exports floating points always, possibly the best solution is to convert points from integer (if present) to double somewhere above the accumulator, so that the stored point is always a floating point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger. Working on this :loading:

exporter/prometheusexporter/accumulator.go Show resolved Hide resolved
exporter/prometheusexporter/accumulator.go Outdated Show resolved Hide resolved
@locmai locmai requested a review from dmitryax as a code owner May 12, 2022 09:43
Comment on lines +215 to +219
switch ip.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
ip.SetIntVal(ip.IntVal() + mv.value.Sum().DataPoints().At(0).IntVal())
case pmetric.NumberDataPointValueTypeDouble:
ip.SetDoubleVal(ip.DoubleVal() + mv.value.Sum().DataPoints().At(0).DoubleVal())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final value will be converted into float64 at the convertSum():

switch ip.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
value = float64(ip.IntVal())
case pmetric.NumberDataPointValueTypeDouble:
value = ip.DoubleVal()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a client were to mix number types, something irregular happens here, e.g., if the stored point has a floating point value and the new point has an integer value. I'm OK ignoring this case, since there are already caveats about what we're doing here. Instead of changing the code for corner cases, I recommend documenting what this will do.

As it stands, your change means that a Prometheus exporter can aggregate a single stream of delta temporality counter data into a single cumulative metric. This will be the case when there is one statsd receiver. If a single OTLP SDK exports delta temporality to this OTC, a single delta temporality counter metric will be correctly aggregated here.

However, if multiple statsd receivers or OTel SDKs generate the same stream using delta temporality, this code will not be able to correctly aggregate; the same is true if one stream contains mixed number types, but that hardly seems important given this other limitation.

To overcome the "Single stream" limitation, the exporter can either:

  1. Blindly apply all deltas, regardless of timing. As long as all the producers behave correctly, there is little opportunity for incorrectness except due to replayed data.
  2. Maintain a map of start-times already applied, possibly with resource information. If the same resource updates a cumulative metric repeatedly, that's when the start_time==end_time test adds correctness.

I think that this change is useful even with the caveat that it only supports one stream, for now. OTOH, blindly applying all deltas isn't very wrong and is very simple. What do you think?

Copy link
Contributor Author

@locmai locmai May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the specification of the single-writer principle:

Multiple writers for a metric stream is considered an error state, or misbehaving system. Receivers SHOULD presume a single writer was intended and eliminate overlap / deduplicate.

If I understood that correctly, I would consider that case is an error in the configuration and it should be the responsible of the receivers to handle that probably.

In this case, I would prefer to assume that all deltas comes to the exporter would be from a single stream and with correct timing.

Instead of blindly applying all the deltas, we could handle it:

OpenTelemetry collectors SHOULD export telemetry when they observe overlapping points in data streams, so that the user can monitor for erroneous configurations.

So if any data points fall into the overlapped case, we report it?

Copy link
Contributor

@jmacd jmacd May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your reasoning.
I was under the impression that the exporter at this point does not keep the Resource to distinguish the sender of the stream; it may be that two senders with different resources are producing the same metric and attributes--that's the case I was thinking of. In any case, you've done a good thing here and I don't want to block it, let's document what it does and move on!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice nice nice! thank you for helping me out with this one!

Hi @Aneurysm9 , could you also take a look? This is for the very old one that we have discussed before to fix the dropping metrics issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that documentation of the expected behavior here would be helpful. Also some tests that exercise this capability.

As for the case of two producers of the same metric with different resources, we do have the resource attributes available at this point but don't seem to include them in the timeseries signature. Would doing that remove the concern about improper accumulation? What knock-on effects might that have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By documenting the expected behavior, I believe I should make another PR to update the Prometheus data model specification here for this change?

With the resource attributes regardless the single write principle, we could identify the 2 producers and therefore could handle the accumulation probably for overlapping case or when the nextDataPoint.startTimestamp > lastDataPoint.Timestamp. I see no side-effect yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the spec update here: open-telemetry/opentelemetry-specification#2570

I'll start working the tests this week soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi folks @jmacd @Aneurysm9 , I just updated the specification (merged open-telemetry/opentelemetry-specification#2570) and the tests for this PR.

Bumped the coverage from 97.7% of statements to 99.3% of statements.

The original TestAccumulateDeltaAggregation simply test the cases where metrics got dropped for both detal aggregation for Sum and Histogram (since they are non-monotonic by default) so I moved some of them to a new one TestAccumulateDroppedMetrics alongside with the MetricAggregationTemporalityUnspecified cases.

@jmacd
Copy link
Contributor

jmacd commented Jun 2, 2022

I'm still enthusiastic about this PR. I would like to see the same functional change in the prometheusremotewriteexporter.

@leocavalcante
Copy link

Is this working already?

I've the following at 0.82.0:

receivers:
  statsd:
    endpoint: 0.0.0.0:8125
    aggregation_interval: 5s
    is_monotonic_counter: true

exporters:
  logging:
    verbosity: detailed
  prometheus:
    endpoint: 0.0.0.0:9090

service:
  telemetry:
    logs:
      level: "debug"
  pipelines:
    metrics:
      receivers: [statsd]
      exporters: [logging, prometheus]

When I send 3 delta counters do Statsd:

foo:1|c
foo:1|c
foo:1|c

I expect to see it accumulated at /metrics but I see 1 instead of 3:

# HELP foo 
# TYPE foo counter
foo 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
comp:prometheus Prometheus related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants