From d9c9bbef8ad1f31f2a91a147f8e9cb43a43d00eb Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 16 Oct 2024 13:40:01 -0700 Subject: [PATCH] otelgrpc: Fix NewClientHandler to emit proper request/response metrics (#6250) Tested by removing `metricdatatest.IgnoreValue()` from stas_handler tests, and see that only the duration metrics differ. --------- Signed-off-by: Bogdan Drutu --- CHANGELOG.md | 1 + .../google.golang.org/grpc/otelgrpc/config.go | 52 ++++++--- .../grpc/otelgrpc/config_test.go | 8 +- .../grpc/otelgrpc/stats_handler.go | 23 ++-- .../otelgrpc/test/grpc_stats_handler_test.go | 101 +++++++++--------- 5 files changed, 102 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a933db4e781..93bb4251ccc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Transform nil attribute values to `log.Value` zero value instead of panicking in `go.opentelemetry.io/contrib/bridges/otellogrus`. (#6237) - Transform nil attribute values to `log.Value` zero value instead of panicking in `go.opentelemetry.io/contrib/bridges/otelzap`. (#6237) - Transform nil attribute values to `log.Value` zero value instead of `log.StringValue("")` in `go.opentelemetry.io/contrib/bridges/otelslog`. (#6246) +- Fix `NewClientHandler` so that `rpc.client.request.*` metrics measure requests instead of responses and `rpc.client.responses.*` metrics measure responses instead of requests in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#6250) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/config.go b/instrumentation/google.golang.org/grpc/otelgrpc/config.go index 18436eaedff..9e87fb4bb19 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/config.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/config.go @@ -51,11 +51,11 @@ type config struct { tracer trace.Tracer meter metric.Meter - rpcDuration metric.Float64Histogram - rpcRequestSize metric.Int64Histogram - rpcResponseSize metric.Int64Histogram - rpcRequestsPerRPC metric.Int64Histogram - rpcResponsesPerRPC metric.Int64Histogram + rpcDuration metric.Float64Histogram + rpcInBytes metric.Int64Histogram + rpcOutBytes metric.Int64Histogram + rpcInMessages metric.Int64Histogram + rpcOutMessages metric.Int64Histogram } // Option applies an option value for a config. @@ -96,46 +96,64 @@ func newConfig(opts []Option, role string) *config { } } - c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size", + rpcRequestSize, err := c.meter.Int64Histogram("rpc."+role+".request.size", metric.WithDescription("Measures size of RPC request messages (uncompressed)."), metric.WithUnit("By")) if err != nil { otel.Handle(err) - if c.rpcRequestSize == nil { - c.rpcRequestSize = noop.Int64Histogram{} + if rpcRequestSize == nil { + rpcRequestSize = noop.Int64Histogram{} } } - c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size", + rpcResponseSize, err := c.meter.Int64Histogram("rpc."+role+".response.size", metric.WithDescription("Measures size of RPC response messages (uncompressed)."), metric.WithUnit("By")) if err != nil { otel.Handle(err) - if c.rpcResponseSize == nil { - c.rpcResponseSize = noop.Int64Histogram{} + if rpcResponseSize == nil { + rpcResponseSize = noop.Int64Histogram{} } } - c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc", + rpcRequestsPerRPC, err := c.meter.Int64Histogram("rpc."+role+".requests_per_rpc", metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), metric.WithUnit("{count}")) if err != nil { otel.Handle(err) - if c.rpcRequestsPerRPC == nil { - c.rpcRequestsPerRPC = noop.Int64Histogram{} + if rpcRequestsPerRPC == nil { + rpcRequestsPerRPC = noop.Int64Histogram{} } } - c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc", + rpcResponsesPerRPC, err := c.meter.Int64Histogram("rpc."+role+".responses_per_rpc", metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), metric.WithUnit("{count}")) if err != nil { otel.Handle(err) - if c.rpcResponsesPerRPC == nil { - c.rpcResponsesPerRPC = noop.Int64Histogram{} + if rpcResponsesPerRPC == nil { + rpcResponsesPerRPC = noop.Int64Histogram{} } } + switch role { + case "client": + c.rpcInBytes = rpcResponseSize + c.rpcInMessages = rpcResponsesPerRPC + c.rpcOutBytes = rpcRequestSize + c.rpcOutMessages = rpcRequestsPerRPC + case "server": + c.rpcInBytes = rpcRequestSize + c.rpcInMessages = rpcRequestsPerRPC + c.rpcOutBytes = rpcResponseSize + c.rpcOutMessages = rpcResponsesPerRPC + default: + c.rpcInBytes = noop.Int64Histogram{} + c.rpcInMessages = noop.Int64Histogram{} + c.rpcOutBytes = noop.Int64Histogram{} + c.rpcOutMessages = noop.Int64Histogram{} + } + return c } diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/config_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/config_test.go index 4924abc9370..4d6192dcdca 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/config_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/config_test.go @@ -19,10 +19,10 @@ func TestNilInstruments(t *testing.T) { ctx := context.Background() assert.NotPanics(t, func() { c.rpcDuration.Record(ctx, 0) }, "rpcDuration") - assert.NotPanics(t, func() { c.rpcRequestSize.Record(ctx, 0) }, "rpcRequestSize") - assert.NotPanics(t, func() { c.rpcResponseSize.Record(ctx, 0) }, "rpcResponseSize") - assert.NotPanics(t, func() { c.rpcRequestsPerRPC.Record(ctx, 0) }, "rpcRequestsPerRPC") - assert.NotPanics(t, func() { c.rpcResponsesPerRPC.Record(ctx, 0) }, "rpcResponsesPerRPC") + assert.NotPanics(t, func() { c.rpcInBytes.Record(ctx, 0) }, "rpcInBytes") + assert.NotPanics(t, func() { c.rpcOutBytes.Record(ctx, 0) }, "rpcOutBytes") + assert.NotPanics(t, func() { c.rpcInMessages.Record(ctx, 0) }, "rpcInMessages") + assert.NotPanics(t, func() { c.rpcOutMessages.Record(ctx, 0) }, "rpcOutMessages") } type meterProvider struct { diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index fbcbfb84e04..c01cb897cd3 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -13,21 +13,22 @@ import ( "google.golang.org/grpc/stats" "google.golang.org/grpc/status" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" ) type gRPCContextKey struct{} type gRPCContext struct { - messagesReceived int64 - messagesSent int64 - metricAttrs []attribute.KeyValue - record bool + inMessages int64 + outMessages int64 + metricAttrs []attribute.KeyValue + record bool } type serverHandler struct { @@ -150,8 +151,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool case *stats.Begin: case *stats.InPayload: if gctx != nil { - messageId = atomic.AddInt64(&gctx.messagesReceived, 1) - c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...))) + messageId = atomic.AddInt64(&gctx.inMessages, 1) + c.rpcInBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...))) } if c.ReceivedEvent { @@ -166,8 +167,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool } case *stats.OutPayload: if gctx != nil { - messageId = atomic.AddInt64(&gctx.messagesSent, 1) - c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...))) + messageId = atomic.AddInt64(&gctx.outMessages, 1) + c.rpcOutBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...))) } if c.SentEvent { @@ -213,8 +214,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool c.rpcDuration.Record(ctx, elapsedTime, recordOpts...) if gctx != nil { - c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), recordOpts...) - c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), recordOpts...) + c.rpcInMessages.Record(ctx, atomic.LoadInt64(&gctx.inMessages), recordOpts...) + c.rpcOutMessages.Record(ctx, atomic.LoadInt64(&gctx.outMessages), recordOpts...) } default: return diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 96a257c8c07..884c60c1c21 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -16,22 +16,21 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" + testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/status" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" - "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" - testpb "google.golang.org/grpc/interop/grpc_testing" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test" ) var ( @@ -803,10 +802,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, - Max: metricdata.NewExtrema(int64(314167)), - Min: metricdata.NewExtrema(int64(314167)), + Max: metricdata.NewExtrema(int64(271840)), + Min: metricdata.NewExtrema(int64(271840)), Count: 1, - Sum: 314167, + Sum: 271840, }, { Attributes: attribute.NewSet( @@ -815,11 +814,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCSystemGRPC, testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, - BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, - Max: metricdata.NewExtrema(int64(4)), - Min: metricdata.NewExtrema(int64(4)), - Count: 1, - Sum: 4, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(45912)), + Min: metricdata.NewExtrema(int64(12)), + Count: 4, + Sum: 74948, }, { Attributes: attribute.NewSet( @@ -828,11 +827,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCSystemGRPC, testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, - BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, - Max: metricdata.NewExtrema(int64(58987)), - Min: metricdata.NewExtrema(int64(13)), - Count: 4, - Sum: 93082, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(21)), + Min: metricdata.NewExtrema(int64(21)), + Count: 1, + Sum: 21, }, { Attributes: attribute.NewSet( @@ -841,11 +840,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCSystemGRPC, testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, - BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, - Max: metricdata.NewExtrema(int64(58987)), - Min: metricdata.NewExtrema(int64(13)), + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(45918)), + Min: metricdata.NewExtrema(int64(16)), Count: 4, - Sum: 93082, + Sum: 74969, }, }, }, @@ -878,10 +877,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, - Max: metricdata.NewExtrema(int64(271840)), - Min: metricdata.NewExtrema(int64(271840)), + Max: metricdata.NewExtrema(int64(314167)), + Min: metricdata.NewExtrema(int64(314167)), Count: 1, - Sum: 271840, + Sum: 314167, }, { Attributes: attribute.NewSet( @@ -890,11 +889,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCSystemGRPC, testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, - BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, - Max: metricdata.NewExtrema(int64(45912)), - Min: metricdata.NewExtrema(int64(12)), - Count: 4, - Sum: 74948, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, }, { Attributes: attribute.NewSet( @@ -903,11 +902,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCSystemGRPC, testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, - BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, - Max: metricdata.NewExtrema(int64(21)), - Min: metricdata.NewExtrema(int64(21)), - Count: 1, - Sum: 21, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(58987)), + Min: metricdata.NewExtrema(int64(13)), + Count: 4, + Sum: 93082, }, { Attributes: attribute.NewSet( @@ -916,11 +915,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { semconv.RPCSystemGRPC, testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, - BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, - Max: metricdata.NewExtrema(int64(45918)), - Min: metricdata.NewExtrema(int64(16)), + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(58987)), + Min: metricdata.NewExtrema(int64(13)), Count: 4, - Sum: 74969, + Sum: 93082, }, }, }, @@ -969,10 +968,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, - Max: metricdata.NewExtrema(int64(1)), - Min: metricdata.NewExtrema(int64(1)), + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), Count: 1, - Sum: 1, + Sum: 4, }, { Attributes: attribute.NewSet( @@ -983,10 +982,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, - Max: metricdata.NewExtrema(int64(4)), - Min: metricdata.NewExtrema(int64(4)), + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), Count: 1, - Sum: 4, + Sum: 1, }, { Attributes: attribute.NewSet( @@ -1049,10 +1048,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, - Max: metricdata.NewExtrema(int64(4)), - Min: metricdata.NewExtrema(int64(4)), + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), Count: 1, - Sum: 4, + Sum: 1, }, { Attributes: attribute.NewSet( @@ -1063,10 +1062,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { testMetricAttr), Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, - Max: metricdata.NewExtrema(int64(1)), - Min: metricdata.NewExtrema(int64(1)), + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), Count: 1, - Sum: 1, + Sum: 4, }, { Attributes: attribute.NewSet(