Skip to content

Commit

Permalink
Review comments #1.
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Nov 6, 2019
1 parent 9d08e07 commit 99c2b9d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 40 deletions.
30 changes: 14 additions & 16 deletions xds/internal/client/lds.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ import (
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"

discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
ldspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
)

func (v2c *v2Client) newLDSRequest(target string) *discoverypb.DiscoveryRequest {
return &discoverypb.DiscoveryRequest{
func (v2c *v2Client) newLDSRequest(target string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: listenerURL,
ResourceNames: []string{target},
Expand All @@ -45,18 +44,24 @@ func (v2c *v2Client) sendLDS(stream adsStream, target string) bool {
return true
}

func (v2c *v2Client) handleLDSResponse(resp *discoverypb.DiscoveryResponse) error {
func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error {
routeName := ""
v2c.mu.Lock()
defer v2c.mu.Unlock()

for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
return fmt.Errorf("xds: failed to unmarshal resource in LDS response: %v", err)
}
lis, ok := resource.Message.(*ldspb.Listener)
lis, ok := resource.Message.(*xdspb.Listener)
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in LDS response", resource.Message)
}
if !v2c.isListenerProtoInteresting(lis) {
// TODO: We might have to cache the results even if the listener is
// not interesting at the moment. It might become interesting later
// on, and at that time, the server might not send an update.
continue
}
if lis.GetApiListener() == nil {
Expand Down Expand Up @@ -89,20 +94,13 @@ func (v2c *v2Client) handleLDSResponse(resp *discoverypb.DiscoveryResponse) erro
err = fmt.Errorf("xds: LDS response %+v does not contain route config name", resp)
}

v2c.mu.Lock()
if v2c.ldsWatch != nil {
v2c.ldsWatch.callback(ldsUpdate{routeName: routeName}, err)
}
v2c.mu.Unlock()
return err
}

func (v2c *v2Client) isListenerProtoInteresting(lis *ldspb.Listener) bool {
interesting := false
v2c.mu.Lock()
if v2c.ldsWatch != nil && v2c.ldsWatch.target == lis.GetName() {
interesting = true
}
v2c.mu.Unlock()
return interesting
// Caller should hold v2c.mu
func (v2c *v2Client) isListenerProtoInteresting(lis *xdspb.Listener) bool {
return v2c.ldsWatch != nil && v2c.ldsWatch.target == lis.GetName()
}
42 changes: 23 additions & 19 deletions xds/internal/client/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ package client

import (
"fmt"
"strings"

"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"

discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
rdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
)

func (v2c *v2Client) newRDSRequest(routeName string) *discoverypb.DiscoveryRequest {
return &discoverypb.DiscoveryRequest{
func (v2c *v2Client) newRDSRequest(routeName string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: routeURL,
ResourceNames: []string{routeName},
Expand All @@ -44,7 +44,7 @@ func (v2c *v2Client) sendRDS(stream adsStream, routeName string) bool {
return true
}

func (v2c *v2Client) handleRDSResponse(resp *discoverypb.DiscoveryResponse) error {
func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
cluster := ""
v2c.mu.Lock()
defer v2c.mu.Unlock()
Expand All @@ -54,14 +54,14 @@ func (v2c *v2Client) handleRDSResponse(resp *discoverypb.DiscoveryResponse) erro
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
return fmt.Errorf("xds: failed to unmarshal resource in RDS response: %v", err)
}
rc, ok := resource.Message.(*rdspb.RouteConfiguration)
rc, ok := resource.Message.(*xdspb.RouteConfiguration)
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in RDS response", resource.Message)
}
if !v2c.isRouteConfigurationInteresting(rc) {
continue
}
cluster = v2c.getClusterFromRouteConfiguration(rc, v2c.ldsWatch.target)
cluster = getClusterFromRouteConfiguration(rc, v2c.ldsWatch.target)
if cluster != "" {
break
}
Expand All @@ -77,22 +77,18 @@ func (v2c *v2Client) handleRDSResponse(resp *discoverypb.DiscoveryResponse) erro
return nil
}

func (v2c *v2Client) isRouteConfigurationInteresting(rc *rdspb.RouteConfiguration) bool {
interesting := false
v2c.mu.Lock()
if v2c.rdsWatch != nil && v2c.rdsWatch.routeName == rc.GetName() {
interesting = true
}
v2c.mu.Unlock()
return interesting
// Caller should hold v2c.mu
func (v2c *v2Client) isRouteConfigurationInteresting(rc *xdspb.RouteConfiguration) bool {
return v2c.rdsWatch != nil && v2c.rdsWatch.routeName == rc.GetName()
}

func (v2c *v2Client) getClusterFromRouteConfiguration(rc *rdspb.RouteConfiguration, target string) string {
func getClusterFromRouteConfiguration(rc *xdspb.RouteConfiguration, target string) string {
host := stripPort(target)
for _, vh := range rc.GetVirtualHosts() {
for _, domain := range vh.GetDomains() {
// TODO: strip port from target before comparison
if target == domain {
if vh.GetRoutes() != nil {
// TODO: Add support for wildcard matching here.
if domain == host {
if len(vh.GetRoutes()) > 0 {
// The last route is the default route.
dr := vh.Routes[len(vh.Routes)-1]
if dr.GetMatch() == nil && dr.GetRoute() != nil {
Expand All @@ -104,3 +100,11 @@ func (v2c *v2Client) getClusterFromRouteConfiguration(rc *rdspb.RouteConfigurati
}
return ""
}

func stripPort(host string) string {
colon := strings.LastIndexByte(host, ':')
if colon == -1 {
return host
}
return host[:colon]
}
6 changes: 3 additions & 3 deletions xds/internal/client/v2client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"

basepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
)

Expand All @@ -43,7 +43,7 @@ type v2Client struct {

// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
cc *grpc.ClientConn
nodeProto *basepb.Node
nodeProto *corepb.Node
backoff func(int) time.Duration

// Message specific channels onto which, corresponding watch information is
Expand All @@ -64,7 +64,7 @@ type v2Client struct {
// newV2Client creates a new v2Client object initialized with the passed
// arguments. It also spawns a long running goroutine to send and receive xDS
// messages.
func newV2Client(cc *grpc.ClientConn, nodeProto *basepb.Node, backoff func(int) time.Duration) *v2Client {
func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration) *v2Client {
v2c := &v2Client{
cc: cc,
nodeProto: nodeProto,
Expand Down
9 changes: 7 additions & 2 deletions xds/internal/client/v2client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
)

const (
defaultTestTimeout = 2 * time.Second
defaultTestTimeout = 5 * time.Second
goodLDSTarget1 = "GoodListener1"
goodLDSTarget2 = "GoodListener2"
uninterestingLDSTarget = "UninterestingListener"
Expand Down Expand Up @@ -252,7 +252,7 @@ func testLDS(t *testing.T, ldsOps chan ldsTestOp) {

// Override the v2Client backoff function with this, so that we can verify
// that a backoff actually was triggerred.
boCh := make(chan int)
boCh := make(chan int, 1)
clientBackoff := func(v int) time.Duration {
boCh <- v
return 0
Expand Down Expand Up @@ -290,12 +290,14 @@ func testLDS(t *testing.T, ldsOps chan ldsTestOp) {
errCh <- fmt.Errorf("got error while processing LDS request: %v, want: %v", got.Err, ldsOp.wantRequest.Err)
return
}
t.Log("FakeServer received expected request...")
}

// if a response is specified in the testOp, push it to the
// fakeserver.
if ldsOp.responseToSend != nil {
fakeServer.ResponseChan <- ldsOp.responseToSend
t.Log("Response pushed to fakeServer...")
}

// Make sure the update callback was invoked, if specified in the
Expand All @@ -311,13 +313,16 @@ func testLDS(t *testing.T, ldsOps chan ldsTestOp) {
errCh <- fmt.Errorf("received error {%v} in lds callback, wantErr: %v", err, ldsOp.wantUpdateErr)
return
}
t.Log("LDS watch callback received expected update...")
}

// Make sure the stream was retried, if specified in the testOp.
if ldsOp.wantRetry {
<-boCh
t.Log("v2Client backed off before retrying...")
}
}
t.Log("Completed all test ops successfully...")
errCh <- nil
}()

Expand Down

0 comments on commit 99c2b9d

Please sign in to comment.