From 8260318373d7914c30a92b124f178907888660f3 Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Thu, 4 Jul 2024 14:06:09 -0700 Subject: [PATCH] [receiver/otelarrow] use mdatagen for metrics (#33895) Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33666 Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .../otelarrowreceiver/internal/arrow/arrow.go | 90 +++++++------------ 1 file changed, 33 insertions(+), 57 deletions(-) diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index facd400c29a2..222b9eaa36a3 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -32,7 +32,6 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/otel" otelcodes "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" @@ -43,12 +42,13 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + + internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata" ) const ( streamFormat = "arrow" hpackMaxDynamicSize = 4096 - scopeName = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver" ) var ( @@ -71,17 +71,15 @@ type Receiver struct { arrowpb.UnsafeArrowLogsServiceServer arrowpb.UnsafeArrowMetricsServiceServer - telemetry component.TelemetrySettings - tracer trace.Tracer - obsrecv *receiverhelper.ObsReport - gsettings configgrpc.ServerConfig - authServer auth.Server - newConsumer func() arrowRecord.ConsumerAPI - netReporter netstats.Interface - recvInFlightBytes metric.Int64UpDownCounter - recvInFlightItems metric.Int64UpDownCounter - recvInFlightRequests metric.Int64UpDownCounter - boundedQueue *admission.BoundedQueue + telemetry component.TelemetrySettings + tracer trace.Tracer + obsrecv *receiverhelper.ObsReport + gsettings configgrpc.ServerConfig + authServer auth.Server + newConsumer func() arrowRecord.ConsumerAPI + netReporter netstats.Interface + telemetryBuilder *internalmetadata.TelemetryBuilder + boundedQueue *admission.BoundedQueue } // receiverStream holds the inFlightWG for a single stream. @@ -102,44 +100,22 @@ func New( netReporter netstats.Interface, ) (*Receiver, error) { tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver") - var errors, err error - recv := &Receiver{ - Consumers: cs, - obsrecv: obsrecv, - telemetry: set.TelemetrySettings, - tracer: tracer, - authServer: authServer, - newConsumer: newConsumer, - gsettings: gsettings, - netReporter: netReporter, - boundedQueue: bq, - } - - meter := recv.telemetry.MeterProvider.Meter(scopeName) - recv.recvInFlightBytes, err = meter.Int64UpDownCounter( - "otel_arrow_receiver_in_flight_bytes", - metric.WithDescription("Number of bytes in flight"), - metric.WithUnit("By"), - ) - errors = multierr.Append(errors, err) - - recv.recvInFlightItems, err = meter.Int64UpDownCounter( - "otel_arrow_receiver_in_flight_items", - metric.WithDescription("Number of items in flight"), - ) - errors = multierr.Append(errors, err) - - recv.recvInFlightRequests, err = meter.Int64UpDownCounter( - "otel_arrow_receiver_in_flight_requests", - metric.WithDescription("Number of requests in flight"), - ) - errors = multierr.Append(errors, err) - - if errors != nil { - return nil, errors - } - - return recv, nil + telemetryBuilder, err := internalmetadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + return &Receiver{ + Consumers: cs, + obsrecv: obsrecv, + telemetry: set.TelemetrySettings, + tracer: tracer, + authServer: authServer, + newConsumer: newConsumer, + gsettings: gsettings, + netReporter: netReporter, + boundedQueue: bq, + telemetryBuilder: telemetryBuilder, + }, nil } // headerReceiver contains the state necessary to decode per-request metadata @@ -445,7 +421,7 @@ func (r *receiverStream) newInFlightData(ctx context.Context, method string, bat ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") r.inFlightWG.Add(1) - r.recvInFlightRequests.Add(ctx, 1) + r.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, 1) id := &inFlightData{ receiverStream: r, method: method, @@ -525,10 +501,10 @@ func (id *inFlightData) anyDone(ctx context.Context) { } if id.uncompSize != 0 { - id.recvInFlightBytes.Add(ctx, -id.uncompSize) + id.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, -id.uncompSize) } if id.numItems != 0 { - id.recvInFlightItems.Add(ctx, int64(-id.numItems)) + id.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(-id.numItems)) } // The netstats code knows that uncompressed size is @@ -540,7 +516,7 @@ func (id *inFlightData) anyDone(ctx context.Context) { sized.Length = id.uncompSize id.netReporter.CountReceive(ctx, sized) - id.recvInFlightRequests.Add(ctx, -1) + id.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, -1) id.inFlightWG.Done() } @@ -638,8 +614,8 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre flight.uncompSize = uncompSize flight.numItems = numItems - r.recvInFlightBytes.Add(inflightCtx, uncompSize) - r.recvInFlightItems.Add(inflightCtx, int64(numItems)) + r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize) + r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems)) numAcquired, err := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound)