From 99c2b9d6be18bed9455b90143ff5cfc2d7a9e098 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 6 Nov 2019 02:19:55 +0000 Subject: [PATCH] Review comments #1. --- xds/internal/client/lds.go | 30 ++++++++++---------- xds/internal/client/rds.go | 42 +++++++++++++++------------- xds/internal/client/v2client.go | 6 ++-- xds/internal/client/v2client_test.go | 9 ++++-- 4 files changed, 47 insertions(+), 40 deletions(-) diff --git a/xds/internal/client/lds.go b/xds/internal/client/lds.go index a9ac9103124d..072bbf1b5825 100644 --- a/xds/internal/client/lds.go +++ b/xds/internal/client/lds.go @@ -24,13 +24,12 @@ import ( "github.com/golang/protobuf/ptypes" "google.golang.org/grpc/grpclog" - discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - ldspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" ) -func (v2c *v2Client) newLDSRequest(target string) *discoverypb.DiscoveryRequest { - return &discoverypb.DiscoveryRequest{ +func (v2c *v2Client) newLDSRequest(target string) *xdspb.DiscoveryRequest { + return &xdspb.DiscoveryRequest{ Node: v2c.nodeProto, TypeUrl: listenerURL, ResourceNames: []string{target}, @@ -45,18 +44,24 @@ func (v2c *v2Client) sendLDS(stream adsStream, target string) bool { return true } -func (v2c *v2Client) handleLDSResponse(resp *discoverypb.DiscoveryResponse) error { +func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error { routeName := "" + v2c.mu.Lock() + defer v2c.mu.Unlock() + for _, r := range resp.GetResources() { var resource ptypes.DynamicAny if err := ptypes.UnmarshalAny(r, &resource); err != nil { return fmt.Errorf("xds: failed to unmarshal resource in LDS response: %v", err) } - lis, ok := resource.Message.(*ldspb.Listener) + lis, ok := resource.Message.(*xdspb.Listener) if !ok { return fmt.Errorf("xds: unexpected resource type: %T in LDS response", resource.Message) } if !v2c.isListenerProtoInteresting(lis) { + // TODO: We might have to cache the results even if the listener is + // not interesting at the moment. It might become interesting later + // on, and at that time, the server might not send an update. continue } if lis.GetApiListener() == nil { @@ -89,20 +94,13 @@ func (v2c *v2Client) handleLDSResponse(resp *discoverypb.DiscoveryResponse) erro err = fmt.Errorf("xds: LDS response %+v does not contain route config name", resp) } - v2c.mu.Lock() if v2c.ldsWatch != nil { v2c.ldsWatch.callback(ldsUpdate{routeName: routeName}, err) } - v2c.mu.Unlock() return err } -func (v2c *v2Client) isListenerProtoInteresting(lis *ldspb.Listener) bool { - interesting := false - v2c.mu.Lock() - if v2c.ldsWatch != nil && v2c.ldsWatch.target == lis.GetName() { - interesting = true - } - v2c.mu.Unlock() - return interesting +// Caller should hold v2c.mu +func (v2c *v2Client) isListenerProtoInteresting(lis *xdspb.Listener) bool { + return v2c.ldsWatch != nil && v2c.ldsWatch.target == lis.GetName() } diff --git a/xds/internal/client/rds.go b/xds/internal/client/rds.go index 7342fbe93d92..f3151756a378 100644 --- a/xds/internal/client/rds.go +++ b/xds/internal/client/rds.go @@ -20,16 +20,16 @@ package client import ( "fmt" + "strings" "github.com/golang/protobuf/ptypes" "google.golang.org/grpc/grpclog" - discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - rdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" ) -func (v2c *v2Client) newRDSRequest(routeName string) *discoverypb.DiscoveryRequest { - return &discoverypb.DiscoveryRequest{ +func (v2c *v2Client) newRDSRequest(routeName string) *xdspb.DiscoveryRequest { + return &xdspb.DiscoveryRequest{ Node: v2c.nodeProto, TypeUrl: routeURL, ResourceNames: []string{routeName}, @@ -44,7 +44,7 @@ func (v2c *v2Client) sendRDS(stream adsStream, routeName string) bool { return true } -func (v2c *v2Client) handleRDSResponse(resp *discoverypb.DiscoveryResponse) error { +func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error { cluster := "" v2c.mu.Lock() defer v2c.mu.Unlock() @@ -54,14 +54,14 @@ func (v2c *v2Client) handleRDSResponse(resp *discoverypb.DiscoveryResponse) erro if err := ptypes.UnmarshalAny(r, &resource); err != nil { return fmt.Errorf("xds: failed to unmarshal resource in RDS response: %v", err) } - rc, ok := resource.Message.(*rdspb.RouteConfiguration) + rc, ok := resource.Message.(*xdspb.RouteConfiguration) if !ok { return fmt.Errorf("xds: unexpected resource type: %T in RDS response", resource.Message) } if !v2c.isRouteConfigurationInteresting(rc) { continue } - cluster = v2c.getClusterFromRouteConfiguration(rc, v2c.ldsWatch.target) + cluster = getClusterFromRouteConfiguration(rc, v2c.ldsWatch.target) if cluster != "" { break } @@ -77,22 +77,18 @@ func (v2c *v2Client) handleRDSResponse(resp *discoverypb.DiscoveryResponse) erro return nil } -func (v2c *v2Client) isRouteConfigurationInteresting(rc *rdspb.RouteConfiguration) bool { - interesting := false - v2c.mu.Lock() - if v2c.rdsWatch != nil && v2c.rdsWatch.routeName == rc.GetName() { - interesting = true - } - v2c.mu.Unlock() - return interesting +// Caller should hold v2c.mu +func (v2c *v2Client) isRouteConfigurationInteresting(rc *xdspb.RouteConfiguration) bool { + return v2c.rdsWatch != nil && v2c.rdsWatch.routeName == rc.GetName() } -func (v2c *v2Client) getClusterFromRouteConfiguration(rc *rdspb.RouteConfiguration, target string) string { +func getClusterFromRouteConfiguration(rc *xdspb.RouteConfiguration, target string) string { + host := stripPort(target) for _, vh := range rc.GetVirtualHosts() { for _, domain := range vh.GetDomains() { - // TODO: strip port from target before comparison - if target == domain { - if vh.GetRoutes() != nil { + // TODO: Add support for wildcard matching here. + if domain == host { + if len(vh.GetRoutes()) > 0 { // The last route is the default route. dr := vh.Routes[len(vh.Routes)-1] if dr.GetMatch() == nil && dr.GetRoute() != nil { @@ -104,3 +100,11 @@ func (v2c *v2Client) getClusterFromRouteConfiguration(rc *rdspb.RouteConfigurati } return "" } + +func stripPort(host string) string { + colon := strings.LastIndexByte(host, ':') + if colon == -1 { + return host + } + return host[:colon] +} diff --git a/xds/internal/client/v2client.go b/xds/internal/client/v2client.go index 91af573c6323..453fcc4530cb 100644 --- a/xds/internal/client/v2client.go +++ b/xds/internal/client/v2client.go @@ -27,7 +27,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/grpclog" - basepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" ) @@ -43,7 +43,7 @@ type v2Client struct { // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. cc *grpc.ClientConn - nodeProto *basepb.Node + nodeProto *corepb.Node backoff func(int) time.Duration // Message specific channels onto which, corresponding watch information is @@ -64,7 +64,7 @@ type v2Client struct { // newV2Client creates a new v2Client object initialized with the passed // arguments. It also spawns a long running goroutine to send and receive xDS // messages. -func newV2Client(cc *grpc.ClientConn, nodeProto *basepb.Node, backoff func(int) time.Duration) *v2Client { +func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration) *v2Client { v2c := &v2Client{ cc: cc, nodeProto: nodeProto, diff --git a/xds/internal/client/v2client_test.go b/xds/internal/client/v2client_test.go index 84a128ac8510..4bdd6c34db62 100644 --- a/xds/internal/client/v2client_test.go +++ b/xds/internal/client/v2client_test.go @@ -42,7 +42,7 @@ import ( ) const ( - defaultTestTimeout = 2 * time.Second + defaultTestTimeout = 5 * time.Second goodLDSTarget1 = "GoodListener1" goodLDSTarget2 = "GoodListener2" uninterestingLDSTarget = "UninterestingListener" @@ -252,7 +252,7 @@ func testLDS(t *testing.T, ldsOps chan ldsTestOp) { // Override the v2Client backoff function with this, so that we can verify // that a backoff actually was triggerred. - boCh := make(chan int) + boCh := make(chan int, 1) clientBackoff := func(v int) time.Duration { boCh <- v return 0 @@ -290,12 +290,14 @@ func testLDS(t *testing.T, ldsOps chan ldsTestOp) { errCh <- fmt.Errorf("got error while processing LDS request: %v, want: %v", got.Err, ldsOp.wantRequest.Err) return } + t.Log("FakeServer received expected request...") } // if a response is specified in the testOp, push it to the // fakeserver. if ldsOp.responseToSend != nil { fakeServer.ResponseChan <- ldsOp.responseToSend + t.Log("Response pushed to fakeServer...") } // Make sure the update callback was invoked, if specified in the @@ -311,13 +313,16 @@ func testLDS(t *testing.T, ldsOps chan ldsTestOp) { errCh <- fmt.Errorf("received error {%v} in lds callback, wantErr: %v", err, ldsOp.wantUpdateErr) return } + t.Log("LDS watch callback received expected update...") } // Make sure the stream was retried, if specified in the testOp. if ldsOp.wantRetry { <-boCh + t.Log("v2Client backed off before retrying...") } } + t.Log("Completed all test ops successfully...") errCh <- nil }()