Skip to content

Commit

Permalink
xds: Add support for multiple addresses per endpoint (#7858)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal authored Dec 16, 2024
1 parent 3f76275 commit cc161de
Show file tree
Hide file tree
Showing 22 changed files with 632 additions and 244 deletions.
7 changes: 7 additions & 0 deletions balancer/weightedroundrobin/weightedroundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
return addr
}

// SetAddrInfoInEndpoint returns a copy of endpoint in which the Attributes
// field is updated with addrInfo.
func SetAddrInfoInEndpoint(endpoint resolver.Endpoint, addrInfo AddrInfo) resolver.Endpoint {
endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, addrInfo)
return endpoint
}

// GetAddrInfo returns the AddrInfo stored in the BalancerAttributes field of
// addr.
func GetAddrInfo(addr resolver.Address) AddrInfo {
Expand Down
6 changes: 6 additions & 0 deletions internal/envconfig/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ var (

// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI")

// XDSDualstackEndpointsEnabled is true if gRPC should read the
// "additional addresses" in the xDS endpoint resource.
// TODO: https://github.com/grpc/grpc-go/issues/7866 - Control this using
// an env variable when all LB policies handle endpoints.
XDSDualstackEndpointsEnabled = false
)
21 changes: 17 additions & 4 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,9 +692,9 @@ type LocalityOptions struct {
// BackendOptions contains options to configure individual backends in a
// locality.
type BackendOptions struct {
// Port number on which the backend is accepting connections. All backends
// Ports on which the backend is accepting connections. All backends
// are expected to run on localhost, hence host name is not stored here.
Port uint32
Ports []uint32
// Health status of the backend. Default is UNKNOWN which is treated the
// same as HEALTHY.
HealthStatus v3corepb.HealthStatus
Expand Down Expand Up @@ -722,7 +722,7 @@ type EndpointOptions struct {
func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment {
var bOpts []BackendOptions
for _, p := range ports {
bOpts = append(bOpts, BackendOptions{Port: p, Weight: 1})
bOpts = append(bOpts, BackendOptions{Ports: []uint32{p}, Weight: 1})
}
return EndpointResourceWithOptions(EndpointOptions{
ClusterName: clusterName,
Expand All @@ -747,15 +747,28 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
if b.Weight == 0 {
b.Weight = 1
}
additionalAddresses := make([]*v3endpointpb.Endpoint_AdditionalAddress, len(b.Ports)-1)
for i, p := range b.Ports[1:] {
additionalAddresses[i] = &v3endpointpb.Endpoint_AdditionalAddress{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: p},
}},
},
}
}
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Port},
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Ports[0]},
},
}},
AdditionalAddresses: additionalAddresses,
}},
HealthStatus: b.HealthStatus,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: b.Weight},
Expand Down
4 changes: 2 additions & 2 deletions stats/opentelemetry/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (s) TestWRRMetrics(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}},
Backends: []e2e.BackendOptions{{Ports: []uint32{port1}}, {Ports: []uint32{port2}}},
Weight: 1,
},
},
Expand Down Expand Up @@ -473,7 +473,7 @@ func (s) TestWRRMetrics(t *testing.T) {
// scheduler.
receivedExpectedMetrics := grpcsync.NewEvent()
go func() {
for !receivedExpectedMetrics.HasFired() {
for !receivedExpectedMetrics.HasFired() && ctx.Err() == nil {
client.EmptyCall(ctx, &testpb.Empty{})
time.Sleep(2 * time.Millisecond)
}
Expand Down
4 changes: 2 additions & 2 deletions test/xds/xds_client_custom_lb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ func (s) TestWrrLocality(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}},
Backends: []e2e.BackendOptions{{Ports: []uint32{port1}}, {Ports: []uint32{port2}}},
Weight: 1,
},
{
Backends: []e2e.BackendOptions{{Port: port3}, {Port: port4}, {Port: port5}},
Backends: []e2e.BackendOptions{{Ports: []uint32{port3}}, {Ports: []uint32{port4}}, {Ports: []uint32{port5}}},
Weight: 2,
},
},
Expand Down
14 changes: 7 additions & 7 deletions test/xds/xds_client_priority_locality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) {
Name: "my-locality-1",
Weight: 1000000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
Backends: []e2e.BackendOptions{{Ports: []uint32{ports[0]}}},
Locality: locality1,
},
{
Name: "my-locality-2",
Weight: 1000000,
Priority: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
Backends: []e2e.BackendOptions{{Ports: []uint32{ports[1]}}},
Locality: locality2,
},
},
Expand Down Expand Up @@ -138,14 +138,14 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) {
Name: "my-locality-1",
Weight: 500000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend0.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend0.Address)}}},
Locality: locality1,
},
{
Name: "my-locality-2",
Weight: 500000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend1.Address)}}},
Locality: locality2,
},
},
Expand All @@ -167,14 +167,14 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) {
Name: "my-locality-1",
Weight: 499884,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend0.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend0.Address)}}},
Locality: locality1,
},
{
Name: "my-locality-2",
Weight: 500115,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend1.Address)}}},
Locality: locality2,
},
},
Expand All @@ -197,7 +197,7 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) {
Name: "my-locality-2",
Weight: 1000000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend1.Address)}}},
Locality: locality2,
},
},
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, server.Address)}}},
Weight: 1,
},
},
Expand Down Expand Up @@ -252,13 +252,13 @@ func (s) TestLoadReportingPickFirstMultiLocality(t *testing.T) {
Localities: []e2e.LocalityOptions{
{
Backends: []e2e.BackendOptions{
{Port: testutils.ParsePort(t, server1.Address)},
{Ports: []uint32{testutils.ParsePort(t, server1.Address)}},
},
Weight: 1,
},
{
Backends: []e2e.BackendOptions{
{Port: testutils.ParsePort(t, server2.Address)},
{Ports: []uint32{testutils.ParsePort(t, server2.Address)}},
},
Weight: 2,
},
Expand Down
30 changes: 24 additions & 6 deletions xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (b *clusterResolverBalancer) updateChildConfig() {
b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts)
}

childCfgBytes, addrs, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy)
childCfgBytes, endpoints, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy)
if err != nil {
b.logger.Warningf("Failed to build child policy config: %v", err)
return
Expand All @@ -248,15 +248,33 @@ func (b *clusterResolverBalancer) updateChildConfig() {
b.logger.Infof("Built child policy config: %s", pretty.ToJSON(childCfg))
}

endpoints := make([]resolver.Endpoint, len(addrs))
for i, a := range addrs {
endpoints[i].Attributes = a.BalancerAttributes
endpoints[i].Addresses = []resolver.Address{a}
flattenedAddrs := make([]resolver.Address, len(endpoints))
for i := range endpoints {
for j := range endpoints[i].Addresses {
addr := endpoints[i].Addresses[j]
addr.BalancerAttributes = endpoints[i].Attributes
// If the endpoint has multiple addresses, only the first is added
// to the flattened address list. This ensures that LB policies
// that don't support endpoints create only one subchannel to a
// backend.
if j == 0 {
flattenedAddrs[i] = addr
}
// BalancerAttributes need to be present in endpoint addresses. This
// temporary workaround is required to make load reporting work
// with the old pickfirst policy which creates SubConns with multiple
// addresses. Since the addresses can be from different localities,
// an Address.BalancerAttribute is used to identify the locality of the
// address used by the transport. This workaround can be removed once
// the old pickfirst is removed.
// See https://github.com/grpc/grpc-go/issues/7339
endpoints[i].Addresses[j] = addr
}
}
if err := b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: endpoints,
Addresses: addrs,
Addresses: flattenedAddrs,
ServiceConfig: b.configRaw,
Attributes: b.attrsWithClient,
},
Expand Down
Loading

0 comments on commit cc161de

Please sign in to comment.