Skip to content

Commit

Permalink
Make locality aware routing xDS changes (#17826)
Browse files Browse the repository at this point in the history
  • Loading branch information
erichaberkorn authored Jun 21, 2023
1 parent 500dcb1 commit a3ba559
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 25 deletions.
4 changes: 4 additions & 0 deletions agent/proxycfg/proxycfg.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
// DeepCopy generates a deep copy of *ConfigSnapshot
func (o *ConfigSnapshot) DeepCopy() *ConfigSnapshot {
var cp ConfigSnapshot = *o
if o.ServiceLocality != nil {
cp.ServiceLocality = new(structs.Locality)
*cp.ServiceLocality = *o.ServiceLocality
}
if o.ServiceMeta != nil {
cp.ServiceMeta = make(map[string]string, len(o.ServiceMeta))
for k2, v2 := range o.ServiceMeta {
Expand Down
1 change: 1 addition & 0 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,7 @@ func IngressListenerKeyFromListener(l structs.IngressListener) IngressListenerKe
type ConfigSnapshot struct {
Kind structs.ServiceKind
Service string
ServiceLocality *structs.Locality
ProxyID ProxyID
Address string
Port int
Expand Down
3 changes: 3 additions & 0 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type serviceInstance struct {
taggedAddresses map[string]structs.ServiceAddress
proxyCfg structs.ConnectProxyConfig
token string
locality *structs.Locality
}

func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error) {
Expand Down Expand Up @@ -244,6 +245,7 @@ func newServiceInstanceFromNodeService(id ProxyID, ns *structs.NodeService, toke
return serviceInstance{
kind: ns.Kind,
service: ns.Service,
locality: ns.Locality,
proxyID: id,
address: ns.Address,
port: ns.Port,
Expand Down Expand Up @@ -303,6 +305,7 @@ func newConfigSnapshotFromServiceInstance(s serviceInstance, config stateConfig)
return ConfigSnapshot{
Kind: s.kind,
Service: s.service,
ServiceLocality: s.locality,
ProxyID: s.proxyID,
Address: s.address,
Port: s.port,
Expand Down
12 changes: 12 additions & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,18 @@ func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecisi
return acl.Allow
}

func (csn *CheckServiceNode) Locality() *Locality {
if csn.Service != nil && csn.Service.Locality != nil {
return csn.Service.Locality
}

if csn.Node != nil && csn.Node.Locality != nil {
return csn.Node.Locality
}

return nil
}

type CheckServiceNodes []CheckServiceNode

func (csns CheckServiceNodes) DeepCopy() CheckServiceNodes {
Expand Down
77 changes: 53 additions & 24 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid]
if ok {
la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand All @@ -158,7 +160,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
endpoints, ok := cfgSnap.ConnectProxy.DestinationGateways.Get(uid)
if ok {
la := makeLoadAssignment(
cfgSnap,
name,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand Down Expand Up @@ -224,7 +228,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
clusterName := connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain)

la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand All @@ -239,7 +245,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C

clusterName := cfgSnap.ServerSNIFn(key.Datacenter, "")
la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand Down Expand Up @@ -409,7 +417,9 @@ func (s *ResourceGenerator) endpointsFromServicesAndResolvers(
for subsetName, groups := range clusterEndpoints {
clusterName := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
groups,
cfgSnap.Locality,
)
Expand Down Expand Up @@ -444,7 +454,9 @@ func (s *ResourceGenerator) makeEndpointsForOutgoingPeeredServices(
groups := []loadAssignmentEndpointGroup{{Endpoints: serviceGroup.Nodes, OnlyPassing: false}}

la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
groups,
// Use an empty key here so that it never matches. This will force the mesh gateway to always
// reference the remote mesh gateway's wan addr.
Expand Down Expand Up @@ -606,7 +618,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
return la, nil
}
la = makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: localGw},
},
Expand All @@ -626,7 +640,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
return nil, nil
}
la = makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand Down Expand Up @@ -756,7 +772,9 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
}

la := makeLoadAssignment(
cfgSnap,
clusterName,
ti.PrioritizeByLocality,
[]loadAssignmentEndpointGroup{endpointGroup},
gatewayKey,
)
Expand Down Expand Up @@ -842,7 +860,7 @@ type loadAssignmentEndpointGroup struct {
OverrideHealth envoy_core_v3.HealthStatus
}

func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpointGroup, localKey proxycfg.GatewayKey) *envoy_endpoint_v3.ClusterLoadAssignment {
func makeLoadAssignment(cfgSnap *proxycfg.ConfigSnapshot, clusterName string, policy *structs.DiscoveryPrioritizeByLocality, endpointGroups []loadAssignmentEndpointGroup, localKey proxycfg.GatewayKey) *envoy_endpoint_v3.ClusterLoadAssignment {
cla := &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: make([]*envoy_endpoint_v3.LocalityLbEndpoints, 0, len(endpointGroups)),
Expand All @@ -856,35 +874,46 @@ func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpo
}
}

for priority, endpointGroup := range endpointGroups {
endpoints := endpointGroup.Endpoints
es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpoints))
var priority uint32

for _, ep := range endpoints {
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)
for _, endpointGroup := range endpointGroups {
endpointsByLocality, err := groupedEndpoints(cfgSnap.ServiceLocality, policy, endpointGroup.Endpoints)

if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN {
healthStatus = endpointGroup.OverrideHealth
}
if err != nil {
continue
}

endpoint := &envoy_endpoint_v3.Endpoint{
Address: makeAddress(addr, port),
for _, endpoints := range endpointsByLocality {
es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpointGroup.Endpoints))

for _, ep := range endpoints {
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)

if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN {
healthStatus = endpointGroup.OverrideHealth
}

endpoint := &envoy_endpoint_v3.Endpoint{
Address: makeAddress(addr, port),
}
es = append(es, &envoy_endpoint_v3.LbEndpoint{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: endpoint,
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),
})
}
es = append(es, &envoy_endpoint_v3.LbEndpoint{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: endpoint,
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),

cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{
Priority: priority,
LbEndpoints: es,
})
}

cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{
Priority: uint32(priority),
LbEndpoints: es,
})
priority++
}
}

return cla
Expand Down
14 changes: 14 additions & 0 deletions agent/xds/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func Test_makeLoadAssignment(t *testing.T) {
tests := []struct {
name string
clusterName string
locality *structs.Locality
endpoints []loadAssignmentEndpointGroup
want *envoy_endpoint_v3.ClusterLoadAssignment
}{
Expand Down Expand Up @@ -211,11 +212,24 @@ func Test_makeLoadAssignment(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(
&proxycfg.ConfigSnapshot{ServiceLocality: tt.locality},
tt.clusterName,
nil,
tt.endpoints,
proxycfg.GatewayKey{Datacenter: "dc1"},
)
require.Equal(t, tt.want, got)

if tt.locality == nil {
got := makeLoadAssignment(
&proxycfg.ConfigSnapshot{ServiceLocality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"}},
tt.clusterName,
nil,
tt.endpoints,
proxycfg.GatewayKey{Datacenter: "dc1"},
)
require.Equal(t, tt.want, got)
}
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion agent/xds/failover_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type targetInfo struct {
// Region is the region from the failover target's Locality. nil means the
// target is in the local Consul cluster.
Region *string

PrioritizeByLocality *structs.DiscoveryPrioritizeByLocality
}

type discoChainTargetGroup struct {
Expand Down Expand Up @@ -87,7 +89,7 @@ func (s *ResourceGenerator) mapDiscoChainTargets(cfgSnap *proxycfg.ConfigSnapsho
var sni, rootPEMs string
var spiffeIDs []string
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
ti := targetInfo{TargetID: tid}
ti := targetInfo{TargetID: tid, PrioritizeByLocality: target.PrioritizeByLocality}

configureTLS := true
if forMeshGateway {
Expand Down
21 changes: 21 additions & 0 deletions agent/xds/locality_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package xds

import (
"fmt"

"github.com/hashicorp/consul/agent/structs"
)

func groupedEndpoints(locality *structs.Locality, policy *structs.DiscoveryPrioritizeByLocality, csns structs.CheckServiceNodes) ([]structs.CheckServiceNodes, error) {
switch {
case policy == nil || policy.Mode == "" || policy.Mode == "none":
return []structs.CheckServiceNodes{csns}, nil
case policy.Mode == "failover":
return prioritizeByLocalityFailover(locality, csns), nil
default:
return nil, fmt.Errorf("unexpected priortize-by-locality mode %q", policy.Mode)
}
}
15 changes: 15 additions & 0 deletions agent/xds/locality_policy_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

//go:build !consulent
// +build !consulent

package xds

import (
"github.com/hashicorp/consul/agent/structs"
)

func prioritizeByLocalityFailover(locality *structs.Locality, csns structs.CheckServiceNodes) []structs.CheckServiceNodes {
return nil
}
14 changes: 14 additions & 0 deletions test/integration/consul-container/libs/assert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func CatalogServiceExists(t *testing.T, c *api.Client, svc string, opts *api.Que
})
}

// CatalogServiceHasInstanceCount verifies the service name exists in the Consul catalog and has the specified
// number of instances.
func CatalogServiceHasInstanceCount(t *testing.T, c *api.Client, svc string, count int, opts *api.QueryOptions) {
retry.Run(t, func(r *retry.R) {
services, _, err := c.Catalog().Service(svc, "", opts)
if err != nil {
r.Fatal("error reading service data")
}
if len(services) != count {
r.Fatalf("did not find %d catalog entries for %s", count, svc)
}
})
}

// CatalogServiceExists verifies the node name exists in the Consul catalog
func CatalogNodeExists(t *testing.T, c *api.Client, nodeName string) {
retry.Run(t, func(r *retry.R) {
Expand Down
2 changes: 2 additions & 0 deletions test/integration/consul-container/libs/service/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ServiceOpts struct {
Checks Checks
Connect SidecarService
Namespace string
Locality *api.Locality
}

// createAndRegisterStaticServerAndSidecar register the services and launch static-server containers
Expand Down Expand Up @@ -119,6 +120,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts
Namespace: serviceOpts.Namespace,
Meta: serviceOpts.Meta,
Check: &agentCheck,
Locality: serviceOpts.Locality,
}
return createAndRegisterStaticServerAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, containerArgs...)
}
Expand Down

0 comments on commit a3ba559

Please sign in to comment.