From 48717bbf594cbf1aca5d43ec1d332a4c299bc57a Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Tue, 25 Jan 2022 11:24:27 -0600 Subject: [PATCH] xds: fix for delta xDS reconnect bug in LDS/CDS (#12174) When a wildcard xDS type (LDS/CDS/SRDS) reconnects from a delta xDS stream, prior to envoy `1.19.0` it would populate the `ResourceNamesSubscribe` field with the full list of currently subscribed items, instead of simply omitting it to infer that it wanted everything (which is what wildcard mode means). This upstream issue was filed in envoyproxy/envoy#16063 and fixed in envoyproxy/envoy#16153 which went out in Envoy `1.19.0` and is fixed in later versions (later refactored in envoyproxy/envoy#16855). This PR conditionally forces LDS/CDS to be wildcard-only even when the connected Envoy requests a non-wildcard subscription, but only does so on versions prior to `1.19.0`, as we should not need to do this on later versions. This fixes the failure case as described here: #11833 (comment) Co-authored-by: Huan Wang --- .changelog/12174.txt | 3 ++ agent/xds/delta.go | 24 +++++++---- agent/xds/delta_test.go | 66 ++++++++++++++++++++++++++++++ agent/xds/envoy_versioning.go | 21 +++++++--- agent/xds/envoy_versioning_test.go | 5 ++- agent/xds/testing.go | 11 ++++- 6 files changed, 115 insertions(+), 15 deletions(-) create mode 100644 .changelog/12174.txt diff --git a/.changelog/12174.txt b/.changelog/12174.txt new file mode 100644 index 000000000000..b839df4eca48 --- /dev/null +++ b/.changelog/12174.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: fix for delta xDS reconnect bug in LDS/CDS +``` diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 4a8370a5571f..c7fcb3592bf0 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -165,12 +165,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - if handler, ok := handlers[req.TypeUrl]; ok { - if handler.Recv(req) { - generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) - } - } - if node == nil && req.Node != nil { node = req.Node var err error @@ -180,6 +174,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove } } + if handler, ok := handlers[req.TypeUrl]; ok { + if handler.Recv(req, generator.ProxyFeatures) { + generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) + } + } + case cfgSnap = <-stateCh: newRes, err := generator.allResourcesFromSnapshot(cfgSnap) if err != nil { @@ -440,7 +440,7 @@ func newDeltaType( // Recv handles new discovery requests from envoy. // // Returns true the first time a type receives a request. -func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool { +func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) bool { if t == nil { return false // not something we care about } @@ -453,6 +453,16 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool t.wildcard = len(req.ResourceNamesSubscribe) == 0 t.registered = true registeredThisTime = true + + if sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect { + switch t.typeURL { + case ListenerType, ClusterType: + if !t.wildcard { + t.wildcard = true + logger.Trace("fixing Envoy bug fixed in 1.19.0 by inferring wildcard mode for type") + } + } + } } /* diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 573caa74cf92..14dac655b106 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -554,6 +554,72 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) } } +func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t *testing.T) { + // This illustrates a scenario related to https://github.com/hashicorp/consul/issues/11833 + + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + _, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy + envoy.EnvoyVersion = "1.18.0" + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) + + var snap *proxycfg.ConfigSnapshot + runStep(t, "get into state after consul restarted", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") + + // Send initial cluster discover. + // This is to simulate the discovery request call from envoy after disconnected from consul ads stream. + // + // We need to force it to be an older version of envoy so that the logic shifts. + envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "local_app", + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + InitialResourceVersions: map[string]string{ + "local_app": "a948904f2f0f479b8f8197694b30184b0d2ed1c1cd2a1ec0fb85d299a192a447", + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": "5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03", + }, + }) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot + // the config contains 3 clusters: local_app, db, geo-cache. + // this is to simulate the fact that there is one additional (upstream) cluster gets added to the sidecar service + // during the time xds disconnected (consul restarted). + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) { aclResolve := func(id string) (acl.Authorizer, error) { // Allow all diff --git a/agent/xds/envoy_versioning.go b/agent/xds/envoy_versioning.go index 0e218ad65941..0af284956040 100644 --- a/agent/xds/envoy_versioning.go +++ b/agent/xds/envoy_versioning.go @@ -26,11 +26,6 @@ type unsupportedVersion struct { } type supportedProxyFeatures struct { - // add version dependent feature flags here - - // GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS is needed to paper - // over some weird envoy behavior. - // // For some reason Envoy versions prior to 1.16.0 when sent an empty CDS // list via the incremental xDS protocol will correctly ack the message and // just never request LDS resources. @@ -45,6 +40,19 @@ type supportedProxyFeatures struct { // issue: https://github.com/envoyproxy/envoy/issues/11877 // PR: https://github.com/envoyproxy/envoy/pull/12069 IncrementalXDSUpdatesMustBeSerial bool + + // Older versions of Envoy incorrectly exploded a wildcard subscription for + // LDS and CDS into specific line items on incremental xDS reconnect. They + // would populate both InitialResourceVersions and ResourceNamesSubscribe + // when they SHOULD have left ResourceNamesSubscribe empty (or used an + // explicit "*" in later Envoy versions) to imply wildcard mode. On + // reconnect, Consul interpreted the lack of the wildcard attribute as + // implying that the Envoy instance should not receive updates for any + // newly created listeners and clusters for the remaining life of that + // Envoy sidecar process. + // see: https://github.com/envoyproxy/envoy/issues/16063 + // see: https://github.com/envoyproxy/envoy/pull/16153 + ForceLDSandCDSToAlwaysUseWildcardsOnReconnect bool } func determineSupportedProxyFeatures(node *envoy_core_v3.Node) (supportedProxyFeatures, error) { @@ -90,6 +98,9 @@ func determineSupportedProxyFeaturesFromVersion(version *version.Version) (suppo sf.IncrementalXDSUpdatesMustBeSerial = true } + // All envoy versions available in Consul 1.10.x need this fix. + sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect = true + return sf, nil } diff --git a/agent/xds/envoy_versioning_test.go b/agent/xds/envoy_versioning_test.go index 007f328fe94f..d5fd8a2b6d3b 100644 --- a/agent/xds/envoy_versioning_test.go +++ b/agent/xds/envoy_versioning_test.go @@ -115,6 +115,7 @@ func TestDetermineSupportedProxyFeaturesFromString(t *testing.T) { cases[v] = testcase{expect: supportedProxyFeatures{ GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS: true, IncrementalXDSUpdatesMustBeSerial: true, + ForceLDSandCDSToAlwaysUseWildcardsOnReconnect: true, }} } for _, v := range []string{ @@ -122,7 +123,9 @@ func TestDetermineSupportedProxyFeaturesFromString(t *testing.T) { "1.17.0", "1.17.1", "1.17.2", "1.17.3", "1.17.4", "1.18.0", "1.18.1", "1.18.2", "1.18.3", "1.18.4", } { - cases[v] = testcase{expect: supportedProxyFeatures{}} + cases[v] = testcase{expect: supportedProxyFeatures{ + ForceLDSandCDSToAlwaysUseWildcardsOnReconnect: true, + }} } for name, tc := range cases { diff --git a/agent/xds/testing.go b/agent/xds/testing.go index 1ec7b8f65992..81cd835b04c5 100644 --- a/agent/xds/testing.go +++ b/agent/xds/testing.go @@ -136,6 +136,8 @@ type TestEnvoy struct { proxyID string token string + EnvoyVersion string + stream *TestADSStream // SoTW v2 deltaStream *TestADSDeltaStream // Incremental v3 } @@ -275,9 +277,14 @@ func (e *TestEnvoy) sendDeltaReq( e.mu.Lock() defer e.mu.Unlock() - ev, valid := stringToEnvoyVersion(proxysupport.EnvoyVersions[0]) + stringVersion := e.EnvoyVersion + if stringVersion == "" { + stringVersion = proxysupport.EnvoyVersions[0] + } + + ev, valid := stringToEnvoyVersion(stringVersion) if !valid { - t.Fatal("envoy version is not valid: %s", proxysupport.EnvoyVersions[0]) + t.Fatal("envoy version is not valid: %s", stringVersion) } if req == nil {