diff --git a/.chloggen/feature-statsdreceiver-metrics.yaml b/.chloggen/feature-statsdreceiver-metrics.yaml new file mode 100644 index 000000000000..925087229ba3 --- /dev/null +++ b/.chloggen/feature-statsdreceiver-metrics.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: statsdreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added received/accepted/refused metrics + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24278] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 1fea65fae7f1..8000eee422ad 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -16,8 +16,10 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" ) @@ -30,7 +32,8 @@ type statsdReceiver struct { config *Config server transport.Server - reporter transport.Reporter + reporter *reporter + obsrecv *receiverhelper.ObsReport parser protocol.Parser nextConsumer consumer.Metrics cancel context.CancelFunc @@ -52,10 +55,22 @@ func newReceiver( return nil, err } + trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + LongLivedCtx: true, + ReceiverID: set.ID, + ReceiverCreateSettings: set, + Transport: trans.String(), + }) + if err != nil { + return nil, err + } + r := &statsdReceiver{ settings: set, config: &config, nextConsumer: nextConsumer, + obsrecv: obsrecv, reporter: rep, parser: &protocol.StatsDParser{ BuildInfo: set.BuildInfo, @@ -104,20 +119,33 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error { } }() go func() { + var successCnt int64 for { select { case <-ticker.C: batchMetrics := r.parser.GetMetrics() for _, batch := range batchMetrics { batchCtx := client.NewContext(ctx, batch.Info) - - if err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer); err != nil { + numPoints := batch.Metrics.DataPointCount() + flushCtx := r.obsrecv.StartMetricsOp(batchCtx) + err := r.Flush(flushCtx, batch.Metrics, r.nextConsumer) + if err != nil { r.reporter.OnDebugf("Error flushing metrics", zap.Error(err)) } + r.obsrecv.EndMetricsOp(flushCtx, metadata.Type.String(), numPoints, err) } case metric := <-transferChan: - if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil { - r.reporter.OnDebugf("Error aggregating metric", zap.Error(err)) + err := r.parser.Aggregate(metric.Raw, metric.Addr) + if err != nil { + r.reporter.RecordParseFailure() + r.reporter.OnDebugf("Error aggregating pmetric", zap.Error(err)) + } else { + successCnt++ + // Record every 100 to reduce overhead + if successCnt%100 == 0 { + r.reporter.RecordParseSuccess(successCnt) + successCnt = 0 + } } case <-ctx.Done(): ticker.Stop() diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index a98149882e55..166a6d8775a1 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -19,7 +19,6 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" ) @@ -117,9 +116,6 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { require.NoError(t, err) r := rcv.(*statsdReceiver) - mr := transport.NewMockReporter(1) - r.reporter = mr - require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) defer func() { assert.NoError(t, r.Shutdown(context.Background())) diff --git a/receiver/statsdreceiver/reporter.go b/receiver/statsdreceiver/reporter.go index 07f2969aadf5..28d4ec836d27 100644 --- a/receiver/statsdreceiver/reporter.go +++ b/receiver/statsdreceiver/reporter.go @@ -4,11 +4,14 @@ package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver" import ( + "context" + "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata" ) // reporter struct implements the transport.Reporter interface to give consistent @@ -16,24 +19,29 @@ import ( type reporter struct { logger *zap.Logger sugaredLogger *zap.SugaredLogger // Used for generic debug logging - obsrecv *receiverhelper.ObsReport + receiverAttr attribute.KeyValue + receivedCount metric.Int64Counter } -var _ transport.Reporter = (*reporter)(nil) +var ( + parseSuccessAttr = attribute.String("parse_success", "true") + parseFailureAttr = attribute.String("parse_success", "false") +) -func newReporter(set receiver.Settings) (transport.Reporter, error) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: set.ID, - Transport: "tcp", - ReceiverCreateSettings: set, - }) +func newReporter(set receiver.Settings) (*reporter, error) { + receivedCount, err := metadata.Meter(set.TelemetrySettings).Int64Counter( + "receiver/received_statsd_metrics", + metric.WithDescription("Number of statsd metrics received."), + metric.WithUnit("1"), + ) if err != nil { return nil, err } return &reporter{ logger: set.Logger, sugaredLogger: set.Logger.Sugar(), - obsrecv: obsrecv, + receiverAttr: attribute.String("receiver", set.ID.String()), + receivedCount: receivedCount, }, nil } @@ -42,3 +50,23 @@ func (r *reporter) OnDebugf(template string, args ...any) { r.sugaredLogger.Debugf(template, args...) } } + +func (r *reporter) RecordParseFailure() { + r.receivedCount.Add( + context.Background(), + 1, + metric.WithAttributes( + r.receiverAttr, + parseFailureAttr), + ) +} + +func (r *reporter) RecordParseSuccess(count int64) { + r.receivedCount.Add( + context.Background(), + count, + metric.WithAttributes( + r.receiverAttr, + parseSuccessAttr), + ) +}