From 662680343c0e0837f80a1e8aa21bc650b473bc7b Mon Sep 17 00:00:00 2001 From: Jason Anderson Date: Wed, 10 Apr 2024 15:56:02 -0700 Subject: [PATCH] [statsdreceiver] fixup metamonitoring the statsd receiver does not currently emit metrics or traces about its own internal operations, making it difficult to understand if it's working properly. this introduces an integration w/ the receiverhelper.ObsReport helper, which is the standard way to emit this information. this also changes how the transport debug logs are emitted; they are now logged via a new transport.Logger interface; the primary reason for this is to avoid confusion w/ the naming--the 'reporter' is now the ObsReport instance, and what the transport.Reporter really was doing was simply emitting debug logs. --- .chloggen/f_statsd-metamon.yaml | 27 +++++++++ .../internal/transport/mock_reporter.go | 32 ----------- .../internal/transport/noop_logger.go | 17 ++++++ .../internal/transport/server.go | 11 +--- .../internal/transport/server_test.go | 4 +- .../internal/transport/tcp_server.go | 12 ++-- .../internal/transport/transport.go | 11 +--- .../internal/transport/udp_server.go | 6 +- .../internal/transport/zap_logger.go | 28 ++++++++++ receiver/statsdreceiver/receiver.go | 55 +++++++++++-------- receiver/statsdreceiver/receiver_test.go | 7 +-- receiver/statsdreceiver/reporter.go | 44 --------------- 12 files changed, 124 insertions(+), 130 deletions(-) create mode 100644 .chloggen/f_statsd-metamon.yaml delete mode 100644 receiver/statsdreceiver/internal/transport/mock_reporter.go create mode 100644 receiver/statsdreceiver/internal/transport/noop_logger.go create mode 100644 receiver/statsdreceiver/internal/transport/zap_logger.go delete mode 100644 receiver/statsdreceiver/reporter.go diff --git a/.chloggen/f_statsd-metamon.yaml b/.chloggen/f_statsd-metamon.yaml new file mode 100644 index 000000000000..9b75f46f94cd --- /dev/null +++ b/.chloggen/f_statsd-metamon.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: statsdreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Emit standard receiver metrics for datapoints passing through the statsdreceiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/statsdreceiver/internal/transport/mock_reporter.go b/receiver/statsdreceiver/internal/transport/mock_reporter.go deleted file mode 100644 index c7a0947309b5..000000000000 --- a/receiver/statsdreceiver/internal/transport/mock_reporter.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" - -import ( - "sync" -) - -// MockReporter provides a Reporter that provides some useful functionalities for -// tests (eg.: wait for certain number of messages). -type MockReporter struct { - wgMetricsProcessed sync.WaitGroup -} - -var _ Reporter = (*MockReporter)(nil) - -// NewMockReporter returns a new instance of a MockReporter. -func NewMockReporter(expectedOnMetricsProcessedCalls int) *MockReporter { - m := MockReporter{} - m.wgMetricsProcessed.Add(expectedOnMetricsProcessedCalls) - return &m -} - -func (m *MockReporter) OnDebugf(_ string, _ ...any) { -} - -// WaitAllOnMetricsProcessedCalls blocks until the number of expected calls -// specified at creation of the reporter is completed. -func (m *MockReporter) WaitAllOnMetricsProcessedCalls() { - m.wgMetricsProcessed.Wait() -} diff --git a/receiver/statsdreceiver/internal/transport/noop_logger.go b/receiver/statsdreceiver/internal/transport/noop_logger.go new file mode 100644 index 000000000000..cf68139e3c09 --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/noop_logger.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +// NoopLogger provides a Logger that does not emit any messages +type NoopLogger struct{} + +var _ Logger = (*NoopLogger)(nil) + +// NewNoopLogger returns a new instance of a NoopLogger. +func NewNoopLogger() Logger { + return &NoopLogger{} +} + +func (m *NoopLogger) OnDebugf(_ string, _ ...any) { +} diff --git a/receiver/statsdreceiver/internal/transport/server.go b/receiver/statsdreceiver/internal/transport/server.go index 0466d2861a2a..7e85bab57848 100644 --- a/receiver/statsdreceiver/internal/transport/server.go +++ b/receiver/statsdreceiver/internal/transport/server.go @@ -20,7 +20,7 @@ type Server interface { // the Parser and passed to the next consumer. ListenAndServe( mc consumer.Metrics, - r Reporter, + l Logger, transferChan chan<- Metric, ) error @@ -34,11 +34,6 @@ type Metric struct { Addr net.Addr } -// Reporter is used to report (via zPages, logs, metrics, etc) the events -// happening when the Server is receiving and processing data. -type Reporter interface { - // OnDebugf allows less structured reporting for debugging scenarios. - OnDebugf( - template string, - args ...any) +type Logger interface { + OnDebugf(template string, args ...any) } diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index a774c0fd4154..84419c77eaf6 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -54,14 +54,14 @@ func Test_Server_ListenAndServe(t *testing.T) { mc := new(consumertest.MetricsSink) require.NoError(t, err) - mr := NewMockReporter(1) + nl := NewNoopLogger() transferChan := make(chan Metric, 10) wgListenAndServe := sync.WaitGroup{} wgListenAndServe.Add(1) go func() { defer wgListenAndServe.Done() - assert.Error(t, srv.ListenAndServe(mc, mr, transferChan)) + assert.Error(t, srv.ListenAndServe(mc, nl, transferChan)) }() runtime.Gosched() diff --git a/receiver/statsdreceiver/internal/transport/tcp_server.go b/receiver/statsdreceiver/internal/transport/tcp_server.go index f776bcd88f04..262e3f23ab1c 100644 --- a/receiver/statsdreceiver/internal/transport/tcp_server.go +++ b/receiver/statsdreceiver/internal/transport/tcp_server.go @@ -19,7 +19,7 @@ var errTCPServerDone = errors.New("server stopped") type tcpServer struct { listener net.Listener - reporter Reporter + logger Logger wg sync.WaitGroup transport Transport stopChan chan struct{} @@ -48,19 +48,19 @@ func NewTCPServer(transport Transport, address string) (Server, error) { } // ListenAndServe starts the server ready to receive metrics. -func (t *tcpServer) ListenAndServe(nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error { - if nextConsumer == nil || reporter == nil { +func (t *tcpServer) ListenAndServe(nextConsumer consumer.Metrics, logger Logger, transferChan chan<- Metric) error { + if nextConsumer == nil || logger == nil { return errNilListenAndServeParameters } - t.reporter = reporter + t.logger = logger LOOP: for { connChan := make(chan net.Conn, 1) go func() { c, err := t.listener.Accept() if err != nil { - t.reporter.OnDebugf("TCP Transport - Accept error: %v", + t.logger.OnDebugf("TCP Transport - Accept error: %v", err) } else { connChan <- c @@ -85,7 +85,7 @@ func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) { for { n, err := c.Read(payload) if err != nil { - t.reporter.OnDebugf("TCP transport (%s) Error reading payload: %v", c.LocalAddr(), err) + t.logger.OnDebugf("TCP transport (%s) Error reading payload: %v", c.LocalAddr(), err) t.wg.Done() return } diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index c065e30c746f..63a033a5d334 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -23,16 +23,9 @@ const ( TCP6 Transport = "tcp6" ) -// NewTransport creates a Transport based on the transport string or returns an empty Transport. +// NewTransport creates a Transport based on the transport string. func NewTransport(ts string) Transport { - trans := Transport(ts) - switch trans { - case UDP, UDP4, UDP6: - return trans - case TCP, TCP4, TCP6: - return trans - } - return Transport("") + return Transport(ts) } // String casts the transport to a String if the Transport is supported. Return an empty Transport overwise. diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 19ad0803d8fe..788813d1b978 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -42,10 +42,10 @@ func NewUDPServer(transport Transport, address string) (Server, error) { // ListenAndServe starts the server ready to receive metrics. func (u *udpServer) ListenAndServe( nextConsumer consumer.Metrics, - reporter Reporter, + logger Logger, transferChan chan<- Metric, ) error { - if nextConsumer == nil || reporter == nil { + if nextConsumer == nil || logger == nil { return errNilListenAndServeParameters } @@ -58,7 +58,7 @@ func (u *udpServer) ListenAndServe( u.handlePacket(bufCopy, addr, transferChan) } if err != nil { - reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", + logger.OnDebugf("%s Transport (%s) - ReadFrom error: %v", u.transport, u.packetConn.LocalAddr(), err) diff --git a/receiver/statsdreceiver/internal/transport/zap_logger.go b/receiver/statsdreceiver/internal/transport/zap_logger.go new file mode 100644 index 000000000000..2b9940639c65 --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/zap_logger.go @@ -0,0 +1,28 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +import "go.uber.org/zap" + +// ZapLogger provides a Logger that emits debug logs on a zap sugared logger +type ZapLogger struct { + logger *zap.Logger + sugaredLogger *zap.SugaredLogger +} + +var _ Logger = (*ZapLogger)(nil) + +// NewZapLogger returns a new instance of a ZapLogger given an existing zap Logger instance +func NewZapLogger(logger *zap.Logger) Logger { + return &ZapLogger{ + logger: logger, + sugaredLogger: logger.Sugar(), + } +} + +func (zl *ZapLogger) OnDebugf(template string, args ...any) { + if zl.logger.Check(zap.DebugLevel, "debug") != nil { + zl.sugaredLogger.Debugf(template, args...) + } +} diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 3e05ca47eb2a..e0ee2944ebb4 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -6,7 +6,6 @@ package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collec import ( "context" "errors" - "fmt" "net" "strings" "time" @@ -16,6 +15,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" @@ -29,11 +29,13 @@ type statsdReceiver struct { settings receiver.CreateSettings config *Config - server transport.Server - reporter transport.Reporter - parser protocol.Parser - nextConsumer consumer.Metrics - cancel context.CancelFunc + trans transport.Transport + server transport.Server + reporter *receiverhelper.ObsReport + transportLogger transport.Logger + parser protocol.Parser + nextConsumer consumer.Metrics + cancel context.CancelFunc } // newReceiver creates the StatsD receiver with the given parameters. @@ -42,21 +44,28 @@ func newReceiver( config Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { - if config.NetAddr.Endpoint == "" { config.NetAddr.Endpoint = "localhost:8125" } - rep, err := newReporter(set) + trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) + + rep, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: trans.String(), + ReceiverCreateSettings: set, + }) if err != nil { return nil, err } r := &statsdReceiver{ - settings: set, - config: &config, - nextConsumer: nextConsumer, - reporter: rep, + settings: set, + config: &config, + nextConsumer: nextConsumer, + trans: trans, + reporter: rep, + transportLogger: transport.NewZapLogger(set.Logger), parser: &protocol.StatsDParser{ BuildInfo: set.BuildInfo, }, @@ -64,23 +73,22 @@ func newReceiver( return r, nil } -func buildTransportServer(config Config) (transport.Server, error) { +func buildTransportServer(trans transport.Transport, endpoint string) (transport.Server, error) { // TODO: Add unix socket transport implementations - trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) switch trans { case transport.UDP, transport.UDP4, transport.UDP6: - return transport.NewUDPServer(trans, config.NetAddr.Endpoint) + return transport.NewUDPServer(trans, endpoint) case transport.TCP, transport.TCP4, transport.TCP6: - return transport.NewTCPServer(trans, config.NetAddr.Endpoint) + return transport.NewTCPServer(trans, endpoint) } - return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport)) + return nil, transport.ErrUnsupportedTransport } // Start starts a UDP server that can process StatsD messages. func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error { ctx, r.cancel = context.WithCancel(ctx) - server, err := buildTransportServer(*r.config) + server, err := buildTransportServer(r.trans, r.config.NetAddr.Endpoint) if err != nil { return err } @@ -97,7 +105,7 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error { return err } go func() { - if err := r.server.ListenAndServe(r.nextConsumer, r.reporter, transferChan); err != nil { + if err := r.server.ListenAndServe(r.nextConsumer, r.transportLogger, transferChan); err != nil { if !errors.Is(err, net.ErrClosed) { r.settings.TelemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) } @@ -112,12 +120,12 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error { batchCtx := client.NewContext(ctx, batch.Info) if err := r.Flush(batchCtx, batch.Metrics, r.nextConsumer); err != nil { - r.reporter.OnDebugf("Error flushing metrics", zap.Error(err)) + r.settings.Logger.Debug("Error flushing metrics", zap.Error(err)) } } case metric := <-transferChan: if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil { - r.reporter.OnDebugf("Error aggregating metric", zap.Error(err)) + r.settings.Logger.Debug("Error aggregating metric", zap.Error(err)) } case <-ctx.Done(): ticker.Stop() @@ -140,5 +148,8 @@ func (r *statsdReceiver) Shutdown(context.Context) error { } func (r *statsdReceiver) Flush(ctx context.Context, metrics pmetric.Metrics, nextConsumer consumer.Metrics) error { - return nextConsumer.ConsumeMetrics(ctx, metrics) + r.reporter.StartMetricsOp(ctx) + err := nextConsumer.ConsumeMetrics(ctx, metrics) + r.reporter.EndMetricsOp(ctx, r.settings.ID.Type().String(), metrics.DataPointCount(), err) + return err } diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index b55456f80004..6808b954dd19 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -5,7 +5,6 @@ package statsdreceiver import ( "context" - "errors" "testing" "time" @@ -44,7 +43,7 @@ func Test_statsdreceiver_Start(t *testing.T) { }, nextConsumer: consumertest.NewNop(), }, - wantErr: errors.New("unsupported transport \"unknown\""), + wantErr: transport.ErrUnsupportedTransport, }, } for _, tt := range tests { @@ -117,8 +116,8 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { require.NoError(t, err) r := rcv.(*statsdReceiver) - mr := transport.NewMockReporter(1) - r.reporter = mr + nl := transport.NewNoopLogger() + r.transportLogger = nl require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) defer func() { diff --git a/receiver/statsdreceiver/reporter.go b/receiver/statsdreceiver/reporter.go deleted file mode 100644 index 6eb5407b88b8..000000000000 --- a/receiver/statsdreceiver/reporter.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver" - -import ( - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" -) - -// reporter struct implements the transport.Reporter interface to give consistent -// observability per Collector metric observability package. -type reporter struct { - logger *zap.Logger - sugaredLogger *zap.SugaredLogger // Used for generic debug logging - obsrecv *receiverhelper.ObsReport -} - -var _ transport.Reporter = (*reporter)(nil) - -func newReporter(set receiver.CreateSettings) (transport.Reporter, error) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: set.ID, - Transport: "tcp", - ReceiverCreateSettings: set, - }) - if err != nil { - return nil, err - } - return &reporter{ - logger: set.Logger, - sugaredLogger: set.Logger.Sugar(), - obsrecv: obsrecv, - }, nil -} - -func (r *reporter) OnDebugf(template string, args ...any) { - if r.logger.Check(zap.DebugLevel, "debug") != nil { - r.sugaredLogger.Debugf(template, args...) - } -}