diff --git a/experimental/opentelemetry/trace_options.go b/experimental/opentelemetry/trace_options.go new file mode 100644 index 000000000000..9dc0079a6407 --- /dev/null +++ b/experimental/opentelemetry/trace_options.go @@ -0,0 +1,36 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package opentelemetry is EXPERIMENTAL and will be moved to stats/opentelemetry +// package in a later release. +package opentelemetry + +import ( + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +// TraceOptions contains the tracing settings for OpenTelemetry instrumentation. +type TraceOptions struct { + // TracerProvider is the OpenTelemetry tracer which is required to + // record traces/trace spans for instrumentation. If unset, tracing + // will not be recorded. + TracerProvider trace.TracerProvider + + // TextMapPropagator propagates span context through text map carrier. + // If unset, tracing will not be recorded. + TextMapPropagator propagation.TextMapPropagator +} diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 265791e5a261..4fffba60fb33 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -21,7 +21,10 @@ import ( "sync/atomic" "time" + otelcodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + grpccodes "google.golang.org/grpc/codes" estats "google.golang.org/grpc/experimental/stats" istats "google.golang.org/grpc/internal/stats" "google.golang.org/grpc/metadata" @@ -85,8 +88,12 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string } startTime := time.Now() + var span trace.Span + if h.options.isTracingEnabled() { + ctx, span = h.createCallTraceSpan(ctx, method) + } err := invoker(ctx, method, req, reply, cc, opts...) - h.perCallMetrics(ctx, err, startTime, ci) + h.perCallTracesAndMetrics(ctx, err, startTime, ci, span) return err } @@ -119,22 +126,37 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S } startTime := time.Now() - + var span trace.Span + if h.options.isTracingEnabled() { + ctx, span = h.createCallTraceSpan(ctx, method) + } callback := func(err error) { - h.perCallMetrics(ctx, err, startTime, ci) + h.perCallTracesAndMetrics(ctx, err, startTime, ci, span) } opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) return streamer(ctx, desc, cc, method, opts...) } -func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { - callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP - attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( - otelattribute.String("grpc.method", ci.method), - otelattribute.String("grpc.target", ci.target), - otelattribute.String("grpc.status", canonicalString(status.Code(err))), - )) - h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) +// perCallTracesAndMetrics records per call trace spans and metrics. +func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) { + if h.options.isTracingEnabled() { + s := status.Convert(err) + if s.Code() == grpccodes.OK { + ts.SetStatus(otelcodes.Ok, s.Message()) + } else { + ts.SetStatus(otelcodes.Error, s.Message()) + } + ts.End() + } + if h.options.isMetricsEnabled() { + callLatency := float64(time.Since(startTime)) / float64(time.Second) + attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( + otelattribute.String("grpc.method", ci.method), + otelattribute.String("grpc.target", ci.target), + otelattribute.String("grpc.status", canonicalString(status.Code(err))), + )) + h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) + } } // TagConn exists to satisfy stats.Handler. @@ -163,15 +185,17 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) } ctx = istats.SetLabels(ctx, labels) } - ai := &attemptInfo{ // populates information about RPC start. + ai := &attemptInfo{ startTime: time.Now(), xdsLabels: labels.TelemetryLabels, - method: info.FullMethodName, + method: removeLeadingSlash(info.FullMethodName), } - ri := &rpcInfo{ - ai: ai, + if h.options.isTracingEnabled() { + ctx, ai = h.traceTagRPC(ctx, ai) } - return setRPCInfo(ctx, ri) + return setRPCInfo(ctx, &rpcInfo{ + ai: ai, + }) } func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { @@ -180,7 +204,12 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present") return } - h.processRPCEvent(ctx, rs, ri.ai) + if h.options.isMetricsEnabled() { + h.processRPCEvent(ctx, rs, ri.ai) + } + if h.options.isTracingEnabled() { + populateSpan(rs, ri.ai) + } } func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { diff --git a/stats/opentelemetry/client_tracing.go b/stats/opentelemetry/client_tracing.go new file mode 100644 index 000000000000..075f401588a1 --- /dev/null +++ b/stats/opentelemetry/client_tracing.go @@ -0,0 +1,54 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opentelemetry + +import ( + "context" + "strings" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" + otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" +) + +// traceTagRPC populates provided context with a new span using the +// TextMapPropagator supplied in trace options and internal itracing.carrier. +// It creates a new outgoing carrier which serializes information about this +// span into gRPC Metadata, if TextMapPropagator is provided in the trace +// options. if TextMapPropagator is not provided, it returns the context as is. +func (h *clientStatsHandler) traceTagRPC(ctx context.Context, ai *attemptInfo) (context.Context, *attemptInfo) { + mn := "Attempt." + strings.Replace(ai.method, "/", ".", -1) + tracer := otel.Tracer("grpc-open-telemetry") + ctx, span := tracer.Start(ctx, mn) + carrier := otelinternaltracing.NewOutgoingCarrier(ctx) + otel.GetTextMapPropagator().Inject(ctx, carrier) + ai.traceSpan = span + return carrier.Context(), ai +} + +// createCallTraceSpan creates a call span to put in the provided context using +// provided TraceProvider. If TraceProvider is nil, it returns context as is. +func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, trace.Span) { + if h.options.TraceOptions.TracerProvider == nil { + logger.Error("TraceProvider is not provided in trace options") + return ctx, nil + } + mn := strings.Replace(removeLeadingSlash(method), "/", ".", -1) + tracer := otel.Tracer("grpc-open-telemetry") + ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient)) + return ctx, span +} diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index ac671e2982a3..1250237b37b6 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -19,11 +19,14 @@ package opentelemetry_test import ( "context" "fmt" - "io" "testing" "time" + "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" + oteltrace "go.opentelemetry.io/otel/trace" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" @@ -35,9 +38,17 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" + experimental "google.golang.org/grpc/experimental/opentelemetry" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" @@ -49,11 +60,6 @@ import ( "google.golang.org/grpc/orca" "google.golang.org/grpc/stats/opentelemetry" "google.golang.org/grpc/stats/opentelemetry/internal/testutils" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) var defaultTestTimeout = 5 * time.Second @@ -66,14 +72,48 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// setupStubServer creates a stub server with OpenTelemetry component configured on client -// and server side. It returns a reader for metrics emitted from OpenTelemetry -// component and the server. -func setupStubServer(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { +// traceSpanInfo is the information received about the trace span. It contains +// subset of information that is needed to verify if correct trace is being +// attributed to the rpc. +type traceSpanInfo struct { + spanKind string + name string + events []trace.Event + attributes []attribute.KeyValue +} + +// defaultMetricsOptions creates default metrics options +func defaultMetricsOptions(_ *testing.T, methodAttributeFilter func(string) bool) (*opentelemetry.MetricsOptions, *metric.ManualReader) { reader := metric.NewManualReader() provider := metric.NewMeterProvider(metric.WithReader(reader)) + metricsOptions := &opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics(), + MethodAttributeFilter: methodAttributeFilter, + } + return metricsOptions, reader +} + +// defaultTraceOptions function to create default trace options +func defaultTraceOptions(_ *testing.T) (*experimental.TraceOptions, *tracetest.InMemoryExporter) { + spanExporter := tracetest.NewInMemoryExporter() + spanProcessor := trace.NewSimpleSpanProcessor(spanExporter) + tracerProvider := trace.NewTracerProvider(trace.WithSpanProcessor(spanProcessor)) + textMapPropagator := propagation.NewCompositeTextMapPropagator(opentelemetry.GRPCTraceBinPropagator{}) + otel.SetTextMapPropagator(textMapPropagator) + otel.SetTracerProvider(tracerProvider) + traceOptions := &experimental.TraceOptions{ + TracerProvider: tracerProvider, + TextMapPropagator: textMapPropagator, + } + return traceOptions, spanExporter +} + +// setupStubServer creates a stub server with OpenTelemetry component configured on client +// and server side and returns the server. +func setupStubServer(t *testing.T, metricsOptions *opentelemetry.MetricsOptions, traceOptions *experimental.TraceOptions) *stubserver.StubServer { ss := &stubserver.StubServer{ - UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + UnaryCallF: func(_ context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ Body: make([]byte, len(in.GetPayload().GetBody())), }}, nil @@ -88,20 +128,19 @@ func setupStubServer(t *testing.T, methodAttributeFilter func(string) bool) (*me }, } - if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(opentelemetry.Options{ - MetricsOptions: opentelemetry.MetricsOptions{ - MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics(), - MethodAttributeFilter: methodAttributeFilter, - }})}, opentelemetry.DialOption(opentelemetry.Options{ - MetricsOptions: opentelemetry.MetricsOptions{ - MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics(), - }, - })); err != nil { + otelOptions := opentelemetry.Options{} + if metricsOptions != nil { + otelOptions.MetricsOptions = *metricsOptions + } + if traceOptions != nil { + otelOptions.TraceOptions = *traceOptions + } + + if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(otelOptions)}, + opentelemetry.DialOption(otelOptions)); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } - return reader, ss + return ss } // TestMethodAttributeFilter tests the method attribute filter. The method @@ -112,7 +151,8 @@ func (s) TestMethodAttributeFilter(t *testing.T) { // Will allow duplex/any other type of RPC. return str != testgrpc.TestService_UnaryCall_FullMethodName } - reader, ss := setupStubServer(t, maf) + mo, reader := defaultMetricsOptions(t, maf) + ss := setupStubServer(t, mo, nil) defer ss.Stop() // Make a Unary and Streaming RPC. The Unary RPC should be filtered by the @@ -197,13 +237,14 @@ func (s) TestMethodAttributeFilter(t *testing.T) { // on the Client (no StaticMethodCallOption set) and Server. The method // attribute on subsequent metrics should be bucketed in "other". func (s) TestAllMetricsOneFunction(t *testing.T) { - reader, ss := setupStubServer(t, nil) + mo, reader := defaultMetricsOptions(t, nil) + ss := setupStubServer(t, mo, nil) defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Make two RPC's, a unary RPC and a streaming RPC. These should cause - // certain metrics to be emitted, which should be able to be observed - // through the Metric Reader. + // certain metrics to be emitted, which should be observed through the + // Metric Reader. if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ Body: make([]byte, 10000), }}, grpc.UseCompressor(gzip.Name)); err != nil { // Deterministic compression. @@ -362,7 +403,7 @@ func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map func (s) TestWRRMetrics(t *testing.T) { cmr := orca.NewServerMetricsRecorder().(orca.CallMetricsRecorder) backend1 := stubserver.StartTestService(t, &stubserver.StubServer{ - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { // Copy metrics from what the test set in cmr into r. sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() @@ -380,7 +421,7 @@ func (s) TestWRRMetrics(t *testing.T) { cmr.SetApplicationUtilization(1.0) backend2 := stubserver.StartTestService(t, &stubserver.StubServer{ - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { // Copy metrics from what the test set in cmr into r. sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() @@ -582,3 +623,959 @@ func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.Manual return fmt.Errorf("error waiting for metrics %v: %v", wantMetrics, ctx.Err()) } + +// TestMetricsAndTracesOptionEnabled verifies the integration of metrics and traces +// emitted by the OpenTelemetry instrumentation in a gRPC environment. It sets up a +// stub server with both metrics and traces enabled, and tests the correct emission +// of metrics and traces during a Unary RPC and a Streaming RPC. The test ensures +// that the emitted metrics reflect the operations performed, including the size of +// the compressed message, and verifies that tracing information is correctly recorded. +func (s) TestMetricsAndTracesOptionEnabled(t *testing.T) { + // Create default metrics options + mo, reader := defaultMetricsOptions(t, nil) + // Create default trace options + to, exporter := defaultTraceOptions(t) + + ss := setupStubServer(t, mo, to) + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout*2) + defer cancel() + + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain metrics and traces to be emitted which should be observed + // through metrics reader and span exporter respectively. + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, grpc.UseCompressor(gzip.Name)); err != nil { // Deterministic compression. + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) + } + + // Verify metrics + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + wantMetrics := testutils.MetricData(testutils.MetricDataOptions{ + Target: ss.Target, + UnaryCompressedMessageSize: float64(57), + }) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + + // Verify traces + spans := exporter.GetSpans() + if got, want := len(spans), 6; got != want { + t.Fatalf("got %d spans, want %d", got, want) + } + + wantSI := []traceSpanInfo{ + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: oteltrace.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{ + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(57), + }, + }, + }, + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(57), + }, + }, + }, + }, + }, + { + name: "Attempt.grpc.testing.TestService.UnaryCall", + spanKind: oteltrace.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{ + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(57), + }, + }, + }, + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(57), + }, + }, + }, + }, + }, + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: oteltrace.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: oteltrace.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: oteltrace.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "Attempt.grpc.testing.TestService.FullDuplexCall", + spanKind: oteltrace.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{}, + }, + } + + // Check that same traceID is used in client and server for unary RPC call. + if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + + // Check that same traceID is used in client and server for streaming RPC call. + if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + + for index, span := range spans { + // Check that the attempt span has the correct status + if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { + t.Errorf("Got status code %v, want %v", got, want) + } + // name + if got, want := span.Name, wantSI[index].name; got != want { + t.Errorf("Span name is %q, want %q", got, want) + } + // spanKind + if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { + t.Errorf("Got span kind %q, want %q", got, want) + } + // attributes + if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { + t.Errorf("Got attributes list of size %q, want %q", got, want) + } + for idx, att := range span.Attributes { + if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) + } + } + // events + if got, want := len(span.Events), len(wantSI[index].events); got != want { + t.Errorf("Event length is %q, want %q", got, want) + } + for eventIdx, event := range span.Events { + if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { + t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) + } + for idx, att := range event.Attributes { + if got, want := att.Key, wantSI[index].events[eventIdx].Attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) + } + if got, want := att.Value, wantSI[index].events[eventIdx].Attributes[idx].Value; got != want { + t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) + } + } + } + } +} + +// TestSpan verifies that the gRPC Trace Binary propagator correctly +// propagates span context between a client and server using the grpc- +// trace-bin header. It sets up a stub server with OpenTelemetry tracing +// enabled, makes a unary RPC, and streaming RPC as well. +// +// Verification: +// - Verifies that the span context is correctly propagated from the client +// to the server, including the trace ID and span ID. +// - Verifies that the server can access the span context and create +// child spans as expected during the RPC calls. +// - Verifies that the tracing information is recorded accurately in +// the OpenTelemetry backend. +func (s) TestSpan(t *testing.T) { + mo, _ := defaultMetricsOptions(t, nil) + // Using defaultTraceOptions to set up OpenTelemetry with an in-memory exporter. + to, spanExporter := defaultTraceOptions(t) + // Start the server with trace options. + ss := setupStubServer(t, mo, to) + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain traces to be emitted, which should be observed through the + // span exporter. + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) + } + + // Get the spans from the exporter + spans := spanExporter.GetSpans() + if got, want := len(spans), 6; got != want { + t.Fatalf("got %d spans, want %d", got, want) + } + + wantSI := []traceSpanInfo{ + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: oteltrace.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{ + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + }, + }, + { + name: "Attempt.grpc.testing.TestService.UnaryCall", + spanKind: oteltrace.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{ + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + }, + }, + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: oteltrace.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: oteltrace.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: oteltrace.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "Attempt.grpc.testing.TestService.FullDuplexCall", + spanKind: oteltrace.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{}, + }, + } + + // Check that same traceID is used in client and server for unary RPC call. + if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + + // Check that same traceID is used in client and server for streaming RPC call. + if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + + for index, span := range spans { + // Check that the attempt span has the correct status + if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { + t.Errorf("Got status code %v, want %v", got, want) + } + // name + if got, want := span.Name, wantSI[index].name; got != want { + t.Errorf("Span name is %q, want %q", got, want) + } + // spanKind + if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { + t.Errorf("Got span kind %q, want %q", got, want) + } + // attributes + if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { + t.Errorf("Got attributes list of size %q, want %q", got, want) + } + for idx, att := range span.Attributes { + if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) + } + } + // events + if got, want := len(span.Events), len(wantSI[index].events); got != want { + t.Errorf("Event length is %q, want %q", got, want) + } + for eventIdx, event := range span.Events { + if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { + t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) + } + for idx, att := range event.Attributes { + if got, want := att.Key, wantSI[index].events[eventIdx].Attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) + } + if got, want := att.Value, wantSI[index].events[eventIdx].Attributes[idx].Value; got != want { + t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) + } + } + } + } +} + +// TestSpan_WithW3CContextPropagator sets up a stub server with OpenTelemetry tracing +// enabled, makes a unary and a streaming RPC, and then asserts that the correct +// number of spans are created with the expected spans. +// +// Verification: +// - Verifies that the correct number of spans are created for both unary and +// streaming RPCs. +// - Verifies that the spans have the expected names and attributes, ensuring +// they accurately reflect the operations performed. +// - Verifies that the trace ID and span ID are correctly assigned and accessible +// in the OpenTelemetry backend. +func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { + mo, _ := defaultMetricsOptions(t, nil) + // Using defaultTraceOptions to set up OpenTelemetry with an in-memory exporter + to, spanExporter := defaultTraceOptions(t) + // Set the W3CContextPropagator as part of TracingOptions. + to.TextMapPropagator = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}) + // Start the server with OpenTelemetry options + ss := setupStubServer(t, mo, to) + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain traces to be emitted, which should be observed through the + // span exporter. + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) + } + // Get the spans from the exporter + spans := spanExporter.GetSpans() + if got, want := len(spans), 6; got != want { + t.Fatalf("Got %d spans, want %d", got, want) + } + + wantSI := []traceSpanInfo{ + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: oteltrace.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{ + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + }, + }, + { + name: "Attempt.grpc.testing.TestService.UnaryCall", + spanKind: oteltrace.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{ + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + Value: attribute.IntValue(1), + }, + { + Key: "message-size", + Value: attribute.IntValue(10006), + }, + { + Key: "message-size-compressed", + Value: attribute.IntValue(10006), + }, + }, + }, + }, + }, + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: oteltrace.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: oteltrace.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(0), + }, + { + Key: "FailFast", + Value: attribute.IntValue(0), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: oteltrace.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "Attempt.grpc.testing.TestService.FullDuplexCall", + spanKind: oteltrace.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + Value: attribute.IntValue(1), + }, + { + Key: "FailFast", + Value: attribute.IntValue(1), + }, + { + Key: "previous-rpc-attempts", + Value: attribute.IntValue(0), + }, + { + Key: "transparent-retry", + Value: attribute.IntValue(0), + }, + }, + events: []trace.Event{}, + }, + } + + // Check that same traceID is used in client and server. + if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + + // Check that same traceID is used in client and server. + if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + for index, span := range spans { + // Check that the attempt span has the correct status + if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { + t.Errorf("Got status code %v, want %v", got, want) + } + // name + if got, want := span.Name, wantSI[index].name; got != want { + t.Errorf("Span name is %q, want %q", got, want) + } + // spanKind + if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { + t.Errorf("Got span kind %q, want %q", got, want) + } + // attributes + if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { + t.Errorf("Got attributes list of size %q, want %q", got, want) + } + for idx, att := range span.Attributes { + if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %v as %v, want %v", span.Name, got, want) + } + } + // events + if got, want := len(span.Events), len(wantSI[index].events); got != want { + t.Errorf("Event length is %q, want %q", got, want) + } + for eventIdx, event := range span.Events { + if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { + t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) + } + for idx, att := range event.Attributes { + if got, want := att.Key, wantSI[index].events[eventIdx].Attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %q with event name %v, as %v, want %v", span.Name, event.Name, got, want) + } + if got, want := att.Value, wantSI[index].events[eventIdx].Attributes[idx].Value; got != want { + t.Errorf("Got attribute value for span name %v with event name %v, as %v, want %v", span.Name, event.Name, got, want) + } + } + } + } +} + +// TestMetricsAndTracesDisabled verifies that RPCs call succeed as expected +// when metrics and traces are disabled in the OpenTelemetry instrumentation. +func (s) TestMetricsAndTracesDisabled(t *testing.T) { + ss := &stubserver.StubServer{ + UnaryCallF: func(_ context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, len(in.GetPayload().GetBody())), + }}, nil + }, + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + } + + if err := ss.Start(nil); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Make two RPCs, a unary RPC and a streaming RPC. + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %v", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) + } +} + +// TestRPCSpanErrorStatus verifies that errors during RPC calls are correctly +// reflected in the span status. It simulates a unary RPC that returns an error +// and checks that the span's status is set to error with the appropriate message. +func (s) TestRPCSpanErrorStatus(t *testing.T) { + mo, _ := defaultMetricsOptions(t, nil) + // Using defaultTraceOptions to set up OpenTelemetry with an in-memory exporter + to, exporter := defaultTraceOptions(t) + const rpcErrorMsg = "unary call: internal server error" + ss := &stubserver.StubServer{ + UnaryCallF: func(_ context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return nil, fmt.Errorf("%v", rpcErrorMsg) + }, + } + + otelOptions := opentelemetry.Options{ + MetricsOptions: *mo, + TraceOptions: *to, + } + + if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(otelOptions)}, + opentelemetry.DialOption(otelOptions)); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}) + + // Verify traces + spans := exporter.GetSpans() + if got, want := len(spans), 3; got != want { + t.Fatalf("got %d spans, want %d", got, want) + } + + // Verify spans has error status with rpcErrorMsg as error message. + if got, want := spans[0].Status.Description, rpcErrorMsg; got != want { + t.Fatalf("got rpc error %s, want %s", spans[0].Status.Description, rpcErrorMsg) + } +} diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index dcc424775f14..d99169e2da67 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -27,35 +27,50 @@ import ( "strings" "time" + otelattribute "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" + experimental "google.golang.org/grpc/experimental/opentelemetry" estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/stats" otelinternal "google.golang.org/grpc/stats/opentelemetry/internal" - - otelattribute "go.opentelemetry.io/otel/attribute" - otelmetric "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" ) func init() { otelinternal.SetPluginOption = func(o *Options, po otelinternal.PluginOption) { o.MetricsOptions.pluginOption = po + // Log an error if one of the options is missing. + if (o.TraceOptions.TextMapPropagator == nil) != (o.TraceOptions.TracerProvider == nil) { + logger.Warning("Tracing will not be recorded because traceOptions are not set properly: one of TextMapPropagator or TracerProvider is missing") + } } } -var logger = grpclog.Component("otel-plugin") - -var canonicalString = internal.CanonicalString.(func(codes.Code) string) - -var joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.DialOption) +var ( + logger = grpclog.Component("otel-plugin") + canonicalString = internal.CanonicalString.(func(codes.Code) string) + joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.DialOption) +) // Options are the options for OpenTelemetry instrumentation. type Options struct { // MetricsOptions are the metrics options for OpenTelemetry instrumentation. MetricsOptions MetricsOptions + // TraceOptions are the tracing options for OpenTelemetry instrumentation. + TraceOptions experimental.TraceOptions +} + +func (o *Options) isMetricsEnabled() bool { + return o.MetricsOptions.MeterProvider != nil +} + +func (o *Options) isTracingEnabled() bool { + return o.TraceOptions.TracerProvider != nil } // MetricsOptions are the metrics options for OpenTelemetry instrumentation. @@ -187,6 +202,15 @@ type attemptInfo struct { pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted xdsLabels map[string]string + + // traceSpan is data used for recording traces. + traceSpan trace.Span + // message counters for sent and received messages (used for + // generating message IDs), and the number of previous RPC attempts for the + // associated call. + countSentMsg uint32 + countRecvMsg uint32 + previousRPCAttempts uint32 } type clientMetrics struct { diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index 4765afa8ed53..da3f60a9ebe5 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -201,10 +201,12 @@ func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) startTime: time.Now(), method: removeLeadingSlash(method), } - ri := &rpcInfo{ - ai: ai, + if h.options.isTracingEnabled() { + ctx, ai = h.traceTagRPC(ctx, ai) } - return setRPCInfo(ctx, ri) + return setRPCInfo(ctx, &rpcInfo{ + ai: ai, + }) } // HandleRPC implements per RPC tracing and stats implementation. @@ -214,7 +216,12 @@ func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present") return } - h.processRPCData(ctx, rs, ri.ai) + if h.options.isTracingEnabled() { + populateSpan(rs, ri.ai) + } + if h.options.isMetricsEnabled() { + h.processRPCData(ctx, rs, ri.ai) + } } func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { diff --git a/stats/opentelemetry/server_tracing.go b/stats/opentelemetry/server_tracing.go new file mode 100644 index 000000000000..c55e03dcaab6 --- /dev/null +++ b/stats/opentelemetry/server_tracing.go @@ -0,0 +1,46 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opentelemetry + +import ( + "context" + "strings" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" + otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" +) + +// traceTagRPC populates context with new span data using the TextMapPropagator +// supplied in trace options and internal itracing.Carrier. It creates a new +// incoming carrier which extracts an existing span context (if present) by +// deserializing from provided context. If valid span context is extracted, it +// is set as parent of the new span otherwise new span remains the root span. +// If TextMapPropagator is not provided in the trace options, it returns context +// as is. +func (h *serverStatsHandler) traceTagRPC(ctx context.Context, ai *attemptInfo) (context.Context, *attemptInfo) { + mn := strings.Replace(ai.method, "/", ".", -1) + var span trace.Span + tracer := otel.Tracer("grpc-open-telemetry") + ctx = otel.GetTextMapPropagator().Extract(ctx, otelinternaltracing.NewIncomingCarrier(ctx)) + // If the context.Context provided in `ctx` to tracer.Start(), contains a + // span then the newly-created Span will be a child of that span, + // otherwise it will be a root span. + ctx, span = tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindServer)) + ai.traceSpan = span + return ctx, ai +} diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go new file mode 100644 index 000000000000..cd5c23cd3b23 --- /dev/null +++ b/stats/opentelemetry/trace.go @@ -0,0 +1,82 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opentelemetry + +import ( + "sync/atomic" + + "go.opentelemetry.io/otel/attribute" + otelcodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" +) + +// populateSpan populates span information based on stats passed in, representing +// invariants of the RPC lifecycle. It ends the span, triggering its export. +// This function handles attempt spans on the client-side and call spans on the +// server-side. +func populateSpan(rs stats.RPCStats, ai *attemptInfo) { + if ai == nil || ai.traceSpan == nil { + // Shouldn't happen, tagRPC call comes before this function gets called + // which populates this information. + logger.Error("ctx passed into stats handler tracing event handling has no traceSpan present") + return + } + span := ai.traceSpan + + switch rs := rs.(type) { + case *stats.Begin: + // Note: Go always added Client and FailFast attributes even though they are not + // defined by the OpenCensus gRPC spec. Thus, they are unimportant for + // correctness. + span.SetAttributes( + attribute.Bool("Client", rs.Client), + attribute.Bool("FailFast", rs.Client), + attribute.Int64("previous-rpc-attempts", int64(ai.previousRPCAttempts)), + attribute.Bool("transparent-retry", rs.IsTransparentRetryAttempt), + ) + // increment previous rpc attempts applicable for next attempt + atomic.AddUint32(&ai.previousRPCAttempts, 1) + case *stats.PickerUpdated: + span.AddEvent("Delayed LB pick complete") + case *stats.InPayload: + // message id - "must be calculated as two different counters starting + // from one for sent messages and one for received messages." + ai.countRecvMsg++ + span.AddEvent("Inbound compressed message", trace.WithAttributes( + attribute.Int64("sequence-number", int64(ai.countRecvMsg)), + attribute.Int64("message-size", int64(rs.Length)), + attribute.Int64("message-size-compressed", int64(rs.CompressedLength)), + )) + case *stats.OutPayload: + ai.countSentMsg++ + span.AddEvent("Outbound compressed message", trace.WithAttributes( + attribute.Int64("sequence-number", int64(ai.countSentMsg)), + attribute.Int64("message-size", int64(rs.Length)), + attribute.Int64("message-size-compressed", int64(rs.CompressedLength)), + )) + case *stats.End: + if rs.Error != nil { + s := status.Convert(rs.Error) + span.SetStatus(otelcodes.Error, s.Message()) + } else { + span.SetStatus(otelcodes.Ok, "Ok") + } + span.End() + } +}