Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connect: rework how the service resolver subset OnlyPassing flag works #6173

Merged
merged 1 commit into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,6 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err)
}

// TODO(rb): do we have to do onlypassing filters here?

m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc]
if !ok {
m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes)
Expand Down Expand Up @@ -608,16 +606,8 @@ func (s *state) resetWatchesFromChain(
}
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target)

// snap.WatchedUpstreams[name]

// delete(snap.WatchedUpstreams[name], target)
// delete(snap.WatchedUpstreamEndpoint[name], target)

// TODO(rb): augment the health rpc so we can get the health information to pass to envoy directly

// TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters

// TODO(rb): handle subset.onlypassing
var subset structs.ServiceResolverSubset
if target.ServiceSubset != "" {
var ok bool
Expand Down Expand Up @@ -649,24 +639,12 @@ func (s *state) resetWatchesFromChain(
meshGateway = structs.MeshGatewayModeNone
}

filterExp := subset.Filter
if subset.OnlyPassing {
if filterExp != "" {
// TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }"
// once the syntax is supported
filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp)
} else {
filterExp = "not Checks.Status != passing"
}
}

// TODO(rb): update the health endpoint to allow returning even unhealthy endpoints
err = s.watchConnectProxyService(
ctx,
"upstream-target:"+string(encodedTarget)+":"+id,
target.Service,
target.Datacenter,
filterExp,
subset.Filter,
meshGateway,
)
if err != nil {
Expand Down
18 changes: 17 additions & 1 deletion agent/structs/discovery_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type CompiledDiscoveryChain struct {
// GroupResolverNodes respresents all unique service instance groups that
// need to be represented. For envoy these render as Clusters.
//
// Omitted from JSON because DiscoveryTarget is not a encoding.TextMarshaler.
// Omitted from JSON because these already show up under the Node field.
GroupResolverNodes map[DiscoveryTarget]*DiscoveryGraphNode `json:"-"`

// TODO(rb): not sure if these two fields are actually necessary but I'll know when I get into xDS
Expand All @@ -54,6 +54,22 @@ func (c *CompiledDiscoveryChain) IsDefault() bool {
c.Node.GroupResolver.Default
}

// SubsetDefinitionForTarget is a convenience function to fetch the subset
// definition for the service subset defined by the provided target. If the
// subset is not defined an empty definition is returned.
func (c *CompiledDiscoveryChain) SubsetDefinitionForTarget(t DiscoveryTarget) ServiceResolverSubset {
if t.ServiceSubset == "" {
return ServiceResolverSubset{}
}

resolver, ok := c.Resolvers[t.Service]
if !ok {
return ServiceResolverSubset{}
}

return resolver.Subsets[t.ServiceSubset]
}

const (
DiscoveryGraphNodeTypeRouter = "router"
DiscoveryGraphNodeTypeSplitter = "splitter"
Expand Down
82 changes: 45 additions & 37 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
la := makeLoadAssignment(
sni,
0,
[]structs.CheckServiceNodes{endpoints},
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
resources = append(resources, la)
Expand All @@ -68,7 +70,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps

chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
if !ok {
continue // TODO(rb): whaaaa?
continue // skip the upstream (should not happen)
}

for target, node := range chain.GroupResolverNodes {
Expand All @@ -77,18 +79,23 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps

endpoints, ok := chainEndpointMap[target]
if !ok {
continue // TODO(rb): whaaaa?
continue // skip the cluster (should not happen)
}

var (
priorityEndpoints []structs.CheckServiceNodes
endpointGroups []loadAssignmentEndpointGroup
overprovisioningFactor int
)

primaryGroup := loadAssignmentEndpointGroup{
Endpoints: endpoints,
OnlyPassing: chain.SubsetDefinitionForTarget(target).OnlyPassing,
}

if failover != nil && len(failover.Targets) > 0 {
priorityEndpoints = make([]structs.CheckServiceNodes, 0, len(failover.Targets)+1)
endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1)

priorityEndpoints = append(priorityEndpoints, endpoints)
endpointGroups = append(endpointGroups, primaryGroup)

if failover.Definition.OverprovisioningFactor > 0 {
overprovisioningFactor = failover.Definition.OverprovisioningFactor
Expand All @@ -101,22 +108,25 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps

for _, failTarget := range failover.Targets {
failEndpoints, ok := chainEndpointMap[failTarget]
if ok {
priorityEndpoints = append(priorityEndpoints, failEndpoints)
if !ok {
continue // skip the failover target (should not happen)
}

endpointGroups = append(endpointGroups, loadAssignmentEndpointGroup{
Endpoints: failEndpoints,
OnlyPassing: chain.SubsetDefinitionForTarget(failTarget).OnlyPassing,
})
}
} else {
priorityEndpoints = []structs.CheckServiceNodes{
endpoints,
}
endpointGroups = append(endpointGroups, primaryGroup)
}

sni := TargetSNI(target, cfgSnap)

la := makeLoadAssignment(
sni,
overprovisioningFactor,
priorityEndpoints,
endpointGroups,
cfgSnap.Datacenter,
)
resources = append(resources, la)
Expand All @@ -136,8 +146,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment(
clusterName,
0,
[]structs.CheckServiceNodes{
endpoints,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
Expand All @@ -150,8 +160,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment(
clusterName,
0,
[]structs.CheckServiceNodes{
endpoints,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
Expand All @@ -166,20 +176,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
endpoints := cfgSnap.MeshGateway.ServiceGroups[svc]

// locally execute the subsets filter
filterExp := subset.Filter
if subset.OnlyPassing {
// we could do another filter pass without bexpr but this simplifies things a bit
if filterExp != "" {
// TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }"
// once the syntax is supported
filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp)
} else {
filterExp = "not Checks.Status != passing"
}
}

if filterExp != "" {
filter, err := bexpr.CreateFilter(filterExp, nil, endpoints)
if subset.Filter != "" {
filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints)
if err != nil {
return nil, err
}
Expand All @@ -194,8 +192,11 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment(
clusterName,
0,
[]structs.CheckServiceNodes{
endpoints,
[]loadAssignmentEndpointGroup{
{
Endpoints: endpoints,
OnlyPassing: subset.OnlyPassing,
},
},
cfgSnap.Datacenter,
)
Expand All @@ -216,23 +217,29 @@ func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
}
}

type loadAssignmentEndpointGroup struct {
Endpoints structs.CheckServiceNodes
OnlyPassing bool
}

func makeLoadAssignment(
clusterName string,
overprovisioningFactor int,
priorityEndpoints []structs.CheckServiceNodes,
endpointGroups []loadAssignmentEndpointGroup,
localDatacenter string,
) *envoy.ClusterLoadAssignment {
cla := &envoy.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(priorityEndpoints)),
Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(endpointGroups)),
}
if overprovisioningFactor > 0 {
cla.Policy = &envoy.ClusterLoadAssignment_Policy{
OverprovisioningFactor: makeUint32Value(overprovisioningFactor),
}
}

for priority, endpoints := range priorityEndpoints {
for priority, endpointGroup := range endpointGroups {
endpoints := endpointGroup.Endpoints
es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints))

for _, ep := range endpoints {
Expand All @@ -246,8 +253,9 @@ func makeLoadAssignment(

for _, chk := range ep.Checks {
if chk.Status == api.HealthCritical {
// This can't actually happen now because health always filters critical
// but in the future it may not so set this correctly!
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if endpointGroup.OnlyPassing && chk.Status != api.HealthPassing {
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if chk.Status == api.HealthWarning && ep.Service.Weights != nil {
Expand Down
26 changes: 16 additions & 10 deletions agent/xds/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,19 @@ func Test_makeLoadAssignment(t *testing.T) {
testWarningCheckServiceNodes[0].Checks[0].Status = "warning"
testWarningCheckServiceNodes[1].Checks[0].Status = "warning"

// TODO(rb): test onlypassing
tests := []struct {
name string
clusterName string
overprovisioningFactor int
endpoints []structs.CheckServiceNodes
endpoints []loadAssignmentEndpointGroup
want *envoy.ClusterLoadAssignment
}{
{
name: "no instances",
clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{
{},
endpoints: []loadAssignmentEndpointGroup{
{Endpoints: nil},
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Expand All @@ -117,8 +118,8 @@ func Test_makeLoadAssignment(t *testing.T) {
{
name: "instances, no weights",
clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{
testCheckServiceNodes,
endpoints: []loadAssignmentEndpointGroup{
{Endpoints: testCheckServiceNodes},
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Expand Down Expand Up @@ -147,8 +148,8 @@ func Test_makeLoadAssignment(t *testing.T) {
{
name: "instances, healthy weights",
clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{
testWeightedCheckServiceNodes,
endpoints: []loadAssignmentEndpointGroup{
{Endpoints: testWeightedCheckServiceNodes},
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Expand Down Expand Up @@ -177,8 +178,8 @@ func Test_makeLoadAssignment(t *testing.T) {
{
name: "instances, warning weights",
clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{
testWarningCheckServiceNodes,
endpoints: []loadAssignmentEndpointGroup{
{Endpoints: testWarningCheckServiceNodes},
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Expand Down Expand Up @@ -207,7 +208,12 @@ func Test_makeLoadAssignment(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(tt.clusterName, tt.overprovisioningFactor, tt.endpoints, "dc1")
got := makeLoadAssignment(
tt.clusterName,
tt.overprovisioningFactor,
tt.endpoints,
"dc1",
)
require.Equal(t, tt.want, got)
})
}
Expand Down
12 changes: 12 additions & 0 deletions agent/xds/testdata/endpoints/mesh-gateway-service-subsets.golden
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,18 @@
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "172.16.1.9",
"portValue": 2222
}
}
},
"healthStatus": "UNHEALTHY",
"loadBalancingWeight": 1
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
enable_central_service_config = true

config_entries {
bootstrap {
kind = "proxy-defaults"
name = "global"

config {
protocol = "http"
}
}

bootstrap {
kind = "service-resolver"
name = "s2"

default_subset = "test"

subsets = {
"test" = {
only_passing = true
}
}
}
}
Loading