Skip to content

Commit

Permalink
xds: fix for delta xDS reconnect bug in LDS/CDS (#12174)
Browse files Browse the repository at this point in the history
When a wildcard xDS type (LDS/CDS/SRDS) reconnects from a delta xDS stream,
prior to envoy `1.19.0` it would populate the `ResourceNamesSubscribe` field
with the full list of currently subscribed items, instead of simply omitting it
to infer that it wanted everything (which is what wildcard mode means).

This upstream issue was filed in envoyproxy/envoy#16063 and fixed in
envoyproxy/envoy#16153 which went out in Envoy `1.19.0` and is fixed in later
versions (later refactored in envoyproxy/envoy#16855).

This PR conditionally forces LDS/CDS to be wildcard-only even when the
connected Envoy requests a non-wildcard subscription, but only does so on
versions prior to `1.19.0`, as we should not need to do this on later versions.

This fixes the failure case as described here: #11833 (comment)

Co-authored-by: Huan Wang <[email protected]>
  • Loading branch information
rboyer and fredwangwang committed Jan 25, 2022
1 parent 44153cb commit 48717bb
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 15 deletions.
3 changes: 3 additions & 0 deletions .changelog/12174.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: fix for delta xDS reconnect bug in LDS/CDS
```
24 changes: 17 additions & 7 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}

if handler, ok := handlers[req.TypeUrl]; ok {
if handler.Recv(req) {
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)
}
}

if node == nil && req.Node != nil {
node = req.Node
var err error
Expand All @@ -180,6 +174,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
}
}

if handler, ok := handlers[req.TypeUrl]; ok {
if handler.Recv(req, generator.ProxyFeatures) {
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)
}
}

case cfgSnap = <-stateCh:
newRes, err := generator.allResourcesFromSnapshot(cfgSnap)
if err != nil {
Expand Down Expand Up @@ -440,7 +440,7 @@ func newDeltaType(
// Recv handles new discovery requests from envoy.
//
// Returns true the first time a type receives a request.
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool {
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) bool {
if t == nil {
return false // not something we care about
}
Expand All @@ -453,6 +453,16 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool
t.wildcard = len(req.ResourceNamesSubscribe) == 0
t.registered = true
registeredThisTime = true

if sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect {
switch t.typeURL {
case ListenerType, ClusterType:
if !t.wildcard {
t.wildcard = true
logger.Trace("fixing Envoy bug fixed in 1.19.0 by inferring wildcard mode for type")
}
}
}
}

/*
Expand Down
66 changes: 66 additions & 0 deletions agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,72 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
}
}

func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t *testing.T) {
// This illustrates a scenario related to https://github.com/hashicorp/consul/issues/11833

aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
_, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
envoy.EnvoyVersion = "1.18.0"

sid := structs.NewServiceID("web-sidecar-proxy", nil)

// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)

var snap *proxycfg.ConfigSnapshot
runStep(t, "get into state after consul restarted", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")

// Send initial cluster discover.
// This is to simulate the discovery request call from envoy after disconnected from consul ads stream.
//
// We need to force it to be an older version of envoy so that the logic shifts.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"local_app",
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
InitialResourceVersions: map[string]string{
"local_app": "a948904f2f0f479b8f8197694b30184b0d2ed1c1cd2a1ec0fb85d299a192a447",
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": "5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03",
},
})

// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

requireProtocolVersionGauge(t, scenario, "v3", 1)

// Deliver a new snapshot
// the config contains 3 clusters: local_app, db, geo-cache.
// this is to simulate the fact that there is one additional (upstream) cluster gets added to the sidecar service
// during the time xds disconnected (consul restarted).
mgr.DeliverConfig(t, sid, snap)

assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
})

envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
Expand Down
21 changes: 16 additions & 5 deletions agent/xds/envoy_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ type unsupportedVersion struct {
}

type supportedProxyFeatures struct {
// add version dependent feature flags here

// GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS is needed to paper
// over some weird envoy behavior.
//
// For some reason Envoy versions prior to 1.16.0 when sent an empty CDS
// list via the incremental xDS protocol will correctly ack the message and
// just never request LDS resources.
Expand All @@ -45,6 +40,19 @@ type supportedProxyFeatures struct {
// issue: https://github.com/envoyproxy/envoy/issues/11877
// PR: https://github.com/envoyproxy/envoy/pull/12069
IncrementalXDSUpdatesMustBeSerial bool

// Older versions of Envoy incorrectly exploded a wildcard subscription for
// LDS and CDS into specific line items on incremental xDS reconnect. They
// would populate both InitialResourceVersions and ResourceNamesSubscribe
// when they SHOULD have left ResourceNamesSubscribe empty (or used an
// explicit "*" in later Envoy versions) to imply wildcard mode. On
// reconnect, Consul interpreted the lack of the wildcard attribute as
// implying that the Envoy instance should not receive updates for any
// newly created listeners and clusters for the remaining life of that
// Envoy sidecar process.
// see: https://github.com/envoyproxy/envoy/issues/16063
// see: https://github.com/envoyproxy/envoy/pull/16153
ForceLDSandCDSToAlwaysUseWildcardsOnReconnect bool
}

func determineSupportedProxyFeatures(node *envoy_core_v3.Node) (supportedProxyFeatures, error) {
Expand Down Expand Up @@ -90,6 +98,9 @@ func determineSupportedProxyFeaturesFromVersion(version *version.Version) (suppo
sf.IncrementalXDSUpdatesMustBeSerial = true
}

// All envoy versions available in Consul 1.10.x need this fix.
sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect = true

return sf, nil
}

Expand Down
5 changes: 4 additions & 1 deletion agent/xds/envoy_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,17 @@ func TestDetermineSupportedProxyFeaturesFromString(t *testing.T) {
cases[v] = testcase{expect: supportedProxyFeatures{
GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS: true,
IncrementalXDSUpdatesMustBeSerial: true,
ForceLDSandCDSToAlwaysUseWildcardsOnReconnect: true,
}}
}
for _, v := range []string{
"1.16.0", "1.16.1", "1.16.2", "1.16.3", "1.16.4", "1.16.5",
"1.17.0", "1.17.1", "1.17.2", "1.17.3", "1.17.4",
"1.18.0", "1.18.1", "1.18.2", "1.18.3", "1.18.4",
} {
cases[v] = testcase{expect: supportedProxyFeatures{}}
cases[v] = testcase{expect: supportedProxyFeatures{
ForceLDSandCDSToAlwaysUseWildcardsOnReconnect: true,
}}
}

for name, tc := range cases {
Expand Down
11 changes: 9 additions & 2 deletions agent/xds/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type TestEnvoy struct {
proxyID string
token string

EnvoyVersion string

stream *TestADSStream // SoTW v2
deltaStream *TestADSDeltaStream // Incremental v3
}
Expand Down Expand Up @@ -275,9 +277,14 @@ func (e *TestEnvoy) sendDeltaReq(
e.mu.Lock()
defer e.mu.Unlock()

ev, valid := stringToEnvoyVersion(proxysupport.EnvoyVersions[0])
stringVersion := e.EnvoyVersion
if stringVersion == "" {
stringVersion = proxysupport.EnvoyVersions[0]
}

ev, valid := stringToEnvoyVersion(stringVersion)
if !valid {
t.Fatal("envoy version is not valid: %s", proxysupport.EnvoyVersions[0])
t.Fatal("envoy version is not valid: %s", stringVersion)
}

if req == nil {
Expand Down

0 comments on commit 48717bb

Please sign in to comment.