diff --git a/orca/call_metric_recorder_test.go b/orca/call_metric_recorder_test.go index 25d4af371d08..43d0e45291e2 100644 --- a/orca/call_metric_recorder_test.go +++ b/orca/call_metric_recorder_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/metadata" "google.golang.org/grpc/orca" + "google.golang.org/grpc/orca/internal" v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" testgrpc "google.golang.org/grpc/interop/grpc_testing" @@ -58,7 +59,6 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) { desc string injectMetrics bool wantProto *v3orcapb.OrcaLoadReport - wantErr error }{ { desc: "with custom backend metrics", @@ -73,7 +73,6 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) { { desc: "with no custom backend metrics", injectMetrics: false, - wantErr: orca.ErrLoadReportMissing, }, } @@ -146,9 +145,9 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) { t.Fatalf("EmptyCall failed: %v", err) } - gotProto, err := orca.ToLoadReport(trailer) - if test.wantErr != nil && !errors.Is(err, test.wantErr) { - t.Fatalf("When retrieving load report, got error: %v, want: %v", err, orca.ErrLoadReportMissing) + gotProto, err := internal.ToLoadReport(trailer) + if err != nil { + t.Fatalf("When retrieving load report, got error: %v, want: ", err) } if test.wantProto != nil && !cmp.Equal(gotProto, test.wantProto, cmp.Comparer(proto.Equal)) { t.Fatalf("Received load report in trailer: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(test.wantProto)) @@ -165,7 +164,6 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) { desc string injectMetrics bool wantProto *v3orcapb.OrcaLoadReport - wantErr error }{ { desc: "with custom backend metrics", @@ -180,7 +178,6 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) { { desc: "with no custom backend metrics", injectMetrics: false, - wantErr: orca.ErrLoadReportMissing, }, } @@ -288,9 +285,9 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) { } } - gotProto, err := orca.ToLoadReport(stream.Trailer()) - if test.wantErr != nil && !errors.Is(err, test.wantErr) { - t.Fatalf("When retrieving load report, got error: %v, want: %v", err, orca.ErrLoadReportMissing) + gotProto, err := internal.ToLoadReport(stream.Trailer()) + if err != nil { + t.Fatalf("When retrieving load report, got error: %v, want: ", err) } if test.wantProto != nil && !cmp.Equal(gotProto, test.wantProto, cmp.Comparer(proto.Equal)) { t.Fatalf("Received load report in trailer: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(test.wantProto)) diff --git a/orca/internal/internal.go b/orca/internal/internal.go index 865d94d86945..35b899d9e877 100644 --- a/orca/internal/internal.go +++ b/orca/internal/internal.go @@ -20,7 +20,16 @@ // avoid polluting the godoc of the top-level orca package. package internal -import ibackoff "google.golang.org/grpc/internal/backoff" +import ( + "errors" + "fmt" + + ibackoff "google.golang.org/grpc/internal/backoff" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/proto" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" +) // AllowAnyMinReportingInterval prevents clamping of the MinReportingInterval // configured via ServiceOptions, to a minimum of 30s. @@ -32,3 +41,31 @@ var AllowAnyMinReportingInterval interface{} // func(*ServiceOptions) // // For testing purposes only. var DefaultBackoffFunc = ibackoff.DefaultExponential.Backoff + +// TrailerMetadataKey is the key in which the per-call backend metrics are +// transmitted. +const TrailerMetadataKey = "endpoint-load-metrics-bin" + +// ToLoadReport unmarshals a binary encoded [ORCA LoadReport] protobuf message +// from md and returns the corresponding struct. The load report is expected to +// be stored as the value for key "endpoint-load-metrics-bin". +// +// If no load report was found in the provided metadata, if multiple load +// reports are found, or if the load report found cannot be parsed, an error is +// returned. +// +// [ORCA LoadReport]: (https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15) +func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) { + vs := md.Get(TrailerMetadataKey) + if len(vs) == 0 { + return nil, nil + } + if len(vs) != 1 { + return nil, errors.New("multiple orca load reports found in provided metadata") + } + ret := new(v3orcapb.OrcaLoadReport) + if err := proto.Unmarshal([]byte(vs[0]), ret); err != nil { + return nil, fmt.Errorf("failed to unmarshal load report found in metadata: %v", err) + } + return ret, nil +} diff --git a/orca/orca.go b/orca/orca.go index bacc4a89ab0b..2c958b6902e9 100644 --- a/orca/orca.go +++ b/orca/orca.go @@ -29,21 +29,19 @@ package orca import ( "context" "errors" - "fmt" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/internal" + igrpc "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancerload" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/orca/internal" "google.golang.org/protobuf/proto" - - v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" ) var ( logger = grpclog.Component("orca-backend-metrics") - joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) + joinServerOptions = igrpc.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) ) const trailerMetadataKey = "endpoint-load-metrics-bin" @@ -144,26 +142,6 @@ func (w *wrappedStream) Context() context.Context { // ErrLoadReportMissing indicates no ORCA load report was found in trailers. var ErrLoadReportMissing = errors.New("orca load report missing in provided metadata") -// ToLoadReport unmarshals a binary encoded [ORCA LoadReport] protobuf message -// from md and returns the corresponding struct. The load report is expected to -// be stored as the value for key "endpoint-load-metrics-bin". -// -// If no load report was found in the provided metadata, ErrLoadReportMissing is -// returned. -// -// [ORCA LoadReport]: (https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15) -func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) { - vs := md.Get(trailerMetadataKey) - if len(vs) == 0 { - return nil, ErrLoadReportMissing - } - ret := new(v3orcapb.OrcaLoadReport) - if err := proto.Unmarshal([]byte(vs[0]), ret); err != nil { - return nil, fmt.Errorf("failed to unmarshal load report found in metadata: %v", err) - } - return ret, nil -} - // loadParser implements the Parser interface defined in `internal/balancerload` // package. This interface is used by the client stream to parse load reports // sent by the server in trailer metadata. The parsed loads are then sent to @@ -174,9 +152,12 @@ func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) { type loadParser struct{} func (loadParser) Parse(md metadata.MD) interface{} { - lr, err := ToLoadReport(md) + lr, err := internal.ToLoadReport(md) if err != nil { - logger.Errorf("Parse(%v) failed: %v", err) + logger.Infof("Parse failed: %v", err) + } + if lr == nil && logger.V(2) { + logger.Infof("Missing ORCA load report data") } return lr } diff --git a/orca/orca_test.go b/orca/orca_test.go index fd356cfba437..096b54907148 100644 --- a/orca/orca_test.go +++ b/orca/orca_test.go @@ -25,12 +25,18 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/orca" + "google.golang.org/grpc/orca/internal" v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" ) func TestToLoadReport(t *testing.T) { + goodReport := &v3orcapb.OrcaLoadReport{ + CpuUtilization: 1.0, + MemUtilization: 50.0, + RequestCost: map[string]float64{"queryCost": 25.0}, + Utilization: map[string]float64{"queueSize": 75.0}, + } tests := []struct { name string md metadata.MD @@ -40,7 +46,7 @@ func TestToLoadReport(t *testing.T) { { name: "no load report in metadata", md: metadata.MD{}, - wantErr: true, + wantErr: false, }, { name: "badly marshaled load report", @@ -49,29 +55,27 @@ func TestToLoadReport(t *testing.T) { }(), wantErr: true, }, + { + name: "multiple load reports", + md: func() metadata.MD { + b, _ := proto.Marshal(goodReport) + return metadata.Pairs("endpoint-load-metrics-bin", string(b), "endpoint-load-metrics-bin", string(b)) + }(), + wantErr: true, + }, { name: "good load report", md: func() metadata.MD { - b, _ := proto.Marshal(&v3orcapb.OrcaLoadReport{ - CpuUtilization: 1.0, - MemUtilization: 50.0, - RequestCost: map[string]float64{"queryCost": 25.0}, - Utilization: map[string]float64{"queueSize": 75.0}, - }) + b, _ := proto.Marshal(goodReport) return metadata.Pairs("endpoint-load-metrics-bin", string(b)) }(), - want: &v3orcapb.OrcaLoadReport{ - CpuUtilization: 1.0, - MemUtilization: 50.0, - RequestCost: map[string]float64{"queryCost": 25.0}, - Utilization: map[string]float64{"queueSize": 75.0}, - }, + want: goodReport, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got, err := orca.ToLoadReport(test.md) + got, err := internal.ToLoadReport(test.md) if (err != nil) != test.wantErr { t.Fatalf("orca.ToLoadReport(%v) = %v, wantErr: %v", test.md, err, test.wantErr) } diff --git a/orca/service.go b/orca/service.go index 9400ae0c7e64..ae011fd9a9d2 100644 --- a/orca/service.go +++ b/orca/service.go @@ -120,7 +120,7 @@ func (s *Service) determineReportingInterval(req *v3orcaservicepb.OrcaLoadReport } dur := req.GetReportInterval().AsDuration() if dur < s.minReportingInterval { - logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using default: %s", dur, s.minReportingInterval) + logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using minimum", dur, s.minReportingInterval) return s.minReportingInterval } return dur