Skip to content

Commit

Permalink
Support gRPC status for versions >=1.40 (#1235)
Browse files Browse the repository at this point in the history
* Support gRPC status for versions >=1.40

* Add changelog entry

* Switch to StructFieldConstMinVersion

---------

Co-authored-by: Ron Federman <[email protected]>
  • Loading branch information
damemi and RonFed authored Nov 1, 2024
1 parent 7889fea commit 1151ce9
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http
### Fixed

- Sporadic shutdown deadlock. ([#1220](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1220))
- Only support gRPC status codes for gRPC >= 1.40. ([#1235](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1235))

## [v0.16.0-alpha] - 2024-10-22

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ volatile const u64 error_status_pos;
volatile const u64 status_s_pos;
volatile const u64 status_code_pos;

volatile const bool write_status_supported;

// This instrumentation attaches uprobe to the following function:
// func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
SEC("uprobe/ClientConn_Invoke")
Expand Down Expand Up @@ -122,6 +124,9 @@ int uprobe_ClientConn_Invoke_Returns(struct pt_regs *ctx) {
return 0;
}

if(!write_status_supported) {
goto done;
}
// Getting the returned response (error)
// The status code is embedded 3 layers deep:
// Invoke() error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import (
"strings"

"github.com/cilium/ebpf"
"github.com/hashicorp/go-version"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sys/unix"

"go.opentelemetry.io/auto/internal/pkg/inject"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/context"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/utils"
"go.opentelemetry.io/auto/internal/pkg/process"
"go.opentelemetry.io/auto/internal/pkg/structfield"
)

Expand All @@ -29,6 +32,24 @@ const (
pkg = "google.golang.org/grpc"
)

var (
writeStatus = false
writeStatusMinVersion = version.Must(version.NewVersion("1.40.0"))
)

type writeStatusConst struct{}

func (w writeStatusConst) InjectOption(td *process.TargetDetails) (inject.Option, error) {
ver, ok := td.Libraries[pkg]
if !ok {
return nil, fmt.Errorf("unknown module version: %s", pkg)
}
if ver.GreaterThanOrEqual(writeStatusMinVersion) {
writeStatus = true
}
return inject.WithKeyValue("write_status_supported", writeStatus), nil
}

// New returns a new [probe.Probe].
func New(logger *slog.Logger) probe.Probe {
id := probe.ID{
Expand All @@ -41,6 +62,7 @@ func New(logger *slog.Logger) probe.Probe {
Consts: []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
writeStatusConst{},
probe.StructFieldConst{
Key: "clientconn_target_ptr_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc", "ClientConn", "target"),
Expand All @@ -57,17 +79,26 @@ func New(logger *slog.Logger) probe.Probe {
Key: "headerFrame_streamid_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "headerFrame", "streamID"),
},
probe.StructFieldConst{
Key: "error_status_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Error", "s"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "error_status_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Error", "s"),
},
MinVersion: writeStatusMinVersion,
},
probe.StructFieldConst{
Key: "status_s_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "status_s_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"),
},
MinVersion: writeStatusMinVersion,
},
probe.StructFieldConst{
Key: "status_code_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "status_code_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"),
},
MinVersion: writeStatusMinVersion,
},
},
Uprobes: []probe.Uprobe{
Expand Down Expand Up @@ -124,8 +155,6 @@ func convertEvent(e *event) []*probe.SpanEvent {
semconv.RPCServiceKey.String(method),
semconv.ServerAddress(target))

attrs = append(attrs, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)))

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: e.SpanContext.TraceID,
SpanID: e.SpanContext.SpanID,
Expand Down Expand Up @@ -155,8 +184,12 @@ func convertEvent(e *event) []*probe.SpanEvent {
TracerSchema: semconv.SchemaURL,
}

if e.StatusCode > 0 {
event.Status = probe.Status{Code: codes.Error}
if writeStatus {
event.Attributes = append(event.Attributes, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)))

if e.StatusCode > 0 {
event.Status = probe.Status{Code: codes.Error}
}
}

return []*probe.SpanEvent{event}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ volatile const bool is_new_frame_pos;
volatile const u64 status_s_pos;
volatile const u64 status_code_pos;

volatile const bool write_status_supported;

static __always_inline long dummy_extract_span_context_from_headers(void *stream_id, struct span_context *parent_span_context) {
return 0;
}
Expand Down Expand Up @@ -175,6 +177,10 @@ int uprobe_http2Server_operateHeader(struct pt_regs *ctx)
// https://github.com/grpc/grpc-go/blob/bcf9171a20e44ed81a6eb152e3ca9e35b2c02c5d/internal/transport/http2_server.go#L1049
SEC("uprobe/http2Server_WriteStatus")
int uprobe_http2Server_WriteStatus(struct pt_regs *ctx) {
if(!write_status_supported) {
bpf_printk("status probe not supported for this version of gRPC");
return 0;
}
struct go_iface go_context = {0};
get_Go_context(ctx, 2, stream_ctx_pos, true, &go_context);
void *key = get_consistent_key(ctx, go_context.data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func New(logger *slog.Logger) probe.Probe {
Consts: []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
writeStatusConst{},
probe.StructFieldConst{
Key: "stream_method_ptr_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "method"),
Expand All @@ -63,13 +64,19 @@ func New(logger *slog.Logger) probe.Probe {
Key: "frame_stream_id_pod",
Val: structfield.NewID("golang.org/x/net", "golang.org/x/net/http2", "FrameHeader", "StreamID"),
},
probe.StructFieldConst{
Key: "status_s_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "status_s_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"),
},
MinVersion: writeStatusMinVersion,
},
probe.StructFieldConst{
Key: "status_code_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "status_code_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"),
},
MinVersion: writeStatusMinVersion,
},
framePosConst{},
},
Expand Down Expand Up @@ -112,6 +119,24 @@ func (c framePosConst) InjectOption(td *process.TargetDetails) (inject.Option, e
return inject.WithKeyValue("is_new_frame_pos", ver.GreaterThanOrEqual(paramChangeVer)), nil
}

type writeStatusConst struct{}

var (
writeStatus = false
writeStatusMinVersion = version.Must(version.NewVersion("1.40.0"))
)

func (w writeStatusConst) InjectOption(td *process.TargetDetails) (inject.Option, error) {
ver, ok := td.Libraries[pkg]
if !ok {
return nil, fmt.Errorf("unknown module version: %s", pkg)
}
if ver.GreaterThanOrEqual(writeStatusMinVersion) {
writeStatus = true
}
return inject.WithKeyValue("write_status_supported", writeStatus), nil
}

// event represents an event in the gRPC server during a gRPC request.
type event struct {
context.BaseSpanProperties
Expand Down Expand Up @@ -141,29 +166,33 @@ func convertEvent(e *event) []*probe.SpanEvent {
pscPtr = nil
}

attrs := []attribute.KeyValue{
semconv.RPCSystemKey.String("grpc"),
semconv.RPCServiceKey.String(method),
}
event := &probe.SpanEvent{
SpanName: method,
StartTime: utils.BootOffsetToTime(e.StartTime),
EndTime: utils.BootOffsetToTime(e.EndTime),
Attributes: []attribute.KeyValue{
semconv.RPCSystemKey.String("grpc"),
semconv.RPCServiceKey.String(method),
semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)),
},
SpanName: method,
StartTime: utils.BootOffsetToTime(e.StartTime),
EndTime: utils.BootOffsetToTime(e.EndTime),
Attributes: attrs,
ParentSpanContext: pscPtr,
SpanContext: &sc,
TracerSchema: semconv.SchemaURL,
}

// Set server status codes per semconv:
// See https://github.com/open-telemetry/semantic-conventions/blob/02ecf0c71e9fa74d09d81c48e04a132db2b7060b/docs/rpc/grpc.md#grpc-status
if e.StatusCode == int32(codes.Unknown) ||
e.StatusCode == int32(codes.DeadlineExceeded) ||
e.StatusCode == int32(codes.Unimplemented) ||
e.StatusCode == int32(codes.Internal) ||
e.StatusCode == int32(codes.Unavailable) ||
e.StatusCode == int32(codes.DataLoss) {
event.Status = probe.Status{Code: otelcodes.Error}
if writeStatus {
event.Attributes = append(event.Attributes, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)))
// Set server status codes per semconv:
// See https://github.com/open-telemetry/semantic-conventions/blob/02ecf0c71e9fa74d09d81c48e04a132db2b7060b/docs/rpc/grpc.md#grpc-status
if e.StatusCode == int32(codes.Unknown) ||
e.StatusCode == int32(codes.DeadlineExceeded) ||
e.StatusCode == int32(codes.Unimplemented) ||
e.StatusCode == int32(codes.Internal) ||
e.StatusCode == int32(codes.Unavailable) ||
e.StatusCode == int32(codes.DataLoss) {
event.Status = probe.Status{Code: otelcodes.Error}
}
}

return []*probe.SpanEvent{event}
}

0 comments on commit 1151ce9

Please sign in to comment.