Skip to content

Commit

Permalink
stats/opentelemetry: Introduce Tracing API (grpc#7852)
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans authored Jan 30, 2025
1 parent 7e1c9b2 commit 78eebff
Show file tree
Hide file tree
Showing 8 changed files with 1,334 additions and 59 deletions.
36 changes: 36 additions & 0 deletions experimental/opentelemetry/trace_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package opentelemetry is EXPERIMENTAL and will be moved to stats/opentelemetry
// package in a later release.
package opentelemetry

import (
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

// TraceOptions contains the tracing settings for OpenTelemetry instrumentation.
type TraceOptions struct {
// TracerProvider is the OpenTelemetry tracer which is required to
// record traces/trace spans for instrumentation. If unset, tracing
// will not be recorded.
TracerProvider trace.TracerProvider

// TextMapPropagator propagates span context through text map carrier.
// If unset, tracing will not be recorded.
TextMapPropagator propagation.TextMapPropagator
}
63 changes: 46 additions & 17 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"sync/atomic"
"time"

otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
estats "google.golang.org/grpc/experimental/stats"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -85,8 +88,12 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
}

startTime := time.Now()
var span trace.Span
if h.options.isTracingEnabled() {
ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
return err
}

Expand Down Expand Up @@ -119,22 +126,37 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
}

startTime := time.Now()

var span trace.Span
if h.options.isTracingEnabled() {
ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) {
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}

func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
// perCallTracesAndMetrics records per call trace spans and metrics.
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) {
if h.options.isTracingEnabled() {
s := status.Convert(err)
if s.Code() == grpccodes.OK {
ts.SetStatus(otelcodes.Ok, s.Message())
} else {
ts.SetStatus(otelcodes.Error, s.Message())
}
ts.End()
}
if h.options.isMetricsEnabled() {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
}
}

// TagConn exists to satisfy stats.Handler.
Expand Down Expand Up @@ -163,15 +185,17 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
}
ctx = istats.SetLabels(ctx, labels)
}
ai := &attemptInfo{ // populates information about RPC start.
ai := &attemptInfo{
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
method: removeLeadingSlash(info.FullMethodName),
}
ri := &rpcInfo{
ai: ai,
if h.options.isTracingEnabled() {
ctx, ai = h.traceTagRPC(ctx, ai)
}
return setRPCInfo(ctx, ri)
return setRPCInfo(ctx, &rpcInfo{
ai: ai,
})
}

func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
Expand All @@ -180,7 +204,12 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return
}
h.processRPCEvent(ctx, rs, ri.ai)
if h.options.isMetricsEnabled() {
h.processRPCEvent(ctx, rs, ri.ai)
}
if h.options.isTracingEnabled() {
populateSpan(rs, ri.ai)
}
}

func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
Expand Down
54 changes: 54 additions & 0 deletions stats/opentelemetry/client_tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package opentelemetry

import (
"context"
"strings"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
)

// traceTagRPC populates provided context with a new span using the
// TextMapPropagator supplied in trace options and internal itracing.carrier.
// It creates a new outgoing carrier which serializes information about this
// span into gRPC Metadata, if TextMapPropagator is provided in the trace
// options. if TextMapPropagator is not provided, it returns the context as is.
func (h *clientStatsHandler) traceTagRPC(ctx context.Context, ai *attemptInfo) (context.Context, *attemptInfo) {
mn := "Attempt." + strings.Replace(ai.method, "/", ".", -1)
tracer := otel.Tracer("grpc-open-telemetry")
ctx, span := tracer.Start(ctx, mn)
carrier := otelinternaltracing.NewOutgoingCarrier(ctx)
otel.GetTextMapPropagator().Inject(ctx, carrier)
ai.traceSpan = span
return carrier.Context(), ai
}

// createCallTraceSpan creates a call span to put in the provided context using
// provided TraceProvider. If TraceProvider is nil, it returns context as is.
func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, trace.Span) {
if h.options.TraceOptions.TracerProvider == nil {
logger.Error("TraceProvider is not provided in trace options")
return ctx, nil
}
mn := strings.Replace(removeLeadingSlash(method), "/", ".", -1)
tracer := otel.Tracer("grpc-open-telemetry")
ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient))
return ctx, span
}
Loading

0 comments on commit 78eebff

Please sign in to comment.