Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gRPC status for versions >=1.40 #1235

Merged
merged 4 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http
### Fixed

- Sporadic shutdown deadlock. ([#1220](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1220))
- Only support gRPC status codes for gRPC >= 1.40. ([#1235](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1235))

## [v0.16.0-alpha] - 2024-10-22

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ volatile const u64 error_status_pos;
volatile const u64 status_s_pos;
volatile const u64 status_code_pos;

volatile const bool write_status_supported;

// This instrumentation attaches uprobe to the following function:
// func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
SEC("uprobe/ClientConn_Invoke")
Expand Down Expand Up @@ -122,6 +124,9 @@ int uprobe_ClientConn_Invoke_Returns(struct pt_regs *ctx) {
return 0;
}

if(!write_status_supported) {
goto done;
}
// Getting the returned response (error)
// The status code is embedded 3 layers deep:
// Invoke() error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import (
"strings"

"github.com/cilium/ebpf"
"github.com/hashicorp/go-version"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sys/unix"

"go.opentelemetry.io/auto/internal/pkg/inject"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/context"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/utils"
"go.opentelemetry.io/auto/internal/pkg/process"
"go.opentelemetry.io/auto/internal/pkg/structfield"
)

Expand All @@ -29,6 +32,24 @@ const (
pkg = "google.golang.org/grpc"
)

var (
writeStatus = false
writeStatusMinVersion = version.Must(version.NewVersion("1.40.0"))
)

type writeStatusConst struct{}

func (w writeStatusConst) InjectOption(td *process.TargetDetails) (inject.Option, error) {
ver, ok := td.Libraries[pkg]
if !ok {
return nil, fmt.Errorf("unknown module version: %s", pkg)
}
if ver.GreaterThanOrEqual(writeStatusMinVersion) {
writeStatus = true
}
return inject.WithKeyValue("write_status_supported", writeStatus), nil
}

// New returns a new [probe.Probe].
func New(logger *slog.Logger) probe.Probe {
id := probe.ID{
Expand All @@ -41,6 +62,7 @@ func New(logger *slog.Logger) probe.Probe {
Consts: []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
writeStatusConst{},
probe.StructFieldConst{
Key: "clientconn_target_ptr_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc", "ClientConn", "target"),
Expand All @@ -57,17 +79,26 @@ func New(logger *slog.Logger) probe.Probe {
Key: "headerFrame_streamid_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "headerFrame", "streamID"),
},
probe.StructFieldConst{
Key: "error_status_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Error", "s"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "error_status_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Error", "s"),
},
MinVersion: writeStatusMinVersion,
},
probe.StructFieldConst{
Key: "status_s_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "status_s_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"),
},
MinVersion: writeStatusMinVersion,
},
probe.StructFieldConst{
Key: "status_code_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "status_code_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"),
},
MinVersion: writeStatusMinVersion,
},
},
Uprobes: []probe.Uprobe{
Expand Down Expand Up @@ -124,8 +155,6 @@ func convertEvent(e *event) []*probe.SpanEvent {
semconv.RPCServiceKey.String(method),
semconv.ServerAddress(target))

attrs = append(attrs, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)))

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: e.SpanContext.TraceID,
SpanID: e.SpanContext.SpanID,
Expand Down Expand Up @@ -155,8 +184,12 @@ func convertEvent(e *event) []*probe.SpanEvent {
TracerSchema: semconv.SchemaURL,
}

if e.StatusCode > 0 {
event.Status = probe.Status{Code: codes.Error}
if writeStatus {
event.Attributes = append(event.Attributes, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)))

if e.StatusCode > 0 {
event.Status = probe.Status{Code: codes.Error}
}
}

return []*probe.SpanEvent{event}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ volatile const bool is_new_frame_pos;
volatile const u64 status_s_pos;
volatile const u64 status_code_pos;

volatile const bool write_status_supported;

static __always_inline long dummy_extract_span_context_from_headers(void *stream_id, struct span_context *parent_span_context) {
return 0;
}
Expand Down Expand Up @@ -175,6 +177,10 @@ int uprobe_http2Server_operateHeader(struct pt_regs *ctx)
// https://github.com/grpc/grpc-go/blob/bcf9171a20e44ed81a6eb152e3ca9e35b2c02c5d/internal/transport/http2_server.go#L1049
SEC("uprobe/http2Server_WriteStatus")
int uprobe_http2Server_WriteStatus(struct pt_regs *ctx) {
if(!write_status_supported) {
bpf_printk("status probe not supported for this version of gRPC");
return 0;
}
struct go_iface go_context = {0};
get_Go_context(ctx, 2, stream_ctx_pos, true, &go_context);
void *key = get_consistent_key(ctx, go_context.data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func New(logger *slog.Logger) probe.Probe {
Consts: []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
writeStatusConst{},
probe.StructFieldConst{
Key: "stream_method_ptr_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "method"),
Expand All @@ -63,13 +64,19 @@ func New(logger *slog.Logger) probe.Probe {
Key: "frame_stream_id_pod",
Val: structfield.NewID("golang.org/x/net", "golang.org/x/net/http2", "FrameHeader", "StreamID"),
},
probe.StructFieldConst{
Key: "status_s_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "status_s_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/status", "Status", "s"),
},
MinVersion: writeStatusMinVersion,
},
probe.StructFieldConst{
Key: "status_code_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"),
probe.StructFieldConstMinVersion{
StructField: probe.StructFieldConst{
Key: "status_code_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/genproto/googleapis/rpc/status", "Status", "Code"),
},
MinVersion: writeStatusMinVersion,
},
framePosConst{},
},
Expand Down Expand Up @@ -112,6 +119,24 @@ func (c framePosConst) InjectOption(td *process.TargetDetails) (inject.Option, e
return inject.WithKeyValue("is_new_frame_pos", ver.GreaterThanOrEqual(paramChangeVer)), nil
}

type writeStatusConst struct{}

var (
writeStatus = false
writeStatusMinVersion = version.Must(version.NewVersion("1.40.0"))
)

func (w writeStatusConst) InjectOption(td *process.TargetDetails) (inject.Option, error) {
ver, ok := td.Libraries[pkg]
if !ok {
return nil, fmt.Errorf("unknown module version: %s", pkg)
}
if ver.GreaterThanOrEqual(writeStatusMinVersion) {
writeStatus = true
}
return inject.WithKeyValue("write_status_supported", writeStatus), nil
}

// event represents an event in the gRPC server during a gRPC request.
type event struct {
context.BaseSpanProperties
Expand Down Expand Up @@ -141,29 +166,33 @@ func convertEvent(e *event) []*probe.SpanEvent {
pscPtr = nil
}

attrs := []attribute.KeyValue{
semconv.RPCSystemKey.String("grpc"),
semconv.RPCServiceKey.String(method),
}
event := &probe.SpanEvent{
SpanName: method,
StartTime: utils.BootOffsetToTime(e.StartTime),
EndTime: utils.BootOffsetToTime(e.EndTime),
Attributes: []attribute.KeyValue{
semconv.RPCSystemKey.String("grpc"),
semconv.RPCServiceKey.String(method),
semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)),
},
SpanName: method,
StartTime: utils.BootOffsetToTime(e.StartTime),
EndTime: utils.BootOffsetToTime(e.EndTime),
Attributes: attrs,
ParentSpanContext: pscPtr,
SpanContext: &sc,
TracerSchema: semconv.SchemaURL,
}

// Set server status codes per semconv:
// See https://github.com/open-telemetry/semantic-conventions/blob/02ecf0c71e9fa74d09d81c48e04a132db2b7060b/docs/rpc/grpc.md#grpc-status
if e.StatusCode == int32(codes.Unknown) ||
e.StatusCode == int32(codes.DeadlineExceeded) ||
e.StatusCode == int32(codes.Unimplemented) ||
e.StatusCode == int32(codes.Internal) ||
e.StatusCode == int32(codes.Unavailable) ||
e.StatusCode == int32(codes.DataLoss) {
event.Status = probe.Status{Code: otelcodes.Error}
if writeStatus {
event.Attributes = append(event.Attributes, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)))
// Set server status codes per semconv:
// See https://github.com/open-telemetry/semantic-conventions/blob/02ecf0c71e9fa74d09d81c48e04a132db2b7060b/docs/rpc/grpc.md#grpc-status
if e.StatusCode == int32(codes.Unknown) ||
e.StatusCode == int32(codes.DeadlineExceeded) ||
e.StatusCode == int32(codes.Unimplemented) ||
e.StatusCode == int32(codes.Internal) ||
e.StatusCode == int32(codes.Unavailable) ||
e.StatusCode == int32(codes.DataLoss) {
event.Status = probe.Status{Code: otelcodes.Error}
}
}

return []*probe.SpanEvent{event}
}
Loading