Skip to content

Commit

Permalink
Align gRPC server status code to span status code
Browse files Browse the repository at this point in the history
  • Loading branch information
dragon3 committed Dec 9, 2023
1 parent 4ec047e commit 10beddf
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 4 deletions.
37 changes: 33 additions & 4 deletions interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
)
}
attributes = attributeFilter(req, attributes...)
span.SetStatus(spanStatus(protocol, err))
if isClient {
span.SetStatus(clientSpanStatus(protocol, err))
} else {
span.SetStatus(serverSpanStatus(protocol, err))
}
span.SetAttributes(attributes...)
instrumentation.duration.Record(ctx, i.config.now().Sub(requestStartTime).Milliseconds(), metric.WithAttributes(attributes...))
instrumentation.requestSize.Record(ctx, int64(requestSize), metric.WithAttributes(attributes...))
Expand Down Expand Up @@ -243,7 +247,7 @@ func (i *Interceptor) WrapStreamingClient(next connect.StreamingClientFunc) conn
}
span.SetAttributes(state.attributes...)
span.SetAttributes(headerAttributes(protocol, responseKey, conn.ResponseHeader(), i.config.responseHeaderKeys)...)
span.SetStatus(spanStatus(protocol, state.error))
span.SetStatus(clientSpanStatus(protocol, state.error))
span.End()
instrumentation.requestsPerRPC.Record(ctx, state.sentCounter,
metric.WithAttributes(state.attributes...))
Expand Down Expand Up @@ -328,7 +332,7 @@ func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) co
}
span.SetAttributes(state.attributes...)
span.SetAttributes(headerAttributes(protocol, responseKey, conn.ResponseHeader(), i.config.responseHeaderKeys)...)
span.SetStatus(spanStatus(protocol, err))
span.SetStatus(serverSpanStatus(protocol, err))
instrumentation.requestsPerRPC.Record(ctx, state.receivedCounter,
metric.WithAttributes(state.attributes...))
instrumentation.responsesPerRPC.Record(ctx, state.sentCounter,
Expand All @@ -354,7 +358,7 @@ func protocolToSemConv(protocol string) string {
}
}

func spanStatus(protocol string, err error) (codes.Code, string) {
func clientSpanStatus(protocol string, err error) (codes.Code, string) {
if err == nil {
return codes.Unset, ""
}
Expand All @@ -366,3 +370,28 @@ func spanStatus(protocol string, err error) (codes.Code, string) {
}
return codes.Error, err.Error()
}

func serverSpanStatus(protocol string, err error) (codes.Code, string) {
if err == nil {
return codes.Unset, ""
}
if protocol == connectProtocol && connect.IsNotModifiedError(err) {
return codes.Unset, ""
}

if connectErr := new(connect.Error); errors.As(err, &connectErr) {
switch connectErr.Code() {

Check failure on line 383 in interceptor.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

missing cases in switch of type connect.Code: connect.CodeCanceled, connect.CodeInvalidArgument, connect.CodeNotFound, connect.CodeAlreadyExists, connect.CodePermissionDenied, connect.CodeResourceExhausted, connect.CodeFailedPrecondition, connect.CodeAborted, connect.CodeOutOfRange, connect.CodeUnauthenticated (exhaustive)
case connect.CodeUnknown,
connect.CodeDeadlineExceeded,
connect.CodeUnimplemented,
connect.CodeInternal,
connect.CodeUnavailable,
connect.CodeDataLoss:
return codes.Error, connectErr.Message()
default:
return codes.Unset, ""
}
}

return codes.Error, err.Error()
}
105 changes: 105 additions & 0 deletions interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package otelconnect

import (
"context"
"errors"
"math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -1970,6 +1971,87 @@ func TestWithoutTraceEventsUnary(t *testing.T) {
}, spanRecorder.Ended())
}

func TestServerSpanStatus(t *testing.T) {
t.Parallel()
var propagator propagation.TraceContext
for _, tc := range serverSpanStatusTestCases {

Check failure on line 1977 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

variable name 'tc' is too short for the scope of its usage (varnamelen)
spanRecorder := tracetest.NewSpanRecorder()
traceProvider := trace.NewTracerProvider(trace.WithSpanProcessor(spanRecorder))
clientSpanRecorder := tracetest.NewSpanRecorder()
clientTraceProvider := trace.NewTracerProvider(trace.WithSpanProcessor(clientSpanRecorder))
serverInterceptor, err := NewInterceptor(
WithTracerProvider(traceProvider),
WithoutTraceEvents(),
)
require.NoError(t, err)
clientInterceptor, err := NewInterceptor(
WithPropagator(propagator),
WithTracerProvider(clientTraceProvider),
)
require.NoError(t, err)
pingClient, _, _ := startServer([]connect.HandlerOption{
connect.WithInterceptors(serverInterceptor),
}, []connect.ClientOption{
connect.WithInterceptors(clientInterceptor),
}, &pluggablePingServer{
ping: func(ctx context.Context, r *connect.Request[pingv1.PingRequest]) (*connect.Response[pingv1.PingResponse], error) {
return nil, connect.NewError(tc.connectCode, errors.New(tc.connectCode.String()))
},
})
if _, err := pingClient.Ping(context.Background(), requestOfSize(1, 0)); err == nil {
t.Error("want error")
}
require.Len(t, spanRecorder.Ended(), 1)
require.Equal(t, codes.Error, clientSpanRecorder.Ended()[0].Status().Code)
require.Equal(t, tc.wantServerSpanCode, spanRecorder.Ended()[0].Status().Code)
require.Equal(t, tc.wantServerSpanDescription, spanRecorder.Ended()[0].Status().Description)
}
}

func TestStreamingServerSpanStatus(t *testing.T) {
t.Parallel()
var propagator propagation.TraceContext
for _, tc := range serverSpanStatusTestCases {

Check failure on line 2014 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

variable name 'tc' is too short for the scope of its usage (varnamelen)
handlerSpanRecorder := tracetest.NewSpanRecorder()
handlerTraceProvider := trace.NewTracerProvider(trace.WithSpanProcessor(handlerSpanRecorder))
clientSpanRecorder := tracetest.NewSpanRecorder()
clientTraceProvider := trace.NewTracerProvider(trace.WithSpanProcessor(clientSpanRecorder))
serverInterceptor, err := NewInterceptor(
WithTracerProvider(handlerTraceProvider),
WithoutTraceEvents(),
)
require.NoError(t, err)
clientInterceptor, err := NewInterceptor(
WithPropagator(propagator),
WithTracerProvider(clientTraceProvider),
)
require.NoError(t, err)
client, _, _ := startServer(
[]connect.HandlerOption{
connect.WithInterceptors(serverInterceptor),
}, []connect.ClientOption{
connect.WithInterceptors(clientInterceptor),
}, &pluggablePingServer{
pingStream: func(ctx context.Context, bs *connect.BidiStream[pingv1.PingStreamRequest, pingv1.PingStreamResponse]) error {
return connect.NewError(tc.connectCode, errors.New(tc.connectCode.String()))
},
})
stream := client.PingStream(context.Background())
assert.NoError(t, stream.Send(&pingv1.PingStreamRequest{

Check failure on line 2040 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

require-error: for error assertions use require (testifylint)
Data: []byte("Hello, otel!"),
}))
_, err = stream.Receive()
assert.Error(t, err)

Check failure on line 2044 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

require-error: for error assertions use require (testifylint)
assert.NoError(t, stream.CloseRequest())

Check failure on line 2045 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

require-error: for error assertions use require (testifylint)
assert.NoError(t, stream.CloseResponse())
assert.Equal(t, len(handlerSpanRecorder.Ended()), 1)

Check failure on line 2047 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

len: use assert.Len (testifylint)
assert.Equal(t, len(clientSpanRecorder.Ended()), 1)

Check failure on line 2048 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

len: use assert.Len (testifylint)
assert.Equal(t, tc.wantServerSpanCode, handlerSpanRecorder.Ended()[0].Status().Code)
assert.Equal(t, tc.wantServerSpanDescription, handlerSpanRecorder.Ended()[0].Status().Description)
assert.Equal(t, clientSpanRecorder.Ended()[0].Status().Code, codes.Error)

Check failure on line 2051 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

expected-actual: need to reverse actual and expected values (testifylint)
}
}

// streamingHandlerInterceptorFunc is a simple Interceptor implementation that only
// wraps streaming handler RPCs. It has no effect on unary or streaming client RPCs.
type streamingHandlerInterceptorFunc func(connect.StreamingHandlerFunc) connect.StreamingHandlerFunc
Expand Down Expand Up @@ -2121,3 +2203,26 @@ func metricResource() *resource.Resource {
attribute.String("telemetry.sdk.version", otel.Version()),
)
}

var serverSpanStatusTestCases = []struct {

Check failure on line 2207 in interceptor_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

serverSpanStatusTestCases is a global variable (gochecknoglobals)
connectCode connect.Code
wantServerSpanCode codes.Code
wantServerSpanDescription string
}{
{connectCode: connect.CodeCanceled, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeUnknown, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeUnknown.String()},
{connectCode: connect.CodeInvalidArgument, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeDeadlineExceeded, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeDeadlineExceeded.String()},
{connectCode: connect.CodeNotFound, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeAlreadyExists, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodePermissionDenied, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeResourceExhausted, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeFailedPrecondition, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeAborted, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeOutOfRange, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeUnimplemented, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeUnimplemented.String()},
{connectCode: connect.CodeInternal, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeInternal.String()},
{connectCode: connect.CodeUnavailable, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeUnavailable.String()},
{connectCode: connect.CodeDataLoss, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeDataLoss.String()},
{connectCode: connect.CodeUnauthenticated, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
}

0 comments on commit 10beddf

Please sign in to comment.