From 50ba981d69e10abfce705d2e4050d76f15c49f4b Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 20 Nov 2024 14:19:54 +0530 Subject: [PATCH 1/8] Add support for multiple addresses per endpoint --- .../weightedroundrobin/weightedroundrobin.go | 7 + internal/envconfig/xds.go | 4 + internal/testutils/xds/e2e/clientresources.go | 17 ++ stats/opentelemetry/e2e_test.go | 2 +- .../clusterresolver/clusterresolver.go | 30 ++- .../balancer/clusterresolver/configbuilder.go | 62 ++--- .../clusterresolver/configbuilder_test.go | 109 +++++---- .../clusterresolver/e2e_test/eds_impl_test.go | 170 ++++++++++++++ xds/internal/balancer/wrrlocality/balancer.go | 7 + xds/internal/internal.go | 6 + .../xdsclient/xdsresource/type_eds.go | 7 +- .../xdsclient/xdsresource/unmarshal_eds.go | 24 +- .../xdsresource/unmarshal_eds_test.go | 213 ++++++++++++++---- 13 files changed, 526 insertions(+), 132 deletions(-) diff --git a/balancer/weightedroundrobin/weightedroundrobin.go b/balancer/weightedroundrobin/weightedroundrobin.go index 8741fdad19dc..258cdd5db280 100644 --- a/balancer/weightedroundrobin/weightedroundrobin.go +++ b/balancer/weightedroundrobin/weightedroundrobin.go @@ -56,6 +56,13 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address { return addr } +// SetAddrInfoInEndpoint returns a copy of endpoint in which the Attributes +// field is updated with addrInfo. +func SetAddrInfoInEndpoint(endpoint resolver.Endpoint, addrInfo AddrInfo) resolver.Endpoint { + endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, addrInfo) + return endpoint +} + // GetAddrInfo returns the AddrInfo stored in the BalancerAttributes field of // addr. func GetAddrInfo(addr resolver.Address) AddrInfo { diff --git a/internal/envconfig/xds.go b/internal/envconfig/xds.go index 29f234acb1b9..5c0d9d26833b 100644 --- a/internal/envconfig/xds.go +++ b/internal/envconfig/xds.go @@ -53,4 +53,8 @@ var ( // C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing. C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI") + + // XDSDualstackEndpointsEnabled is true if gRPC should read the + // "additional addresses" in the xDS endpoint resource. + XDSDualstackEndpointsEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS", false) ) diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index 526f22aeab71..d369e8f19f81 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -700,6 +700,10 @@ type BackendOptions struct { HealthStatus v3corepb.HealthStatus // Weight sets the backend weight. Defaults to 1. Weight uint32 + // Additional port number on which the backend is accepting connections. + // All backends are expected to run on localhost, hence host name is not + // stored here. + AdditionalPorts []uint32 } // EndpointOptions contains options to configure an Endpoint (or @@ -747,6 +751,18 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad if b.Weight == 0 { b.Weight = 1 } + additionalAddresses := make([]*v3endpointpb.Endpoint_AdditionalAddress, len(b.AdditionalPorts)) + for i, p := range b.AdditionalPorts { + additionalAddresses[i] = &v3endpointpb.Endpoint_AdditionalAddress{ + Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ + SocketAddress: &v3corepb.SocketAddress{ + Protocol: v3corepb.SocketAddress_TCP, + Address: opts.Host, + PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: p}, + }}, + }, + } + } lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{ HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{ Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ @@ -756,6 +772,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Port}, }, }}, + AdditionalAddresses: additionalAddresses, }}, HealthStatus: b.HealthStatus, LoadBalancingWeight: &wrapperspb.UInt32Value{Value: b.Weight}, diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index e56c0fe94805..5a5ecbecd47c 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -473,7 +473,7 @@ func (s) TestWRRMetrics(t *testing.T) { // scheduler. receivedExpectedMetrics := grpcsync.NewEvent() go func() { - for !receivedExpectedMetrics.HasFired() { + for !receivedExpectedMetrics.HasFired() && ctx.Err() == nil { client.EmptyCall(ctx, &testpb.Empty{}) time.Sleep(2 * time.Millisecond) } diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index ae2c5fe957a2..b052157d69c4 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -234,7 +234,7 @@ func (b *clusterResolverBalancer) updateChildConfig() { b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts) } - childCfgBytes, addrs, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy) + childCfgBytes, endpoints, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy) if err != nil { b.logger.Warningf("Failed to build child policy config: %v", err) return @@ -248,15 +248,33 @@ func (b *clusterResolverBalancer) updateChildConfig() { b.logger.Infof("Built child policy config: %s", pretty.ToJSON(childCfg)) } - endpoints := make([]resolver.Endpoint, len(addrs)) - for i, a := range addrs { - endpoints[i].Attributes = a.BalancerAttributes - endpoints[i].Addresses = []resolver.Address{a} + flattenedAddrs := make([]resolver.Address, len(endpoints)) + for i := range endpoints { + for j := range endpoints[i].Addresses { + addr := endpoints[i].Addresses[j] + addr.BalancerAttributes = endpoints[i].Attributes + // If the endpoint has multiple addresses, only the first is added + // to the flattened address list. This ensures that LB policies + // that doesn't support endpoints, create only one subchannel to a + // backend. + if j == 0 { + flattenedAddrs[i] = addr + } + // BalancerAttributes need to be present in endpoint addresses. This + // temporary workaround is required to make load reporting work + // with the old pickfirst policy which creates SubConns with multiple + // addresses. Since the addresses can be from different localities, + // an Address.BalancerAttribute is used to identify the locality of the + // address used by the transport. This workaround can be removed once + // the old pickfirst is removed. + // See https://github.com/grpc/grpc-go/issues/7339 + endpoints[i].Addresses[j] = addr + } } if err := b.child.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ Endpoints: endpoints, - Addresses: addrs, + Addresses: flattenedAddrs, ServiceConfig: b.configRaw, Attributes: b.attrsWithClient, }, diff --git a/xds/internal/balancer/clusterresolver/configbuilder.go b/xds/internal/balancer/clusterresolver/configbuilder.go index f62b8e6c8eb5..f2c0089b161e 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder.go +++ b/xds/internal/balancer/clusterresolver/configbuilder.go @@ -71,8 +71,8 @@ type priorityConfig struct { // ┌──────▼─────┐ ┌─────▼──────┐ // │xDSLBPolicy │ │xDSLBPolicy │ (Locality and Endpoint picking layer) // └────────────┘ └────────────┘ -func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) { - pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy) +func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Endpoint, error) { + pc, endpoints, err := buildPriorityConfig(priorities, xdsLBPolicy) if err != nil { return nil, nil, fmt.Errorf("failed to build priority config: %v", err) } @@ -80,23 +80,23 @@ func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internals if err != nil { return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err) } - return ret, addrs, nil + return ret, endpoints, nil } -func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) { +func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Endpoint, error) { var ( - retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} - retAddrs []resolver.Address + retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} + retEndpoints []resolver.Endpoint ) for _, p := range priorities { switch p.mechanism.Type { case DiscoveryMechanismTypeEDS: - names, configs, addrs, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy) + names, configs, endpoints, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy) if err != nil { return nil, nil, err } retConfig.Priorities = append(retConfig.Priorities, names...) - retAddrs = append(retAddrs, addrs...) + retEndpoints = append(retEndpoints, endpoints...) odCfgs := convertClusterImplMapToOutlierDetection(configs, p.mechanism.outlierDetection) for n, c := range odCfgs { retConfig.Children[n] = &priority.Child{ @@ -107,9 +107,9 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi } continue case DiscoveryMechanismTypeLogicalDNS: - name, config, addrs := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism) + name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism) retConfig.Priorities = append(retConfig.Priorities, name) - retAddrs = append(retAddrs, addrs...) + retEndpoints = append(retEndpoints, endpoints...) odCfg := makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection) retConfig.Children[name] = &priority.Child{ Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: odCfg}, @@ -120,7 +120,7 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi continue } } - return retConfig, retAddrs, nil + return retConfig, retEndpoints, nil } func convertClusterImplMapToOutlierDetection(ciCfgs map[string]*clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) map[string]*outlierdetection.LBConfig { @@ -137,19 +137,20 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out return &odCfgRet } -func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) { +func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Endpoint) { // Endpoint picking policy for DNS is hardcoded to pick_first. const childPolicy = "pick_first" - retAddrs := make([]resolver.Address, 0, len(addrStrs)) + retEndpoints := make([]resolver.Endpoint, 0, len(addrStrs)) pName := fmt.Sprintf("priority-%v", g.prefix) for _, addrStr := range addrStrs { - retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName})) + endpoint := resolver.Endpoint{Addresses: []resolver.Address{{Addr: addrStr}}} + retEndpoints = append(retEndpoints, hierarchy.SetInEndpoint(endpoint, []string{pName})) } return pName, &clusterimpl.LBConfig{ Cluster: mechanism.Cluster, TelemetryLabels: mechanism.TelemetryLabels, ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}, - }, retAddrs + }, retEndpoints } // buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for @@ -161,7 +162,7 @@ func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism // - map{"p0":p0_config, "p1":p1_config} // - [p0_address_0, p0_address_1, p1_address_0, p1_address_1] // - p0 addresses' hierarchy attributes are set to p0 -func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) { +func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Endpoint, error) { drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops)) for _, d := range edsResp.Drops { drops = append(drops, clusterimpl.DropConfig{ @@ -183,17 +184,17 @@ func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.Endpoint } retNames := g.generate(priorities) retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames)) - var retAddrs []resolver.Address + var retEndpoints []resolver.Endpoint for i, pName := range retNames { priorityLocalities := priorities[i] - cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy) + cfg, endpoints, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy) if err != nil { return nil, nil, nil, err } retConfigs[pName] = cfg - retAddrs = append(retAddrs, addrs...) + retEndpoints = append(retEndpoints, endpoints...) } - return retNames, retConfigs, retAddrs, nil + return retNames, retConfigs, retEndpoints, nil } // groupLocalitiesByPriority returns the localities grouped by priority. @@ -244,8 +245,8 @@ func dedupSortedIntSlice(a []int) []int { // priority), and generates a cluster impl policy config, and a list of // addresses with their path hierarchy set to [priority-name, locality-name], so // priority and the xDS LB Policy know which child policy each address is for. -func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) { - var addrs []resolver.Address +func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Endpoint, error) { + var retEndpoints []resolver.Endpoint for _, locality := range localities { var lw uint32 = 1 if locality.Weight != 0 { @@ -262,21 +263,24 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown { continue } - addr := resolver.Address{Addr: endpoint.Address} - addr = hierarchy.Set(addr, []string{priorityName, localityStr}) - addr = internal.SetLocalityID(addr, locality.ID) + resolverEndpoint := resolver.Endpoint{} + for _, as := range append([]string{endpoint.Address}, endpoint.AdditionalAddress...) { + resolverEndpoint.Addresses = append(resolverEndpoint.Addresses, resolver.Address{Addr: as}) + } + resolverEndpoint = hierarchy.SetInEndpoint(resolverEndpoint, []string{priorityName, localityStr}) + resolverEndpoint = internal.SetLocalityIDInEndpoint(resolverEndpoint, locality.ID) // "To provide the xds_wrr_locality load balancer information about // locality weights received from EDS, the cluster resolver will // populate a new locality weight attribute for each address The // attribute will have the weight (as an integer) of the locality // the address is part of." - A52 - addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: lw}) + resolverEndpoint = wrrlocality.SetAddrInfoInEndpoint(resolverEndpoint, wrrlocality.AddrInfo{LocalityWeight: lw}) var ew uint32 = 1 if endpoint.Weight != 0 { ew = endpoint.Weight } - addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew}) - addrs = append(addrs, addr) + resolverEndpoint = weightedroundrobin.SetAddrInfoInEndpoint(resolverEndpoint, weightedroundrobin.AddrInfo{Weight: lw * ew}) + retEndpoints = append(retEndpoints, resolverEndpoint) } } return &clusterimpl.LBConfig{ @@ -287,5 +291,5 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority TelemetryLabels: mechanism.TelemetryLabels, DropCategories: drops, ChildPolicy: xdsLBPolicy, - }, addrs, nil + }, retEndpoints, nil } diff --git a/xds/internal/balancer/clusterresolver/configbuilder_test.go b/xds/internal/balancer/clusterresolver/configbuilder_test.go index 01055886865c..a9a6a802aa84 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder_test.go +++ b/xds/internal/balancer/clusterresolver/configbuilder_test.go @@ -51,8 +51,8 @@ const ( testDropCategory = "test-drops" testDropOverMillion = 1 - localityCount = 5 - addressPerLocality = 2 + localityCount = 5 + endpointPerLocality = 2 ) var ( @@ -62,12 +62,12 @@ var ( testLocalitiesP0, testLocalitiesP1 []xdsresource.Locality - addrCmpOpts = cmp.Options{ + endpointCmpOpts = cmp.Options{ cmp.AllowUnexported(attributes.Attributes{}), - cmp.Transformer("SortAddrs", func(in []resolver.Address) []resolver.Address { - out := append([]resolver.Address(nil), in...) // Copy input to avoid mutating it + cmp.Transformer("SortEndpoints", func(in []resolver.Endpoint) []resolver.Endpoint { + out := append([]resolver.Endpoint(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { - return out[i].Addr < out[j].Addr + return out[i].Addresses[0].Addr < out[j].Addresses[0].Addr }) return out }), @@ -88,12 +88,16 @@ func init() { addrs []string ends []xdsresource.Endpoint ) - for j := 0; j < addressPerLocality; j++ { + for j := 0; j < endpointPerLocality; j++ { addr := fmt.Sprintf("addr-%d-%d", i, j) addrs = append(addrs, addr) ends = append(ends, xdsresource.Endpoint{ Address: addr, HealthStatus: xdsresource.EndpointHealthStatusHealthy, + AdditionalAddress: []string{ + fmt.Sprintf("addr-%d-%d-additional-1", i, j), + fmt.Sprintf("addr-%d-%d-additional-2", i, j), + }, }) } testAddressStrs = append(testAddressStrs, addrs) @@ -309,9 +313,11 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { Name: "pick_first", }, } - wantAddrs := []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-3"}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-3"}), + e1 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][0].Address}}} + e2 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][1].Address}}} + wantEndpoints := []resolver.Endpoint{ + hierarchy.SetInEndpoint(e1, []string{"priority-3"}), + hierarchy.SetInEndpoint(e2, []string{"priority-3"}), } if diff := cmp.Diff(gotName, wantName); diff != "" { @@ -320,7 +326,7 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } - if diff := cmp.Diff(gotAddrs, wantAddrs, addrCmpOpts); diff != "" { + if diff := cmp.Diff(gotAddrs, wantEndpoints, endpointCmpOpts); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } } @@ -334,7 +340,7 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { t.Fatalf("Failed to create LRS server config for testing: %v", err) } - gotNames, gotConfigs, gotAddrs, _ := buildClusterImplConfigForEDS( + gotNames, gotConfigs, gotEndpoints, _ := buildClusterImplConfigForEDS( newNameGenerator(2), xdsresource.EndpointsUpdate{ Drops: []xdsresource.OverloadDropConfig{ @@ -408,15 +414,15 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { }, }, } - wantAddrs := []resolver.Address{ - testAddrWithAttrs(testAddressStrs[0][0], 20, 1, "priority-2-0", &testLocalityIDs[0]), - testAddrWithAttrs(testAddressStrs[0][1], 20, 1, "priority-2-0", &testLocalityIDs[0]), - testAddrWithAttrs(testAddressStrs[1][0], 80, 1, "priority-2-0", &testLocalityIDs[1]), - testAddrWithAttrs(testAddressStrs[1][1], 80, 1, "priority-2-0", &testLocalityIDs[1]), - testAddrWithAttrs(testAddressStrs[2][0], 20, 1, "priority-2-1", &testLocalityIDs[2]), - testAddrWithAttrs(testAddressStrs[2][1], 20, 1, "priority-2-1", &testLocalityIDs[2]), - testAddrWithAttrs(testAddressStrs[3][0], 80, 1, "priority-2-1", &testLocalityIDs[3]), - testAddrWithAttrs(testAddressStrs[3][1], 80, 1, "priority-2-1", &testLocalityIDs[3]), + wantEndpoints := []resolver.Endpoint{ + testEndpointWithAttrs(allAddrs(testEndpoints[0][0]), 20, 1, "priority-2-0", &testLocalityIDs[0]), + testEndpointWithAttrs(allAddrs(testEndpoints[0][1]), 20, 1, "priority-2-0", &testLocalityIDs[0]), + testEndpointWithAttrs(allAddrs(testEndpoints[1][0]), 80, 1, "priority-2-0", &testLocalityIDs[1]), + testEndpointWithAttrs(allAddrs(testEndpoints[1][1]), 80, 1, "priority-2-0", &testLocalityIDs[1]), + testEndpointWithAttrs(allAddrs(testEndpoints[2][0]), 20, 1, "priority-2-1", &testLocalityIDs[2]), + testEndpointWithAttrs(allAddrs(testEndpoints[2][1]), 20, 1, "priority-2-1", &testLocalityIDs[2]), + testEndpointWithAttrs(allAddrs(testEndpoints[3][0]), 80, 1, "priority-2-1", &testLocalityIDs[3]), + testEndpointWithAttrs(allAddrs(testEndpoints[3][1]), 80, 1, "priority-2-1", &testLocalityIDs[3]), } if diff := cmp.Diff(gotNames, wantNames); diff != "" { @@ -425,12 +431,16 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { if diff := cmp.Diff(gotConfigs, wantConfigs); diff != "" { t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff) } - if diff := cmp.Diff(gotAddrs, wantAddrs, addrCmpOpts); diff != "" { + if diff := cmp.Diff(gotEndpoints, wantEndpoints, endpointCmpOpts); diff != "" { t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff) } } +func allAddrs(endpoint xdsresource.Endpoint) []string { + return append([]string{endpoint.Address}, endpoint.AdditionalAddress...) +} + func TestGroupLocalitiesByPriority(t *testing.T) { tests := []struct { name string @@ -526,14 +536,14 @@ func TestDedupSortedIntSlice(t *testing.T) { func TestPriorityLocalitiesToClusterImpl(t *testing.T) { tests := []struct { - name string - localities []xdsresource.Locality - priorityName string - mechanism DiscoveryMechanism - childPolicy *iserviceconfig.BalancerConfig - wantConfig *clusterimpl.LBConfig - wantAddrs []resolver.Address - wantErr bool + name string + localities []xdsresource.Locality + priorityName string + mechanism DiscoveryMechanism + childPolicy *iserviceconfig.BalancerConfig + wantConfig *clusterimpl.LBConfig + wantEndpoints []resolver.Endpoint + wantErr bool }{{ name: "round robin as child, no LRS", localities: []xdsresource.Locality{ @@ -567,11 +577,11 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { EDSServiceName: testEDSService, ChildPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, - wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", 20, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", 20, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", 80, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", 80, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + wantEndpoints: []resolver.Endpoint{ + testEndpointWithAttrs([]string{"addr-1-1"}, 20, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testEndpointWithAttrs([]string{"addr-1-2"}, 20, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testEndpointWithAttrs([]string{"addr-2-1"}, 80, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + testEndpointWithAttrs([]string{"addr-2-2"}, 80, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), }, }, { @@ -603,11 +613,11 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { Config: &ringhash.LBConfig{MinRingSize: 1, MaxRingSize: 2}, }, }, - wantAddrs: []resolver.Address{ - testAddrWithAttrs("addr-1-1", 20, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-1-2", 20, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), - testAddrWithAttrs("addr-2-1", 80, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), - testAddrWithAttrs("addr-2-2", 80, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + wantEndpoints: []resolver.Endpoint{ + testEndpointWithAttrs([]string{"addr-1-1"}, 20, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testEndpointWithAttrs([]string{"addr-1-2"}, 20, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testEndpointWithAttrs([]string{"addr-2-1"}, 80, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + testEndpointWithAttrs([]string{"addr-2-2"}, 80, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), }, }, } @@ -620,7 +630,7 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { if diff := cmp.Diff(got, tt.wantConfig); diff != "" { t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) } - if diff := cmp.Diff(got1, tt.wantAddrs, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { + if diff := cmp.Diff(got1, tt.wantEndpoints, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) } }) @@ -635,17 +645,20 @@ func assertString(f func() (string, error)) string { return s } -func testAddrWithAttrs(addrStr string, localityWeight, endpointWeight uint32, priority string, lID *internal.LocalityID) resolver.Address { - addr := resolver.Address{Addr: addrStr} +func testEndpointWithAttrs(addrStrs []string, localityWeight, endpointWeight uint32, priority string, lID *internal.LocalityID) resolver.Endpoint { + endpoint := resolver.Endpoint{} + for _, a := range addrStrs { + endpoint.Addresses = append(endpoint.Addresses, resolver.Address{Addr: a}) + } path := []string{priority} if lID != nil { path = append(path, assertString(lID.ToString)) - addr = internal.SetLocalityID(addr, *lID) + endpoint = internal.SetLocalityIDInEndpoint(endpoint, *lID) } - addr = hierarchy.Set(addr, path) - addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) - addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: localityWeight * endpointWeight}) - return addr + endpoint = hierarchy.SetInEndpoint(endpoint, path) + endpoint = wrrlocality.SetAddrInfoInEndpoint(endpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight}) + endpoint = weightedroundrobin.SetAddrInfoInEndpoint(endpoint, weightedroundrobin.AddrInfo{Weight: localityWeight * endpointWeight}) + return endpoint } func TestConvertClusterImplMapToOutlierDetection(t *testing.T) { diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index af6224585aca..828a1a516369 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "net" "strings" "testing" "time" @@ -27,9 +28,13 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" @@ -1138,3 +1143,168 @@ func waitForProducedZeroAddressesError(ctx context.Context, t *testing.T, client } return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and produced zero addresses") } + +// Test runs a server which listens on multiple ports. The test updates xds resouce +// cache to contain a single endpoint with multiple addresses. The test intercepts +// the resolver updates sent to the petiole policy and verifies that the +// additional endpoint addresses are correctly propagated. +func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Start a backend server which provide that listens to multiple ports to simulate + // a backend with multiple addresses. + servers, cleanup2 := startTestServiceBackends(t, 1) + defer cleanup2() + lis1, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + defer lis1.Close() + lis2, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + defer lis2.Close() + + go servers[0].S.Serve(lis1) + go servers[0].S.Serve(lis2) + + addrs, ports := backendAddressesAndPorts(t, servers) + additionalPorts := []uint32{ + testutils.ParsePort(t, lis1.Addr().String()), + testutils.ParsePort(t, lis2.Addr().String()), + } + + testCases := []struct { + name string + dualstackEndpointsEnabled bool + wantEndpointPorts []uint32 + wantAddrPorts []uint32 + }{ + { + name: "flag_enabled", + dualstackEndpointsEnabled: true, + wantEndpointPorts: append([]uint32{ports[0]}, additionalPorts...), + wantAddrPorts: ports, + }, + { + name: "flag_disabled", + wantEndpointPorts: ports, + wantAddrPorts: ports, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + origDualstackEndpointsEnabled := envconfig.XDSDualstackEndpointsEnabled + defer func() { + envconfig.XDSDualstackEndpointsEnabled = origDualstackEndpointsEnabled + }() + envconfig.XDSDualstackEndpointsEnabled = tc.dualstackEndpointsEnabled + + // Wrap the round robin balancer to intercept resolver updates. + originalRRBuilder := balancer.Get(roundrobin.Name) + defer func() { + balancer.Register(originalRRBuilder) + }() + resolverUpdateCh := make(chan resolver.State, 1) + stub.Register(roundrobin.Name, stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bd.Data = originalRRBuilder.Build(bd.ClientConn, bd.BuildOptions) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + resolverUpdateCh <- ccs.ResolverState + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + }) + + // Spin up a management server to receive xDS resources from. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + // Create bootstrap configuration pointing to the above management server. + nodeID := uuid.New().String() + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + // Create xDS resources for consumption by the test. We start off with a + // single backend in a single EDS locality. + resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ + Name: localityName1, + Weight: 1, + Backends: []e2e.BackendOptions{{ + Port: ports[0], + AdditionalPorts: additionalPorts}}, + }}) + + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client talking to the above management server, configured + // with a short watch expiry timeout. + xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + }) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) + } + defer close() + + // Create a manual resolver and push a service config specifying the use of + // the cluster_resolver LB policy with a single discovery mechanism. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "%s", + "type": "EDS", + "edsServiceName": "%s", + "outlierDetection": {} + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, clusterName, edsServiceName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) + + cc, err := grpc.NewClient(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to create new client for local test server: %v", err) + } + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil { + t.Fatal(err) + } + + var rs resolver.State + select { + case rs = <-resolverUpdateCh: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for resolver update.") + } + + gotEndpointPorts := []uint32{} + for _, a := range rs.Endpoints[0].Addresses { + gotEndpointPorts = append(gotEndpointPorts, testutils.ParsePort(t, a.Addr)) + } + if diff := cmp.Diff(gotEndpointPorts, tc.wantEndpointPorts); diff != "" { + t.Errorf("Unexpected endpoint address ports in resolver update, diff (-got +want): %v", diff) + } + + gotAddrPorts := []uint32{} + for _, a := range rs.Addresses { + gotAddrPorts = append(gotAddrPorts, testutils.ParsePort(t, a.Addr)) + } + if diff := cmp.Diff(gotAddrPorts, tc.wantAddrPorts); diff != "" { + t.Errorf("Unexpected address ports in resolver update, diff (-got +want): %v", diff) + } + }) + } +} diff --git a/xds/internal/balancer/wrrlocality/balancer.go b/xds/internal/balancer/wrrlocality/balancer.go index 943ee7806ba1..2b289a81143c 100644 --- a/xds/internal/balancer/wrrlocality/balancer.go +++ b/xds/internal/balancer/wrrlocality/balancer.go @@ -120,6 +120,13 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address { return addr } +// SetAddrInfoInEndpoint returns a copy of endpoint in which the Attributes +// field is updated with AddrInfo. +func SetAddrInfoInEndpoint(endpoint resolver.Endpoint, addrInfo AddrInfo) resolver.Endpoint { + endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, addrInfo) + return endpoint +} + func (a AddrInfo) String() string { return fmt.Sprintf("Locality Weight: %d", a.LocalityWeight) } diff --git a/xds/internal/internal.go b/xds/internal/internal.go index 1d8a6b03f1b3..74c919521551 100644 --- a/xds/internal/internal.go +++ b/xds/internal/internal.go @@ -86,6 +86,12 @@ func SetLocalityID(addr resolver.Address, l LocalityID) resolver.Address { return addr } +// SetLocalityIDInEndpoint sets locality ID in endpoint to l. +func SetLocalityIDInEndpoint(endpoint resolver.Endpoint, l LocalityID) resolver.Endpoint { + endpoint.Attributes = endpoint.Attributes.WithValue(localityKey, l) + return endpoint +} + // ResourceTypeMapForTesting maps TypeUrl to corresponding ResourceType. var ResourceTypeMapForTesting map[string]any diff --git a/xds/internal/xdsclient/xdsresource/type_eds.go b/xds/internal/xdsclient/xdsresource/type_eds.go index 1254d250c99b..8b14219b4ade 100644 --- a/xds/internal/xdsclient/xdsresource/type_eds.go +++ b/xds/internal/xdsclient/xdsresource/type_eds.go @@ -49,9 +49,10 @@ const ( // Endpoint contains information of an endpoint. type Endpoint struct { - Address string - HealthStatus EndpointHealthStatus - Weight uint32 + Address string + AdditionalAddress []string + HealthStatus EndpointHealthStatus + Weight uint32 } // Locality contains information of a locality. diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go index f65845b702c8..6b318aef7658 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go @@ -26,6 +26,7 @@ import ( v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/xds/internal" "google.golang.org/protobuf/proto" @@ -94,14 +95,25 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs weight = w.GetValue() } addr := parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()) - if uniqueEndpointAddrs[addr] { - return nil, fmt.Errorf("duplicate endpoint with the same address %s", addr) + var additionalAddrs []string + if envconfig.XDSDualstackEndpointsEnabled { + additionalAddrs = make([]string, len(lbEndpoint.GetEndpoint().GetAdditionalAddresses())) + for i, sa := range lbEndpoint.GetEndpoint().GetAdditionalAddresses() { + additionalAddrs[i] = parseAddress(sa.GetAddress().GetSocketAddress()) + } + } + + for _, a := range append([]string{addr}, additionalAddrs...) { + if uniqueEndpointAddrs[a] { + return nil, fmt.Errorf("duplicate endpoint with the same address %s", a) + } + uniqueEndpointAddrs[a] = true } - uniqueEndpointAddrs[addr] = true endpoints = append(endpoints, Endpoint{ - HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), - Address: addr, - Weight: weight, + HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), + Address: addr, + Weight: weight, + AdditionalAddress: additionalAddrs, }) } return endpoints, nil diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go index f2419eac55a4..8a7f8e7014de 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go @@ -28,6 +28,8 @@ import ( v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/xds/internal" @@ -47,8 +49,8 @@ func (s) TestEDSParseRespProto(t *testing.T) { name: "missing-priority", m: func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-1", 1, 0, []string{"addr1:314"}, nil) - clab0.addLocality("locality-2", 1, 2, []string{"addr2:159"}, nil) + clab0.addLocality("locality-1", 1, 0, []endpointOpts{{addrWithPort: "addr1:314"}}, nil) + clab0.addLocality("locality-2", 1, 2, []endpointOpts{{addrWithPort: "addr2:159"}}, nil) return clab0.Build() }(), want: EndpointsUpdate{}, @@ -58,7 +60,7 @@ func (s) TestEDSParseRespProto(t *testing.T) { name: "missing-locality-ID", m: func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("", 1, 0, []string{"addr1:314"}, nil) + clab0.addLocality("", 1, 0, []endpointOpts{{addrWithPort: "addr1:314"}}, nil) return clab0.Build() }(), want: EndpointsUpdate{}, @@ -68,7 +70,7 @@ func (s) TestEDSParseRespProto(t *testing.T) { name: "zero-endpoint-weight", m: func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, &addLocalityOptions{Weight: []uint32{0}}) + clab0.addLocality("locality-0", 1, 0, []endpointOpts{{addrWithPort: "addr1:314"}}, &addLocalityOptions{Weight: []uint32{0}}) return clab0.Build() }(), want: EndpointsUpdate{}, @@ -78,8 +80,8 @@ func (s) TestEDSParseRespProto(t *testing.T) { name: "duplicate-locality-in-the-same-priority", m: func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, nil) - clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, nil) // Duplicate locality with the same priority. + clab0.addLocality("locality-0", 1, 0, []endpointOpts{{addrWithPort: "addr1:314"}}, nil) + clab0.addLocality("locality-0", 1, 0, []endpointOpts{{addrWithPort: "addr1:314"}}, nil) // Duplicate locality with the same priority. return clab0.Build() }(), want: EndpointsUpdate{}, @@ -89,10 +91,10 @@ func (s) TestEDSParseRespProto(t *testing.T) { name: "missing locality weight", m: func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-1", 0, 1, []string{"addr1:314"}, &addLocalityOptions{ + clab0.addLocality("locality-1", 0, 1, []endpointOpts{{addrWithPort: "addr1:314"}}, &addLocalityOptions{ Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY}, }) - clab0.addLocality("locality-2", 0, 0, []string{"addr2:159"}, &addLocalityOptions{ + clab0.addLocality("locality-2", 0, 0, []endpointOpts{{addrWithPort: "addr2:159"}}, &addLocalityOptions{ Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY}, }) return clab0.Build() @@ -103,9 +105,9 @@ func (s) TestEDSParseRespProto(t *testing.T) { name: "max sum of weights at the same priority exceeded", m: func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-1", 1, 0, []string{"addr1:314"}, nil) - clab0.addLocality("locality-2", 4294967295, 1, []string{"addr2:159"}, nil) - clab0.addLocality("locality-3", 1, 1, []string{"addr2:88"}, nil) + clab0.addLocality("locality-1", 1, 0, []endpointOpts{{addrWithPort: "addr1:314"}}, nil) + clab0.addLocality("locality-2", 4294967295, 1, []endpointOpts{{addrWithPort: "addr2:159"}}, nil) + clab0.addLocality("locality-3", 1, 1, []endpointOpts{{addrWithPort: "addr2:88"}}, nil) return clab0.Build() }(), want: EndpointsUpdate{}, @@ -115,8 +117,8 @@ func (s) TestEDSParseRespProto(t *testing.T) { name: "duplicate endpoint address", m: func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-1", 1, 1, []string{"addr:997"}, nil) - clab0.addLocality("locality-2", 1, 0, []string{"addr:997"}, nil) + clab0.addLocality("locality-1", 1, 1, []endpointOpts{{addrWithPort: "addr:997"}}, nil) + clab0.addLocality("locality-2", 1, 0, []endpointOpts{{addrWithPort: "addr:997"}}, nil) return clab0.Build() }(), want: EndpointsUpdate{}, @@ -126,11 +128,11 @@ func (s) TestEDSParseRespProto(t *testing.T) { name: "good", m: func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-1", 1, 1, []string{"addr1:314"}, &addLocalityOptions{ + clab0.addLocality("locality-1", 1, 1, []endpointOpts{{addrWithPort: "addr1:314"}}, &addLocalityOptions{ Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY}, Weight: []uint32{271}, }) - clab0.addLocality("locality-2", 1, 0, []string{"addr2:159"}, &addLocalityOptions{ + clab0.addLocality("locality-2", 1, 0, []endpointOpts{{addrWithPort: "addr2:159"}}, &addLocalityOptions{ Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_DRAINING}, Weight: []uint32{828}, }) @@ -167,12 +169,12 @@ func (s) TestEDSParseRespProto(t *testing.T) { name: "good duplicate locality with different priority", m: func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-1", 1, 1, []string{"addr1:314"}, &addLocalityOptions{ + clab0.addLocality("locality-1", 1, 1, []endpointOpts{{addrWithPort: "addr1:314"}}, &addLocalityOptions{ Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY}, Weight: []uint32{271}, }) // Same locality name, but with different priority. - clab0.addLocality("locality-1", 1, 0, []string{"addr2:159"}, &addLocalityOptions{ + clab0.addLocality("locality-1", 1, 0, []endpointOpts{{addrWithPort: "addr2:159"}}, &addLocalityOptions{ Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_DRAINING}, Weight: []uint32{828}, }) @@ -213,7 +215,120 @@ func (s) TestEDSParseRespProto(t *testing.T) { t.Errorf("parseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr) return } - if d := cmp.Diff(got, tt.want); d != "" { + if d := cmp.Diff(got, tt.want, cmpopts.EquateEmpty()); d != "" { + t.Errorf("parseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d) + } + }) + } +} + +func (s) TestEDSParseRespProtoAdditionalAddrs(t *testing.T) { + origDualstackEndpointsEnabled := envconfig.XDSDualstackEndpointsEnabled + defer func() { + envconfig.XDSDualstackEndpointsEnabled = origDualstackEndpointsEnabled + }() + envconfig.XDSDualstackEndpointsEnabled = true + + tests := []struct { + name string + m *v3endpointpb.ClusterLoadAssignment + want EndpointsUpdate + wantErr bool + }{ + { + name: "duplicate primary address in self additional addresses", + m: func() *v3endpointpb.ClusterLoadAssignment { + clab0 := newClaBuilder("test", nil) + clab0.addLocality("locality-1", 1, 0, []endpointOpts{{addrWithPort: "addr:998", additionalAddrWithPorts: []string{"addr:998"}}}, nil) + return clab0.Build() + }(), + want: EndpointsUpdate{}, + wantErr: true, + }, + { + name: "duplicate primary address in other locality additional addresses", + m: func() *v3endpointpb.ClusterLoadAssignment { + clab0 := newClaBuilder("test", nil) + clab0.addLocality("locality-1", 1, 1, []endpointOpts{{addrWithPort: "addr:997"}}, nil) + clab0.addLocality("locality-2", 1, 0, []endpointOpts{{addrWithPort: "addr:998", additionalAddrWithPorts: []string{"addr:997"}}}, nil) + return clab0.Build() + }(), + want: EndpointsUpdate{}, + wantErr: true, + }, + { + name: "duplicate additional address in self additional addresses", + m: func() *v3endpointpb.ClusterLoadAssignment { + clab0 := newClaBuilder("test", nil) + clab0.addLocality("locality-1", 1, 0, []endpointOpts{{addrWithPort: "addr:998", additionalAddrWithPorts: []string{"addr:999", "addr:999"}}}, nil) + return clab0.Build() + }(), + want: EndpointsUpdate{}, + wantErr: true, + }, + { + name: "duplicate additional address in other locality additional addresses", + m: func() *v3endpointpb.ClusterLoadAssignment { + clab0 := newClaBuilder("test", nil) + clab0.addLocality("locality-1", 1, 1, []endpointOpts{{addrWithPort: "addr:997", additionalAddrWithPorts: []string{"addr:1000"}}}, nil) + clab0.addLocality("locality-2", 1, 0, []endpointOpts{{addrWithPort: "addr:998", additionalAddrWithPorts: []string{"addr:1000"}}}, nil) + return clab0.Build() + }(), + want: EndpointsUpdate{}, + wantErr: true, + }, + { + name: "multiple localities", + m: func() *v3endpointpb.ClusterLoadAssignment { + clab0 := newClaBuilder("test", nil) + clab0.addLocality("locality-1", 1, 1, []endpointOpts{{addrWithPort: "addr1:997", additionalAddrWithPorts: []string{"addr1:1000"}}}, &addLocalityOptions{ + Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY}, + Weight: []uint32{271}, + }) + clab0.addLocality("locality-2", 1, 0, []endpointOpts{{addrWithPort: "addr2:998", additionalAddrWithPorts: []string{"addr2:1000"}}}, &addLocalityOptions{ + Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY}, + Weight: []uint32{828}, + }) + return clab0.Build() + }(), + want: EndpointsUpdate{ + Drops: nil, + Localities: []Locality{ + { + Endpoints: []Endpoint{{ + Address: "addr1:997", + HealthStatus: EndpointHealthStatusUnhealthy, + Weight: 271, + AdditionalAddress: []string{"addr1:1000"}, + }}, + ID: internal.LocalityID{SubZone: "locality-1"}, + Priority: 1, + Weight: 1, + }, + { + Endpoints: []Endpoint{{ + Address: "addr2:998", + AdditionalAddress: []string{"addr2:1000"}, + HealthStatus: EndpointHealthStatusHealthy, + Weight: 828, + }}, + ID: internal.LocalityID{SubZone: "locality-2"}, + Priority: 0, + Weight: 1, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseEDSRespProto(tt.m) + if (err != nil) != tt.wantErr { + t.Errorf("parseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr) + return + } + if d := cmp.Diff(got, tt.want, cmpopts.EquateEmpty()); d != "" { t.Errorf("parseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d) } }) @@ -223,11 +338,11 @@ func (s) TestEDSParseRespProto(t *testing.T) { func (s) TestUnmarshalEndpoints(t *testing.T) { var v3EndpointsAny = testutils.MarshalAny(t, func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-1", 1, 1, []string{"addr1:314"}, &addLocalityOptions{ + clab0.addLocality("locality-1", 1, 1, []endpointOpts{{addrWithPort: "addr1:314"}}, &addLocalityOptions{ Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_UNHEALTHY}, Weight: []uint32{271}, }) - clab0.addLocality("locality-2", 1, 0, []string{"addr2:159"}, &addLocalityOptions{ + clab0.addLocality("locality-2", 1, 0, []endpointOpts{{addrWithPort: "addr2:159"}}, &addLocalityOptions{ Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_DRAINING}, Weight: []uint32{828}, }) @@ -258,8 +373,8 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { name: "bad endpoints resource", resource: testutils.MarshalAny(t, func() *v3endpointpb.ClusterLoadAssignment { clab0 := newClaBuilder("test", nil) - clab0.addLocality("locality-1", 1, 0, []string{"addr1:314"}, nil) - clab0.addLocality("locality-2", 1, 2, []string{"addr2:159"}, nil) + clab0.addLocality("locality-1", 1, 0, []endpointOpts{{addrWithPort: "addr1:314"}}, nil) + clab0.addLocality("locality-2", 1, 2, []endpointOpts{{addrWithPort: "addr2:159"}}, nil) return clab0.Build() }()), wantName: "test", @@ -379,29 +494,49 @@ type addLocalityOptions struct { Weight []uint32 } +type endpointOpts struct { + addrWithPort string + additionalAddrWithPorts []string +} + +func addressFromStr(addrWithPort string) *v3corepb.Address { + host, portStr, err := net.SplitHostPort(addrWithPort) + if err != nil { + panic("failed to split " + addrWithPort) + } + port, err := strconv.Atoi(portStr) + if err != nil { + panic("failed to atoi " + portStr) + } + + return &v3corepb.Address{ + Address: &v3corepb.Address_SocketAddress{ + SocketAddress: &v3corepb.SocketAddress{ + Protocol: v3corepb.SocketAddress_TCP, + Address: host, + PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: uint32(port)}, + }, + }, + } +} + // addLocality adds a locality to the builder. -func (clab *claBuilder) addLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *addLocalityOptions) { +func (clab *claBuilder) addLocality(subzone string, weight uint32, priority uint32, endpoints []endpointOpts, opts *addLocalityOptions) { var lbEndPoints []*v3endpointpb.LbEndpoint - for i, a := range addrsWithPort { - host, portStr, err := net.SplitHostPort(a) - if err != nil { - panic("failed to split " + a) - } - port, err := strconv.Atoi(portStr) - if err != nil { - panic("failed to atoi " + portStr) + for i, e := range endpoints { + var additionalAddrs []*v3endpointpb.Endpoint_AdditionalAddress + for _, a := range e.additionalAddrWithPorts { + additionalAddrs = append(additionalAddrs, &v3endpointpb.Endpoint_AdditionalAddress{ + Address: addressFromStr(a), + }) } - lbe := &v3endpointpb.LbEndpoint{ HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{ Endpoint: &v3endpointpb.Endpoint{ - Address: &v3corepb.Address{ - Address: &v3corepb.Address_SocketAddress{ - SocketAddress: &v3corepb.SocketAddress{ - Protocol: v3corepb.SocketAddress_TCP, - Address: host, - PortSpecifier: &v3corepb.SocketAddress_PortValue{ - PortValue: uint32(port)}}}}}}, + Address: addressFromStr(e.addrWithPort), + AdditionalAddresses: additionalAddrs, + }, + }, } if opts != nil { if i < len(opts.Health) { From cc60f74562dbba80e65a614147b4e91e3a460970 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 20 Nov 2024 17:00:24 +0530 Subject: [PATCH 2/8] Use endpint slice for dns resolver --- .../balancer/clusterresolver/configbuilder.go | 15 ++++++----- .../clusterresolver/configbuilder_test.go | 22 ++++++++-------- .../clusterresolver/resource_resolver.go | 5 ++-- .../clusterresolver/resource_resolver_dns.go | 25 ++++++------------- 4 files changed, 29 insertions(+), 38 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/configbuilder.go b/xds/internal/balancer/clusterresolver/configbuilder.go index f2c0089b161e..b93ada5c0e14 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder.go +++ b/xds/internal/balancer/clusterresolver/configbuilder.go @@ -48,8 +48,8 @@ type priorityConfig struct { mechanism DiscoveryMechanism // edsResp is set only if type is EDS. edsResp xdsresource.EndpointsUpdate - // addresses is set only if type is DNS. - addresses []string + // endpoints is set only if type is DNS. + endpoints []resolver.Endpoint // Each discovery mechanism has a name generator so that the child policies // can reuse names between updates (EDS updates for example). childNameGen *nameGenerator @@ -107,7 +107,7 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi } continue case DiscoveryMechanismTypeLogicalDNS: - name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism) + name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.endpoints, p.mechanism) retConfig.Priorities = append(retConfig.Priorities, name) retEndpoints = append(retEndpoints, endpoints...) odCfg := makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection) @@ -137,14 +137,13 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out return &odCfgRet } -func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Endpoint) { +func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Endpoint) { // Endpoint picking policy for DNS is hardcoded to pick_first. const childPolicy = "pick_first" - retEndpoints := make([]resolver.Endpoint, 0, len(addrStrs)) + retEndpoints := make([]resolver.Endpoint, len(endpoints)) pName := fmt.Sprintf("priority-%v", g.prefix) - for _, addrStr := range addrStrs { - endpoint := resolver.Endpoint{Addresses: []resolver.Address{{Addr: addrStr}}} - retEndpoints = append(retEndpoints, hierarchy.SetInEndpoint(endpoint, []string{pName})) + for i, e := range endpoints { + retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName}) } return pName, &clusterimpl.LBConfig{ Cluster: mechanism.Cluster, diff --git a/xds/internal/balancer/clusterresolver/configbuilder_test.go b/xds/internal/balancer/clusterresolver/configbuilder_test.go index a9a6a802aa84..cbdbb30ba0a7 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder_test.go +++ b/xds/internal/balancer/clusterresolver/configbuilder_test.go @@ -56,9 +56,9 @@ const ( ) var ( - testLocalityIDs []internal.LocalityID - testAddressStrs [][]string - testEndpoints [][]xdsresource.Endpoint + testLocalityIDs []internal.LocalityID + testResolverEndpoints [][]resolver.Endpoint + testEndpoints [][]xdsresource.Endpoint testLocalitiesP0, testLocalitiesP1 []xdsresource.Locality @@ -85,12 +85,12 @@ func init() { for i := 0; i < localityCount; i++ { testLocalityIDs = append(testLocalityIDs, internal.LocalityID{Zone: fmt.Sprintf("test-zone-%d", i)}) var ( - addrs []string - ends []xdsresource.Endpoint + endpoints []resolver.Endpoint + ends []xdsresource.Endpoint ) for j := 0; j < endpointPerLocality; j++ { addr := fmt.Sprintf("addr-%d-%d", i, j) - addrs = append(addrs, addr) + endpoints = append(endpoints, resolver.Endpoint{Addresses: []resolver.Address{{Addr: addr}}}) ends = append(ends, xdsresource.Endpoint{ Address: addr, HealthStatus: xdsresource.EndpointHealthStatusHealthy, @@ -100,7 +100,7 @@ func init() { }, }) } - testAddressStrs = append(testAddressStrs, addrs) + testResolverEndpoints = append(testResolverEndpoints, endpoints) testEndpoints = append(testEndpoints, ends) } @@ -175,7 +175,7 @@ func TestBuildPriorityConfigJSON(t *testing.T) { mechanism: DiscoveryMechanism{ Type: DiscoveryMechanismTypeLogicalDNS, }, - addresses: testAddressStrs[4], + endpoints: testResolverEndpoints[4], childNameGen: newNameGenerator(1), }, }, nil) @@ -230,7 +230,7 @@ func TestBuildPriorityConfig(t *testing.T) { Type: DiscoveryMechanismTypeLogicalDNS, outlierDetection: noopODCfg, }, - addresses: testAddressStrs[4], + endpoints: testResolverEndpoints[4], childNameGen: newNameGenerator(1), }, }, nil) @@ -305,7 +305,7 @@ func TestBuildPriorityConfig(t *testing.T) { } func TestBuildClusterImplConfigForDNS(t *testing.T) { - gotName, gotConfig, gotAddrs := buildClusterImplConfigForDNS(newNameGenerator(3), testAddressStrs[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}) + gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), testResolverEndpoints[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}) wantName := "priority-3" wantConfig := &clusterimpl.LBConfig{ Cluster: testClusterName2, @@ -326,7 +326,7 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } - if diff := cmp.Diff(gotAddrs, wantEndpoints, endpointCmpOpts); diff != "" { + if diff := cmp.Diff(gotEndpoints, wantEndpoints, endpointCmpOpts); diff != "" { t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff) } } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index 3bcfba8732a3..d9315c3acef5 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -294,8 +295,8 @@ func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) { switch uu := u.(type) { case xdsresource.EndpointsUpdate: ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu, childNameGen: rDM.childNameGen}) - case []string: - ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu, childNameGen: rDM.childNameGen}) + case []resolver.Endpoint: + ret = append(ret, priorityConfig{mechanism: rDM.dm, endpoints: uu, childNameGen: rDM.childNameGen}) } } select { diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index cfc871d3b59d..e5c381354d85 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -47,7 +47,7 @@ type dnsDiscoveryMechanism struct { logger *grpclog.PrefixLogger mu sync.Mutex - addrs []string + endpoints []resolver.Endpoint updateReceived bool } @@ -103,7 +103,7 @@ func (dr *dnsDiscoveryMechanism) lastUpdate() (any, bool) { if !dr.updateReceived { return nil, false } - return dr.addrs, true + return dr.endpoints, true } func (dr *dnsDiscoveryMechanism) resolveNow() { @@ -133,23 +133,14 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error { } dr.mu.Lock() - var addrs []string - if len(state.Endpoints) > 0 { - // Assume 1 address per endpoint, which is how DNS is expected to - // behave. The slice will grow as needed, however. - addrs = make([]string, 0, len(state.Endpoints)) - for _, e := range state.Endpoints { - for _, a := range e.Addresses { - addrs = append(addrs, a.Addr) - } - } - } else { - addrs = make([]string, len(state.Addresses)) + var endpoints = state.Endpoints + if len(endpoints) == 0 { + endpoints = make([]resolver.Endpoint, len(state.Addresses)) for i, a := range state.Addresses { - addrs[i] = a.Addr + endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{a}} } } - dr.addrs = addrs + dr.endpoints = endpoints dr.updateReceived = true dr.mu.Unlock() @@ -172,7 +163,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) { dr.mu.Unlock() return } - dr.addrs = nil + dr.endpoints = nil dr.updateReceived = true dr.mu.Unlock() From cffba1de71b85ef819adb0c88a29eb3b6fe40b62 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 20 Nov 2024 23:12:06 +0530 Subject: [PATCH 3/8] Remove env var support --- internal/envconfig/xds.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/envconfig/xds.go b/internal/envconfig/xds.go index 5c0d9d26833b..273cdf1f1fdf 100644 --- a/internal/envconfig/xds.go +++ b/internal/envconfig/xds.go @@ -56,5 +56,7 @@ var ( // XDSDualstackEndpointsEnabled is true if gRPC should read the // "additional addresses" in the xDS endpoint resource. - XDSDualstackEndpointsEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS", false) + // TODO: Control this using an env variable when all LB policies + // handle endpoints. + XDSDualstackEndpointsEnabled = false ) From 213b7671f72d9cc92db8ee68c00f10f6542f98e6 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 22 Nov 2024 15:13:25 +0530 Subject: [PATCH 4/8] Add todo in comment --- internal/envconfig/xds.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/envconfig/xds.go b/internal/envconfig/xds.go index 273cdf1f1fdf..9afeb444d453 100644 --- a/internal/envconfig/xds.go +++ b/internal/envconfig/xds.go @@ -56,7 +56,7 @@ var ( // XDSDualstackEndpointsEnabled is true if gRPC should read the // "additional addresses" in the xDS endpoint resource. - // TODO: Control this using an env variable when all LB policies - // handle endpoints. + // TODO: https://github.com/grpc/grpc-go/issues/7866 - Control this using + // an env variable when all LB policies handle endpoints. XDSDualstackEndpointsEnabled = false ) From 71ad5196af6731eb194ed2897c998a11e9ca91a5 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 11 Dec 2024 02:11:42 +0530 Subject: [PATCH 5/8] Make a single addresses list in the Endpoint resource --- .../balancer/clusterresolver/configbuilder.go | 2 +- .../clusterresolver/configbuilder_test.go | 44 +++++++++---------- .../xdsclient/tests/eds_watchers_test.go | 20 ++++----- .../tests/federation_watchers_test.go | 2 +- .../xdsclient/tests/resource_update_test.go | 8 ++-- .../xdsclient/xdsresource/type_eds.go | 7 ++- .../xdsclient/xdsresource/unmarshal_eds.go | 17 +++---- .../xdsresource/unmarshal_eds_test.go | 30 ++++++------- 8 files changed, 60 insertions(+), 70 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/configbuilder.go b/xds/internal/balancer/clusterresolver/configbuilder.go index b93ada5c0e14..683f973b3fbf 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder.go +++ b/xds/internal/balancer/clusterresolver/configbuilder.go @@ -263,7 +263,7 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority continue } resolverEndpoint := resolver.Endpoint{} - for _, as := range append([]string{endpoint.Address}, endpoint.AdditionalAddress...) { + for _, as := range endpoint.Addresses { resolverEndpoint.Addresses = append(resolverEndpoint.Addresses, resolver.Address{Addr: as}) } resolverEndpoint = hierarchy.SetInEndpoint(resolverEndpoint, []string{priorityName, localityStr}) diff --git a/xds/internal/balancer/clusterresolver/configbuilder_test.go b/xds/internal/balancer/clusterresolver/configbuilder_test.go index cbdbb30ba0a7..e7dc2b781b5e 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder_test.go +++ b/xds/internal/balancer/clusterresolver/configbuilder_test.go @@ -92,9 +92,9 @@ func init() { addr := fmt.Sprintf("addr-%d-%d", i, j) endpoints = append(endpoints, resolver.Endpoint{Addresses: []resolver.Address{{Addr: addr}}}) ends = append(ends, xdsresource.Endpoint{ - Address: addr, HealthStatus: xdsresource.EndpointHealthStatusHealthy, - AdditionalAddress: []string{ + Addresses: []string{ + addr, fmt.Sprintf("addr-%d-%d-additional-1", i, j), fmt.Sprintf("addr-%d-%d-additional-2", i, j), }, @@ -313,8 +313,8 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) { Name: "pick_first", }, } - e1 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][0].Address}}} - e2 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][1].Address}}} + e1 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][0].Addresses[0]}}} + e2 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][1].Addresses[0]}}} wantEndpoints := []resolver.Endpoint{ hierarchy.SetInEndpoint(e1, []string{"priority-3"}), hierarchy.SetInEndpoint(e2, []string{"priority-3"}), @@ -415,14 +415,14 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { }, } wantEndpoints := []resolver.Endpoint{ - testEndpointWithAttrs(allAddrs(testEndpoints[0][0]), 20, 1, "priority-2-0", &testLocalityIDs[0]), - testEndpointWithAttrs(allAddrs(testEndpoints[0][1]), 20, 1, "priority-2-0", &testLocalityIDs[0]), - testEndpointWithAttrs(allAddrs(testEndpoints[1][0]), 80, 1, "priority-2-0", &testLocalityIDs[1]), - testEndpointWithAttrs(allAddrs(testEndpoints[1][1]), 80, 1, "priority-2-0", &testLocalityIDs[1]), - testEndpointWithAttrs(allAddrs(testEndpoints[2][0]), 20, 1, "priority-2-1", &testLocalityIDs[2]), - testEndpointWithAttrs(allAddrs(testEndpoints[2][1]), 20, 1, "priority-2-1", &testLocalityIDs[2]), - testEndpointWithAttrs(allAddrs(testEndpoints[3][0]), 80, 1, "priority-2-1", &testLocalityIDs[3]), - testEndpointWithAttrs(allAddrs(testEndpoints[3][1]), 80, 1, "priority-2-1", &testLocalityIDs[3]), + testEndpointWithAttrs(testEndpoints[0][0].Addresses, 20, 1, "priority-2-0", &testLocalityIDs[0]), + testEndpointWithAttrs(testEndpoints[0][1].Addresses, 20, 1, "priority-2-0", &testLocalityIDs[0]), + testEndpointWithAttrs(testEndpoints[1][0].Addresses, 80, 1, "priority-2-0", &testLocalityIDs[1]), + testEndpointWithAttrs(testEndpoints[1][1].Addresses, 80, 1, "priority-2-0", &testLocalityIDs[1]), + testEndpointWithAttrs(testEndpoints[2][0].Addresses, 20, 1, "priority-2-1", &testLocalityIDs[2]), + testEndpointWithAttrs(testEndpoints[2][1].Addresses, 20, 1, "priority-2-1", &testLocalityIDs[2]), + testEndpointWithAttrs(testEndpoints[3][0].Addresses, 80, 1, "priority-2-1", &testLocalityIDs[3]), + testEndpointWithAttrs(testEndpoints[3][1].Addresses, 80, 1, "priority-2-1", &testLocalityIDs[3]), } if diff := cmp.Diff(gotNames, wantNames); diff != "" { @@ -437,10 +437,6 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { } -func allAddrs(endpoint xdsresource.Endpoint) []string { - return append([]string{endpoint.Address}, endpoint.AdditionalAddress...) -} - func TestGroupLocalitiesByPriority(t *testing.T) { tests := []struct { name string @@ -549,16 +545,16 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{ - {Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, + {Addresses: []string{"addr-1-1"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, + {Addresses: []string{"addr-1-2"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, }, ID: internal.LocalityID{Zone: "test-zone-1"}, Weight: 20, }, { Endpoints: []xdsresource.Endpoint{ - {Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, + {Addresses: []string{"addr-2-1"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, + {Addresses: []string{"addr-2-2"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, }, ID: internal.LocalityID{Zone: "test-zone-2"}, Weight: 80, @@ -589,16 +585,16 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) { localities: []xdsresource.Locality{ { Endpoints: []xdsresource.Endpoint{ - {Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, + {Addresses: []string{"addr-1-1"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, + {Addresses: []string{"addr-1-2"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, }, ID: internal.LocalityID{Zone: "test-zone-1"}, Weight: 20, }, { Endpoints: []xdsresource.Endpoint{ - {Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, - {Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, + {Addresses: []string{"addr-2-1"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90}, + {Addresses: []string{"addr-2-2"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10}, }, ID: internal.LocalityID{Zone: "test-zone-2"}, Weight: 80, diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index ef27178fac22..21021b8992bb 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -176,7 +176,7 @@ func (s) TestEDSWatch(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", @@ -199,7 +199,7 @@ func (s) TestEDSWatch(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", @@ -335,7 +335,7 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", @@ -351,7 +351,7 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost2, edsPort2), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost2, edsPort2)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", @@ -373,7 +373,7 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", @@ -389,7 +389,7 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost2, edsPort2), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost2, edsPort2)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", @@ -581,7 +581,7 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", @@ -669,7 +669,7 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", @@ -795,7 +795,7 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", @@ -952,7 +952,7 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}}, ID: internal.LocalityID{ Region: "region-1", Zone: "zone-1", diff --git a/xds/internal/xdsclient/tests/federation_watchers_test.go b/xds/internal/xdsclient/tests/federation_watchers_test.go index 78f69518cd6a..65233c344733 100644 --- a/xds/internal/xdsclient/tests/federation_watchers_test.go +++ b/xds/internal/xdsclient/tests/federation_watchers_test.go @@ -287,7 +287,7 @@ func (s) TestFederation_EndpointsResourceContextParamOrder(t *testing.T) { update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: "localhost:666", Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{"localhost:666"}, Weight: 1}}, Weight: 1, ID: internal.LocalityID{ Region: "region-1", diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index b493c820c774..0460385d0fb7 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -1077,13 +1077,13 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { wantUpdate: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: "addr1:314", Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{"addr1:314"}, Weight: 1}}, ID: internal.LocalityID{SubZone: "locality-1"}, Priority: 1, Weight: 1, }, { - Endpoints: []xdsresource.Endpoint{{Address: "addr2:159", Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{"addr2:159"}, Weight: 1}}, ID: internal.LocalityID{SubZone: "locality-2"}, Priority: 0, Weight: 1, @@ -1111,13 +1111,13 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { wantUpdate: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: "addr1:314", Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{"addr1:314"}, Weight: 1}}, ID: internal.LocalityID{SubZone: "locality-1"}, Priority: 1, Weight: 1, }, { - Endpoints: []xdsresource.Endpoint{{Address: "addr2:159", Weight: 1}}, + Endpoints: []xdsresource.Endpoint{{Addresses: []string{"addr2:159"}, Weight: 1}}, ID: internal.LocalityID{SubZone: "locality-2"}, Priority: 0, Weight: 1, diff --git a/xds/internal/xdsclient/xdsresource/type_eds.go b/xds/internal/xdsclient/xdsresource/type_eds.go index 8b14219b4ade..f94a17e7c66a 100644 --- a/xds/internal/xdsclient/xdsresource/type_eds.go +++ b/xds/internal/xdsclient/xdsresource/type_eds.go @@ -49,10 +49,9 @@ const ( // Endpoint contains information of an endpoint. type Endpoint struct { - Address string - AdditionalAddress []string - HealthStatus EndpointHealthStatus - Weight uint32 + Addresses []string + HealthStatus EndpointHealthStatus + Weight uint32 } // Locality contains information of a locality. diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go index 6b318aef7658..fd780d6632d2 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go @@ -94,26 +94,23 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs } weight = w.GetValue() } - addr := parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()) - var additionalAddrs []string + addrs := []string{parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress())} if envconfig.XDSDualstackEndpointsEnabled { - additionalAddrs = make([]string, len(lbEndpoint.GetEndpoint().GetAdditionalAddresses())) - for i, sa := range lbEndpoint.GetEndpoint().GetAdditionalAddresses() { - additionalAddrs[i] = parseAddress(sa.GetAddress().GetSocketAddress()) + for _, sa := range lbEndpoint.GetEndpoint().GetAdditionalAddresses() { + addrs = append(addrs, parseAddress(sa.GetAddress().GetSocketAddress())) } } - for _, a := range append([]string{addr}, additionalAddrs...) { + for _, a := range addrs { if uniqueEndpointAddrs[a] { return nil, fmt.Errorf("duplicate endpoint with the same address %s", a) } uniqueEndpointAddrs[a] = true } endpoints = append(endpoints, Endpoint{ - HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), - Address: addr, - Weight: weight, - AdditionalAddress: additionalAddrs, + HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), + Addresses: addrs, + Weight: weight, }) } return endpoints, nil diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go index 8a7f8e7014de..e8df0c3c3593 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go @@ -143,7 +143,7 @@ func (s) TestEDSParseRespProto(t *testing.T) { Localities: []Locality{ { Endpoints: []Endpoint{{ - Address: "addr1:314", + Addresses: []string{"addr1:314"}, HealthStatus: EndpointHealthStatusUnhealthy, Weight: 271, }}, @@ -153,7 +153,7 @@ func (s) TestEDSParseRespProto(t *testing.T) { }, { Endpoints: []Endpoint{{ - Address: "addr2:159", + Addresses: []string{"addr2:159"}, HealthStatus: EndpointHealthStatusDraining, Weight: 828, }}, @@ -185,7 +185,7 @@ func (s) TestEDSParseRespProto(t *testing.T) { Localities: []Locality{ { Endpoints: []Endpoint{{ - Address: "addr1:314", + Addresses: []string{"addr1:314"}, HealthStatus: EndpointHealthStatusUnhealthy, Weight: 271, }}, @@ -195,7 +195,7 @@ func (s) TestEDSParseRespProto(t *testing.T) { }, { Endpoints: []Endpoint{{ - Address: "addr2:159", + Addresses: []string{"addr2:159"}, HealthStatus: EndpointHealthStatusDraining, Weight: 828, }}, @@ -296,10 +296,9 @@ func (s) TestEDSParseRespProtoAdditionalAddrs(t *testing.T) { Localities: []Locality{ { Endpoints: []Endpoint{{ - Address: "addr1:997", - HealthStatus: EndpointHealthStatusUnhealthy, - Weight: 271, - AdditionalAddress: []string{"addr1:1000"}, + Addresses: []string{"addr1:997", "addr1:1000"}, + HealthStatus: EndpointHealthStatusUnhealthy, + Weight: 271, }}, ID: internal.LocalityID{SubZone: "locality-1"}, Priority: 1, @@ -307,10 +306,9 @@ func (s) TestEDSParseRespProtoAdditionalAddrs(t *testing.T) { }, { Endpoints: []Endpoint{{ - Address: "addr2:998", - AdditionalAddress: []string{"addr2:1000"}, - HealthStatus: EndpointHealthStatusHealthy, - Weight: 828, + Addresses: []string{"addr2:998", "addr2:1000"}, + HealthStatus: EndpointHealthStatusHealthy, + Weight: 828, }}, ID: internal.LocalityID{SubZone: "locality-2"}, Priority: 0, @@ -389,7 +387,7 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { Localities: []Locality{ { Endpoints: []Endpoint{{ - Address: "addr1:314", + Addresses: []string{"addr1:314"}, HealthStatus: EndpointHealthStatusUnhealthy, Weight: 271, }}, @@ -399,7 +397,7 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { }, { Endpoints: []Endpoint{{ - Address: "addr2:159", + Addresses: []string{"addr2:159"}, HealthStatus: EndpointHealthStatusDraining, Weight: 828, }}, @@ -420,7 +418,7 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { Localities: []Locality{ { Endpoints: []Endpoint{{ - Address: "addr1:314", + Addresses: []string{"addr1:314"}, HealthStatus: EndpointHealthStatusUnhealthy, Weight: 271, }}, @@ -430,7 +428,7 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { }, { Endpoints: []Endpoint{{ - Address: "addr2:159", + Addresses: []string{"addr2:159"}, HealthStatus: EndpointHealthStatusDraining, Weight: 828, }}, From 837e8c73fbc5ef7b0a7af07a25185eb33499727c Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 11 Dec 2024 03:35:08 +0530 Subject: [PATCH 6/8] Address review comments --- .../clusterresolver/e2e_test/eds_impl_test.go | 74 +++++++++++-------- .../clusterresolver/resource_resolver_dns.go | 1 + 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index 828a1a516369..315809174928 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -1151,10 +1151,15 @@ func waitForProducedZeroAddressesError(ctx context.Context, t *testing.T, client func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - // Start a backend server which provide that listens to multiple ports to simulate - // a backend with multiple addresses. - servers, cleanup2 := startTestServiceBackends(t, 1) - defer cleanup2() + + // Start a backend server which listens to multiple ports to simulate a + // backend with multiple addresses. + server := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } lis1, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Failed to create listener: %v", err) @@ -1165,14 +1170,25 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { t.Fatalf("Failed to create listener: %v", err) } defer lis2.Close() + lis3, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + defer lis3.Close() - go servers[0].S.Serve(lis1) - go servers[0].S.Serve(lis2) + server.Listener = lis1 + if err := server.StartServer(); err != nil { + t.Fatalf("Failed to start stub server: %v", err) + } + go server.S.Serve(lis2) + go server.S.Serve(lis3) - addrs, ports := backendAddressesAndPorts(t, servers) + t.Logf("Started test service backend at addresses %q, %q, %q", lis1.Addr(), lis2.Addr(), lis3.Addr()) + + port := testutils.ParsePort(t, lis1.Addr().String()) additionalPorts := []uint32{ - testutils.ParsePort(t, lis1.Addr().String()), testutils.ParsePort(t, lis2.Addr().String()), + testutils.ParsePort(t, lis3.Addr().String()), } testCases := []struct { @@ -1184,13 +1200,13 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { { name: "flag_enabled", dualstackEndpointsEnabled: true, - wantEndpointPorts: append([]uint32{ports[0]}, additionalPorts...), - wantAddrPorts: ports, + wantEndpointPorts: append([]uint32{port}, additionalPorts...), + wantAddrPorts: []uint32{port}, }, { name: "flag_disabled", - wantEndpointPorts: ports, - wantAddrPorts: ports, + wantEndpointPorts: []uint32{port}, + wantAddrPorts: []uint32{port}, }, } @@ -1234,10 +1250,9 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { Name: localityName1, Weight: 1, Backends: []e2e.BackendOptions{{ - Port: ports[0], + Port: port, AdditionalPorts: additionalPorts}}, }}) - if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -1245,9 +1260,8 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { // Create an xDS client talking to the above management server, configured // with a short watch expiry timeout. xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - Contents: bootstrapContents, - WatchExpiryTimeout: defaultTestWatchExpiryTimeout, + Name: t.Name(), + Contents: bootstrapContents, }) if err != nil { t.Fatalf("Failed to create an xDS client: %v", err) @@ -1258,18 +1272,18 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { // the cluster_resolver LB policy with a single discovery mechanism. r := manual.NewBuilderWithScheme("whatever") jsonSC := fmt.Sprintf(`{ - "loadBalancingConfig":[{ - "cluster_resolver_experimental":{ - "discoveryMechanisms": [{ - "cluster": "%s", - "type": "EDS", - "edsServiceName": "%s", - "outlierDetection": {} - }], - "xdsLbPolicy":[{"round_robin":{}}] - } - }] - }`, clusterName, edsServiceName) + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "%s", + "type": "EDS", + "edsServiceName": "%s", + "outlierDetection": {} + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, clusterName, edsServiceName) scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) @@ -1279,7 +1293,7 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { } defer cc.Close() client := testgrpc.NewTestServiceClient(cc) - if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: lis1.Addr().String()}}); err != nil { t.Fatal(err) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index e5c381354d85..5f7a21153057 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -138,6 +138,7 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error { endpoints = make([]resolver.Endpoint, len(state.Addresses)) for i, a := range state.Addresses { endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{a}} + endpoints[i].Attributes = a.BalancerAttributes } } dr.endpoints = endpoints From 005b99b40d7f472ee2e2f61555cd1890da39a0fe Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 11 Dec 2024 03:58:02 +0530 Subject: [PATCH 7/8] Remove AdditionalPorts --- internal/testutils/xds/e2e/clientresources.go | 16 ++--- stats/opentelemetry/e2e_test.go | 2 +- test/xds/xds_client_custom_lb_test.go | 4 +- test/xds/xds_client_priority_locality_test.go | 14 ++-- .../clusterimpl/tests/balancer_test.go | 6 +- .../clusterresolver/e2e_test/eds_impl_test.go | 72 +++++++++---------- .../ringhash/e2e/ringhash_balancer_test.go | 12 ++-- 7 files changed, 61 insertions(+), 65 deletions(-) diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index d369e8f19f81..bf063a2e3bea 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -692,18 +692,14 @@ type LocalityOptions struct { // BackendOptions contains options to configure individual backends in a // locality. type BackendOptions struct { - // Port number on which the backend is accepting connections. All backends + // Ports on which the backend is accepting connections. All backends // are expected to run on localhost, hence host name is not stored here. - Port uint32 + Ports []uint32 // Health status of the backend. Default is UNKNOWN which is treated the // same as HEALTHY. HealthStatus v3corepb.HealthStatus // Weight sets the backend weight. Defaults to 1. Weight uint32 - // Additional port number on which the backend is accepting connections. - // All backends are expected to run on localhost, hence host name is not - // stored here. - AdditionalPorts []uint32 } // EndpointOptions contains options to configure an Endpoint (or @@ -726,7 +722,7 @@ type EndpointOptions struct { func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment { var bOpts []BackendOptions for _, p := range ports { - bOpts = append(bOpts, BackendOptions{Port: p, Weight: 1}) + bOpts = append(bOpts, BackendOptions{Ports: []uint32{p}, Weight: 1}) } return EndpointResourceWithOptions(EndpointOptions{ ClusterName: clusterName, @@ -751,8 +747,8 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad if b.Weight == 0 { b.Weight = 1 } - additionalAddresses := make([]*v3endpointpb.Endpoint_AdditionalAddress, len(b.AdditionalPorts)) - for i, p := range b.AdditionalPorts { + additionalAddresses := make([]*v3endpointpb.Endpoint_AdditionalAddress, len(b.Ports)-1) + for i, p := range b.Ports[1:] { additionalAddresses[i] = &v3endpointpb.Endpoint_AdditionalAddress{ Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ SocketAddress: &v3corepb.SocketAddress{ @@ -769,7 +765,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad SocketAddress: &v3corepb.SocketAddress{ Protocol: v3corepb.SocketAddress_TCP, Address: opts.Host, - PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Port}, + PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Ports[0]}, }, }}, AdditionalAddresses: additionalAddresses, diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 5a5ecbecd47c..ac671e2982a3 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -436,7 +436,7 @@ func (s) TestWRRMetrics(t *testing.T) { Host: "localhost", Localities: []e2e.LocalityOptions{ { - Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{port1}}, {Ports: []uint32{port2}}}, Weight: 1, }, }, diff --git a/test/xds/xds_client_custom_lb_test.go b/test/xds/xds_client_custom_lb_test.go index 8d87a89753c7..2c881301cca2 100644 --- a/test/xds/xds_client_custom_lb_test.go +++ b/test/xds/xds_client_custom_lb_test.go @@ -238,11 +238,11 @@ func (s) TestWrrLocality(t *testing.T) { Host: "localhost", Localities: []e2e.LocalityOptions{ { - Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{port1}}, {Ports: []uint32{port2}}}, Weight: 1, }, { - Backends: []e2e.BackendOptions{{Port: port3}, {Port: port4}, {Port: port5}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{port3}}, {Ports: []uint32{port4}}, {Ports: []uint32{port5}}}, Weight: 2, }, }, diff --git a/test/xds/xds_client_priority_locality_test.go b/test/xds/xds_client_priority_locality_test.go index 0f7c5e21f3ba..607e6ea859b6 100644 --- a/test/xds/xds_client_priority_locality_test.go +++ b/test/xds/xds_client_priority_locality_test.go @@ -95,14 +95,14 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) { Name: "my-locality-1", Weight: 1000000, Priority: 0, - Backends: []e2e.BackendOptions{{Port: ports[0]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[0]}}}, Locality: locality1, }, { Name: "my-locality-2", Weight: 1000000, Priority: 1, - Backends: []e2e.BackendOptions{{Port: ports[1]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[1]}}}, Locality: locality2, }, }, @@ -138,14 +138,14 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) { Name: "my-locality-1", Weight: 500000, Priority: 0, - Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend0.Address)}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend0.Address)}}}, Locality: locality1, }, { Name: "my-locality-2", Weight: 500000, Priority: 0, - Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend1.Address)}}}, Locality: locality2, }, }, @@ -167,14 +167,14 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) { Name: "my-locality-1", Weight: 499884, Priority: 0, - Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend0.Address)}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend0.Address)}}}, Locality: locality1, }, { Name: "my-locality-2", Weight: 500115, Priority: 0, - Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend1.Address)}}}, Locality: locality2, }, }, @@ -197,7 +197,7 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) { Name: "my-locality-2", Weight: 1000000, Priority: 0, - Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend1.Address)}}}, Locality: locality2, }, }, diff --git a/xds/internal/balancer/clusterimpl/tests/balancer_test.go b/xds/internal/balancer/clusterimpl/tests/balancer_test.go index f7d91d19597a..a8e7ac7a10ad 100644 --- a/xds/internal/balancer/clusterimpl/tests/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/tests/balancer_test.go @@ -141,7 +141,7 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) { Host: "localhost", Localities: []e2e.LocalityOptions{ { - Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, server.Address)}}}, Weight: 1, }, }, @@ -252,13 +252,13 @@ func (s) TestLoadReportingPickFirstMultiLocality(t *testing.T) { Localities: []e2e.LocalityOptions{ { Backends: []e2e.BackendOptions{ - {Port: testutils.ParsePort(t, server1.Address)}, + {Ports: []uint32{testutils.ParsePort(t, server1.Address)}}, }, Weight: 1, }, { Backends: []e2e.BackendOptions{ - {Port: testutils.ParsePort(t, server2.Address)}, + {Ports: []uint32{testutils.ParsePort(t, server2.Address)}}, }, Weight: 2, }, diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index 315809174928..89760f6fd23e 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -146,7 +146,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ Name: localityName1, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[0]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[0]}}}, }}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -201,7 +201,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ Name: localityName1, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[0]}, {Port: ports[1]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[0]}}, {Ports: []uint32{ports[1]}}}, }}) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -215,7 +215,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ Name: localityName1, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[1]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[1]}}}, }}) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -228,7 +228,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ Name: localityName1, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[2]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[2]}}}, }}) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -275,12 +275,12 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { { Name: localityName1, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[0]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[0]}}}, }, { Name: localityName2, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[1]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[1]}}}, }, }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -337,17 +337,17 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { { Name: localityName1, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[0]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[0]}}}, }, { Name: localityName2, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[1]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[1]}}}, }, { Name: localityName3, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[2]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[2]}}}, }, }) if err := managementServer.Update(ctx, resources); err != nil { @@ -363,12 +363,12 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { { Name: localityName2, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[1]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[1]}}}, }, { Name: localityName3, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[2]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[2]}}}, }, }) if err := managementServer.Update(ctx, resources); err != nil { @@ -385,12 +385,12 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { { Name: localityName2, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[1]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[1]}}}, }, { Name: localityName3, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[2]}, {Port: ports[3]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[2]}}, {Ports: []uint32{ports[3]}}}, }, }) if err := managementServer.Update(ctx, resources); err != nil { @@ -426,24 +426,24 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) { Name: localityName1, Weight: 1, Backends: []e2e.BackendOptions{ - {Port: ports[0], HealthStatus: v3corepb.HealthStatus_UNKNOWN}, - {Port: ports[1], HealthStatus: v3corepb.HealthStatus_HEALTHY}, - {Port: ports[2], HealthStatus: v3corepb.HealthStatus_UNHEALTHY}, - {Port: ports[3], HealthStatus: v3corepb.HealthStatus_DRAINING}, - {Port: ports[4], HealthStatus: v3corepb.HealthStatus_TIMEOUT}, - {Port: ports[5], HealthStatus: v3corepb.HealthStatus_DEGRADED}, + {Ports: []uint32{ports[0]}, HealthStatus: v3corepb.HealthStatus_UNKNOWN}, + {Ports: []uint32{ports[1]}, HealthStatus: v3corepb.HealthStatus_HEALTHY}, + {Ports: []uint32{ports[2]}, HealthStatus: v3corepb.HealthStatus_UNHEALTHY}, + {Ports: []uint32{ports[3]}, HealthStatus: v3corepb.HealthStatus_DRAINING}, + {Ports: []uint32{ports[4]}, HealthStatus: v3corepb.HealthStatus_TIMEOUT}, + {Ports: []uint32{ports[5]}, HealthStatus: v3corepb.HealthStatus_DEGRADED}, }, }, { Name: localityName2, Weight: 1, Backends: []e2e.BackendOptions{ - {Port: ports[6], HealthStatus: v3corepb.HealthStatus_UNKNOWN}, - {Port: ports[7], HealthStatus: v3corepb.HealthStatus_HEALTHY}, - {Port: ports[8], HealthStatus: v3corepb.HealthStatus_UNHEALTHY}, - {Port: ports[9], HealthStatus: v3corepb.HealthStatus_DRAINING}, - {Port: ports[10], HealthStatus: v3corepb.HealthStatus_TIMEOUT}, - {Port: ports[11], HealthStatus: v3corepb.HealthStatus_DEGRADED}, + {Ports: []uint32{ports[6]}, HealthStatus: v3corepb.HealthStatus_UNKNOWN}, + {Ports: []uint32{ports[7]}, HealthStatus: v3corepb.HealthStatus_HEALTHY}, + {Ports: []uint32{ports[8]}, HealthStatus: v3corepb.HealthStatus_UNHEALTHY}, + {Ports: []uint32{ports[9]}, HealthStatus: v3corepb.HealthStatus_DRAINING}, + {Ports: []uint32{ports[10]}, HealthStatus: v3corepb.HealthStatus_TIMEOUT}, + {Ports: []uint32{ports[11]}, HealthStatus: v3corepb.HealthStatus_DEGRADED}, }, }, }) @@ -572,7 +572,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ Name: localityName1, Weight: 1, - Backends: []e2e.BackendOptions{{Port: ports[0]}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{ports[0]}}}, }}) if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -916,7 +916,7 @@ func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) { resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ Name: localityName1, Weight: 1, - Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, server.Address)}}}, }}) resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0} ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -989,7 +989,7 @@ func (s) TestEDS_BadUpdateWithPreviousGoodUpdate(t *testing.T) { resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{ Name: localityName1, Weight: 1, - Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}}, + Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, server.Address)}}}, }}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -1185,8 +1185,8 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { t.Logf("Started test service backend at addresses %q, %q, %q", lis1.Addr(), lis2.Addr(), lis3.Addr()) - port := testutils.ParsePort(t, lis1.Addr().String()) - additionalPorts := []uint32{ + ports := []uint32{ + testutils.ParsePort(t, lis1.Addr().String()), testutils.ParsePort(t, lis2.Addr().String()), testutils.ParsePort(t, lis3.Addr().String()), } @@ -1200,13 +1200,13 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { { name: "flag_enabled", dualstackEndpointsEnabled: true, - wantEndpointPorts: append([]uint32{port}, additionalPorts...), - wantAddrPorts: []uint32{port}, + wantEndpointPorts: ports, + wantAddrPorts: ports[:1], }, { name: "flag_disabled", - wantEndpointPorts: []uint32{port}, - wantAddrPorts: []uint32{port}, + wantEndpointPorts: ports[:1], + wantAddrPorts: ports[:1], }, } @@ -1250,8 +1250,8 @@ func (s) TestEDS_EndpointWithMultipleAddresses(t *testing.T) { Name: localityName1, Weight: 1, Backends: []e2e.BackendOptions{{ - Port: port, - AdditionalPorts: additionalPorts}}, + Ports: ports, + }}, }}) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go index 94efa99dcdf0..afc60924b3f8 100644 --- a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go +++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go @@ -182,7 +182,7 @@ func backendOptions(t *testing.T, serverAddrs []string) []e2e.BackendOptions { var backendOpts []e2e.BackendOptions for _, addr := range serverAddrs { - backendOpts = append(backendOpts, e2e.BackendOptions{Port: testutils.ParsePort(t, addr)}) + backendOpts = append(backendOpts, e2e.BackendOptions{Ports: []uint32{testutils.ParsePort(t, addr)}}) } return backendOpts } @@ -872,9 +872,9 @@ func (s) TestRingHash_EndpointWeights(t *testing.T) { const clusterName = "cluster" backendOpts := []e2e.BackendOptions{ - {Port: testutils.ParsePort(t, backends[0])}, - {Port: testutils.ParsePort(t, backends[1])}, - {Port: testutils.ParsePort(t, backends[2]), Weight: 2}, + {Ports: []uint32{testutils.ParsePort(t, backends[0])}}, + {Ports: []uint32{testutils.ParsePort(t, backends[1])}}, + {Ports: []uint32{testutils.ParsePort(t, backends[2])}, Weight: 2}, } endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ @@ -1209,14 +1209,14 @@ func (s) TestRingHash_RandomHashingDistributionAccordingToLocalityAndEndpointWei Localities: []e2e.LocalityOptions{ { Backends: []e2e.BackendOptions{{ - Port: testutils.ParsePort(t, backends[0]), + Ports: []uint32{testutils.ParsePort(t, backends[0])}, Weight: endpoint1Weight, }}, Weight: locality1Weight, }, { Backends: []e2e.BackendOptions{{ - Port: testutils.ParsePort(t, backends[1]), + Ports: []uint32{testutils.ParsePort(t, backends[1])}, Weight: endpoint2Weight, }}, Weight: locality2Weight, From 64c5642ae95f3cd69de020d59a102e0ba3efb1b2 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 16 Dec 2024 10:10:10 +0530 Subject: [PATCH 8/8] Fix comment typo --- xds/internal/balancer/clusterresolver/clusterresolver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index b052157d69c4..f0a8905d374b 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -255,7 +255,7 @@ func (b *clusterResolverBalancer) updateChildConfig() { addr.BalancerAttributes = endpoints[i].Attributes // If the endpoint has multiple addresses, only the first is added // to the flattened address list. This ensures that LB policies - // that doesn't support endpoints, create only one subchannel to a + // that don't support endpoints create only one subchannel to a // backend. if j == 0 { flattenedAddrs[i] = addr