Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locality routing xDS #17826

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agent/proxycfg/proxycfg.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
// DeepCopy generates a deep copy of *ConfigSnapshot
func (o *ConfigSnapshot) DeepCopy() *ConfigSnapshot {
var cp ConfigSnapshot = *o
if o.ServiceLocality != nil {
cp.ServiceLocality = new(structs.Locality)
*cp.ServiceLocality = *o.ServiceLocality
}
if o.ServiceMeta != nil {
cp.ServiceMeta = make(map[string]string, len(o.ServiceMeta))
for k2, v2 := range o.ServiceMeta {
Expand Down
1 change: 1 addition & 0 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,7 @@ func IngressListenerKeyFromListener(l structs.IngressListener) IngressListenerKe
type ConfigSnapshot struct {
Kind structs.ServiceKind
Service string
ServiceLocality *structs.Locality
ProxyID ProxyID
Address string
Port int
Expand Down
3 changes: 3 additions & 0 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type serviceInstance struct {
taggedAddresses map[string]structs.ServiceAddress
proxyCfg structs.ConnectProxyConfig
token string
locality *structs.Locality
}

func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error) {
Expand Down Expand Up @@ -244,6 +245,7 @@ func newServiceInstanceFromNodeService(id ProxyID, ns *structs.NodeService, toke
return serviceInstance{
kind: ns.Kind,
service: ns.Service,
locality: ns.Locality,
proxyID: id,
address: ns.Address,
port: ns.Port,
Expand Down Expand Up @@ -303,6 +305,7 @@ func newConfigSnapshotFromServiceInstance(s serviceInstance, config stateConfig)
return ConfigSnapshot{
Kind: s.kind,
Service: s.service,
ServiceLocality: s.locality,
ProxyID: s.proxyID,
Address: s.address,
Port: s.port,
Expand Down
12 changes: 12 additions & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,18 @@ func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecisi
return acl.Allow
}

func (csn *CheckServiceNode) Locality() *Locality {
if csn.Service != nil && csn.Service.Locality != nil {
return csn.Service.Locality
}

if csn.Node != nil && csn.Node.Locality != nil {
return csn.Node.Locality
}

return nil
}

type CheckServiceNodes []CheckServiceNode

func (csns CheckServiceNodes) DeepCopy() CheckServiceNodes {
Expand Down
77 changes: 53 additions & 24 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid]
if ok {
la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand All @@ -158,7 +160,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
endpoints, ok := cfgSnap.ConnectProxy.DestinationGateways.Get(uid)
if ok {
la := makeLoadAssignment(
cfgSnap,
name,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand Down Expand Up @@ -224,7 +228,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
clusterName := connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain)

la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand All @@ -239,7 +245,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C

clusterName := cfgSnap.ServerSNIFn(key.Datacenter, "")
la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand Down Expand Up @@ -409,7 +417,9 @@ func (s *ResourceGenerator) endpointsFromServicesAndResolvers(
for subsetName, groups := range clusterEndpoints {
clusterName := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
groups,
cfgSnap.Locality,
)
Expand Down Expand Up @@ -444,7 +454,9 @@ func (s *ResourceGenerator) makeEndpointsForOutgoingPeeredServices(
groups := []loadAssignmentEndpointGroup{{Endpoints: serviceGroup.Nodes, OnlyPassing: false}}

la := makeLoadAssignment(
cfgSnap,
clusterName,
nil,
groups,
// Use an empty key here so that it never matches. This will force the mesh gateway to always
// reference the remote mesh gateway's wan addr.
Expand Down Expand Up @@ -606,7 +618,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
return la, nil
}
la = makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: localGw},
},
Expand All @@ -626,7 +640,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
return nil, nil
}
la = makeLoadAssignment(
cfgSnap,
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
Expand Down Expand Up @@ -756,7 +772,9 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
}

la := makeLoadAssignment(
cfgSnap,
clusterName,
ti.PrioritizeByLocality,
[]loadAssignmentEndpointGroup{endpointGroup},
gatewayKey,
)
Expand Down Expand Up @@ -842,7 +860,7 @@ type loadAssignmentEndpointGroup struct {
OverrideHealth envoy_core_v3.HealthStatus
}

func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpointGroup, localKey proxycfg.GatewayKey) *envoy_endpoint_v3.ClusterLoadAssignment {
func makeLoadAssignment(cfgSnap *proxycfg.ConfigSnapshot, clusterName string, policy *structs.DiscoveryPrioritizeByLocality, endpointGroups []loadAssignmentEndpointGroup, localKey proxycfg.GatewayKey) *envoy_endpoint_v3.ClusterLoadAssignment {
cla := &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: make([]*envoy_endpoint_v3.LocalityLbEndpoints, 0, len(endpointGroups)),
Expand All @@ -856,35 +874,46 @@ func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpo
}
}

for priority, endpointGroup := range endpointGroups {
endpoints := endpointGroup.Endpoints
es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpoints))
var priority uint32

for _, ep := range endpoints {
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)
for _, endpointGroup := range endpointGroups {
endpointsByLocality, err := groupedEndpoints(cfgSnap.ServiceLocality, policy, endpointGroup.Endpoints)

if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN {
healthStatus = endpointGroup.OverrideHealth
}
if err != nil {
continue
}

endpoint := &envoy_endpoint_v3.Endpoint{
Address: makeAddress(addr, port),
for _, endpoints := range endpointsByLocality {
es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpointGroup.Endpoints))

for _, ep := range endpoints {
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)

if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN {
healthStatus = endpointGroup.OverrideHealth
}

endpoint := &envoy_endpoint_v3.Endpoint{
Address: makeAddress(addr, port),
}
es = append(es, &envoy_endpoint_v3.LbEndpoint{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: endpoint,
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),
})
}
es = append(es, &envoy_endpoint_v3.LbEndpoint{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: endpoint,
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),

cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{
Priority: priority,
LbEndpoints: es,
})
}

cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{
Priority: uint32(priority),
LbEndpoints: es,
})
priority++
}
}

return cla
Expand Down
14 changes: 14 additions & 0 deletions agent/xds/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func Test_makeLoadAssignment(t *testing.T) {
tests := []struct {
name string
clusterName string
locality *structs.Locality
endpoints []loadAssignmentEndpointGroup
want *envoy_endpoint_v3.ClusterLoadAssignment
}{
Expand Down Expand Up @@ -211,11 +212,24 @@ func Test_makeLoadAssignment(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(
&proxycfg.ConfigSnapshot{ServiceLocality: tt.locality},
tt.clusterName,
nil,
tt.endpoints,
proxycfg.GatewayKey{Datacenter: "dc1"},
)
require.Equal(t, tt.want, got)

if tt.locality == nil {
got := makeLoadAssignment(
&proxycfg.ConfigSnapshot{ServiceLocality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"}},
tt.clusterName,
nil,
tt.endpoints,
proxycfg.GatewayKey{Datacenter: "dc1"},
)
require.Equal(t, tt.want, got)
}
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion agent/xds/failover_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type targetInfo struct {
// Region is the region from the failover target's Locality. nil means the
// target is in the local Consul cluster.
Region *string

PrioritizeByLocality *structs.DiscoveryPrioritizeByLocality
}

type discoChainTargetGroup struct {
Expand Down Expand Up @@ -87,7 +89,7 @@ func (s *ResourceGenerator) mapDiscoChainTargets(cfgSnap *proxycfg.ConfigSnapsho
var sni, rootPEMs string
var spiffeIDs []string
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
ti := targetInfo{TargetID: tid}
ti := targetInfo{TargetID: tid, PrioritizeByLocality: target.PrioritizeByLocality}

configureTLS := true
if forMeshGateway {
Expand Down
21 changes: 21 additions & 0 deletions agent/xds/locality_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package xds

import (
"fmt"

"github.com/hashicorp/consul/agent/structs"
)

func groupedEndpoints(locality *structs.Locality, policy *structs.DiscoveryPrioritizeByLocality, csns structs.CheckServiceNodes) ([]structs.CheckServiceNodes, error) {
switch {
case policy == nil || policy.Mode == "" || policy.Mode == "none":
return []structs.CheckServiceNodes{csns}, nil
case policy.Mode == "failover":
return prioritizeByLocalityFailover(locality, csns), nil
default:
return nil, fmt.Errorf("unexpected priortize-by-locality mode %q", policy.Mode)
}
}
15 changes: 15 additions & 0 deletions agent/xds/locality_policy_oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

//go:build !consulent
// +build !consulent

package xds

import (
"github.com/hashicorp/consul/agent/structs"
)

func prioritizeByLocalityFailover(locality *structs.Locality, csns structs.CheckServiceNodes) []structs.CheckServiceNodes {
return nil
}
14 changes: 14 additions & 0 deletions test/integration/consul-container/libs/assert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func CatalogServiceExists(t *testing.T, c *api.Client, svc string, opts *api.Que
})
}

// CatalogServiceHasInstanceCount verifies the service name exists in the Consul catalog and has the specified
// number of instances.
func CatalogServiceHasInstanceCount(t *testing.T, c *api.Client, svc string, count int, opts *api.QueryOptions) {
retry.Run(t, func(r *retry.R) {
services, _, err := c.Catalog().Service(svc, "", opts)
if err != nil {
r.Fatal("error reading service data")
}
if len(services) != count {
r.Fatalf("did not find %d catalog entries for %s", count, svc)
}
})
}

// CatalogServiceExists verifies the node name exists in the Consul catalog
func CatalogNodeExists(t *testing.T, c *api.Client, nodeName string) {
retry.Run(t, func(r *retry.R) {
Expand Down
2 changes: 2 additions & 0 deletions test/integration/consul-container/libs/service/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ServiceOpts struct {
Checks Checks
Connect SidecarService
Namespace string
Locality *api.Locality
}

// createAndRegisterStaticServerAndSidecar register the services and launch static-server containers
Expand Down Expand Up @@ -119,6 +120,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts
Namespace: serviceOpts.Namespace,
Meta: serviceOpts.Meta,
Check: &agentCheck,
Locality: serviceOpts.Locality,
}
return createAndRegisterStaticServerAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, containerArgs...)
}
Expand Down