From 24d010ff66b96ea5ebbd06e55b798c63cf01d074 Mon Sep 17 00:00:00 2001 From: Eric Sirianni Date: Sat, 9 Mar 2024 16:01:13 -0500 Subject: [PATCH 1/3] Add accepted/refused points metric to statsdreceiver --- .../internal/transport/mock_reporter.go | 4 ++ .../internal/transport/server.go | 3 ++ receiver/statsdreceiver/receiver.go | 6 ++- receiver/statsdreceiver/reporter.go | 53 +++++++++++++++++-- 4 files changed, 60 insertions(+), 6 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/mock_reporter.go b/receiver/statsdreceiver/internal/transport/mock_reporter.go index c7a0947309b5..273b6ec3465d 100644 --- a/receiver/statsdreceiver/internal/transport/mock_reporter.go +++ b/receiver/statsdreceiver/internal/transport/mock_reporter.go @@ -25,6 +25,10 @@ func NewMockReporter(expectedOnMetricsProcessedCalls int) *MockReporter { func (m *MockReporter) OnDebugf(_ string, _ ...any) { } +func (m *MockReporter) RecordAcceptedMetric() {} + +func (m *MockReporter) RecordRefusedMetric() {} + // WaitAllOnMetricsProcessedCalls blocks until the number of expected calls // specified at creation of the reporter is completed. func (m *MockReporter) WaitAllOnMetricsProcessedCalls() { diff --git a/receiver/statsdreceiver/internal/transport/server.go b/receiver/statsdreceiver/internal/transport/server.go index 0466d2861a2a..75d76ddd4d69 100644 --- a/receiver/statsdreceiver/internal/transport/server.go +++ b/receiver/statsdreceiver/internal/transport/server.go @@ -41,4 +41,7 @@ type Reporter interface { OnDebugf( template string, args ...any) + + RecordAcceptedMetric() + RecordRefusedMetric() } diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 3e05ca47eb2a..b2a509e8fd2f 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -116,8 +116,12 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error { } } case metric := <-transferChan: - if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil { + err := r.parser.Aggregate(metric.Raw, metric.Addr) + if err != nil { r.reporter.OnDebugf("Error aggregating metric", zap.Error(err)) + r.reporter.RecordRefusedMetric() + } else { + r.reporter.RecordAcceptedMetric() } case <-ctx.Done(): ticker.Stop() diff --git a/receiver/statsdreceiver/reporter.go b/receiver/statsdreceiver/reporter.go index 6eb5407b88b8..f24720dacb9a 100644 --- a/receiver/statsdreceiver/reporter.go +++ b/receiver/statsdreceiver/reporter.go @@ -4,19 +4,27 @@ package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver" import ( + "context" + "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" ) // reporter struct implements the transport.Reporter interface to give consistent // observability per Collector metric observability package. type reporter struct { - logger *zap.Logger - sugaredLogger *zap.SugaredLogger // Used for generic debug logging - obsrecv *receiverhelper.ObsReport + logger *zap.Logger + sugaredLogger *zap.SugaredLogger // Used for generic debug logging + obsrecv *receiverhelper.ObsReport + staticAttrs []attribute.KeyValue + acceptedMetricPoints metric.Int64Counter + refusedMetricPoints metric.Int64Counter } var _ transport.Reporter = (*reporter)(nil) @@ -30,11 +38,38 @@ func newReporter(set receiver.CreateSettings) (transport.Reporter, error) { if err != nil { return nil, err } - return &reporter{ + + r := &reporter{ logger: set.Logger, sugaredLogger: set.Logger.Sugar(), obsrecv: obsrecv, - }, nil + staticAttrs: []attribute.KeyValue{ + attribute.String("receiver", set.ID.String()), + }, + } + + // See https://github.com/open-telemetry/opentelemetry-collector/blob/241334609fc47927b4a8533dfca28e0f65dad9fe/receiver/receiverhelper/obsreport.go#L104 + // for the metric naming conventions + + r.acceptedMetricPoints, err = metadata.Meter(set.TelemetrySettings).Int64Counter( + "receiver/accepted_metric_points", + metric.WithDescription("Number of metric data points accepted"), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + r.refusedMetricPoints, err = metadata.Meter(set.TelemetrySettings).Int64Counter( + "receiver/refused_metric_points", + metric.WithDescription("Number of metric data points refused"), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + return r, nil } func (r *reporter) OnDebugf(template string, args ...any) { @@ -42,3 +77,11 @@ func (r *reporter) OnDebugf(template string, args ...any) { r.sugaredLogger.Debugf(template, args...) } } + +func (r *reporter) RecordAcceptedMetric() { + r.acceptedMetricPoints.Add(context.Background(), 1, metric.WithAttributes(r.staticAttrs...)) +} + +func (r *reporter) RecordRefusedMetric() { + r.refusedMetricPoints.Add(context.Background(), 1, metric.WithAttributes(r.staticAttrs...)) +} From aa7058e1bb7b42c5433f79f3a778dfd39668f245 Mon Sep 17 00:00:00 2001 From: Eric Sirianni Date: Sun, 10 Mar 2024 11:19:42 -0400 Subject: [PATCH 2/3] Add flush metrics to statsdreceiver --- .../internal/transport/mock_reporter.go | 4 +- .../internal/transport/server.go | 4 +- receiver/statsdreceiver/receiver.go | 8 +-- receiver/statsdreceiver/reporter.go | 49 +++++++++++++++++-- 4 files changed, 53 insertions(+), 12 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/mock_reporter.go b/receiver/statsdreceiver/internal/transport/mock_reporter.go index 273b6ec3465d..fd2a5156ebcb 100644 --- a/receiver/statsdreceiver/internal/transport/mock_reporter.go +++ b/receiver/statsdreceiver/internal/transport/mock_reporter.go @@ -25,9 +25,9 @@ func NewMockReporter(expectedOnMetricsProcessedCalls int) *MockReporter { func (m *MockReporter) OnDebugf(_ string, _ ...any) { } -func (m *MockReporter) RecordAcceptedMetric() {} +func (m *MockReporter) RecordReceivedMetric(_ error) {} -func (m *MockReporter) RecordRefusedMetric() {} +func (m *MockReporter) RecordFlushedMetrics(_ int64, _ error) {} // WaitAllOnMetricsProcessedCalls blocks until the number of expected calls // specified at creation of the reporter is completed. diff --git a/receiver/statsdreceiver/internal/transport/server.go b/receiver/statsdreceiver/internal/transport/server.go index 75d76ddd4d69..a45eda730f8d 100644 --- a/receiver/statsdreceiver/internal/transport/server.go +++ b/receiver/statsdreceiver/internal/transport/server.go @@ -42,6 +42,6 @@ type Reporter interface { template string, args ...any) - RecordAcceptedMetric() - RecordRefusedMetric() + RecordReceivedMetric(err error) + RecordFlushedMetrics(count int64, err error) } diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index b2a509e8fd2f..760440451bdb 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -111,17 +111,17 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error { for _, batch := range batchMetrics { batchCtx := client.NewContext(ctx, batch.Info) - if err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer); err != nil { + err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer) + r.reporter.RecordFlushedMetrics(int64(batch.Metrics.MetricCount()), err) + if err != nil { r.reporter.OnDebugf("Error flushing metrics", zap.Error(err)) } } case metric := <-transferChan: err := r.parser.Aggregate(metric.Raw, metric.Addr) + r.reporter.RecordReceivedMetric(err) if err != nil { r.reporter.OnDebugf("Error aggregating metric", zap.Error(err)) - r.reporter.RecordRefusedMetric() - } else { - r.reporter.RecordAcceptedMetric() } case <-ctx.Done(): ticker.Stop() diff --git a/receiver/statsdreceiver/reporter.go b/receiver/statsdreceiver/reporter.go index f24720dacb9a..73cd1625381e 100644 --- a/receiver/statsdreceiver/reporter.go +++ b/receiver/statsdreceiver/reporter.go @@ -25,6 +25,8 @@ type reporter struct { staticAttrs []attribute.KeyValue acceptedMetricPoints metric.Int64Counter refusedMetricPoints metric.Int64Counter + flushedMetricPoints metric.Int64Counter + flushCount metric.Int64Counter } var _ transport.Reporter = (*reporter)(nil) @@ -69,6 +71,24 @@ func newReporter(set receiver.CreateSettings) (transport.Reporter, error) { return nil, err } + r.flushedMetricPoints, err = metadata.Meter(set.TelemetrySettings).Int64Counter( + "receiver/statsd.flushed_metric_points", + metric.WithDescription("Number of metric data points flushed"), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + + r.flushCount, err = metadata.Meter(set.TelemetrySettings).Int64Counter( + "receiver/statsd.flushes", + metric.WithDescription("Number of flush operations"), + metric.WithUnit("1"), + ) + if err != nil { + return nil, err + } + return r, nil } @@ -78,10 +98,31 @@ func (r *reporter) OnDebugf(template string, args ...any) { } } -func (r *reporter) RecordAcceptedMetric() { - r.acceptedMetricPoints.Add(context.Background(), 1, metric.WithAttributes(r.staticAttrs...)) +func (r *reporter) RecordReceivedMetric(err error) { + if err == nil { + r.acceptedMetricPoints.Add(context.Background(), 1, metric.WithAttributes(r.staticAttrs...)) + } else { + r.refusedMetricPoints.Add(context.Background(), 1, metric.WithAttributes(r.staticAttrs...)) + } } -func (r *reporter) RecordRefusedMetric() { - r.refusedMetricPoints.Add(context.Background(), 1, metric.WithAttributes(r.staticAttrs...)) +func (r *reporter) RecordFlushedMetrics(count int64, err error) { + var status string + if err == nil { + status = "success" + } else { + status = "failure" + } + r.flushedMetricPoints.Add( + context.Background(), + count, + metric.WithAttributes(r.staticAttrs...), + metric.WithAttributes(attribute.String("status", status)), + ) + r.flushCount.Add( + context.Background(), + 1, + metric.WithAttributes(r.staticAttrs...), + metric.WithAttributes(attribute.String("status", status)), + ) } From 2ec5c8065460a167c9802d08b1f736c030d06243 Mon Sep 17 00:00:00 2001 From: Eric Sirianni Date: Tue, 19 Mar 2024 21:20:54 -0400 Subject: [PATCH 3/3] Use datapoint count instead of metric count --- receiver/statsdreceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 760440451bdb..835405681c2f 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -112,7 +112,7 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error { batchCtx := client.NewContext(ctx, batch.Info) err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer) - r.reporter.RecordFlushedMetrics(int64(batch.Metrics.MetricCount()), err) + r.reporter.RecordFlushedMetrics(int64(batch.Metrics.DataPointCount()), err) if err != nil { r.reporter.OnDebugf("Error flushing metrics", zap.Error(err)) }