Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/statsd] Add self-telemetry #31839

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions receiver/statsdreceiver/internal/transport/mock_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func NewMockReporter(expectedOnMetricsProcessedCalls int) *MockReporter {
func (m *MockReporter) OnDebugf(_ string, _ ...any) {
}

func (m *MockReporter) RecordReceivedMetric(_ error) {}

func (m *MockReporter) RecordFlushedMetrics(_ int64, _ error) {}

// WaitAllOnMetricsProcessedCalls blocks until the number of expected calls
// specified at creation of the reporter is completed.
func (m *MockReporter) WaitAllOnMetricsProcessedCalls() {
Expand Down
3 changes: 3 additions & 0 deletions receiver/statsdreceiver/internal/transport/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ type Reporter interface {
OnDebugf(
template string,
args ...any)

RecordReceivedMetric(err error)
RecordFlushedMetrics(count int64, err error)
}
8 changes: 6 additions & 2 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,16 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error {
for _, batch := range batchMetrics {
batchCtx := client.NewContext(ctx, batch.Info)

if err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer); err != nil {
err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer)
r.reporter.RecordFlushedMetrics(int64(batch.Metrics.DataPointCount()), err)
if err != nil {
r.reporter.OnDebugf("Error flushing metrics", zap.Error(err))
}
}
case metric := <-transferChan:
if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil {
err := r.parser.Aggregate(metric.Raw, metric.Addr)
r.reporter.RecordReceivedMetric(err)
if err != nil {
r.reporter.OnDebugf("Error aggregating metric", zap.Error(err))
}
case <-ctx.Done():
Expand Down
94 changes: 89 additions & 5 deletions receiver/statsdreceiver/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,29 @@
package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"

import (
"context"

"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
)

// reporter struct implements the transport.Reporter interface to give consistent
// observability per Collector metric observability package.
type reporter struct {
logger *zap.Logger
sugaredLogger *zap.SugaredLogger // Used for generic debug logging
obsrecv *receiverhelper.ObsReport
logger *zap.Logger
sugaredLogger *zap.SugaredLogger // Used for generic debug logging
obsrecv *receiverhelper.ObsReport
staticAttrs []attribute.KeyValue
acceptedMetricPoints metric.Int64Counter
refusedMetricPoints metric.Int64Counter
flushedMetricPoints metric.Int64Counter
flushCount metric.Int64Counter
}

var _ transport.Reporter = (*reporter)(nil)
Expand All @@ -30,15 +40,89 @@ func newReporter(set receiver.CreateSettings) (transport.Reporter, error) {
if err != nil {
return nil, err
}
return &reporter{

r := &reporter{
logger: set.Logger,
sugaredLogger: set.Logger.Sugar(),
obsrecv: obsrecv,
}, nil
staticAttrs: []attribute.KeyValue{
attribute.String("receiver", set.ID.String()),
},
}

// See https://github.com/open-telemetry/opentelemetry-collector/blob/241334609fc47927b4a8533dfca28e0f65dad9fe/receiver/receiverhelper/obsreport.go#L104
// for the metric naming conventions

r.acceptedMetricPoints, err = metadata.Meter(set.TelemetrySettings).Int64Counter(
"receiver/accepted_metric_points",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this receiver not use receiverhelper? If it is it should be producing this metric already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with @TylerHelmuth here. I've read the PR description but still don't understand why we cannot use receiverhelper.NewObsReport as other receivers. Excessive tracing is a concern for any receiver, so collector's tracing is expected to be sampled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this receiver not use receiverhelper? If it is it should be producing this metric already.

It does not use receiverhelper. It previously used the obsreport package, but that was dead code as mentioned in the issue #24278 (comment)

Copy link
Contributor Author

@sirianni sirianni May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't understand why we cannot use receiverhelper.NewObsReport as other receivers.

It's messy to use obsreport because the asynchronous nature of this receiver (the flush happening in the background) does not align with the StartMetricsOp() / EndMetricsOp() pattern of obsreport. This is likely why the obsreport integration was abandoned in #1670 when aggregation / async flush was implemented in this receiver.

Excessive tracing is a concern for any receiver, so collector's tracing is expected to be sampled.

I'm more concerned with the performance overhead of the tracing instrumentation code than I am the volume of traces. Since submitting this PR we actually had to rewrite this instrumentation because even the overhead of incrementing the metric counter synchronously was causing statsd packet loss.

You can see in the attached flame graph that simply calling metric.(*int64Inst).Add() is significant overhead (12% of overall CPU).

image

image

metric.WithDescription("Number of metric data points accepted"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}

r.refusedMetricPoints, err = metadata.Meter(set.TelemetrySettings).Int64Counter(
"receiver/refused_metric_points",
metric.WithDescription("Number of metric data points refused"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}

r.flushedMetricPoints, err = metadata.Meter(set.TelemetrySettings).Int64Counter(
"receiver/statsd.flushed_metric_points",
metric.WithDescription("Number of metric data points flushed"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}

r.flushCount, err = metadata.Meter(set.TelemetrySettings).Int64Counter(
"receiver/statsd.flushes",
metric.WithDescription("Number of flush operations"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}

return r, nil
}

func (r *reporter) OnDebugf(template string, args ...any) {
if r.logger.Check(zap.DebugLevel, "debug") != nil {
r.sugaredLogger.Debugf(template, args...)
}
}

func (r *reporter) RecordReceivedMetric(err error) {
if err == nil {
r.acceptedMetricPoints.Add(context.Background(), 1, metric.WithAttributes(r.staticAttrs...))
} else {
r.refusedMetricPoints.Add(context.Background(), 1, metric.WithAttributes(r.staticAttrs...))
}
}

func (r *reporter) RecordFlushedMetrics(count int64, err error) {
Copy link
Contributor

@hardproblems hardproblems Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again compared to the prometheusreceiver, i think the flushed metrics should be the receiver accepted/refused metrics. The parser should be the place where the instrumentation for received and parse error counts are added

Copy link
Contributor Author

@sirianni sirianni Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I'm not sure about this one. In our monitoring, we are comparing the received datapoint count to the sent metrics count from statsd clients. See https://github.com/DataDog/datadog-go/blob/9d920252c04efe4e5f4d3a283002b72253a7da47/statsd/telemetry.go#L271

Is prometheus a valid comparison? In prometheus there is no aggregation in the receiver itself so there's only one measure for a "received metric", right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with flipping the logic if that's the consensus. The naming becomes a bit harder though 🙂

var status string
if err == nil {
status = "success"
} else {
status = "failure"
}
r.flushedMetricPoints.Add(
context.Background(),
count,
metric.WithAttributes(r.staticAttrs...),
metric.WithAttributes(attribute.String("status", status)),
)
r.flushCount.Add(
context.Background(),
1,
metric.WithAttributes(r.staticAttrs...),
metric.WithAttributes(attribute.String("status", status)),
)
}
Loading