diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index 5eadd1ac1d0e..6faf81ab552c 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -85,7 +85,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal b.logger = prefixLogger(b) b.logger.Infof("Created") - b.resourceWatcher = newResourceResolver(b) + b.resourceWatcher = newResourceResolver(b, b.logger) b.cc = &ccWrapper{ ClientConn: cc, resourceWatcher: b.resourceWatcher, diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index ea0cdf6fc257..b4740eea6d0b 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -27,13 +27,21 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/status" + xdstestutils "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + "google.golang.org/protobuf/types/known/wrapperspb" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -771,3 +779,289 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { t.Fatalf("EmptyCall() failed with error %v, want %v", err, dnsErr) } } + +// TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate tests the +// scenario where the top-level cluster is an aggregate cluster that resolves to +// an EDS and LOGICAL_DNS cluster. The management server first sends a good EDS +// response for the EDS cluster and the test verifies that RPCs get routed to +// the EDS cluster. The management server then sends a bad EDS response. The +// test verifies that the cluster_resolver LB policy continues to use the +// previously received good update and that RPCs still get routed to the EDS +// cluster. +func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) { + dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() + defer cleanup1() + + // Start an xDS management server. + mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer cleanup2() + + // Start two test backends and extract their host and port. The first + // backend is used for the EDS cluster and the second backend is used for + // the LOGICAL_DNS cluster. + servers, cleanup3 := startTestServiceBackends(t, 2) + defer cleanup3() + addrs, ports := backendAddressesAndPorts(t, servers) + + // Configure an aggregate cluster pointing to an EDS and DNS cluster. Also + // configure an endpoints resource for the EDS cluster. + const ( + edsClusterName = clusterName + "-eds" + dnsClusterName = clusterName + "-dns" + dnsHostName = "dns_host" + dnsPort = uint32(8080) + ) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), + e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create xDS client, configure cds_experimental LB policy with a manual + // resolver, and dial the test backends. + cc, cleanup := setupAndDial(t, bootstrapContents) + defer cleanup() + + // Ensure that the DNS resolver is started for the expected target. + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for DNS resolver to be started") + case target := <-dnsTargetCh: + got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) + if got != want { + t.Fatalf("DNS resolution started for target %q, want %q", got, want) + } + } + + // Update DNS resolver with test backend addresses. + dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) + + // Make an RPC and ensure that it gets routed to the first backend since the + // EDS cluster is of higher priority than the LOGICAL_DNS cluster. + client := testgrpc.NewTestServiceClient(cc) + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + if peer.Addr.String() != addrs[0].Addr { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr) + } + + // Push an EDS resource from the management server that is expected to be + // NACKed by the xDS client. Since the cluster_resolver LB policy has a + // previously received good EDS resource, it will continue to use that. + resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Ensure that RPCs continue to get routed to the EDS cluster for the next + // second. + for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + if peer.Addr.String() != addrs[0].Addr { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr) + } + } +} + +// TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate tests the +// scenario where the top-level cluster is an aggregate cluster that resolves to +// an EDS and LOGICAL_DNS cluster. The management server sends a bad EDS +// response. The test verifies that the cluster_resolver LB policy falls back to +// the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response +// as though it received an update with no endpoints. +func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) { + dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() + defer cleanup1() + + // Start an xDS management server. + mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer cleanup2() + + // Start two test backends and extract their host and port. The first + // backend is used for the EDS cluster and the second backend is used for + // the LOGICAL_DNS cluster. + servers, cleanup3 := startTestServiceBackends(t, 2) + defer cleanup3() + addrs, ports := backendAddressesAndPorts(t, servers) + + // Configure an aggregate cluster pointing to an EDS and DNS cluster. + const ( + edsClusterName = clusterName + "-eds" + dnsClusterName = clusterName + "-dns" + dnsHostName = "dns_host" + dnsPort = uint32(8080) + ) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), + e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})}, + SkipValidation: true, + } + + // Set a load balancing weight of 0 for the backend in the EDS resource. + // This is expected to be NACKed by the xDS client. Since the + // cluster_resolver LB policy has no previously received good EDS resource, + // it will treat this as though it received an update with no endpoints. + resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0} + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create xDS client, configure cds_experimental LB policy with a manual + // resolver, and dial the test backends. + cc, cleanup := setupAndDial(t, bootstrapContents) + defer cleanup() + + // Ensure that the DNS resolver is started for the expected target. + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for DNS resolver to be started") + case target := <-dnsTargetCh: + got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) + if got != want { + t.Fatalf("DNS resolution started for target %q, want %q", got, want) + } + } + + // Update DNS resolver with test backend addresses. + dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) + + // Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster. + peer := &peer.Peer{} + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + if peer.Addr.String() != addrs[1].Addr { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr) + } +} + +// TestAggregateCluster_Fallback_EDS_ResourceNotFound tests the scenario where +// the top-level cluster is an aggregate cluster that resolves to an EDS and +// LOGICAL_DNS cluster. The management server does not respond with the EDS +// cluster. The test verifies that the cluster_resolver LB policy falls back to +// the LOGICAL_DNS cluster in this case. +func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { + dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() + defer cleanup1() + + // Start an xDS management server. + mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer cleanup2() + + // Start a test backend for the LOGICAL_DNS cluster. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + + // Configure an aggregate cluster pointing to an EDS and DNS cluster. No + // endpoints are configured for the EDS cluster. + const ( + edsClusterName = clusterName + "-eds" + dnsClusterName = clusterName + "-dns" + dnsHostName = "dns_host" + dnsPort = uint32(8080) + ) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), + e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client talking to the above management server, configured + // with a short watch expiry timeout. + xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ + XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), + NodeProto: &v3corepb.Node{Id: nodeID}, + }, defaultTestWatchExpiryTimeout, time.Duration(0)) + if err != nil { + t.Fatalf("failed to create xds client: %v", err) + } + defer close() + + // Create a manual resolver and push a service config specifying the use of + // the cds LB policy as the top-level LB policy, and a corresponding config + // with a single cluster. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cds_experimental":{ + "cluster": "%s" + } + }] + }`, clusterName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) + + // Create a ClientConn. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + // Make an RPC with a short deadline. We expect this RPC to not succeed + // because the DNS resolver has not responded with endpoint addresses. + client := testgrpc.NewTestServiceClient(cc) + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { + t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded) + } + + // Ensure that the DNS resolver is started for the expected target. + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for DNS resolver to be started") + case target := <-dnsTargetCh: + got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) + if got != want { + t.Fatalf("DNS resolution started for target %q, want %q", got, want) + } + } + + // Update DNS resolver with test backend addresses. + dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}}) + + // Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster. + // Even though the EDS cluster is of higher priority, since the management + // server does not respond with an EDS resource, the cluster_resolver LB + // policy is expected to fallback to the LOGICAL_DNS cluster once the watch + // timeout expires. + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + if peer.Addr.String() != server.Address { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address) + } +} diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index f49528499356..f2089e9640a0 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" @@ -41,7 +42,9 @@ import ( "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/balancer/priority" + xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" "google.golang.org/protobuf/types/known/wrapperspb" @@ -62,8 +65,9 @@ const ( localityName2 = "my-locality-2" localityName3 = "my-locality-3" - defaultTestTimeout = 5 * time.Second - defaultTestShortTimeout = 10 * time.Millisecond + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond + defaultTestWatchExpiryTimeout = 500 * time.Millisecond ) type s struct { @@ -852,6 +856,225 @@ func (s) TestEDS_ClusterResourceUpdates(t *testing.T) { } } +// TestEDS_BadUpdateWithoutPreviousGoodUpdate tests the case where the +// management server sends a bad update (one that is NACKed by the xDS client). +// Since the cluster_resolver LB policy does not have a previously received good +// update, it is expected to treat this bad update as though it received an +// update with no endpoints. Hence RPCs are expected to fail with "all +// priorities removed" error. +func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) { + // Spin up a management server to receive xDS resources from. + mgmtServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup1() + + // Start a backend server that implements the TestService. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + + // Create an EDS resource with a load balancing weight of 0. This will + // result in the resource being NACKed by the xDS client. Since the + // cluster_resolver LB policy does not have a previously received good EDS + // update, it should treat this update as an empty EDS update. + resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}}, + }}) + resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0} + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client for use by the cluster_resolver LB policy. + xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Create a manual resolver and push a service config specifying the use of + // the cluster_resolver LB policy with a single discovery mechanism. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "%s", + "type": "EDS", + "edsServiceName": "%s", + "outlierDetection": {} + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, clusterName, edsServiceName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) + + // Create a ClientConn and verify that RPCs fail with "all priorities + // removed" error. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) + if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil { + t.Fatal(err) + } +} + +// TestEDS_BadUpdateWithPreviousGoodUpdate tests the case where the +// cluster_resolver LB policy receives a good EDS update from the management +// server and the test verifies that RPCs are successful. Then, a bad update is +// received from the management server (one that is NACKed by the xDS client). +// The test verifies that the previously received good update is still being +// used and that RPCs are still successful. +func (s) TestEDS_BadUpdateWithPreviousGoodUpdate(t *testing.T) { + // Spin up a management server to receive xDS resources from. + mgmtServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup1() + + // Start a backend server that implements the TestService. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + + // Create an EDS resource for consumption by the test. + resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}}, + }}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client for use by the cluster_resolver LB policy. + xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Create a manual resolver and push a service config specifying the use of + // the cluster_resolver LB policy with a single discovery mechanism. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "%s", + "type": "EDS", + "edsServiceName": "%s", + "outlierDetection": {} + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, clusterName, edsServiceName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + // Ensure RPCs are being roundrobined across the single backend. + client := testgrpc.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: server.Address}}); err != nil { + t.Fatal(err) + } + + // Update the endpoints resource in the management server with a load + // balancing weight of 0. This will result in the resource being NACKed by + // the xDS client. But since the cluster_resolver LB policy has a previously + // received good EDS update, it should continue using it. + resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Ensure that RPCs continue to succeed for the next second. + for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: server.Address}}); err != nil { + t.Fatal(err) + } + } +} + +// TestEDS_ResourceNotFound tests the case where the requested EDS resource does +// not exist on the management server. Once the watch timer associated with the +// requested resource expires, the cluster_resolver LB policy receives a +// "resource-not-found" callback from the xDS client and is expected to treat it +// as though it received an update with no endpoints. Hence RPCs are expected to +// fail with "all priorities removed" error. +func (s) TestEDS_ResourceNotFound(t *testing.T) { + // Spin up a management server to receive xDS resources from. + mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) + if err != nil { + t.Fatalf("Failed to spin up the xDS management server: %v", err) + } + defer mgmtServer.Stop() + + // Create an xDS client talking to the above management server, configured + // with a short watch expiry timeout. + nodeID := uuid.New().String() + xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ + XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), + NodeProto: &v3corepb.Node{Id: nodeID}, + }, defaultTestWatchExpiryTimeout, time.Duration(0)) + if err != nil { + t.Fatalf("failed to create xds client: %v", err) + } + defer close() + + // Configure no resources on the management server. + resources := e2e.UpdateOptions{NodeID: nodeID, SkipValidation: true} + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Create a manual resolver and push a service config specifying the use of + // the cluster_resolver LB policy with a single discovery mechanism. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "%s", + "type": "EDS", + "edsServiceName": "%s", + "outlierDetection": {} + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, clusterName, edsServiceName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) + + // Create a ClientConn and verify that RPCs fail with "all priorities + // removed" error. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) + if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil { + t.Fatal(err) + } +} + // waitForAllPrioritiesRemovedError repeatedly makes RPCs using the // TestServiceClient until they fail with an error which indicates that all // priorities have been removed. A non-nil error is returned if the context diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index 580734a02154..142548ab3861 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -21,6 +21,7 @@ package clusterresolver import ( "sync" + "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -84,6 +85,7 @@ type discoveryMechanismAndResolver struct { type resourceResolver struct { parent *clusterResolverBalancer + logger *grpclog.PrefixLogger updateChannel chan *resourceUpdate // mu protects the slice and map, and content of the resolvers in the slice. @@ -104,9 +106,10 @@ type resourceResolver struct { childNameGeneratorSeqID uint64 } -func newResourceResolver(parent *clusterResolverBalancer) *resourceResolver { +func newResourceResolver(parent *clusterResolverBalancer, logger *grpclog.PrefixLogger) *resourceResolver { return &resourceResolver{ parent: parent, + logger: logger, updateChannel: make(chan *resourceUpdate, 1), childrenMap: make(map[discoveryMechanismKey]discoveryMechanismAndResolver), } @@ -172,7 +175,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) { var resolver endpointsResolver switch dm.Type { case DiscoveryMechanismTypeEDS: - resolver = newEDSResolver(dmKey.name, rr.parent.xdsClient, rr) + resolver = newEDSResolver(dmKey.name, rr.parent.xdsClient, rr, rr.logger) case DiscoveryMechanismTypeLogicalDNS: resolver = newDNSResolver(dmKey.name, rr) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go index 6fec20151f42..86af73cbae21 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go @@ -21,6 +21,7 @@ package clusterresolver import ( "sync" + "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -30,20 +31,20 @@ type edsDiscoveryMechanism struct { cancelWatch func() topLevelResolver topLevelResolver stopped *grpcsync.Event + logger *grpclog.PrefixLogger - mu sync.Mutex - update xdsresource.EndpointsUpdate - updateReceived bool + mu sync.Mutex + update *xdsresource.EndpointsUpdate // Nil indicates no update received so far. } func (er *edsDiscoveryMechanism) lastUpdate() (interface{}, bool) { er.mu.Lock() defer er.mu.Unlock() - if !er.updateReceived { + if er.update == nil { return nil, false } - return er.update, true + return *er.update, true } func (er *edsDiscoveryMechanism) resolveNow() { @@ -63,10 +64,11 @@ func (er *edsDiscoveryMechanism) stop() { // newEDSResolver returns an implementation of the endpointsResolver interface // that uses EDS to resolve the given name to endpoints. -func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver) *edsDiscoveryMechanism { +func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver, logger *grpclog.PrefixLogger) *edsDiscoveryMechanism { ret := &edsDiscoveryMechanism{ nameToWatch: nameToWatch, topLevelResolver: topLevelResolver, + logger: logger, stopped: grpcsync.NewEvent(), } ret.cancelWatch = xdsresource.WatchEndpoints(producer, nameToWatch, ret) @@ -80,8 +82,7 @@ func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceD } er.mu.Lock() - er.update = update.Resource - er.updateReceived = true + er.update = &update.Resource er.mu.Unlock() er.topLevelResolver.onUpdate() @@ -92,7 +93,28 @@ func (er *edsDiscoveryMechanism) OnError(err error) { return } - er.topLevelResolver.onError(err) + if er.logger.V(2) { + er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err) + } + + er.mu.Lock() + if er.update != nil { + // Continue using a previously received good configuration if one + // exists. + er.mu.Unlock() + return + } + + // Else report an empty update that would result in no priority child being + // created for this discovery mechanism. This would result in the priority + // LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or + // localities) if this was the only discovery mechanism, or would result in + // the priority LB policy using a lower priority discovery mechanism when + // that becomes available. + er.update = &xdsresource.EndpointsUpdate{} + er.mu.Unlock() + + er.topLevelResolver.onUpdate() } func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() { @@ -100,5 +122,19 @@ func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() { return } - er.topLevelResolver.onError(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Endpoints not found in received response", er.nameToWatch)) + if er.logger.V(2) { + er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch) + } + + // Report an empty update that would result in no priority child being + // created for this discovery mechanism. This would result in the priority + // LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or + // localities) if this was the only discovery mechanism, or would result in + // the priority LB policy using a lower priority discovery mechanism when + // that becomes available. + er.mu.Lock() + er.update = &xdsresource.EndpointsUpdate{} + er.mu.Unlock() + + er.topLevelResolver.onUpdate() }