Skip to content

Commit

Permalink
Changed context propagation to deal binary directly using metadata an…
Browse files Browse the repository at this point in the history
…d remove base64 usages
  • Loading branch information
purnesh42H committed Nov 12, 2024
1 parent 3c8389b commit 062e769
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 323 deletions.
78 changes: 43 additions & 35 deletions stats/opentelemetry/grpc_trace_bin_propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ package opentelemetry

import (
"context"
"encoding/base64"

otelpropagation "go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
itracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
"google.golang.org/grpc/stats"
)

// GRPCTraceBinHeaderKey is the gRPC metadata header key `grpc-trace-bin` used
// to propagate trace context in binary format.
const GRPCTraceBinHeaderKey = "grpc-trace-bin"

// GRPCTraceBinPropagator is an OpenTelemetry TextMapPropagator which is used
// to extract and inject trace context data from and into headers exchanged by
// gRPC applications. It propagates trace data in binary format using the
Expand All @@ -36,53 +39,58 @@ type GRPCTraceBinPropagator struct{}
// Inject sets OpenTelemetry trace context information from the Context into
// the carrier.
//
// If the carrier is a CustomCarrier, trace data is directly injected in a
// binary format using the `grpc-trace-bin` header (fast path). Otherwise,
// the trace data is base64 encoded and injected using the same header in
// text format (slow path). If span context is not valid or emptu, no data is
// injected.
// It first attempts to retrieve any existing binary trace data from the
// provided context using `stats.Trace()`. If found, it means that a trace data
// was injected by a system using gRPC OpenCensus plugin. Hence, we inject this
// trace data into the carrier, allowing trace from system using gRPC
// OpenCensus plugin to propagate downstream. However, we set the value in
// string format against `grpc-trace-bin` key so that downstream systems which
// are using gRPC OpenTelemetry plugin are able to extract it using
// `GRPCTraceBinPropagator`.
//
// It then attempts to retrieve an OpenTelemetry span context from the provided
// context. If not found, that means either there is no trace context or previous
// system had injected it using gRPC OpenCensus plugin. Therefore, it returns
// early without doing anymore modification to carrier. If found, that means
// previous system had injected using OpenTelemetry plugin so it converts the
// span context to binary and set in carrier in string format against
// `grpc-trace-bin` key.
func (GRPCTraceBinPropagator) Inject(ctx context.Context, carrier otelpropagation.TextMapCarrier) {
span := oteltrace.SpanFromContext(ctx)
if !span.SpanContext().IsValid() {
return
bd := stats.Trace(ctx)
if bd != nil {
carrier.Set(GRPCTraceBinHeaderKey, string(bd))
}

bd := binary(span.SpanContext())
if bd == nil {
sc := oteltrace.SpanFromContext(ctx)
if !sc.SpanContext().IsValid() {
return
}

if cc, ok := carrier.(*itracing.CustomCarrier); ok {
cc.SetBinary(bd)
return
}
carrier.Set(itracing.GRPCTraceBinHeaderKey, base64.StdEncoding.EncodeToString(bd))
bd = binary(sc.SpanContext())
carrier.Set(GRPCTraceBinHeaderKey, string(bd))
}

// Extract reads OpenTelemetry trace context information from the carrier into a
// Context.
//
// If the carrier is a CustomCarrier, trace data is read directly in a binary
// format from the `grpc-trace-bin` header (fast path). Otherwise, the trace
// data is base64 decoded from the same header in text format (slow path).
// It first attempts to read `grpc-trace-bin` header value from carrier. If
// found, that means the trace data was injected using gRPC OpenTelemetry
// plugin using `GRPCTraceBinPropagator`. It then set the trace data into
// context using `stats.SetTrace` for downstream systems still using gRPC
// OpenCensus plugin to be able to use this context.
//
// If a valid trace context is found, this function returns a new context
// derived from the input `ctx` containing the extracted span context. The
// extracted span context is marked as "remote", indicating that the trace
// originated from a different process or service. If trace context is invalid
// or not present, input `ctx` is returned as is.
// It then also extracts the OpenTelemetry span context from binary header
// value. If span context is not valid, it just returns the context as parent.
// If span context is valid, it creates a new context containing the extracted
// OpenTelemetry span context marked as remote so that downstream systems using
// OpenTelemetry are able use this context.
func (GRPCTraceBinPropagator) Extract(ctx context.Context, carrier otelpropagation.TextMapCarrier) context.Context {
var bd []byte
if cc, ok := carrier.(*itracing.CustomCarrier); ok {
bd = cc.GetBinary()
} else {
bd, _ = base64.StdEncoding.DecodeString(carrier.Get(itracing.GRPCTraceBinHeaderKey))
}
if bd == nil {
return ctx
h := carrier.Get(GRPCTraceBinHeaderKey)
if h != "" {
ctx = stats.SetTrace(ctx, []byte(h))
}

sc, ok := fromBinary([]byte(bd))
sc, ok := fromBinary([]byte(h))
if !ok {
return ctx
}
Expand All @@ -95,7 +103,7 @@ func (GRPCTraceBinPropagator) Extract(ctx context.Context, carrier otelpropagati
// `grpc-trace-bin` key because it only sets the `grpc-trace-bin` header for
// propagating trace context.
func (GRPCTraceBinPropagator) Fields() []string {
return []string{itracing.GRPCTraceBinHeaderKey}
return []string{GRPCTraceBinHeaderKey}
}

// Binary returns the binary format representation of a SpanContext.
Expand Down
198 changes: 93 additions & 105 deletions stats/opentelemetry/grpc_trace_bin_propagator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,168 +20,156 @@ package opentelemetry

import (
"context"
"encoding/base64"
"testing"

"github.com/google/go-cmp/cmp"
otelpropagation "go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
itracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
)

// validSpanContext is a valid OpenTelemetry span context.
var validSpanContext = oteltrace.SpanContext{}.WithTraceID(
oteltrace.TraceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}).WithSpanID(
oteltrace.SpanID{17, 18, 19, 20, 21, 22, 23, 24}).WithTraceFlags(
oteltrace.TraceFlags(1))
// Valid OpenTelemetry span contexts for testing.
var (
validSpanContext1 = oteltrace.SpanContext{}.WithTraceID(
oteltrace.TraceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}).WithSpanID(
oteltrace.SpanID{17, 18, 19, 20, 21, 22, 23, 24}).WithTraceFlags(
oteltrace.TraceFlags(1))
validSpanContext2 = oteltrace.SpanContext{}.WithTraceID(
oteltrace.TraceID{17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}).WithSpanID(
oteltrace.SpanID{33, 34, 35, 36, 37, 38, 39, 40}).WithTraceFlags(
oteltrace.TraceFlags(1))
)

// TestInject verifies that the GRPCTraceBinPropagator correctly injects
// OpenTelemetry span context as `grpc-trace-bin` header in the provided
// carrier's context, for both fast and slow path, if span context is valid. If
// span context is invalid, carrier's context is not modified which is verified
// by retrieving zero value span context from carrier's context.
// existing binary trace data or OpenTelemetry span context as `grpc-trace-bin`
// header in the provided carrier's metadata.
//
// For existing binary traced data, it maintains a test field `scExists` which
// if contains a valid span context, it set it as binary trace data using
// `stats.SetIncomingTrace` to mimick scenario of previous system setting trace
// context using gRPC OpenCensus plugin. It then verifies that if valid
// OpenTelemetry span context is not present to inject, the carrier's metadata
// should have binary equivalent of `scExists`.
//
// For fast path, it passes `CustomCarrier` as carrier to `Inject()` which
// injects the span context using `CustomCarrier.SetBinary()`. It then
// retrieves the injected span context using `stats.OutgoingTrace()` for
// verification because `SetBinary()` does injection directly in binary format.
// For OpenTelemetry span context, it maintains a test field `scToInject` which
// if contains a valid span context, it creates a context using that span
// context. It then verifies that irrespective of whether existing trace data
// is present or not, if `scToInject` is valid span context, carrier's metadata
// should have binary equivalent of `scToInject`.
//
// For slow path, it passes `otel.MapCarrier` as carrier to `Inject()` which
// injects the span context after base64 encoding as string using
// `carrier.Set()`. It then retrieves the injected span context using
// `carrier.Get()` for verification because `Set()` does injection in string
// format.
// If both `scToInject` and `scExists` are invalid span contexts, it verifies
// that `grpc-trace-bin` header is not set in the carrier's metadata.
func (s) TestInject(t *testing.T) {
tests := []struct {
name string
sc oteltrace.SpanContext
fast bool // to indicate whether to follow fast path or slow path for injection verification
validSC bool // to indicate whether to expect a valid span context or not
name string
scToInject oteltrace.SpanContext // span context to inject from the context to carrier
scExists oteltrace.SpanContext // existing trace data in the context to set using `stats.SetIncomingTrace` to mimick scenario of previous system setting trace context using gRPC OpenCensus plugin.
wantSC oteltrace.SpanContext // expected span context from carrier after injection
}{
{
name: "fast path, valid context",
sc: validSpanContext,
fast: true,
validSC: true,
name: "inject valid span context, no existing trace data",
scToInject: validSpanContext1,
scExists: oteltrace.SpanContext{},
wantSC: validSpanContext1,
},
{
name: "fast path, invalid context",
sc: oteltrace.SpanContext{},
fast: true,
validSC: false,
name: "invalid span context to inject, existing trace data present",
scToInject: oteltrace.SpanContext{},
scExists: validSpanContext2,
wantSC: validSpanContext2,
},
{
name: "slow path, valid context",
sc: validSpanContext,
fast: false,
validSC: true,
name: "valid span context to inject, existing trace data present",
scToInject: validSpanContext1,
scExists: validSpanContext2,
wantSC: validSpanContext1, // if new valid OpenTelemetry span context is present, it overrides existing trace data
},
{
name: "slow path, invalid context",
sc: oteltrace.SpanContext{},
fast: false,
validSC: false,
name: "invalid span context to inject, no trace data present",
scToInject: oteltrace.SpanContext{},
scExists: oteltrace.SpanContext{},
wantSC: oteltrace.SpanContext{},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := GRPCTraceBinPropagator{}
tCtx, tCancel := context.WithCancel(context.Background())
tCtx = oteltrace.ContextWithSpanContext(tCtx, test.sc)
defer tCancel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var c otelpropagation.TextMapCarrier
if test.fast {
c = itracing.NewCustomCarrier(metadata.NewOutgoingContext(ctx, metadata.MD{}))
} else {
c = otelpropagation.MapCarrier{}
if test.scExists.IsValid() {
ctx = stats.SetIncomingTrace(ctx, binary(test.scExists))
}
if test.scToInject.IsValid() {
ctx = oteltrace.ContextWithSpanContext(ctx, test.scToInject)
}
p.Inject(tCtx, c)

var gotSC oteltrace.SpanContext
var gotValidSC bool
if test.fast {
if gotSC, gotValidSC = fromBinary(stats.OutgoingTrace(c.(*itracing.CustomCarrier).Context())); test.validSC != gotValidSC {
t.Fatalf("got invalid span context in CustomCarrier's context from grpc-trace-bin header: %v, want valid span context", stats.OutgoingTrace(c.(*itracing.CustomCarrier).Context()))
}
} else {
b, err := base64.StdEncoding.DecodeString(c.Get(itracing.GRPCTraceBinHeaderKey))
if err != nil {
t.Fatalf("failed to decode MapCarrier's grpc-trace-bin base64 string header %s to binary: %v", c.Get(itracing.GRPCTraceBinHeaderKey), err)
}
if gotSC, gotValidSC = fromBinary(b); test.validSC != gotValidSC {
t.Fatalf("got invalid span context in MapCarrier's context from grpc-trace-bin header: %v, want valid span context", b)
c := itracing.NewCustomCarrier(&metadata.MD{})
p.Inject(ctx, c)
gotH := c.Get(GRPCTraceBinHeaderKey)
if !test.wantSC.IsValid() {
if gotH != "" {
t.Fatalf("got non-empty value from CustomCarrier's metadata grpc-trace-bin header, want empty")
}
return
}
if gotH == "" {
t.Fatalf("got empty value from CustomCarrier's metadata grpc-trace-bin header, want valid span context: %v", test.wantSC)
}
if test.sc.TraceID() != gotSC.TraceID() && test.sc.SpanID() != gotSC.SpanID() && test.sc.TraceFlags() != gotSC.TraceFlags() {
t.Fatalf("got span context = %v, want span contexts %v", gotSC, test.sc)
gotSC, ok := fromBinary([]byte(gotH))
if !ok {
t.Fatalf("got invalid span context from CustomCarrier's metadata grpc-trace-bin header, want valid span context: %v", test.wantSC)
}
if test.wantSC.TraceID() != gotSC.TraceID() && test.wantSC.SpanID() != gotSC.SpanID() && test.wantSC.TraceFlags() != gotSC.TraceFlags() {
t.Fatalf("got span context = %v, want span contexts %v", gotSC, test.wantSC)
}
})
}
}

// TestExtract verifies that the GRPCTraceBinPropagator correctly extracts
// OpenTelemetry span context data for fast and slow path, if valid span
// context was injected. If invalid context was injected, it verifies that a
// zero value span context was retrieved.
// OpenTelemetry span context data from the provided context using carrier.
//
// For fast path, it uses the CustomCarrier and sets the span context in the
// binary format using `stats.SetIncomingTrace` in its context and then
// verifies that same span context is extracted directly in binary foramt.
// If a valid span context was injected, it verifies same trace span context
// is extracted from carrier's metadata for `grpc-trace-bin` header key. It
// also verifies that the binary value of `grpc-trace-bin` header is set
// correcttly using `stats.SetTrace` by verifying the outgoing trace to make
// sure trace context propagation is backward compatible.
//
// For slow path, it uses a MapCarrier and sets base64 encoded span context in
// its context and then verifies that same span context is extracted after
// base64 decoding.
// If invalid span context was injected, it verifies that valid trace span
// context is not extracted.
func (s) TestExtract(t *testing.T) {
tests := []struct {
name string
sc oteltrace.SpanContext
fast bool // to indicate whether to follow fast path or slow path for extraction verification
name string
wantSC oteltrace.SpanContext // expected span context from carrier
}{
{
name: "fast path, valid context",
sc: validSpanContext.WithRemote(true),
fast: true,
},
{
name: "fast path, invalid context",
sc: oteltrace.SpanContext{},
fast: true,
},
{
name: "slow path, valid context",
sc: validSpanContext.WithRemote(true),
fast: false,
name: "valid OpenTelemetry span context",
wantSC: validSpanContext1.WithRemote(true),
},
{
name: "slow path, invalid context",
sc: oteltrace.SpanContext{},
fast: false,
name: "invalid OpenTelemetry span context",
wantSC: oteltrace.SpanContext{},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := GRPCTraceBinPropagator{}
bd := binary(test.sc)
bd := binary(test.wantSC)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var c otelpropagation.TextMapCarrier
if test.fast {
c = itracing.NewCustomCarrier(stats.SetIncomingTrace(ctx, bd))
} else {
c = otelpropagation.MapCarrier{itracing.GRPCTraceBinHeaderKey: base64.StdEncoding.EncodeToString(bd)}
}
c := itracing.NewCustomCarrier(&metadata.MD{GRPCTraceBinHeaderKey: []string{string(bd)}})

tCtx := p.Extract(ctx, c)
got := oteltrace.SpanContextFromContext(tCtx)
if !got.Equal(test.sc) {
t.Fatalf("got = %v, want %v", got, test.sc)
if !got.Equal(test.wantSC) {
t.Fatalf("got span context: %v, want span context: %v", got, test.wantSC)
}
if bd != nil && cmp.Equal(bd, stats.OutgoingTrace(ctx)) {
t.Fatalf("stats.OutgoingTrace(ctx) = %v, want %v", stats.OutgoingTrace(ctx), bd)
}
})
}
Expand All @@ -198,8 +186,8 @@ func (s) TestBinary(t *testing.T) {
}{
{
name: "valid context",
sc: validSpanContext,
want: binary(validSpanContext),
sc: validSpanContext1,
want: binary(validSpanContext1),
},
{
name: "zero value context",
Expand Down Expand Up @@ -232,7 +220,7 @@ func (s) TestFromBinary(t *testing.T) {
{
name: "valid",
b: []byte{0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 1, 17, 18, 19, 20, 21, 22, 23, 24, 2, 1},
want: validSpanContext.WithRemote(true),
want: validSpanContext1.WithRemote(true),
ok: true,
},
{
Expand Down
Loading

0 comments on commit 062e769

Please sign in to comment.