Skip to content

Commit

Permalink
Revert "statsdreceiver observability (open-telemetry#33415)"
Browse files Browse the repository at this point in the history
This reverts commit e5a02fa.
  • Loading branch information
TylerHelmuth committed Jun 12, 2024
1 parent 2a3c493 commit 4a297a3
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 99 deletions.
27 changes: 0 additions & 27 deletions .chloggen/feature-statsdreceiver-metrics.yaml

This file was deleted.

38 changes: 5 additions & 33 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
)
Expand All @@ -32,8 +30,7 @@ type statsdReceiver struct {
config *Config

server transport.Server
reporter *reporter
obsrecv *receiverhelper.ObsReport
reporter transport.Reporter
parser protocol.Parser
nextConsumer consumer.Metrics
cancel context.CancelFunc
Expand All @@ -55,22 +52,10 @@ func newReceiver(
return nil, err
}

trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
LongLivedCtx: true,
ReceiverID: set.ID,
ReceiverCreateSettings: set,
Transport: trans.String(),
})
if err != nil {
return nil, err
}

r := &statsdReceiver{
settings: set,
config: &config,
nextConsumer: nextConsumer,
obsrecv: obsrecv,
reporter: rep,
parser: &protocol.StatsDParser{
BuildInfo: set.BuildInfo,
Expand Down Expand Up @@ -119,33 +104,20 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error {
}
}()
go func() {
var successCnt int64
for {
select {
case <-ticker.C:
batchMetrics := r.parser.GetMetrics()
for _, batch := range batchMetrics {
batchCtx := client.NewContext(ctx, batch.Info)
numPoints := batch.Metrics.DataPointCount()
flushCtx := r.obsrecv.StartMetricsOp(batchCtx)
err := r.Flush(flushCtx, batch.Metrics, r.nextConsumer)
if err != nil {

if err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer); err != nil {
r.reporter.OnDebugf("Error flushing metrics", zap.Error(err))
}
r.obsrecv.EndMetricsOp(flushCtx, metadata.Type.String(), numPoints, err)
}
case metric := <-transferChan:
err := r.parser.Aggregate(metric.Raw, metric.Addr)
if err != nil {
r.reporter.RecordParseFailure()
r.reporter.OnDebugf("Error aggregating pmetric", zap.Error(err))
} else {
successCnt++
// Record every 100 to reduce overhead
if successCnt%100 == 0 {
r.reporter.RecordParseSuccess(successCnt)
successCnt = 0
}
if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil {
r.reporter.OnDebugf("Error aggregating metric", zap.Error(err))
}
case <-ctx.Done():
ticker.Stop()
Expand Down
4 changes: 4 additions & 0 deletions receiver/statsdreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client"
)

Expand Down Expand Up @@ -116,6 +117,9 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) {
require.NoError(t, err)
r := rcv.(*statsdReceiver)

mr := transport.NewMockReporter(1)
r.reporter = mr

require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, r.Shutdown(context.Background()))
Expand Down
50 changes: 11 additions & 39 deletions receiver/statsdreceiver/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,36 @@
package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"

import (
"context"

"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
)

// reporter struct implements the transport.Reporter interface to give consistent
// observability per Collector metric observability package.
type reporter struct {
logger *zap.Logger
sugaredLogger *zap.SugaredLogger // Used for generic debug logging
receiverAttr attribute.KeyValue
receivedCount metric.Int64Counter
obsrecv *receiverhelper.ObsReport
}

var (
parseSuccessAttr = attribute.String("parse_success", "true")
parseFailureAttr = attribute.String("parse_success", "false")
)
var _ transport.Reporter = (*reporter)(nil)

func newReporter(set receiver.Settings) (*reporter, error) {
receivedCount, err := metadata.Meter(set.TelemetrySettings).Int64Counter(
"receiver/received_statsd_metrics",
metric.WithDescription("Number of statsd metrics received."),
metric.WithUnit("1"),
)
func newReporter(set receiver.Settings) (transport.Reporter, error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: "tcp",
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
return &reporter{
logger: set.Logger,
sugaredLogger: set.Logger.Sugar(),
receiverAttr: attribute.String("receiver", set.ID.String()),
receivedCount: receivedCount,
obsrecv: obsrecv,
}, nil
}

Expand All @@ -50,23 +42,3 @@ func (r *reporter) OnDebugf(template string, args ...any) {
r.sugaredLogger.Debugf(template, args...)
}
}

func (r *reporter) RecordParseFailure() {
r.receivedCount.Add(
context.Background(),
1,
metric.WithAttributes(
r.receiverAttr,
parseFailureAttr),
)
}

func (r *reporter) RecordParseSuccess(count int64) {
r.receivedCount.Add(
context.Background(),
count,
metric.WithAttributes(
r.receiverAttr,
parseSuccessAttr),
)
}

0 comments on commit 4a297a3

Please sign in to comment.