From 924457101bdb60cd3daeb38b43690f3bccb95853 Mon Sep 17 00:00:00 2001 From: Yangmin Zhu Date: Tue, 2 Apr 2019 10:36:15 -0700 Subject: [PATCH 1/7] syscall: remove logging in init(). (#2734) The same as https://github.com/grpc/grpc-go/pull/2373. Signed-off-by: Yangmin Zhu --- internal/syscall/syscall_nonlinux.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/internal/syscall/syscall_nonlinux.go b/internal/syscall/syscall_nonlinux.go index 61678feb0044..d3fd9dab3331 100644 --- a/internal/syscall/syscall_nonlinux.go +++ b/internal/syscall/syscall_nonlinux.go @@ -22,18 +22,24 @@ package syscall import ( "net" + "sync" "time" "google.golang.org/grpc/grpclog" ) -func init() { - grpclog.Info("CPU time info is unavailable on non-linux or appengine environment.") +var once sync.Once + +func log() { + once.Do(func() { + grpclog.Info("CPU time info is unavailable on non-linux or appengine environment.") + }) } // GetCPUTime returns the how much CPU time has passed since the start of this process. // It always returns 0 under non-linux or appengine environment. func GetCPUTime() int64 { + log() return 0 } @@ -42,22 +48,26 @@ type Rusage struct{} // GetRusage is a no-op function under non-linux or appengine environment. func GetRusage() (rusage *Rusage) { + log() return nil } // CPUTimeDiff returns the differences of user CPU time and system CPU time used // between two Rusage structs. It a no-op function for non-linux or appengine environment. func CPUTimeDiff(first *Rusage, latest *Rusage) (float64, float64) { + log() return 0, 0 } // SetTCPUserTimeout is a no-op function under non-linux or appengine environments func SetTCPUserTimeout(conn net.Conn, timeout time.Duration) error { + log() return nil } // GetTCPUserTimeout is a no-op function under non-linux or appengine environments // a negative return value indicates the operation is not supported func GetTCPUserTimeout(conn net.Conn) (int, error) { + log() return -1, nil } From d389f9fac68eea0dcc49957d0b4cca5b3a0a7171 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 2 Apr 2019 11:15:36 -0700 Subject: [PATCH 2/7] balancer: add server loads from RPC trailers to DoneInfo (#2641) --- balancer/balancer.go | 5 + internal/balancerload/load.go | 46 +++ internal/balancerload/orca/orca.go | 84 +++++ internal/balancerload/orca/orca_test.go | 88 ++++++ internal/balancerload/orca/orca_v1/orca.pb.go | 293 ++++++++++++++++++ internal/balancerload/orca/orca_v1/orca.proto | 60 ++++ stream.go | 2 + test/balancer_test.go | 58 ++++ 8 files changed, 636 insertions(+) create mode 100644 internal/balancerload/load.go create mode 100644 internal/balancerload/orca/orca.go create mode 100644 internal/balancerload/orca/orca_test.go create mode 100644 internal/balancerload/orca/orca_v1/orca.pb.go create mode 100644 internal/balancerload/orca/orca_v1/orca.proto diff --git a/balancer/balancer.go b/balancer/balancer.go index 801ca69b9a23..fafede238c13 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -183,6 +183,11 @@ type DoneInfo struct { BytesSent bool // BytesReceived indicates if any byte has been received from the server. BytesReceived bool + // ServerLoad is the load received from server. It's usually sent as part of + // trailing metadata. + // + // The only supported type now is *orca_v1.LoadReport. + ServerLoad interface{} } var ( diff --git a/internal/balancerload/load.go b/internal/balancerload/load.go new file mode 100644 index 000000000000..3a905d96657e --- /dev/null +++ b/internal/balancerload/load.go @@ -0,0 +1,46 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package balancerload defines APIs to parse server loads in trailers. The +// parsed loads are sent to balancers in DoneInfo. +package balancerload + +import ( + "google.golang.org/grpc/metadata" +) + +// Parser converts loads from metadata into a concrete type. +type Parser interface { + // Parse parses loads from metadata. + Parse(md metadata.MD) interface{} +} + +var parser Parser + +// SetParser sets the load parser. +// +// Not mutex-protected, should be called before any gRPC functions. +func SetParser(lr Parser) { + parser = lr +} + +// Parse calls parser.Read(). +func Parse(md metadata.MD) interface{} { + if parser == nil { + return nil + } + return parser.Parse(md) +} diff --git a/internal/balancerload/orca/orca.go b/internal/balancerload/orca/orca.go new file mode 100644 index 000000000000..8b8a1f17860a --- /dev/null +++ b/internal/balancerload/orca/orca.go @@ -0,0 +1,84 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:generate protoc -I ./orca_v1 --go_out=plugins=grpc:./orca_v1 ./orca_v1/orca.proto + +// Package orca implements Open Request Cost Aggregation. +package orca + +import ( + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/balancerload" + orcapb "google.golang.org/grpc/internal/balancerload/orca/orca_v1" + "google.golang.org/grpc/metadata" +) + +const mdKey = "X-Endpoint-Load-Metrics-Bin" + +// toBytes converts a orca load report into bytes. +func toBytes(r *orcapb.LoadReport) []byte { + if r == nil { + return nil + } + + b, err := proto.Marshal(r) + if err != nil { + grpclog.Warningf("orca: failed to marshal load report: %v", err) + return nil + } + return b +} + +// ToMetadata converts a orca load report into grpc metadata. +func ToMetadata(r *orcapb.LoadReport) metadata.MD { + b := toBytes(r) + if b == nil { + return nil + } + return metadata.Pairs(mdKey, string(b)) +} + +// fromBytes reads load report bytes and converts it to orca. +func fromBytes(b []byte) *orcapb.LoadReport { + ret := new(orcapb.LoadReport) + if err := proto.Unmarshal(b, ret); err != nil { + grpclog.Warningf("orca: failed to unmarshal load report: %v", err) + return nil + } + return ret +} + +// FromMetadata reads load report from metadata and converts it to orca. +// +// It returns nil if report is not found in metadata. +func FromMetadata(md metadata.MD) *orcapb.LoadReport { + vs := md.Get(mdKey) + if len(vs) == 0 { + return nil + } + return fromBytes([]byte(vs[0])) +} + +type loadParser struct{} + +func (*loadParser) Parse(md metadata.MD) interface{} { + return FromMetadata(md) +} + +func init() { + balancerload.SetParser(&loadParser{}) +} diff --git a/internal/balancerload/orca/orca_test.go b/internal/balancerload/orca/orca_test.go new file mode 100644 index 000000000000..10a16085b84e --- /dev/null +++ b/internal/balancerload/orca/orca_test.go @@ -0,0 +1,88 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package orca + +import ( + "reflect" + "strings" + "testing" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/internal/balancerload/orca/orca_v1" + "google.golang.org/grpc/metadata" +) + +var ( + testMessage = &orca_v1.LoadReport{ + CpuUtilization: 0.1, + MemUtilization: 0.2, + NicInUtilization: 0, + NicOutUtilization: 0, + RequestCostOrUtilization: map[string]float64{"ttt": 0.4}, + } + testBytes, _ = proto.Marshal(testMessage) +) + +func TestToMetadata(t *testing.T) { + tests := []struct { + name string + r *orca_v1.LoadReport + want metadata.MD + }{{ + name: "nil", + r: nil, + want: nil, + }, { + name: "valid", + r: testMessage, + want: metadata.MD{ + strings.ToLower(mdKey): []string{string(testBytes)}, + }, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ToMetadata(tt.r); !reflect.DeepEqual(got, tt.want) { + t.Errorf("ToMetadata() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFromMetadata(t *testing.T) { + tests := []struct { + name string + md metadata.MD + want *orca_v1.LoadReport + }{{ + name: "nil", + md: nil, + want: nil, + }, { + name: "valid", + md: metadata.MD{ + strings.ToLower(mdKey): []string{string(testBytes)}, + }, + want: testMessage, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := FromMetadata(tt.md); !proto.Equal(got, tt.want) { + t.Errorf("FromMetadata() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/balancerload/orca/orca_v1/orca.pb.go b/internal/balancerload/orca/orca_v1/orca.pb.go new file mode 100644 index 000000000000..1c21ddf6f738 --- /dev/null +++ b/internal/balancerload/orca/orca_v1/orca.pb.go @@ -0,0 +1,293 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: orca.proto + +package orca_v1 + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import duration "github.com/golang/protobuf/ptypes/duration" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type LoadReport struct { + // CPU utilization expressed as a fraction of available CPU resources. This + // should be derived from a sample or measurement taken during the request. + CpuUtilization float64 `protobuf:"fixed64,1,opt,name=cpu_utilization,json=cpuUtilization,proto3" json:"cpu_utilization,omitempty"` + // Memory utilization expressed as a fraction of available memory + // resources. This should be derived from a sample or measurement taken + // during the request. + MemUtilization float64 `protobuf:"fixed64,2,opt,name=mem_utilization,json=memUtilization,proto3" json:"mem_utilization,omitempty"` + // NIC inbound/outbound utilization expressed as a fraction of available NIC + // bandwidth. The request in/out bytes can be inferred by Envoy, but not the + // NIC availability at the endpoint, hence reporting + NicInUtilization float64 `protobuf:"fixed64,3,opt,name=nic_in_utilization,json=nicInUtilization,proto3" json:"nic_in_utilization,omitempty"` + NicOutUtilization float64 `protobuf:"fixed64,4,opt,name=nic_out_utilization,json=nicOutUtilization,proto3" json:"nic_out_utilization,omitempty"` + // Application specific requests costs. Values may be absolute costs (e.g. + // 3487 bytes of storage) associated with the cost or utilization, + // expressed as a fraction of total resources available. Utilization + // metrics should be derived from a sample or measurement taken + // during the request. + RequestCostOrUtilization map[string]float64 `protobuf:"bytes,5,rep,name=request_cost_or_utilization,json=requestCostOrUtilization,proto3" json:"request_cost_or_utilization,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"fixed64,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LoadReport) Reset() { *m = LoadReport{} } +func (m *LoadReport) String() string { return proto.CompactTextString(m) } +func (*LoadReport) ProtoMessage() {} +func (*LoadReport) Descriptor() ([]byte, []int) { + return fileDescriptor_orca_542539e3bf435293, []int{0} +} +func (m *LoadReport) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LoadReport.Unmarshal(m, b) +} +func (m *LoadReport) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LoadReport.Marshal(b, m, deterministic) +} +func (dst *LoadReport) XXX_Merge(src proto.Message) { + xxx_messageInfo_LoadReport.Merge(dst, src) +} +func (m *LoadReport) XXX_Size() int { + return xxx_messageInfo_LoadReport.Size(m) +} +func (m *LoadReport) XXX_DiscardUnknown() { + xxx_messageInfo_LoadReport.DiscardUnknown(m) +} + +var xxx_messageInfo_LoadReport proto.InternalMessageInfo + +func (m *LoadReport) GetCpuUtilization() float64 { + if m != nil { + return m.CpuUtilization + } + return 0 +} + +func (m *LoadReport) GetMemUtilization() float64 { + if m != nil { + return m.MemUtilization + } + return 0 +} + +func (m *LoadReport) GetNicInUtilization() float64 { + if m != nil { + return m.NicInUtilization + } + return 0 +} + +func (m *LoadReport) GetNicOutUtilization() float64 { + if m != nil { + return m.NicOutUtilization + } + return 0 +} + +func (m *LoadReport) GetRequestCostOrUtilization() map[string]float64 { + if m != nil { + return m.RequestCostOrUtilization + } + return nil +} + +type LoadReportRequest struct { + // Interval for generating Open RCA core metric responses. + ReportInterval *duration.Duration `protobuf:"bytes,1,opt,name=report_interval,json=reportInterval,proto3" json:"report_interval,omitempty"` + // Request costs to collect. If this is empty, all known requests costs tracked by + // the load reporting agent will be returned. This provides an opportunity for + // the client to selectively obtain a subset of tracked costs. + RequestCostNames []string `protobuf:"bytes,2,rep,name=request_cost_names,json=requestCostNames,proto3" json:"request_cost_names,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *LoadReportRequest) Reset() { *m = LoadReportRequest{} } +func (m *LoadReportRequest) String() string { return proto.CompactTextString(m) } +func (*LoadReportRequest) ProtoMessage() {} +func (*LoadReportRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_orca_542539e3bf435293, []int{1} +} +func (m *LoadReportRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LoadReportRequest.Unmarshal(m, b) +} +func (m *LoadReportRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LoadReportRequest.Marshal(b, m, deterministic) +} +func (dst *LoadReportRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LoadReportRequest.Merge(dst, src) +} +func (m *LoadReportRequest) XXX_Size() int { + return xxx_messageInfo_LoadReportRequest.Size(m) +} +func (m *LoadReportRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LoadReportRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LoadReportRequest proto.InternalMessageInfo + +func (m *LoadReportRequest) GetReportInterval() *duration.Duration { + if m != nil { + return m.ReportInterval + } + return nil +} + +func (m *LoadReportRequest) GetRequestCostNames() []string { + if m != nil { + return m.RequestCostNames + } + return nil +} + +func init() { + proto.RegisterType((*LoadReport)(nil), "orca.v1.LoadReport") + proto.RegisterMapType((map[string]float64)(nil), "orca.v1.LoadReport.RequestCostOrUtilizationEntry") + proto.RegisterType((*LoadReportRequest)(nil), "orca.v1.LoadReportRequest") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// OpenRCAServiceClient is the client API for OpenRCAService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type OpenRCAServiceClient interface { + StreamCoreMetrics(ctx context.Context, in *LoadReportRequest, opts ...grpc.CallOption) (OpenRCAService_StreamCoreMetricsClient, error) +} + +type openRCAServiceClient struct { + cc *grpc.ClientConn +} + +func NewOpenRCAServiceClient(cc *grpc.ClientConn) OpenRCAServiceClient { + return &openRCAServiceClient{cc} +} + +func (c *openRCAServiceClient) StreamCoreMetrics(ctx context.Context, in *LoadReportRequest, opts ...grpc.CallOption) (OpenRCAService_StreamCoreMetricsClient, error) { + stream, err := c.cc.NewStream(ctx, &_OpenRCAService_serviceDesc.Streams[0], "/orca.v1.OpenRCAService/StreamCoreMetrics", opts...) + if err != nil { + return nil, err + } + x := &openRCAServiceStreamCoreMetricsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type OpenRCAService_StreamCoreMetricsClient interface { + Recv() (*LoadReport, error) + grpc.ClientStream +} + +type openRCAServiceStreamCoreMetricsClient struct { + grpc.ClientStream +} + +func (x *openRCAServiceStreamCoreMetricsClient) Recv() (*LoadReport, error) { + m := new(LoadReport) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// OpenRCAServiceServer is the server API for OpenRCAService service. +type OpenRCAServiceServer interface { + StreamCoreMetrics(*LoadReportRequest, OpenRCAService_StreamCoreMetricsServer) error +} + +func RegisterOpenRCAServiceServer(s *grpc.Server, srv OpenRCAServiceServer) { + s.RegisterService(&_OpenRCAService_serviceDesc, srv) +} + +func _OpenRCAService_StreamCoreMetrics_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(LoadReportRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(OpenRCAServiceServer).StreamCoreMetrics(m, &openRCAServiceStreamCoreMetricsServer{stream}) +} + +type OpenRCAService_StreamCoreMetricsServer interface { + Send(*LoadReport) error + grpc.ServerStream +} + +type openRCAServiceStreamCoreMetricsServer struct { + grpc.ServerStream +} + +func (x *openRCAServiceStreamCoreMetricsServer) Send(m *LoadReport) error { + return x.ServerStream.SendMsg(m) +} + +var _OpenRCAService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "orca.v1.OpenRCAService", + HandlerType: (*OpenRCAServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamCoreMetrics", + Handler: _OpenRCAService_StreamCoreMetrics_Handler, + ServerStreams: true, + }, + }, + Metadata: "orca.proto", +} + +func init() { proto.RegisterFile("orca.proto", fileDescriptor_orca_542539e3bf435293) } + +var fileDescriptor_orca_542539e3bf435293 = []byte{ + // 373 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xcd, 0x6b, 0xe3, 0x30, + 0x10, 0xc5, 0xd7, 0xf6, 0x66, 0x97, 0x28, 0x90, 0x0f, 0x65, 0x0f, 0x59, 0x2f, 0xbb, 0x84, 0x5c, + 0x36, 0x87, 0x45, 0xd9, 0xa4, 0x97, 0xd2, 0x5b, 0x9b, 0x16, 0x1a, 0xfa, 0x11, 0x50, 0xe8, 0xa5, + 0x17, 0xe3, 0x28, 0xd3, 0x20, 0x6a, 0x4b, 0xae, 0x2c, 0x19, 0xd2, 0x7b, 0xff, 0xea, 0x5e, 0x8a, + 0x65, 0x97, 0xd8, 0x90, 0xf6, 0x26, 0xbd, 0xf9, 0xbd, 0x61, 0xe6, 0x0d, 0x42, 0x52, 0xb1, 0x90, + 0x24, 0x4a, 0x6a, 0x89, 0xbf, 0xdb, 0x77, 0x36, 0xf5, 0xff, 0x6c, 0xa5, 0xdc, 0x46, 0x30, 0xb1, + 0xf2, 0xda, 0x3c, 0x4c, 0x36, 0x46, 0x85, 0x9a, 0x4b, 0x51, 0x80, 0xa3, 0x57, 0x17, 0xa1, 0x6b, + 0x19, 0x6e, 0x28, 0x24, 0x52, 0x69, 0xfc, 0x17, 0x75, 0x58, 0x62, 0x02, 0xa3, 0x79, 0xc4, 0x9f, + 0x2d, 0x37, 0x70, 0x86, 0xce, 0xd8, 0xa1, 0x6d, 0x96, 0x98, 0xbb, 0xbd, 0x9a, 0x83, 0x31, 0xc4, + 0x35, 0xd0, 0x2d, 0xc0, 0x18, 0xe2, 0x2a, 0xf8, 0x0f, 0x61, 0xc1, 0x59, 0xc0, 0x45, 0x8d, 0xf5, + 0x2c, 0xdb, 0x15, 0x9c, 0x2d, 0x44, 0x95, 0x26, 0xa8, 0x9f, 0xd3, 0xd2, 0xe8, 0x1a, 0xfe, 0xd5, + 0xe2, 0x3d, 0xc1, 0xd9, 0xd2, 0xe8, 0x2a, 0x9f, 0xa0, 0x5f, 0x0a, 0x9e, 0x0c, 0xa4, 0x3a, 0x60, + 0x32, 0xd5, 0x81, 0x54, 0x35, 0x5f, 0x63, 0xe8, 0x8d, 0x5b, 0xb3, 0x29, 0x29, 0xd3, 0x20, 0xfb, + 0x4d, 0x09, 0x2d, 0x6c, 0x73, 0x99, 0xea, 0xa5, 0xaa, 0xb4, 0xbc, 0x10, 0x5a, 0xed, 0xe8, 0x40, + 0x7d, 0x50, 0xf6, 0xaf, 0xd0, 0xef, 0x4f, 0xad, 0xb8, 0x8b, 0xbc, 0x47, 0xd8, 0xd9, 0xd8, 0x9a, + 0x34, 0x7f, 0xe2, 0x1f, 0xa8, 0x91, 0x85, 0x91, 0x81, 0x32, 0xa1, 0xe2, 0x73, 0xe2, 0x1e, 0x3b, + 0xa3, 0x17, 0x07, 0xf5, 0xf6, 0x33, 0x95, 0x7d, 0xf1, 0x19, 0xea, 0x28, 0x2b, 0x04, 0x5c, 0x68, + 0x50, 0x59, 0x18, 0xd9, 0x6e, 0xad, 0xd9, 0x4f, 0x52, 0x5c, 0x93, 0xbc, 0x5f, 0x93, 0x9c, 0x97, + 0xd7, 0xa4, 0xed, 0xc2, 0xb1, 0x28, 0x0d, 0x79, 0xec, 0xb5, 0x60, 0x44, 0x18, 0x43, 0x3a, 0x70, + 0x87, 0xde, 0xb8, 0x49, 0xbb, 0x95, 0xe5, 0x6e, 0x73, 0x7d, 0x76, 0x8f, 0xda, 0xcb, 0x04, 0x04, + 0x9d, 0x9f, 0xae, 0x40, 0x65, 0x9c, 0x01, 0xbe, 0x44, 0xbd, 0x95, 0x56, 0x10, 0xc6, 0x73, 0xa9, + 0xe0, 0x06, 0xb4, 0xe2, 0x2c, 0xc5, 0xfe, 0x81, 0x20, 0xcb, 0xa1, 0xfd, 0xfe, 0x81, 0xda, 0xe8, + 0xcb, 0x7f, 0x67, 0xfd, 0xcd, 0x0e, 0x7b, 0xf4, 0x16, 0x00, 0x00, 0xff, 0xff, 0xc0, 0xda, 0x2d, + 0xb7, 0x9f, 0x02, 0x00, 0x00, +} diff --git a/internal/balancerload/orca/orca_v1/orca.proto b/internal/balancerload/orca/orca_v1/orca.proto new file mode 100644 index 000000000000..33d835d1bef8 --- /dev/null +++ b/internal/balancerload/orca/orca_v1/orca.proto @@ -0,0 +1,60 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package orca.v1; + +import "google/protobuf/duration.proto"; + +message LoadReport { + // CPU utilization expressed as a fraction of available CPU resources. This + // should be derived from a sample or measurement taken during the request. + double cpu_utilization = 1; + + // Memory utilization expressed as a fraction of available memory + // resources. This should be derived from a sample or measurement taken + // during the request. + double mem_utilization = 2; + + // NIC inbound/outbound utilization expressed as a fraction of available NIC + // bandwidth. The request in/out bytes can be inferred by Envoy, but not the + // NIC availability at the endpoint, hence reporting + double nic_in_utilization = 3; + double nic_out_utilization = 4; + + // Application specific requests costs. Values may be absolute costs (e.g. + // 3487 bytes of storage) associated with the cost or utilization, + // expressed as a fraction of total resources available. Utilization + // metrics should be derived from a sample or measurement taken + // during the request. + map request_cost_or_utilization = 5; +} + +message LoadReportRequest { + // Interval for generating Open RCA core metric responses. + google.protobuf.Duration report_interval = 1; + // Request costs to collect. If this is empty, all known requests costs tracked by + // the load reporting agent will be returned. This provides an opportunity for + // the client to selectively obtain a subset of tracked costs. + repeated string request_cost_names = 2; +} + +service OpenRCAService { + rpc StreamCoreMetrics(LoadReportRequest) returns (stream LoadReport) { + } +} + diff --git a/stream.go b/stream.go index 4e00cc08d462..6e2bf51e0a09 100644 --- a/stream.go +++ b/stream.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/encoding" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/balancerload" "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcrand" @@ -940,6 +941,7 @@ func (a *csAttempt) finish(err error) { Trailer: tr, BytesSent: a.s != nil, BytesReceived: br, + ServerLoad: balancerload.Parse(tr), }) } if a.statsHandler != nil { diff --git a/test/balancer_test.go b/test/balancer_test.go index 4d8b4f9cafa2..e8e1e0ea34cf 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -24,14 +24,19 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/balancerload/orca" + orcapb "google.golang.org/grpc/internal/balancerload/orca/orca_v1" "google.golang.org/grpc/resolver" testpb "google.golang.org/grpc/test/grpc_testing" "google.golang.org/grpc/testdata" + + _ "google.golang.org/grpc/internal/balancerload/orca" ) const testBalancerName = "testbalancer" @@ -194,3 +199,56 @@ func testDoneInfo(t *testing.T, e env) { t.Fatalf("Got %d picks, %d doneInfo, want equal amount", len(b.pickOptions), len(b.doneInfo)) } } + +func (s) TestDoneLoads(t *testing.T) { + for _, e := range listTestEnv() { + testDoneLoads(t, e) + } +} + +func testDoneLoads(t *testing.T, e env) { + b := &testBalancer{} + balancer.Register(b) + + testLoad := &orcapb.LoadReport{ + CpuUtilization: 0.31, + MemUtilization: 0.41, + NicInUtilization: 0.59, + NicOutUtilization: 0.26, + RequestCostOrUtilization: nil, + } + + ss := &stubServer{ + emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + grpc.SetTrailer(ctx, orca.ToMetadata(testLoad)) + return &testpb.Empty{}, nil + }, + } + if err := ss.Start(nil, grpc.WithBalancerName(testBalancerName)); err != nil { + t.Fatalf("error starting testing server: %v", err) + } + defer ss.Stop() + + tc := testpb.NewTestServiceClient(ss.cc) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, nil) + } + + poWant := []balancer.PickOptions{ + {FullMethodName: "/grpc.testing.TestService/EmptyCall"}, + } + if !reflect.DeepEqual(b.pickOptions, poWant) { + t.Fatalf("b.pickOptions = %v; want %v", b.pickOptions, poWant) + } + + if len(b.doneInfo) < 1 { + t.Fatalf("b.doneInfo = %v, want length 1", b.doneInfo) + } + gotLoad, _ := b.doneInfo[0].ServerLoad.(*orcapb.LoadReport) + if !proto.Equal(gotLoad, testLoad) { + t.Fatalf("b.doneInfo[0].ServerLoad = %v; want = %v", b.doneInfo[0].ServerLoad, testLoad) + } +} From 955eb8a3c829b92e7c850d8c78946735d852eeaa Mon Sep 17 00:00:00 2001 From: lyuxuan Date: Tue, 2 Apr 2019 15:42:35 -0700 Subject: [PATCH 3/7] channelz: cleanup channel registration if Dial fails (#2733) --- clientconn.go | 26 +++++++++++++------------- test/channelz_test.go | 12 ++++++++++++ 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/clientconn.go b/clientconn.go index a1e1a98006a7..bad91069f8af 100644 --- a/clientconn.go +++ b/clientconn.go @@ -134,6 +134,18 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * opt.apply(&cc.dopts) } + defer func() { + select { + case <-ctx.Done(): + conn, err = nil, ctx.Err() + default: + } + + if err != nil { + cc.Close() + } + }() + if channelz.IsOn() { if cc.dopts.channelzParentID != 0 { cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) @@ -196,18 +208,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * defer cancel() } - defer func() { - select { - case <-ctx.Done(): - conn, err = nil, ctx.Err() - default: - } - - if err != nil { - cc.Close() - } - }() - scSet := false if cc.dopts.scChan != nil { // Try to get an initial service config. @@ -820,7 +820,7 @@ func (cc *ClientConn) Close() error { } channelz.AddTraceEvent(cc.channelzID, ted) // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to - // the entity beng deleted, and thus prevent it from being deleted right away. + // the entity being deleted, and thus prevent it from being deleted right away. channelz.RemoveEntry(cc.channelzID) } return nil diff --git a/test/channelz_test.go b/test/channelz_test.go index c5dcff39f703..4d8c0cc3e0b6 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -190,6 +190,18 @@ func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) { } } +func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) { + channelz.NewChannelzStorage() + // Make dial fails (due to no transport security specified) + _, err := grpc.Dial("fake.addr") + if err == nil { + t.Fatal("expecting dial to fail") + } + if tcs, end := channelz.GetTopChannels(0, 0); tcs != nil || !end { + t.Fatalf("GetTopChannels(0, 0) = %v, %v, want , true", tcs, end) + } +} + func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) { channelz.NewChannelzStorage() e := tcpClearRREnv From 4745f6ae0ddd06cf0892fd56a3ca478af1f70b6a Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 2 Apr 2019 16:27:52 -0700 Subject: [PATCH 4/7] grpclb: fallback after init (#2681) regenerate picker when switching between fallback/non-fallback, because new SubConn state might not be updated for cached SubConns --- balancer/grpclb/grpclb.go | 95 +++++++++++++++++------ balancer/grpclb/grpclb_remote_balancer.go | 42 ++++++---- balancer/grpclb/grpclb_test.go | 80 ++++++++++++++++--- balancer/grpclb/grpclb_test_util_test.go | 85 ++++++++++++++++++++ 4 files changed, 252 insertions(+), 50 deletions(-) create mode 100644 balancer/grpclb/grpclb_test_util_test.go diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index e385d02d119b..a1123cedc7f8 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -172,7 +172,6 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal doneCh: make(chan struct{}), manualResolver: r, - csEvltr: &balancer.ConnectivityStateEvaluator{}, subConns: make(map[resolver.Address]balancer.SubConn), scStates: make(map[balancer.SubConn]connectivity.State), picker: &errPicker{err: balancer.ErrNoSubConnAvailable}, @@ -238,15 +237,15 @@ type lbBalancer struct { // but with only READY SCs will be gerenated. backendAddrs []resolver.Address // Roundrobin functionalities. - csEvltr *balancer.ConnectivityStateEvaluator state connectivity.State subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns. picker balancer.Picker // Support fallback to resolved backend addresses if there's no response // from remote balancer within fallbackTimeout. - fallbackTimerExpired bool - serverListReceived bool + remoteBalancerConnected bool + serverListReceived bool + inFallback bool // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set // when resolved address updates are received, and read in the goroutine // handling fallback. @@ -264,13 +263,16 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) { return } + if lb.state == connectivity.Connecting { + lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} + return + } + var readySCs []balancer.SubConn if lb.usePickFirst { - if lb.state == connectivity.Ready || lb.state == connectivity.Idle { - for _, sc := range lb.subConns { - readySCs = append(readySCs, sc) - break - } + for _, sc := range lb.subConns { + readySCs = append(readySCs, sc) + break } } else { for _, a := range lb.backendAddrs { @@ -286,10 +288,13 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) { // If there's no ready SubConns, always re-pick. This is to avoid drops // unless at least one SubConn is ready. Otherwise we may drop more // often than want because of drops + re-picks(which become re-drops). + // + // This doesn't seem to be necessary after the connecting check above. + // Kept for safety. lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} return } - if len(lb.fullServerList) <= 0 { + if lb.inFallback { lb.picker = newRRPicker(readySCs) return } @@ -305,6 +310,34 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) { prevLBPicker.updateReadySCs(readySCs) } +// aggregateSubConnStats calculate the aggregated state of SubConns in +// lb.SubConns. These SubConns are subconns in use (when switching between +// fallback and grpclb). lb.scState contains states for all SubConns, including +// those in cache (SubConns are cached for 10 seconds after remove). +// +// The aggregated state is: +// - If at least one SubConn in Ready, the aggregated state is Ready; +// - Else if at least one SubConn in Connecting, the aggregated state is Connecting; +// - Else the aggregated state is TransientFailure. +func (lb *lbBalancer) aggregateSubConnStates() connectivity.State { + var numConnecting uint64 + + for _, sc := range lb.subConns { + if state, ok := lb.scStates[sc]; ok { + switch state { + case connectivity.Ready: + return connectivity.Ready + case connectivity.Connecting: + numConnecting++ + } + } + } + if numConnecting > 0 { + return connectivity.Connecting + } + return connectivity.TransientFailure +} + func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { if grpclog.V(2) { grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s) @@ -328,18 +361,33 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi // kept the sc's state in scStates. Remove state for this sc here. delete(lb.scStates, sc) } + // Force regenerate picker if + // - this sc became ready from not-ready + // - this sc became not-ready from ready + lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false) + + // Enter fallback when the aggregated state is not Ready and the connection + // to remote balancer is lost. + if lb.state != connectivity.Ready { + if !lb.inFallback && !lb.remoteBalancerConnected { + // Enter fallback. + lb.refreshSubConns(lb.resolvedBackendAddrs, false) + } + } +} +// updateStateAndPicker re-calculate the aggregated state, and regenerate picker +// if overall state is changed. +// +// If forceRegeneratePicker is true, picker will be regenerated. +func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) { oldAggrState := lb.state - lb.state = lb.csEvltr.RecordTransition(oldS, s) - + lb.state = lb.aggregateSubConnStates() // Regenerate picker when one of the following happens: - // - this sc became ready from not-ready - // - this sc became not-ready from ready - // - the aggregated state of balancer became TransientFailure from non-TransientFailure - // - the aggregated state of balancer became non-TransientFailure from TransientFailure - if (oldS == connectivity.Ready) != (s == connectivity.Ready) || - (lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { - lb.regeneratePicker(false) + // - caller wants to regenerate + // - the aggregated state changed + if forceRegeneratePicker || (lb.state != oldAggrState) { + lb.regeneratePicker(resetDrop) } lb.cc.UpdateBalancerState(lb.state, lb.picker) @@ -357,11 +405,11 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) { return } lb.mu.Lock() - if lb.serverListReceived { + if lb.inFallback || lb.serverListReceived { lb.mu.Unlock() return } - lb.fallbackTimerExpired = true + // Enter fallback. lb.refreshSubConns(lb.resolvedBackendAddrs, false) lb.mu.Unlock() } @@ -405,10 +453,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { lb.mu.Lock() lb.resolvedBackendAddrs = backendAddrs - // If serverListReceived is true, connection to remote balancer was - // successful and there's no need to do fallback anymore. - // If fallbackTimerExpired is false, fallback hasn't happened yet. - if !lb.serverListReceived && lb.fallbackTimerExpired { + if lb.inFallback { // This means we received a new list of resolved backends, and we are // still in fallback mode. Need to update the list of backends we are // using to the new list of backends. diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index e7e927087800..7ed886f060af 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -85,24 +85,26 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) { backendAddrs = append(backendAddrs, addr) } - // Call refreshSubConns to create/remove SubConns. + // Call refreshSubConns to create/remove SubConns. If we are in fallback, + // this is also exiting fallback. lb.refreshSubConns(backendAddrs, true) - // Regenerate and update picker no matter if there's update on backends (if - // any SubConn will be newed/removed). Because since the full serverList was - // different, there might be updates in drops or pick weights(different - // number of duplicates). We need to update picker with the fulllist. - // - // Now with cache, even if SubConn was newed/removed, there might be no - // state changes. - lb.regeneratePicker(true) - lb.cc.UpdateBalancerState(lb.state, lb.picker) } -// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool -// indicating whether the backendAddrs are different from the cached -// backendAddrs (whether any SubConn was newed/removed). +// refreshSubConns creates/removes SubConns with backendAddrs, and refreshes +// balancer state and picker. +// // Caller must hold lb.mu. func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCLBServer bool) { + defer func() { + // Regenerate and update picker after refreshing subconns because with + // cache, even if SubConn was newed/removed, there might be no state + // changes (the subconn will be kept in cache, not actually + // newed/removed). + lb.updateStateAndPicker(true, true) + }() + + lb.inFallback = !fromGRPCLBServer + opts := balancer.NewSubConnOptions{} if fromGRPCLBServer { opts.CredsBundle = lb.grpclbBackendCreds @@ -218,6 +220,9 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) { if err != nil { return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) } + lb.mu.Lock() + lb.remoteBalancerConnected = true + lb.mu.Unlock() // grpclb handshake on the stream. initReq := &lbpb.LoadBalanceRequest{ @@ -270,6 +275,17 @@ func (lb *lbBalancer) watchRemoteBalancer() { // Trigger a re-resolve when the stream errors. lb.cc.cc.ResolveNow(resolver.ResolveNowOption{}) + lb.mu.Lock() + lb.remoteBalancerConnected = false + lb.fullServerList = nil + // Enter fallback when connection to remote balancer is lost, and the + // aggregated state is not Ready. + if !lb.inFallback && lb.state != connectivity.Ready { + // Entering fallback. + lb.refreshSubConns(lb.resolvedBackendAddrs, false) + } + lb.mu.Unlock() + if !doBackoff { retryCount = 0 continue diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 0cdb5c375c7c..2f4e3e01eede 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -230,18 +230,21 @@ func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServe b.stats.merge(req.GetClientStats()) } }() - for v := range b.sls { - resp = &lbpb.LoadBalanceResponse{ - LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{ - ServerList: v, - }, + for { + select { + case v := <-b.sls: + resp = &lbpb.LoadBalanceResponse{ + LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{ + ServerList: v, + }, + } + case <-stream.Context().Done(): + return stream.Context().Err() } if err := stream.Send(resp); err != nil { return err } } - <-b.done - return nil } type testServer struct { @@ -297,6 +300,9 @@ type testServers struct { backends []*grpc.Server beIPs []net.IP bePorts []int + + lbListener net.Listener + beListeners []net.Listener } func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) { @@ -317,7 +323,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP) bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port) - beListeners = append(beListeners, beLis) + beListeners = append(beListeners, newRestartableListener(beLis)) } backends := startBackends(beServerName, false, beListeners...) @@ -327,6 +333,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er err = fmt.Errorf("failed to create the listener for the load balancer %v", err) return } + lbLis = newRestartableListener(lbLis) lbCreds := &serverNameCheckCreds{ sn: lbServerName, } @@ -344,6 +351,9 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er backends: backends, beIPs: beIPs, bePorts: bePorts, + + lbListener: lbLis, + beListeners: beListeners, } cleanup = func() { defer stopBackends(backends) @@ -712,7 +722,7 @@ func TestFallback(t *testing.T) { testC := testpb.NewTestServiceClient(cc) r.UpdateState(resolver.State{Addresses: []resolver.Address{{ - Addr: "", + Addr: "invalid.address", Type: resolver.GRPCLB, ServerName: lbServerName, }, { @@ -723,7 +733,7 @@ func TestFallback(t *testing.T) { var p peer.Peer if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { - t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) + t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } if p.Addr.String() != beLis.Addr().String() { t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr()) @@ -739,16 +749,62 @@ func TestFallback(t *testing.T) { ServerName: beServerName, }}}) + var backendUsed bool for i := 0; i < 1000; i++ { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { - return + backendUsed = true + break } time.Sleep(time.Millisecond) } - t.Fatalf("No RPC sent to backend behind remote balancer after 1 second") + if !backendUsed { + t.Fatalf("No RPC sent to backend behind remote balancer after 1 second") + } + + // Close backend and remote balancer connections, should use fallback. + tss.beListeners[0].(*restartableListener).stopPreviousConns() + tss.lbListener.(*restartableListener).stopPreviousConns() + time.Sleep(time.Second) + + var fallbackUsed bool + for i := 0; i < 1000; i++ { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) + } + if p.Addr.String() == beLis.Addr().String() { + fallbackUsed = true + break + } + time.Sleep(time.Millisecond) + } + if !fallbackUsed { + t.Fatalf("No RPC sent to fallback after 1 second") + } + + // Restart backend and remote balancer, should not use backends. + tss.beListeners[0].(*restartableListener).restart() + tss.lbListener.(*restartableListener).restart() + tss.ls.sls <- sl + + time.Sleep(time.Second) + + var backendUsed2 bool + for i := 0; i < 1000; i++ { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) + } + if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { + backendUsed2 = true + break + } + time.Sleep(time.Millisecond) + } + if !backendUsed2 { + t.Fatalf("No RPC sent to backend behind remote balancer after 1 second") + } } // The remote balancer sends response with duplicates to grpclb client. diff --git a/balancer/grpclb/grpclb_test_util_test.go b/balancer/grpclb/grpclb_test_util_test.go new file mode 100644 index 000000000000..5d3e6ba7fed9 --- /dev/null +++ b/balancer/grpclb/grpclb_test_util_test.go @@ -0,0 +1,85 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpclb + +import ( + "net" + "sync" +) + +type tempError struct{} + +func (*tempError) Error() string { + return "grpclb test temporary error" +} +func (*tempError) Temporary() bool { + return true +} + +type restartableListener struct { + net.Listener + addr string + + mu sync.Mutex + closed bool + conns []net.Conn +} + +func newRestartableListener(l net.Listener) *restartableListener { + return &restartableListener{ + Listener: l, + addr: l.Addr().String(), + } +} + +func (l *restartableListener) Accept() (conn net.Conn, err error) { + conn, err = l.Listener.Accept() + if err == nil { + l.mu.Lock() + if l.closed { + conn.Close() + l.mu.Unlock() + return nil, &tempError{} + } + l.conns = append(l.conns, conn) + l.mu.Unlock() + } + return +} + +func (l *restartableListener) Close() error { + return l.Listener.Close() +} + +func (l *restartableListener) stopPreviousConns() { + l.mu.Lock() + l.closed = true + tmp := l.conns + l.conns = nil + l.mu.Unlock() + for _, conn := range tmp { + conn.Close() + } +} + +func (l *restartableListener) restart() { + l.mu.Lock() + l.closed = false + l.mu.Unlock() +} From ea5e6da287dc46f91538f761e87d9f01fd474105 Mon Sep 17 00:00:00 2001 From: lyuxuan Date: Wed, 3 Apr 2019 10:50:28 -0700 Subject: [PATCH 5/7] service config: default service config (#2686) --- clientconn.go | 97 ++++++++++++++++++++++++++++++---------- clientconn_test.go | 88 ++++++++++++++++++++++++++++++++++++ dialoptions.go | 33 ++++++++++---- resolver_conn_wrapper.go | 4 +- service_config.go | 19 ++++---- service_config_test.go | 48 ++++++++++++-------- 6 files changed, 229 insertions(+), 60 deletions(-) diff --git a/clientconn.go b/clientconn.go index bad91069f8af..e12ea3479f94 100644 --- a/clientconn.go +++ b/clientconn.go @@ -68,6 +68,9 @@ var ( errConnClosing = errors.New("grpc: the connection is closing") // errBalancerClosed indicates that the balancer is closed. errBalancerClosed = errors.New("grpc: balancer is closed") + // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default + // service config. + invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" ) // The following errors are returned from Dial and DialContext @@ -185,6 +188,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } } + if cc.dopts.defaultServiceConfigRawJSON != nil { + sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) + if err != nil { + return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err) + } + cc.dopts.defaultServiceConfig = sc + } cc.mkp = cc.dopts.copts.KeepaliveParams if cc.dopts.copts.Dialer == nil { @@ -214,7 +224,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * select { case sc, ok := <-cc.dopts.scChan: if ok { - cc.sc = sc + cc.sc = &sc scSet = true } default: @@ -260,7 +270,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * select { case sc, ok := <-cc.dopts.scChan: if ok { - cc.sc = sc + cc.sc = &sc } case <-ctx.Done(): return nil, ctx.Err() @@ -382,8 +392,7 @@ type ClientConn struct { mu sync.RWMutex resolverWrapper *ccResolverWrapper - sc ServiceConfig - scRaw string + sc *ServiceConfig conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters @@ -429,8 +438,7 @@ func (cc *ClientConn) scWatcher() { cc.mu.Lock() // TODO: load balance policy runtime change is ignored. // We may revisit this decision in the future. - cc.sc = sc - cc.scRaw = "" + cc.sc = &sc cc.mu.Unlock() case <-cc.ctx.Done(): return @@ -457,6 +465,24 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { } } +// gRPC should resort to default service config when: +// * resolver service config is disabled +// * or, resolver does not return a service config or returns an invalid one. +func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool { + if cc.dopts.disableServiceConfig { + return true + } + // The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type. + // Right now, we assume that empty service config string means resolver does not return a config. + if sc == "" { + return true + } + // TODO: the logic below is temporary. Once we finish the logic to validate service config + // in resolver, we will replace the logic below. + _, err := parseServiceConfig(sc) + return err != nil +} + func (cc *ClientConn) updateResolverState(s resolver.State) error { cc.mu.Lock() defer cc.mu.Unlock() @@ -467,29 +493,26 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error { return nil } - if !cc.dopts.disableServiceConfig && cc.scRaw != s.ServiceConfig { - // New service config; apply it. + if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) { + if cc.dopts.defaultServiceConfig != nil && cc.sc == nil { + cc.applyServiceConfig(cc.dopts.defaultServiceConfig) + } + } else { + // TODO: the parsing logic below will be moved inside resolver. sc, err := parseServiceConfig(s.ServiceConfig) if err != nil { - fmt.Println("error parsing config: ", err) return err } - cc.scRaw = s.ServiceConfig - cc.sc = sc - - if cc.sc.retryThrottling != nil { - newThrottler := &retryThrottler{ - tokens: cc.sc.retryThrottling.MaxTokens, - max: cc.sc.retryThrottling.MaxTokens, - thresh: cc.sc.retryThrottling.MaxTokens / 2, - ratio: cc.sc.retryThrottling.TokenRatio, - } - cc.retryThrottler.Store(newThrottler) - } else { - cc.retryThrottler.Store((*retryThrottler)(nil)) + if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig { + cc.applyServiceConfig(sc) } } + // update the service config that will be sent to balancer. + if cc.sc != nil { + s.ServiceConfig = cc.sc.rawJSONString + } + if cc.dopts.balancerBuilder == nil { // Only look at balancer types and switch balancer if balancer dial // option is not set. @@ -504,7 +527,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error { // TODO: use new loadBalancerConfig field with appropriate priority. if isGRPCLB { newBalancerName = grpclbName - } else if cc.sc.LB != nil { + } else if cc.sc != nil && cc.sc.LB != nil { newBalancerName = *cc.sc.LB } else { newBalancerName = PickFirstBalancerName @@ -724,6 +747,9 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { // TODO: Avoid the locking here. cc.mu.RLock() defer cc.mu.RUnlock() + if cc.sc == nil { + return MethodConfig{} + } m, ok := cc.sc.Methods[method] if !ok { i := strings.LastIndex(method, "/") @@ -735,6 +761,9 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { cc.mu.RLock() defer cc.mu.RUnlock() + if cc.sc == nil { + return nil + } return cc.sc.healthCheckConfig } @@ -748,6 +777,28 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st return t, done, nil } +func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error { + if sc == nil { + // should never reach here. + return fmt.Errorf("got nil pointer for service config") + } + cc.sc = sc + + if cc.sc.retryThrottling != nil { + newThrottler := &retryThrottler{ + tokens: cc.sc.retryThrottling.MaxTokens, + max: cc.sc.retryThrottling.MaxTokens, + thresh: cc.sc.retryThrottling.MaxTokens / 2, + ratio: cc.sc.retryThrottling.TokenRatio, + } + cc.retryThrottler.Store(newThrottler) + } else { + cc.retryThrottler.Store((*retryThrottler)(nil)) + } + + return nil +} + func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { cc.mu.RLock() r := cc.resolverWrapper diff --git a/clientconn_test.go b/clientconn_test.go index 9b5ac03547b6..ab1db5664049 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -24,6 +24,7 @@ import ( "fmt" "math" "net" + "strings" "sync/atomic" "testing" "time" @@ -1232,3 +1233,90 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) { t.Fatal("timed out waiting for any server to be contacted after tryUpdateAddrs") } } + +func (s) TestDefaultServiceConfig(t *testing.T) { + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + addr := r.Scheme() + ":///non.existent" + js := `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "bar" + } + ], + "waitForReady": true + } + ] +}` + testInvalidDefaultServiceConfig(t) + testDefaultServiceConfigWhenResolverServiceConfigDisabled(t, r, addr, js) + testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t, r, addr, js) + testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t, r, addr, js) +} + +func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool { + var i int + for i = 0; i < 10; i++ { + mc := cc.GetMethodConfig("/foo/bar") + if mc.WaitForReady != nil && *mc.WaitForReady == true { + break + } + time.Sleep(100 * time.Millisecond) + } + return i != 10 +} + +func testInvalidDefaultServiceConfig(t *testing.T) { + _, err := Dial("fake.com", WithInsecure(), WithDefaultServiceConfig("")) + if !strings.Contains(err.Error(), invalidDefaultServiceConfigErrPrefix) { + t.Fatalf("Dial got err: %v, want err contains: %v", err, invalidDefaultServiceConfigErrPrefix) + } +} + +func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r resolver.Resolver, addr string, js string) { + cc, err := Dial(addr, WithInsecure(), WithDisableServiceConfig(), WithDefaultServiceConfig(js)) + if err != nil { + t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) + } + defer cc.Close() + // Resolver service config gets ignored since resolver service config is disabled. + r.(*manual.Resolver).UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr}}, + ServiceConfig: "{}", + }) + if !verifyWaitForReadyEqualsTrue(cc) { + t.Fatal("default service config failed to be applied after 1s") + } +} + +func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T, r resolver.Resolver, addr string, js string) { + cc, err := Dial(addr, WithInsecure(), WithDefaultServiceConfig(js)) + if err != nil { + t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) + } + defer cc.Close() + r.(*manual.Resolver).UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr}}, + }) + if !verifyWaitForReadyEqualsTrue(cc) { + t.Fatal("default service config failed to be applied after 1s") + } +} + +func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T, r resolver.Resolver, addr string, js string) { + cc, err := Dial(addr, WithInsecure(), WithDefaultServiceConfig(js)) + if err != nil { + t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) + } + defer cc.Close() + r.(*manual.Resolver).UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr}}, + ServiceConfig: "{something wrong,}", + }) + if !verifyWaitForReadyEqualsTrue(cc) { + t.Fatal("default service config failed to be applied after 1s") + } +} diff --git a/dialoptions.go b/dialoptions.go index a0743a9e75fa..e114fecbb7b4 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -55,14 +55,16 @@ type dialOptions struct { // balancer, and also by WithBalancerName dial option. balancerBuilder balancer.Builder // This is to support grpclb. - resolverBuilder resolver.Builder - reqHandshake envconfig.RequireHandshakeSetting - channelzParentID int64 - disableServiceConfig bool - disableRetry bool - disableHealthCheck bool - healthCheckFunc internal.HealthChecker - minConnectTimeout func() time.Duration + resolverBuilder resolver.Builder + reqHandshake envconfig.RequireHandshakeSetting + channelzParentID int64 + disableServiceConfig bool + disableRetry bool + disableHealthCheck bool + healthCheckFunc internal.HealthChecker + minConnectTimeout func() time.Duration + defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. + defaultServiceConfigRawJSON *string } // DialOption configures how we set up the connection. @@ -441,12 +443,27 @@ func WithChannelzParentID(id int64) DialOption { // WithDisableServiceConfig returns a DialOption that causes grpc to ignore any // service config provided by the resolver and provides a hint to the resolver // to not fetch service configs. +// +// Note that, this dial option only disables service config from resolver. If +// default service config is provided, grpc will use the default service config. func WithDisableServiceConfig() DialOption { return newFuncDialOption(func(o *dialOptions) { o.disableServiceConfig = true }) } +// WithDefaultServiceConfig returns a DialOption that configures the default +// service config, which will be used in cases where: +// 1. WithDisableServiceConfig is called. +// 2. Resolver does not return service config or if the resolver gets and invalid config. +// +// This API is EXPERIMENTAL. +func WithDefaultServiceConfig(s string) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.defaultServiceConfigRawJSON = &s + }) +} + // WithDisableRetry returns a DialOption that disables retries, even if the // service config enables them. This does not impact transparent retries, which // will happen automatically if no data is written to the wire or if the RPC is diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index 176da7bd3c48..e9cef3a92b55 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -118,7 +118,7 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) { ccr.curState = s } -// NewAddress is called by the resolver implemenetion to send addresses to gRPC. +// NewAddress is called by the resolver implementation to send addresses to gRPC. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { if ccr.isDone() { return @@ -131,7 +131,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { ccr.cc.updateResolverState(ccr.curState) } -// NewServiceConfig is called by the resolver implemenetion to send service +// NewServiceConfig is called by the resolver implementation to send service // configs to gRPC. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { if ccr.isDone() { diff --git a/service_config.go b/service_config.go index 982a3bf21e65..1c5227426f49 100644 --- a/service_config.go +++ b/service_config.go @@ -99,6 +99,9 @@ type ServiceConfig struct { // healthCheckConfig must be set as one of the requirement to enable LB channel // health check. healthCheckConfig *healthCheckConfig + // rawJSONString stores service config json string that get parsed into + // this service config struct. + rawJSONString string } // healthCheckConfig defines the go-native version of the LB channel health check config. @@ -238,24 +241,22 @@ type jsonSC struct { HealthCheckConfig *healthCheckConfig } -func parseServiceConfig(js string) (ServiceConfig, error) { - if len(js) == 0 { - return ServiceConfig{}, fmt.Errorf("no JSON service config provided") - } +func parseServiceConfig(js string) (*ServiceConfig, error) { var rsc jsonSC err := json.Unmarshal([]byte(js), &rsc) if err != nil { grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) - return ServiceConfig{}, err + return nil, err } sc := ServiceConfig{ LB: rsc.LoadBalancingPolicy, Methods: make(map[string]MethodConfig), retryThrottling: rsc.RetryThrottling, healthCheckConfig: rsc.HealthCheckConfig, + rawJSONString: js, } if rsc.MethodConfig == nil { - return sc, nil + return &sc, nil } for _, m := range *rsc.MethodConfig { @@ -265,7 +266,7 @@ func parseServiceConfig(js string) (ServiceConfig, error) { d, err := parseDuration(m.Timeout) if err != nil { grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) - return ServiceConfig{}, err + return nil, err } mc := MethodConfig{ @@ -274,7 +275,7 @@ func parseServiceConfig(js string) (ServiceConfig, error) { } if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil { grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) - return ServiceConfig{}, err + return nil, err } if m.MaxRequestMessageBytes != nil { if *m.MaxRequestMessageBytes > int64(maxInt) { @@ -305,7 +306,7 @@ func parseServiceConfig(js string) (ServiceConfig, error) { sc.retryThrottling = nil } } - return sc, nil + return &sc, nil } func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) { diff --git a/service_config_test.go b/service_config_test.go index 020b643f89c8..a21416303c75 100644 --- a/service_config_test.go +++ b/service_config_test.go @@ -29,7 +29,7 @@ import ( func (s) TestParseLoadBalancer(t *testing.T) { testcases := []struct { scjs string - wantSC ServiceConfig + wantSC *ServiceConfig wantErr bool }{ { @@ -47,7 +47,7 @@ func (s) TestParseLoadBalancer(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ LB: newString("round_robin"), Methods: map[string]MethodConfig{ "/foo/Bar": { @@ -72,14 +72,14 @@ func (s) TestParseLoadBalancer(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, } for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -88,7 +88,7 @@ func (s) TestParseLoadBalancer(t *testing.T) { func (s) TestParseWaitForReady(t *testing.T) { testcases := []struct { scjs string - wantSC ServiceConfig + wantSC *ServiceConfig wantErr bool }{ { @@ -105,7 +105,7 @@ func (s) TestParseWaitForReady(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ Methods: map[string]MethodConfig{ "/foo/Bar": { WaitForReady: newBool(true), @@ -128,7 +128,7 @@ func (s) TestParseWaitForReady(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ Methods: map[string]MethodConfig{ "/foo/Bar": { WaitForReady: newBool(false), @@ -160,14 +160,14 @@ func (s) TestParseWaitForReady(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, } for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -176,7 +176,7 @@ func (s) TestParseWaitForReady(t *testing.T) { func (s) TestPraseTimeOut(t *testing.T) { testcases := []struct { scjs string - wantSC ServiceConfig + wantSC *ServiceConfig wantErr bool }{ { @@ -193,7 +193,7 @@ func (s) TestPraseTimeOut(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ Methods: map[string]MethodConfig{ "/foo/Bar": { Timeout: newDuration(time.Second), @@ -216,7 +216,7 @@ func (s) TestPraseTimeOut(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, { @@ -242,14 +242,14 @@ func (s) TestPraseTimeOut(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, } for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -258,7 +258,7 @@ func (s) TestPraseTimeOut(t *testing.T) { func (s) TestPraseMsgSize(t *testing.T) { testcases := []struct { scjs string - wantSC ServiceConfig + wantSC *ServiceConfig wantErr bool }{ { @@ -276,7 +276,7 @@ func (s) TestPraseMsgSize(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ Methods: map[string]MethodConfig{ "/foo/Bar": { MaxReqSize: newInt(1024), @@ -311,14 +311,14 @@ func (s) TestPraseMsgSize(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, } for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -384,3 +384,15 @@ func newDuration(b time.Duration) *time.Duration { func newString(b string) *string { return &b } + +func scCompareWithRawJSONSkipped(s1, s2 *ServiceConfig) bool { + if s1 == nil && s2 == nil { + return true + } + if (s1 == nil) != (s2 == nil) { + return false + } + s1.rawJSONString = "" + s2.rawJSONString = "" + return reflect.DeepEqual(s1, s2) +} From d37bd82db65e7557e0143fbe76f937a35cbff441 Mon Sep 17 00:00:00 2001 From: David Drysdale Date: Thu, 4 Apr 2019 17:58:15 +0100 Subject: [PATCH 6/7] Fix DialContext when using a timeout (#2737) Fixes #2736 --- clientconn.go | 13 +++++++------ clientconn_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/clientconn.go b/clientconn.go index e12ea3479f94..bd2d2b317798 100644 --- a/clientconn.go +++ b/clientconn.go @@ -138,12 +138,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } defer func() { - select { - case <-ctx.Done(): - conn, err = nil, ctx.Err() - default: - } - if err != nil { cc.Close() } @@ -217,6 +211,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) defer cancel() } + defer func() { + select { + case <-ctx.Done(): + conn, err = nil, ctx.Err() + default: + } + }() scSet := false if cc.dopts.scChan != nil { diff --git a/clientconn_test.go b/clientconn_test.go index ab1db5664049..356f4f25af95 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -52,6 +52,48 @@ func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.Sta return state, state == wantState } +func (s) TestDialWithTimeout(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer lis.Close() + lisAddr := resolver.Address{Addr: lis.Addr().String()} + lisDone := make(chan struct{}) + dialDone := make(chan struct{}) + // 1st listener accepts the connection and then does nothing + go func() { + defer close(lisDone) + conn, err := lis.Accept() + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + framer := http2.NewFramer(conn, conn) + if err := framer.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("Error while writing settings. Err: %v", err) + return + } + <-dialDone // Close conn only after dial returns. + }() + + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + r.InitialState(resolver.State{Addresses: []resolver.Address{lisAddr}}) + client, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithTimeout(5*time.Second)) + close(dialDone) + if err != nil { + t.Fatalf("Dial failed. Err: %v", err) + } + defer client.Close() + timeout := time.After(1 * time.Second) + select { + case <-timeout: + t.Fatal("timed out waiting for server to finish") + case <-lisDone: + } +} + func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) { lis1, err := net.Listen("tcp", "localhost:0") if err != nil { From b03f6fd5e3dfa7663a225c36a15d623159f6724b Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 5 Apr 2019 13:51:39 -0700 Subject: [PATCH 7/7] internal: balancer/xds go1.12 only (#2748) Until https://github.com/envoyproxy/go-control-plane/issues/168 is fixed --- balancer/xds/edsbalancer/balancergroup.go | 2 ++ balancer/xds/edsbalancer/balancergroup_test.go | 2 ++ balancer/xds/edsbalancer/edsbalancer.go | 2 ++ balancer/xds/edsbalancer/edsbalancer_test.go | 2 ++ balancer/xds/edsbalancer/test_util_test.go | 2 ++ balancer/xds/edsbalancer/util.go | 2 ++ balancer/xds/edsbalancer/util_test.go | 2 ++ balancer/xds/xds.go | 2 ++ balancer/xds/xds_client.go | 2 ++ balancer/xds/xds_client_test.go | 2 ++ balancer/xds/xds_test.go | 2 ++ 11 files changed, 22 insertions(+) diff --git a/balancer/xds/edsbalancer/balancergroup.go b/balancer/xds/edsbalancer/balancergroup.go index 10ddf32132cf..77b185c8b543 100644 --- a/balancer/xds/edsbalancer/balancergroup.go +++ b/balancer/xds/edsbalancer/balancergroup.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * Copyright 2019 gRPC authors. * diff --git a/balancer/xds/edsbalancer/balancergroup_test.go b/balancer/xds/edsbalancer/balancergroup_test.go index 580d9069c2c9..10ed684e599a 100644 --- a/balancer/xds/edsbalancer/balancergroup_test.go +++ b/balancer/xds/edsbalancer/balancergroup_test.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * Copyright 2019 gRPC authors. * diff --git a/balancer/xds/edsbalancer/edsbalancer.go b/balancer/xds/edsbalancer/edsbalancer.go index ebdde63bff2d..8e4efcb7c02c 100644 --- a/balancer/xds/edsbalancer/edsbalancer.go +++ b/balancer/xds/edsbalancer/edsbalancer.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * Copyright 2019 gRPC authors. * diff --git a/balancer/xds/edsbalancer/edsbalancer_test.go b/balancer/xds/edsbalancer/edsbalancer_test.go index 40ae93c2daac..592d95da2915 100644 --- a/balancer/xds/edsbalancer/edsbalancer_test.go +++ b/balancer/xds/edsbalancer/edsbalancer_test.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * Copyright 2019 gRPC authors. * diff --git a/balancer/xds/edsbalancer/test_util_test.go b/balancer/xds/edsbalancer/test_util_test.go index ed0415d47eb9..674a2b4628b9 100644 --- a/balancer/xds/edsbalancer/test_util_test.go +++ b/balancer/xds/edsbalancer/test_util_test.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * Copyright 2019 gRPC authors. * diff --git a/balancer/xds/edsbalancer/util.go b/balancer/xds/edsbalancer/util.go index 46c6283f7032..0b1a397f2ccd 100644 --- a/balancer/xds/edsbalancer/util.go +++ b/balancer/xds/edsbalancer/util.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * Copyright 2019 gRPC authors. * diff --git a/balancer/xds/edsbalancer/util_test.go b/balancer/xds/edsbalancer/util_test.go index fa5338f8273e..68471b6c19c8 100644 --- a/balancer/xds/edsbalancer/util_test.go +++ b/balancer/xds/edsbalancer/util_test.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * Copyright 2019 gRPC authors. * diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index 6a8488b9774d..1097920b84af 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * * Copyright 2019 gRPC authors. diff --git a/balancer/xds/xds_client.go b/balancer/xds/xds_client.go index d18d9033abbe..181bc3827209 100644 --- a/balancer/xds/xds_client.go +++ b/balancer/xds/xds_client.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * * Copyright 2019 gRPC authors. diff --git a/balancer/xds/xds_client_test.go b/balancer/xds/xds_client_test.go index 942f6ae07fcb..409ee099ad4a 100644 --- a/balancer/xds/xds_client_test.go +++ b/balancer/xds/xds_client_test.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * * Copyright 2019 gRPC authors. diff --git a/balancer/xds/xds_test.go b/balancer/xds/xds_test.go index 21a9fe2e4eb6..c175f086bc36 100644 --- a/balancer/xds/xds_test.go +++ b/balancer/xds/xds_test.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * * Copyright 2019 gRPC authors.