Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/google.golang.org/{grpc, grpc.v12}: add support for WithSpanOptions #1159

Merged
merged 8 commits into from
Mar 20, 2023
33 changes: 15 additions & 18 deletions contrib/google.golang.org/grpc.v12/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package grpc // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc.v12"

import (
"math"
"net"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/internal/grpcutil"
Expand Down Expand Up @@ -41,31 +40,32 @@ func UnaryServerInterceptor(opts ...InterceptorOption) grpc.UnaryServerIntercept

log.Debug("contrib/google.golang.org/grpc.v12: Configuring UnaryServerInterceptor: %#v", cfg)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
span, ctx := startSpanFromContext(ctx, info.FullMethod, cfg.serviceName, cfg.analyticsRate)
span, ctx := startSpanFromContext(ctx, info.FullMethod, cfg.serviceName, cfg.spanOpts...)
resp, err := handler(ctx, req)
span.Finish(tracer.WithError(err))
return resp, err
}
}

func startSpanFromContext(ctx context.Context, method, service string, rate float64) (ddtrace.Span, context.Context) {
opts := []ddtrace.StartSpanOption{
func startSpanFromContext(ctx context.Context, method, service string, opts ...tracer.StartSpanOption) (ddtrace.Span, context.Context) {
// copy opts in case the caller reuses the slice in parallel
// we will add at least 5, at most 6 items
optsLocal := make([]tracer.StartSpanOption, len(opts), len(opts)+6)
copy(optsLocal, opts)
optsLocal = append(optsLocal,
tracer.ServiceName(service),
tracer.ResourceName(method),
tracer.Tag(tagMethod, method),
tracer.SpanType(ext.AppTypeRPC),
tracer.Measured(),
tracer.Tag(ext.Component, "google.golang.org/grpc.v12"),
tracer.Tag(ext.SpanKind, ext.SpanKindServer),
}
if !math.IsNaN(rate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, rate))
}
)
md, _ := metadata.FromContext(ctx) // nil is ok
if sctx, err := tracer.Extract(grpcutil.MDCarrier(md)); err == nil {
opts = append(opts, tracer.ChildOf(sctx))
optsLocal = append(optsLocal, tracer.ChildOf(sctx))
}
return tracer.StartSpanFromContext(ctx, "grpc.server", opts...)
return tracer.StartSpanFromContext(ctx, "grpc.server", optsLocal...)
}

// UnaryClientInterceptor will add tracing to a grpc client.
Expand All @@ -84,17 +84,14 @@ func UnaryClientInterceptor(opts ...InterceptorOption) grpc.UnaryClientIntercept
span ddtrace.Span
p peer.Peer
)
spanopts := []ddtrace.StartSpanOption{
spanopts := cfg.spanOpts
spanopts = append(spanopts,
tracer.Tag(tagMethod, method),
tracer.SpanType(ext.AppTypeRPC),
}
if !math.IsNaN(cfg.analyticsRate) {
spanopts = append(spanopts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate))
}
spanopts = append(spanopts, tracer.Tag(ext.Component, "google.golang.org/grpc.v12"))
spanopts = append(spanopts, tracer.Tag(ext.SpanKind, ext.SpanKindClient))
tracer.Tag(ext.Component, "google.golang.org/grpc.v12"),
tracer.Tag(ext.SpanKind, ext.SpanKindClient),
)
span, ctx = tracer.StartSpanFromContext(ctx, "grpc.client", spanopts...)

md, ok := metadata.FromContext(ctx)
if !ok {
md = metadata.MD{}
Expand Down
31 changes: 31 additions & 0 deletions contrib/google.golang.org/grpc.v12/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,35 @@ func TestAnalyticsSettings(t *testing.T) {

assertRate(t, mt, 0.23, WithAnalyticsRate(0.23))
})

t.Run("spanOpts", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

assertRate(t, mt, 0.23, WithAnalyticsRate(0.33), WithSpanOptions(tracer.AnalyticsRate(0.23)))
})
}

func TestSpanOpts(t *testing.T) {
assert := assert.New(t)
mt := mocktracer.Start()
defer mt.Stop()

rig, err := newRigWithOpts(true, WithSpanOptions(tracer.Tag("foo", "bar")))
if err != nil {
t.Fatalf("error setting up rig: %s", err)
}
defer rig.Close()
client := rig.client

resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "pass"})
assert.Nil(err)
assert.Equal(resp.Message, "passed")

spans := mt.FinishedSpans()
assert.Len(spans, 2)

for _, s := range spans {
assert.Equal(s.Tags()["foo"], "bar")
}
}
30 changes: 16 additions & 14 deletions contrib/google.golang.org/grpc.v12/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
package grpc

import (
"math"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
)

type interceptorConfig struct {
serviceName string
analyticsRate float64
serviceName string
spanOpts []ddtrace.StartSpanOption
}

// InterceptorOption represents an option that can be passed to the grpc unary
Expand All @@ -22,11 +22,9 @@ type InterceptorOption func(*interceptorConfig)

func defaults(cfg *interceptorConfig) {
// cfg.serviceName default set in interceptor
// cfg.analyticsRate = globalconfig.AnalyticsRate()
// cfg.spanOpts = append(cfg.spanOpts, tracer.AnalyticsRate(globalconfig.AnalyticsRate()))
if internal.BoolEnv("DD_TRACE_GRPC_ANALYTICS_ENABLED", false) {
cfg.analyticsRate = 1.0
} else {
cfg.analyticsRate = math.NaN()
cfg.spanOpts = append(cfg.spanOpts, tracer.AnalyticsRate(1.0))
}
}

Expand All @@ -41,9 +39,7 @@ func WithServiceName(name string) InterceptorOption {
func WithAnalytics(on bool) InterceptorOption {
return func(cfg *interceptorConfig) {
if on {
cfg.analyticsRate = 1.0
} else {
cfg.analyticsRate = math.NaN()
WithSpanOptions(tracer.AnalyticsRate(1.0))(cfg)
}
}
}
Expand All @@ -53,9 +49,15 @@ func WithAnalytics(on bool) InterceptorOption {
func WithAnalyticsRate(rate float64) InterceptorOption {
return func(cfg *interceptorConfig) {
if rate >= 0.0 && rate <= 1.0 {
cfg.analyticsRate = rate
} else {
cfg.analyticsRate = math.NaN()
WithSpanOptions(tracer.AnalyticsRate(rate))(cfg)
}
}
}

// WithSpanOptions defines a set of additional ddtrace.StartSpanOption to be added
// to spans started by the integration.
func WithSpanOptions(opts ...ddtrace.StartSpanOption) InterceptorOption {
return func(cfg *interceptorConfig) {
cfg.spanOpts = append(cfg.spanOpts, opts...)
}
}
7 changes: 4 additions & 3 deletions contrib/google.golang.org/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package grpc // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.or
import (
"errors"
"io"
"math"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/internal/grpcutil"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
Expand All @@ -28,15 +27,17 @@ import (
var spanTypeRPC = tracer.SpanType(ext.AppTypeRPC)

func (cfg *config) startSpanOptions(opts ...tracer.StartSpanOption) []tracer.StartSpanOption {
if len(cfg.tags) == 0 && math.IsNaN(cfg.analyticsRate) {
if len(cfg.tags) == 0 && len(cfg.spanOpts) == 0 {
return opts
}

ret := make([]tracer.StartSpanOption, 0, 1+len(cfg.tags)+len(opts))
ret = append(ret, tracer.AnalyticsRate(cfg.analyticsRate))
for _, opt := range opts {
ret = append(ret, opt)
}
for _, opt := range cfg.spanOpts {
ret = append(ret, opt)
}
for key, tag := range cfg.tags {
ret = append(ret, tracer.Tag(key, tag))
}
Expand Down
65 changes: 65 additions & 0 deletions contrib/google.golang.org/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,13 @@ func TestAnalyticsSettings(t *testing.T) {

assertRate(t, mt, 0.23, WithAnalyticsRate(0.23))
})

t.Run("spanOpts", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

assertRate(t, mt, 0.23, WithAnalyticsRate(0.33), WithSpanOptions(tracer.AnalyticsRate(0.23)))
})
}

func TestIgnoredMethods(t *testing.T) {
Expand Down Expand Up @@ -910,6 +917,64 @@ func TestIgnoredMetadata(t *testing.T) {
}
}

func TestSpanOpts(t *testing.T) {
t.Run("unary", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
rig, err := newRig(true, WithSpanOptions(tracer.Tag("foo", "bar")))
if err != nil {
t.Fatalf("error setting up rig: %s", err)
}
client := rig.client
resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "pass"})
assert.Nil(t, err)
assert.Equal(t, resp.Message, "passed")

spans := mt.FinishedSpans()
assert.Len(t, spans, 2)

for _, span := range spans {
assert.Equal(t, span.Tags()["foo"], "bar")
}
rig.Close()
mt.Reset()
})

t.Run("stream", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
rig, err := newRig(true, WithSpanOptions(tracer.Tag("foo", "bar")))
if err != nil {
t.Fatalf("error setting up rig: %s", err)
}

ctx, done := context.WithCancel(context.Background())
client := rig.client
stream, err := client.StreamPing(ctx)
assert.NoError(t, err)

err = stream.Send(&FixtureRequest{Name: "pass"})
assert.NoError(t, err)

resp, err := stream.Recv()
assert.NoError(t, err)
assert.Equal(t, resp.Message, "passed")

assert.NoError(t, stream.CloseSend())
done() // close stream from client side
rig.Close()

waitForSpans(mt, 7, 5*time.Second)

spans := mt.FinishedSpans()
assert.Len(t, spans, 7)
for _, span := range spans {
assert.Equal(t, span.Tags()["foo"], "bar")
}
mt.Reset()
})
}

func TestCustomTag(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
Expand Down
28 changes: 15 additions & 13 deletions contrib/google.golang.org/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package grpc

import (
"math"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"

Expand All @@ -21,7 +21,6 @@ type Option func(*config)
type config struct {
serviceName string
nonErrorCodes map[codes.Code]bool
analyticsRate float64
traceStreamCalls bool
traceStreamMessages bool
noDebugStack bool
Expand All @@ -30,6 +29,7 @@ type config struct {
withMetadataTags bool
ignoredMetadata map[string]struct{}
withRequestTags bool
spanOpts []ddtrace.StartSpanOption
tags map[string]interface{}
}

Expand Down Expand Up @@ -60,11 +60,9 @@ func defaults(cfg *config) {
cfg.traceStreamCalls = true
cfg.traceStreamMessages = true
cfg.nonErrorCodes = map[codes.Code]bool{codes.Canceled: true}
// cfg.analyticsRate = globalconfig.AnalyticsRate()
// cfg.spanOpts = append(cfg.spanOpts, tracer.AnalyticsRate(globalconfig.AnalyticsRate()))
ajgajg1134 marked this conversation as resolved.
Show resolved Hide resolved
if internal.BoolEnv("DD_TRACE_GRPC_ANALYTICS_ENABLED", false) {
cfg.analyticsRate = 1.0
} else {
cfg.analyticsRate = math.NaN()
cfg.spanOpts = append(cfg.spanOpts, tracer.AnalyticsRate(1.0))
}
cfg.ignoredMetadata = map[string]struct{}{
"x-datadog-trace-id": {},
Expand Down Expand Up @@ -119,9 +117,7 @@ func NonErrorCodes(cs ...codes.Code) InterceptorOption {
func WithAnalytics(on bool) Option {
return func(cfg *config) {
if on {
cfg.analyticsRate = 1.0
} else {
cfg.analyticsRate = math.NaN()
WithSpanOptions(tracer.AnalyticsRate(1.0))(cfg)
}
}
}
Expand All @@ -131,9 +127,7 @@ func WithAnalytics(on bool) Option {
func WithAnalyticsRate(rate float64) Option {
return func(cfg *config) {
if rate >= 0.0 && rate <= 1.0 {
cfg.analyticsRate = rate
} else {
cfg.analyticsRate = math.NaN()
WithSpanOptions(tracer.AnalyticsRate(rate))(cfg)
}
}
}
Expand Down Expand Up @@ -198,3 +192,11 @@ func WithCustomTag(key string, value interface{}) Option {
cfg.tags[key] = value
}
}

// WithSpanOptions defines a set of additional ddtrace.StartSpanOption to be added
// to spans started by the integration.
func WithSpanOptions(opts ...ddtrace.StartSpanOption) Option {
return func(cfg *config) {
cfg.spanOpts = append(cfg.spanOpts, opts...)
}
}
2 changes: 1 addition & 1 deletion contrib/google.golang.org/grpc/stats_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo)
rti.FullMethodName,
"grpc.client",
h.cfg.clientServiceName(),
tracer.AnalyticsRate(h.cfg.analyticsRate),
h.cfg.spanOpts...,
)
ctx = injectSpanIntoContext(ctx)
return ctx
Expand Down
3 changes: 2 additions & 1 deletion contrib/google.golang.org/grpc/stats_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestClientStatsHandler(t *testing.T) {
assert := assert.New(t)

serviceName := "grpc-service"
statsHandler := NewClientStatsHandler(WithServiceName(serviceName))
statsHandler := NewClientStatsHandler(WithServiceName(serviceName), WithSpanOptions(tracer.Tag("foo", "bar")))
server, err := newClientStatsHandlerTestServer(statsHandler)
if err != nil {
t.Fatalf("failed to start test server: %s", err)
Expand Down Expand Up @@ -56,6 +56,7 @@ func TestClientStatsHandler(t *testing.T) {
assert.Equal("/grpc.Fixture/Ping", tags[tagMethodName])
assert.Equal("127.0.0.1", tags[ext.TargetHost])
assert.Equal(server.port, tags[ext.TargetPort])
assert.Equal("bar", tags["foo"])
}

func newClientStatsHandlerTestServer(statsHandler stats.Handler) (*rig, error) {
Expand Down
4 changes: 2 additions & 2 deletions contrib/google.golang.org/grpc/stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ type serverStatsHandler struct {

// TagRPC starts a new span for the initiated RPC request.
func (h *serverStatsHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context {
h.cfg.spanOpts = append(h.cfg.spanOpts, tracer.Measured())
_, ctx = startSpanFromContext(
ctx,
rti.FullMethodName,
"grpc.server",
h.cfg.serverServiceName(),
tracer.AnalyticsRate(h.cfg.analyticsRate),
tracer.Measured(),
h.cfg.spanOpts...,
)
return ctx
}
Expand Down
Loading