Skip to content

Commit

Permalink
Showing 5 changed files with 571 additions and 15 deletions.
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
b.logger = prefixLogger(b)
b.logger.Infof("Created")

b.resourceWatcher = newResourceResolver(b)
b.resourceWatcher = newResourceResolver(b, b.logger)
b.cc = &ccWrapper{
ClientConn: cc,
resourceWatcher: b.resourceWatcher,
Original file line number Diff line number Diff line change
@@ -27,13 +27,21 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/wrapperspb"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -771,3 +779,289 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
t.Fatalf("EmptyCall() failed with error %v, want %v", err, dnsErr)
}
}

// TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate tests the
// scenario where the top-level cluster is an aggregate cluster that resolves to
// an EDS and LOGICAL_DNS cluster. The management server first sends a good EDS
// response for the EDS cluster and the test verifies that RPCs get routed to
// the EDS cluster. The management server then sends a bad EDS response. The
// test verifies that the cluster_resolver LB policy continues to use the
// previously received good update and that RPCs still get routed to the EDS
// cluster.
func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends and extract their host and port. The first
// backend is used for the EDS cluster and the second backend is used for
// the LOGICAL_DNS cluster.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and DNS cluster. Also
// configure an endpoints resource for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})

// Make an RPC and ensure that it gets routed to the first backend since the
// EDS cluster is of higher priority than the LOGICAL_DNS cluster.
client := testgrpc.NewTestServiceClient(cc)
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
}

// Push an EDS resource from the management server that is expected to be
// NACKed by the xDS client. Since the cluster_resolver LB policy has a
// previously received good EDS resource, it will continue to use that.
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Ensure that RPCs continue to get routed to the EDS cluster for the next
// second.
for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
}
}
}

// TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate tests the
// scenario where the top-level cluster is an aggregate cluster that resolves to
// an EDS and LOGICAL_DNS cluster. The management server sends a bad EDS
// response. The test verifies that the cluster_resolver LB policy falls back to
// the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response
// as though it received an update with no endpoints.
func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends and extract their host and port. The first
// backend is used for the EDS cluster and the second backend is used for
// the LOGICAL_DNS cluster.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and DNS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
SkipValidation: true,
}

// Set a load balancing weight of 0 for the backend in the EDS resource.
// This is expected to be NACKed by the xDS client. Since the
// cluster_resolver LB policy has no previously received good EDS resource,
// it will treat this as though it received an update with no endpoints.
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})

// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
peer := &peer.Peer{}
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[1].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr)
}
}

// TestAggregateCluster_Fallback_EDS_ResourceNotFound tests the scenario where
// the top-level cluster is an aggregate cluster that resolves to an EDS and
// LOGICAL_DNS cluster. The management server does not respond with the EDS
// cluster. The test verifies that the cluster_resolver LB policy falls back to
// the LOGICAL_DNS cluster in this case.
func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start a test backend for the LOGICAL_DNS cluster.
server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Configure an aggregate cluster pointing to an EDS and DNS cluster. No
// endpoints are configured for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create an xDS client talking to the above management server, configured
// with a short watch expiry timeout.
xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
NodeProto: &v3corepb.Node{Id: nodeID},
}, defaultTestWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer close()

// Create a manual resolver and push a service config specifying the use of
// the cds LB policy as the top-level LB policy, and a corresponding config
// with a single cluster.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cds_experimental":{
"cluster": "%s"
}
}]
}`, clusterName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))

// Create a ClientConn.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

// Make an RPC with a short deadline. We expect this RPC to not succeed
// because the DNS resolver has not responded with endpoint addresses.
client := testgrpc.NewTestServiceClient(cc)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
}

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}})

// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
// Even though the EDS cluster is of higher priority, since the management
// server does not respond with an EDS resource, the cluster_resolver LB
// policy is expected to fallback to the LOGICAL_DNS cluster once the watch
// timeout expires.
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != server.Address {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
}
}
227 changes: 225 additions & 2 deletions xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
@@ -41,7 +42,9 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/priority"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/wrapperspb"

@@ -62,8 +65,9 @@ const (
localityName2 = "my-locality-2"
localityName3 = "my-locality-3"

defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
defaultTestWatchExpiryTimeout = 500 * time.Millisecond
)

type s struct {
@@ -852,6 +856,225 @@ func (s) TestEDS_ClusterResourceUpdates(t *testing.T) {
}
}

// TestEDS_BadUpdateWithoutPreviousGoodUpdate tests the case where the
// management server sends a bad update (one that is NACKed by the xDS client).
// Since the cluster_resolver LB policy does not have a previously received good
// update, it is expected to treat this bad update as though it received an
// update with no endpoints. Hence RPCs are expected to fail with "all
// priorities removed" error.
func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) {
// Spin up a management server to receive xDS resources from.
mgmtServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup1()

// Start a backend server that implements the TestService.
server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Create an EDS resource with a load balancing weight of 0. This will
// result in the resource being NACKed by the xDS client. Since the
// cluster_resolver LB policy does not have a previously received good EDS
// update, it should treat this update as an empty EDS update.
resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
}})
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create an xDS client for use by the cluster_resolver LB policy.
xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer close()

// Create a manual resolver and push a service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cluster_resolver_experimental":{
"discoveryMechanisms": [{
"cluster": "%s",
"type": "EDS",
"edsServiceName": "%s",
"outlierDetection": {}
}],
"xdsLbPolicy":[{"round_robin":{}}]
}
}]
}`, clusterName, edsServiceName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))

// Create a ClientConn and verify that RPCs fail with "all priorities
// removed" error.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)
if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil {
t.Fatal(err)
}
}

// TestEDS_BadUpdateWithPreviousGoodUpdate tests the case where the
// cluster_resolver LB policy receives a good EDS update from the management
// server and the test verifies that RPCs are successful. Then, a bad update is
// received from the management server (one that is NACKed by the xDS client).
// The test verifies that the previously received good update is still being
// used and that RPCs are still successful.
func (s) TestEDS_BadUpdateWithPreviousGoodUpdate(t *testing.T) {
// Spin up a management server to receive xDS resources from.
mgmtServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup1()

// Start a backend server that implements the TestService.
server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Create an EDS resource for consumption by the test.
resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
Name: localityName1,
Weight: 1,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create an xDS client for use by the cluster_resolver LB policy.
xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer close()

// Create a manual resolver and push a service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cluster_resolver_experimental":{
"discoveryMechanisms": [{
"cluster": "%s",
"type": "EDS",
"edsServiceName": "%s",
"outlierDetection": {}
}],
"xdsLbPolicy":[{"round_robin":{}}]
}
}]
}`, clusterName, edsServiceName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

// Ensure RPCs are being roundrobined across the single backend.
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: server.Address}}); err != nil {
t.Fatal(err)
}

// Update the endpoints resource in the management server with a load
// balancing weight of 0. This will result in the resource being NACKed by
// the xDS client. But since the cluster_resolver LB policy has a previously
// received good EDS update, it should continue using it.
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Ensure that RPCs continue to succeed for the next second.
for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: server.Address}}); err != nil {
t.Fatal(err)
}
}
}

// TestEDS_ResourceNotFound tests the case where the requested EDS resource does
// not exist on the management server. Once the watch timer associated with the
// requested resource expires, the cluster_resolver LB policy receives a
// "resource-not-found" callback from the xDS client and is expected to treat it
// as though it received an update with no endpoints. Hence RPCs are expected to
// fail with "all priorities removed" error.
func (s) TestEDS_ResourceNotFound(t *testing.T) {
// Spin up a management server to receive xDS resources from.
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatalf("Failed to spin up the xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create an xDS client talking to the above management server, configured
// with a short watch expiry timeout.
nodeID := uuid.New().String()
xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
NodeProto: &v3corepb.Node{Id: nodeID},
}, defaultTestWatchExpiryTimeout, time.Duration(0))
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer close()

// Configure no resources on the management server.
resources := e2e.UpdateOptions{NodeID: nodeID, SkipValidation: true}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Create a manual resolver and push a service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cluster_resolver_experimental":{
"discoveryMechanisms": [{
"cluster": "%s",
"type": "EDS",
"edsServiceName": "%s",
"outlierDetection": {}
}],
"xdsLbPolicy":[{"round_robin":{}}]
}
}]
}`, clusterName, edsServiceName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))

// Create a ClientConn and verify that RPCs fail with "all priorities
// removed" error.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)
if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil {
t.Fatal(err)
}
}

// waitForAllPrioritiesRemovedError repeatedly makes RPCs using the
// TestServiceClient until they fail with an error which indicates that all
// priorities have been removed. A non-nil error is returned if the context
7 changes: 5 additions & 2 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ package clusterresolver
import (
"sync"

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

@@ -84,6 +85,7 @@ type discoveryMechanismAndResolver struct {

type resourceResolver struct {
parent *clusterResolverBalancer
logger *grpclog.PrefixLogger
updateChannel chan *resourceUpdate

// mu protects the slice and map, and content of the resolvers in the slice.
@@ -104,9 +106,10 @@ type resourceResolver struct {
childNameGeneratorSeqID uint64
}

func newResourceResolver(parent *clusterResolverBalancer) *resourceResolver {
func newResourceResolver(parent *clusterResolverBalancer, logger *grpclog.PrefixLogger) *resourceResolver {
return &resourceResolver{
parent: parent,
logger: logger,
updateChannel: make(chan *resourceUpdate, 1),
childrenMap: make(map[discoveryMechanismKey]discoveryMechanismAndResolver),
}
@@ -172,7 +175,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
var resolver endpointsResolver
switch dm.Type {
case DiscoveryMechanismTypeEDS:
resolver = newEDSResolver(dmKey.name, rr.parent.xdsClient, rr)
resolver = newEDSResolver(dmKey.name, rr.parent.xdsClient, rr, rr.logger)
case DiscoveryMechanismTypeLogicalDNS:
resolver = newDNSResolver(dmKey.name, rr)
}
56 changes: 46 additions & 10 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ package clusterresolver
import (
"sync"

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@@ -30,20 +31,20 @@ type edsDiscoveryMechanism struct {
cancelWatch func()
topLevelResolver topLevelResolver
stopped *grpcsync.Event
logger *grpclog.PrefixLogger

mu sync.Mutex
update xdsresource.EndpointsUpdate
updateReceived bool
mu sync.Mutex
update *xdsresource.EndpointsUpdate // Nil indicates no update received so far.
}

func (er *edsDiscoveryMechanism) lastUpdate() (interface{}, bool) {
er.mu.Lock()
defer er.mu.Unlock()

if !er.updateReceived {
if er.update == nil {
return nil, false
}
return er.update, true
return *er.update, true
}

func (er *edsDiscoveryMechanism) resolveNow() {
@@ -63,10 +64,11 @@ func (er *edsDiscoveryMechanism) stop() {

// newEDSResolver returns an implementation of the endpointsResolver interface
// that uses EDS to resolve the given name to endpoints.
func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver) *edsDiscoveryMechanism {
func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver, logger *grpclog.PrefixLogger) *edsDiscoveryMechanism {
ret := &edsDiscoveryMechanism{
nameToWatch: nameToWatch,
topLevelResolver: topLevelResolver,
logger: logger,
stopped: grpcsync.NewEvent(),
}
ret.cancelWatch = xdsresource.WatchEndpoints(producer, nameToWatch, ret)
@@ -80,8 +82,7 @@ func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceD
}

er.mu.Lock()
er.update = update.Resource
er.updateReceived = true
er.update = &update.Resource
er.mu.Unlock()

er.topLevelResolver.onUpdate()
@@ -92,13 +93,48 @@ func (er *edsDiscoveryMechanism) OnError(err error) {
return
}

er.topLevelResolver.onError(err)
if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err)
}

er.mu.Lock()
if er.update != nil {
// Continue using a previously received good configuration if one
// exists.
er.mu.Unlock()
return
}

// Else report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate()
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
if er.stopped.HasFired() {
return
}

er.topLevelResolver.onError(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Endpoints not found in received response", er.nameToWatch))
if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
}

// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate()
}

0 comments on commit bf5b7ae

Please sign in to comment.