Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Add support for multiple addresses per endpoint #7858

Merged
merged 8 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions balancer/weightedroundrobin/weightedroundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
return addr
}

// SetAddrInfoInEndpoint returns a copy of endpoint in which the Attributes
// field is updated with addrInfo.
func SetAddrInfoInEndpoint(endpoint resolver.Endpoint, addrInfo AddrInfo) resolver.Endpoint {
endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, addrInfo)
easwars marked this conversation as resolved.
Show resolved Hide resolved
return endpoint
}

// GetAddrInfo returns the AddrInfo stored in the BalancerAttributes field of
// addr.
func GetAddrInfo(addr resolver.Address) AddrInfo {
Expand Down
6 changes: 6 additions & 0 deletions internal/envconfig/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ var (

// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI")

// XDSDualstackEndpointsEnabled is true if gRPC should read the
// "additional addresses" in the xDS endpoint resource.
// TODO: https://github.com/grpc/grpc-go/issues/7866 - Control this using
// an env variable when all LB policies handle endpoints.
XDSDualstackEndpointsEnabled = false
)
21 changes: 17 additions & 4 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,9 +692,9 @@ type LocalityOptions struct {
// BackendOptions contains options to configure individual backends in a
// locality.
type BackendOptions struct {
// Port number on which the backend is accepting connections. All backends
// Ports on which the backend is accepting connections. All backends
// are expected to run on localhost, hence host name is not stored here.
Port uint32
Ports []uint32
// Health status of the backend. Default is UNKNOWN which is treated the
// same as HEALTHY.
HealthStatus v3corepb.HealthStatus
Expand Down Expand Up @@ -722,7 +722,7 @@ type EndpointOptions struct {
func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment {
var bOpts []BackendOptions
for _, p := range ports {
bOpts = append(bOpts, BackendOptions{Port: p, Weight: 1})
bOpts = append(bOpts, BackendOptions{Ports: []uint32{p}, Weight: 1})
}
return EndpointResourceWithOptions(EndpointOptions{
ClusterName: clusterName,
Expand All @@ -747,15 +747,28 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
if b.Weight == 0 {
b.Weight = 1
}
additionalAddresses := make([]*v3endpointpb.Endpoint_AdditionalAddress, len(b.Ports)-1)
for i, p := range b.Ports[1:] {
additionalAddresses[i] = &v3endpointpb.Endpoint_AdditionalAddress{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: p},
}},
},
}
}
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Port},
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Ports[0]},
},
}},
AdditionalAddresses: additionalAddresses,
}},
HealthStatus: b.HealthStatus,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: b.Weight},
Expand Down
4 changes: 2 additions & 2 deletions stats/opentelemetry/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (s) TestWRRMetrics(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}},
Backends: []e2e.BackendOptions{{Ports: []uint32{port1}}, {Ports: []uint32{port2}}},
Weight: 1,
},
},
Expand Down Expand Up @@ -473,7 +473,7 @@ func (s) TestWRRMetrics(t *testing.T) {
// scheduler.
receivedExpectedMetrics := grpcsync.NewEvent()
go func() {
for !receivedExpectedMetrics.HasFired() {
for !receivedExpectedMetrics.HasFired() && ctx.Err() == nil {
client.EmptyCall(ctx, &testpb.Empty{})
time.Sleep(2 * time.Millisecond)
}
Expand Down
4 changes: 2 additions & 2 deletions test/xds/xds_client_custom_lb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ func (s) TestWrrLocality(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}},
Backends: []e2e.BackendOptions{{Ports: []uint32{port1}}, {Ports: []uint32{port2}}},
Weight: 1,
},
{
Backends: []e2e.BackendOptions{{Port: port3}, {Port: port4}, {Port: port5}},
Backends: []e2e.BackendOptions{{Ports: []uint32{port3}}, {Ports: []uint32{port4}}, {Ports: []uint32{port5}}},
Weight: 2,
},
},
Expand Down
14 changes: 7 additions & 7 deletions test/xds/xds_client_priority_locality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) {
Name: "my-locality-1",
Weight: 1000000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
Backends: []e2e.BackendOptions{{Ports: []uint32{ports[0]}}},
Locality: locality1,
},
{
Name: "my-locality-2",
Weight: 1000000,
Priority: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
Backends: []e2e.BackendOptions{{Ports: []uint32{ports[1]}}},
Locality: locality2,
},
},
Expand Down Expand Up @@ -138,14 +138,14 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) {
Name: "my-locality-1",
Weight: 500000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend0.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend0.Address)}}},
Locality: locality1,
},
{
Name: "my-locality-2",
Weight: 500000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend1.Address)}}},
Locality: locality2,
},
},
Expand All @@ -167,14 +167,14 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) {
Name: "my-locality-1",
Weight: 499884,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend0.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend0.Address)}}},
Locality: locality1,
},
{
Name: "my-locality-2",
Weight: 500115,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend1.Address)}}},
Locality: locality2,
},
},
Expand All @@ -197,7 +197,7 @@ func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) {
Name: "my-locality-2",
Weight: 1000000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, backend1.Address)}}},
Locality: locality2,
},
},
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
Backends: []e2e.BackendOptions{{Ports: []uint32{testutils.ParsePort(t, server.Address)}}},
Weight: 1,
},
},
Expand Down Expand Up @@ -252,13 +252,13 @@ func (s) TestLoadReportingPickFirstMultiLocality(t *testing.T) {
Localities: []e2e.LocalityOptions{
{
Backends: []e2e.BackendOptions{
{Port: testutils.ParsePort(t, server1.Address)},
{Ports: []uint32{testutils.ParsePort(t, server1.Address)}},
},
Weight: 1,
},
{
Backends: []e2e.BackendOptions{
{Port: testutils.ParsePort(t, server2.Address)},
{Ports: []uint32{testutils.ParsePort(t, server2.Address)}},
},
Weight: 2,
},
Expand Down
30 changes: 24 additions & 6 deletions xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (b *clusterResolverBalancer) updateChildConfig() {
b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts)
}

childCfgBytes, addrs, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy)
childCfgBytes, endpoints, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy)
if err != nil {
b.logger.Warningf("Failed to build child policy config: %v", err)
return
Expand All @@ -248,15 +248,33 @@ func (b *clusterResolverBalancer) updateChildConfig() {
b.logger.Infof("Built child policy config: %s", pretty.ToJSON(childCfg))
}

endpoints := make([]resolver.Endpoint, len(addrs))
for i, a := range addrs {
endpoints[i].Attributes = a.BalancerAttributes
endpoints[i].Addresses = []resolver.Address{a}
flattenedAddrs := make([]resolver.Address, len(endpoints))
for i := range endpoints {
for j := range endpoints[i].Addresses {
easwars marked this conversation as resolved.
Show resolved Hide resolved
addr := endpoints[i].Addresses[j]
addr.BalancerAttributes = endpoints[i].Attributes
// If the endpoint has multiple addresses, only the first is added
// to the flattened address list. This ensures that LB policies
// that doesn't support endpoints, create only one subchannel to a
// backend.
if j == 0 {
flattenedAddrs[i] = addr
}
// BalancerAttributes need to be present in endpoint addresses. This
// temporary workaround is required to make load reporting work
// with the old pickfirst policy which creates SubConns with multiple
// addresses. Since the addresses can be from different localities,
// an Address.BalancerAttribute is used to identify the locality of the
// address used by the transport. This workaround can be removed once
// the old pickfirst is removed.
// See https://github.com/grpc/grpc-go/issues/7339
endpoints[i].Addresses[j] = addr
}
}
if err := b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: endpoints,
Addresses: addrs,
Addresses: flattenedAddrs,
ServiceConfig: b.configRaw,
Attributes: b.attrsWithClient,
},
Expand Down
Loading
Loading