Skip to content

Commit

Permalink
statsdreceiver observability (#33415)
Browse files Browse the repository at this point in the history
Description:
Add received statsd / accepted/refused metrics to statsdreceiver

Resurrecting
#31822
since #31839 is closed.

Link to tracking Issue:

#24278

Testing:
This works in our internal testing environments. Took a profile and
didn't see metric recording taking much cpu
  • Loading branch information
hardproblems authored Jun 11, 2024
1 parent 43aee3b commit e5a02fa
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 20 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feature-statsdreceiver-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: statsdreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added received/accepted/refused metrics

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24278]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
38 changes: 33 additions & 5 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
)
Expand All @@ -30,7 +32,8 @@ type statsdReceiver struct {
config *Config

server transport.Server
reporter transport.Reporter
reporter *reporter
obsrecv *receiverhelper.ObsReport
parser protocol.Parser
nextConsumer consumer.Metrics
cancel context.CancelFunc
Expand All @@ -52,10 +55,22 @@ func newReceiver(
return nil, err
}

trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
LongLivedCtx: true,
ReceiverID: set.ID,
ReceiverCreateSettings: set,
Transport: trans.String(),
})
if err != nil {
return nil, err
}

r := &statsdReceiver{
settings: set,
config: &config,
nextConsumer: nextConsumer,
obsrecv: obsrecv,
reporter: rep,
parser: &protocol.StatsDParser{
BuildInfo: set.BuildInfo,
Expand Down Expand Up @@ -104,20 +119,33 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error {
}
}()
go func() {
var successCnt int64
for {
select {
case <-ticker.C:
batchMetrics := r.parser.GetMetrics()
for _, batch := range batchMetrics {
batchCtx := client.NewContext(ctx, batch.Info)

if err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer); err != nil {
numPoints := batch.Metrics.DataPointCount()
flushCtx := r.obsrecv.StartMetricsOp(batchCtx)
err := r.Flush(flushCtx, batch.Metrics, r.nextConsumer)
if err != nil {
r.reporter.OnDebugf("Error flushing metrics", zap.Error(err))
}
r.obsrecv.EndMetricsOp(flushCtx, metadata.Type.String(), numPoints, err)
}
case metric := <-transferChan:
if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil {
r.reporter.OnDebugf("Error aggregating metric", zap.Error(err))
err := r.parser.Aggregate(metric.Raw, metric.Addr)
if err != nil {
r.reporter.RecordParseFailure()
r.reporter.OnDebugf("Error aggregating pmetric", zap.Error(err))
} else {
successCnt++
// Record every 100 to reduce overhead
if successCnt%100 == 0 {
r.reporter.RecordParseSuccess(successCnt)
successCnt = 0
}
}
case <-ctx.Done():
ticker.Stop()
Expand Down
4 changes: 0 additions & 4 deletions receiver/statsdreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client"
)

Expand Down Expand Up @@ -117,9 +116,6 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) {
require.NoError(t, err)
r := rcv.(*statsdReceiver)

mr := transport.NewMockReporter(1)
r.reporter = mr

require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, r.Shutdown(context.Background()))
Expand Down
50 changes: 39 additions & 11 deletions receiver/statsdreceiver/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,44 @@
package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"

import (
"context"

"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata"
)

// reporter struct implements the transport.Reporter interface to give consistent
// observability per Collector metric observability package.
type reporter struct {
logger *zap.Logger
sugaredLogger *zap.SugaredLogger // Used for generic debug logging
obsrecv *receiverhelper.ObsReport
receiverAttr attribute.KeyValue
receivedCount metric.Int64Counter
}

var _ transport.Reporter = (*reporter)(nil)
var (
parseSuccessAttr = attribute.String("parse_success", "true")
parseFailureAttr = attribute.String("parse_success", "false")
)

func newReporter(set receiver.Settings) (transport.Reporter, error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: "tcp",
ReceiverCreateSettings: set,
})
func newReporter(set receiver.Settings) (*reporter, error) {
receivedCount, err := metadata.Meter(set.TelemetrySettings).Int64Counter(
"receiver/received_statsd_metrics",
metric.WithDescription("Number of statsd metrics received."),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
return &reporter{
logger: set.Logger,
sugaredLogger: set.Logger.Sugar(),
obsrecv: obsrecv,
receiverAttr: attribute.String("receiver", set.ID.String()),
receivedCount: receivedCount,
}, nil
}

Expand All @@ -42,3 +50,23 @@ func (r *reporter) OnDebugf(template string, args ...any) {
r.sugaredLogger.Debugf(template, args...)
}
}

func (r *reporter) RecordParseFailure() {
r.receivedCount.Add(
context.Background(),
1,
metric.WithAttributes(
r.receiverAttr,
parseFailureAttr),
)
}

func (r *reporter) RecordParseSuccess(count int64) {
r.receivedCount.Add(
context.Background(),
count,
metric.WithAttributes(
r.receiverAttr,
parseSuccessAttr),
)
}

0 comments on commit e5a02fa

Please sign in to comment.