From 89fcd1ab4c549797f567a41da9a2b411426ba33d Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Wed, 29 Jul 2020 14:44:39 +0800 Subject: [PATCH] Add propagator option for gRPC instrumentation --- instrumentation/grpctrace/interceptor.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/instrumentation/grpctrace/interceptor.go b/instrumentation/grpctrace/interceptor.go index 7a8d195e21d0..90d6d84a4347 100644 --- a/instrumentation/grpctrace/interceptor.go +++ b/instrumentation/grpctrace/interceptor.go @@ -69,14 +69,14 @@ var ( // s := grpc.NewServer( // grpc.WithUnaryInterceptor(grpctrace.UnaryClientInterceptor(tracer)), // ..., // (existing DialOptions)) -func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor { +func UnaryClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryClientInterceptor { return func( ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, - opts ...grpc.CallOption, + callOpts ...grpc.CallOption, ) error { requestMetadata, _ := metadata.FromOutgoingContext(ctx) metadataCopy := requestMetadata.Copy() @@ -91,12 +91,12 @@ func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor { ) defer span.End() - Inject(ctx, &metadataCopy) + Inject(ctx, &metadataCopy, opts...) ctx = metadata.NewOutgoingContext(ctx, metadataCopy) messageSent.Event(ctx, 1, req) - err := invoker(ctx, method, req, reply, cc, opts...) + err := invoker(ctx, method, req, reply, cc, callOpts...) messageReceived.Event(ctx, 1, reply) @@ -248,14 +248,14 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { // s := grpc.Dial( // grpc.WithStreamInterceptor(grpctrace.StreamClientInterceptor(tracer)), // ..., // (existing DialOptions)) -func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor { +func StreamClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamClientInterceptor { return func( ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, - opts ...grpc.CallOption, + callOpts ...grpc.CallOption, ) (grpc.ClientStream, error) { requestMetadata, _ := metadata.FromOutgoingContext(ctx) metadataCopy := requestMetadata.Copy() @@ -269,10 +269,10 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor { trace.WithAttributes(attr...), ) - Inject(ctx, &metadataCopy) + Inject(ctx, &metadataCopy, opts...) ctx = metadata.NewOutgoingContext(ctx, metadataCopy) - s, err := streamer(ctx, desc, cc, method, opts...) + s, err := streamer(ctx, desc, cc, method, callOpts...) stream := wrapClientStream(s, desc) go func() { @@ -300,7 +300,7 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor { // s := grpc.Dial( // grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor(tracer)), // ..., // (existing ServerOptions)) -func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor { +func UnaryServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryServerInterceptor { return func( ctx context.Context, req interface{}, @@ -310,7 +310,7 @@ func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor { requestMetadata, _ := metadata.FromIncomingContext(ctx) metadataCopy := requestMetadata.Copy() - entries, spanCtx := Extract(ctx, &metadataCopy) + entries, spanCtx := Extract(ctx, &metadataCopy, opts...) ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{ MultiKV: entries, })) @@ -388,7 +388,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream { // s := grpc.Dial( // grpc.StreamInterceptor(grpctrace.StreamServerInterceptor(tracer)), // ..., // (existing ServerOptions)) -func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor { +func StreamServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamServerInterceptor { return func( srv interface{}, ss grpc.ServerStream, @@ -400,7 +400,7 @@ func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor { requestMetadata, _ := metadata.FromIncomingContext(ctx) metadataCopy := requestMetadata.Copy() - entries, spanCtx := Extract(ctx, &metadataCopy) + entries, spanCtx := Extract(ctx, &metadataCopy, opts...) ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{ MultiKV: entries, }))