Skip to content

Commit

Permalink
Fix request/response metrics. In StatsHandler In means response for C…
Browse files Browse the repository at this point in the history
…lient

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Oct 15, 2024
1 parent 9cf5701 commit 2d71950
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 32 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Fixed

- Fix request/response metrics. In StatsHandler In means response for Client. (#6250)

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
52 changes: 35 additions & 17 deletions instrumentation/google.golang.org/grpc/otelgrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type config struct {
tracer trace.Tracer
meter metric.Meter

rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
rpcDuration metric.Float64Histogram
rpcInBytes metric.Int64Histogram
rpcOutBytes metric.Int64Histogram
rpcInMessages metric.Int64Histogram
rpcOutMessages metric.Int64Histogram
}

// Option applies an option value for a config.
Expand Down Expand Up @@ -96,46 +96,64 @@ func newConfig(opts []Option, role string) *config {
}
}

c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
rpcRequestSize, err := c.meter.Int64Histogram("rpc."+role+".request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcRequestSize == nil {
c.rpcRequestSize = noop.Int64Histogram{}
if rpcRequestSize == nil {
rpcRequestSize = noop.Int64Histogram{}
}
}

c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
rpcResponseSize, err := c.meter.Int64Histogram("rpc."+role+".response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcResponseSize == nil {
c.rpcResponseSize = noop.Int64Histogram{}
if rpcResponseSize == nil {
rpcResponseSize = noop.Int64Histogram{}
}
}

c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
rpcRequestsPerRPC, err := c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcRequestsPerRPC == nil {
c.rpcRequestsPerRPC = noop.Int64Histogram{}
if rpcRequestsPerRPC == nil {
rpcRequestsPerRPC = noop.Int64Histogram{}
}
}

c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
rpcResponsesPerRPC, err := c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcResponsesPerRPC == nil {
c.rpcResponsesPerRPC = noop.Int64Histogram{}
if rpcResponsesPerRPC == nil {
rpcResponsesPerRPC = noop.Int64Histogram{}
}
}

switch role {
case "client":
c.rpcInBytes = rpcResponseSize
c.rpcInMessages = rpcResponsesPerRPC
c.rpcOutBytes = rpcRequestSize
c.rpcOutMessages = rpcRequestsPerRPC
case "server":
c.rpcInBytes = rpcRequestSize
c.rpcInMessages = rpcRequestsPerRPC
c.rpcOutBytes = rpcResponseSize
c.rpcOutMessages = rpcResponsesPerRPC
default:
c.rpcInBytes = noop.Int64Histogram{}
c.rpcInMessages = noop.Int64Histogram{}
c.rpcOutBytes = noop.Int64Histogram{}
c.rpcOutMessages = noop.Int64Histogram{}
}

return c
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func TestNilInstruments(t *testing.T) {

ctx := context.Background()
assert.NotPanics(t, func() { c.rpcDuration.Record(ctx, 0) }, "rpcDuration")
assert.NotPanics(t, func() { c.rpcRequestSize.Record(ctx, 0) }, "rpcRequestSize")
assert.NotPanics(t, func() { c.rpcResponseSize.Record(ctx, 0) }, "rpcResponseSize")
assert.NotPanics(t, func() { c.rpcRequestsPerRPC.Record(ctx, 0) }, "rpcRequestsPerRPC")
assert.NotPanics(t, func() { c.rpcResponsesPerRPC.Record(ctx, 0) }, "rpcResponsesPerRPC")
assert.NotPanics(t, func() { c.rpcInBytes.Record(ctx, 0) }, "rpcInBytes")
assert.NotPanics(t, func() { c.rpcOutBytes.Record(ctx, 0) }, "rpcOutBytes")
assert.NotPanics(t, func() { c.rpcInMessages.Record(ctx, 0) }, "rpcInMessages")
assert.NotPanics(t, func() { c.rpcOutMessages.Record(ctx, 0) }, "rpcOutMessages")
}

type meterProvider struct {
Expand Down
23 changes: 12 additions & 11 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ import (
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
)

type gRPCContextKey struct{}

type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
record bool
inMessages int64
outMessages int64
metricAttrs []attribute.KeyValue
record bool
}

type serverHandler struct {
Expand Down Expand Up @@ -150,8 +151,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
messageId = atomic.AddInt64(&gctx.inMessages, 1)
c.rpcInBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

if c.ReceivedEvent {
Expand All @@ -166,8 +167,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
}
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
messageId = atomic.AddInt64(&gctx.outMessages, 1)
c.rpcOutBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

if c.SentEvent {
Expand Down Expand Up @@ -213,8 +214,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool

c.rpcDuration.Record(ctx, elapsedTime, recordOpts...)
if gctx != nil {
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), recordOpts...)
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), recordOpts...)
c.rpcInMessages.Record(ctx, atomic.LoadInt64(&gctx.inMessages), recordOpts...)
c.rpcOutMessages.Record(ctx, atomic.LoadInt64(&gctx.outMessages), recordOpts...)
}
default:
return
Expand Down

0 comments on commit 2d71950

Please sign in to comment.