diff --git a/.changelog/12174.txt b/.changelog/12174.txt new file mode 100644 index 000000000000..b839df4eca48 --- /dev/null +++ b/.changelog/12174.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: fix for delta xDS reconnect bug in LDS/CDS +``` diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 71c97af93396..b0a4ccafcba0 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -165,12 +165,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - if handler, ok := handlers[req.TypeUrl]; ok { - if handler.Recv(req) { - generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) - } - } - if node == nil && req.Node != nil { node = req.Node var err error @@ -180,6 +174,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove } } + if handler, ok := handlers[req.TypeUrl]; ok { + if handler.Recv(req, generator.ProxyFeatures) { + generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) + } + } + case cfgSnap = <-stateCh: newRes, err := generator.allResourcesFromSnapshot(cfgSnap) if err != nil { @@ -434,7 +434,7 @@ func newDeltaType( // Recv handles new discovery requests from envoy. // // Returns true the first time a type receives a request. -func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool { +func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) bool { if t == nil { return false // not something we care about } @@ -447,6 +447,16 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool t.wildcard = len(req.ResourceNamesSubscribe) == 0 t.registered = true registeredThisTime = true + + if sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect { + switch t.typeURL { + case ListenerType, ClusterType: + if !t.wildcard { + t.wildcard = true + logger.Trace("fixing Envoy bug fixed in 1.19.0 by inferring wildcard mode for type") + } + } + } } /* diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index d5b1a575bab4..eca97e36eef7 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -554,6 +554,72 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) } } +func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t *testing.T) { + // This illustrates a scenario related to https://github.com/hashicorp/consul/issues/11833 + + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + _, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy + envoy.EnvoyVersion = "1.18.0" + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) + + var snap *proxycfg.ConfigSnapshot + runStep(t, "get into state after consul restarted", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") + + // Send initial cluster discover. + // This is to simulate the discovery request call from envoy after disconnected from consul ads stream. + // + // We need to force it to be an older version of envoy so that the logic shifts. + envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "local_app", + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + InitialResourceVersions: map[string]string{ + "local_app": "a948904f2f0f479b8f8197694b30184b0d2ed1c1cd2a1ec0fb85d299a192a447", + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": "5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03", + }, + }) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot + // the config contains 3 clusters: local_app, db, geo-cache. + // this is to simulate the fact that there is one additional (upstream) cluster gets added to the sidecar service + // during the time xds disconnected (consul restarted). + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) { aclResolve := func(id string) (acl.Authorizer, error) { // Allow all diff --git a/agent/xds/envoy_versioning.go b/agent/xds/envoy_versioning.go index 27df44f170b8..be0a770fe889 100644 --- a/agent/xds/envoy_versioning.go +++ b/agent/xds/envoy_versioning.go @@ -13,8 +13,7 @@ var ( // the zero'th point release of the last element of proxysupport.EnvoyVersions. minSupportedVersion = version.Must(version.NewVersion("1.17.0")) - // add min version constraints for associated feature flags when necessary, for example: - // minVersionAllowingEmptyGatewayClustersWithIncrementalXDS = version.Must(version.NewVersion("1.16.0")) + minVersionToForceLDSandCDSToAlwaysUseWildcardsOnReconnect = version.Must(version.NewVersion("1.19.0")) specificUnsupportedVersions = []unsupportedVersion{} ) @@ -26,16 +25,19 @@ type unsupportedVersion struct { } type supportedProxyFeatures struct { - // add version dependent feature flags here + // Older versions of Envoy incorrectly exploded a wildcard subscription for + // LDS and CDS into specific line items on incremental xDS reconnect. They + // would populate both InitialResourceVersions and ResourceNamesSubscribe + // when they SHOULD have left ResourceNamesSubscribe empty (or used an + // explicit "*" in later Envoy versions) to imply wildcard mode. On + // reconnect, Consul interpreted the lack of the wildcard attribute as + // implying that the Envoy instance should not receive updates for any + // newly created listeners and clusters for the remaining life of that + // Envoy sidecar process. // - // For example, we previously had flags for Envoy < 1.16 called: - // - // GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS - // IncrementalXDSUpdatesMustBeSerial - // - // Which then manifested in the code for checks with this struct populated. - // By dropping support for 1.15, we no longer have any special flags here - // but leaving this flagging functionality for future one-offs. + // see: https://github.com/envoyproxy/envoy/issues/16063 + // see: https://github.com/envoyproxy/envoy/pull/16153 + ForceLDSandCDSToAlwaysUseWildcardsOnReconnect bool } func determineSupportedProxyFeatures(node *envoy_core_v3.Node) (supportedProxyFeatures, error) { @@ -73,12 +75,9 @@ func determineSupportedProxyFeaturesFromVersion(version *version.Version) (suppo sf := supportedProxyFeatures{} - // add version constraints to populate feature flags here when necessary, for example: - /* - if version.LessThan(minVersionAllowingEmptyGatewayClustersWithIncrementalXDS) { - sf.GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS = true - } - */ + if version.LessThan(minVersionToForceLDSandCDSToAlwaysUseWildcardsOnReconnect) { + sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect = true + } return sf, nil } diff --git a/agent/xds/envoy_versioning_test.go b/agent/xds/envoy_versioning_test.go index 75211ecc5b12..47622ab73776 100644 --- a/agent/xds/envoy_versioning_test.go +++ b/agent/xds/envoy_versioning_test.go @@ -125,6 +125,12 @@ func TestDetermineSupportedProxyFeaturesFromString(t *testing.T) { for _, v := range []string{ "1.17.0", "1.17.1", "1.17.2", "1.17.3", "1.17.4", "1.18.0", "1.18.1", "1.18.2", "1.18.3", "1.18.4", + } { + cases[v] = testcase{expect: supportedProxyFeatures{ + ForceLDSandCDSToAlwaysUseWildcardsOnReconnect: true, + }} + } + for _, v := range []string{ "1.19.0", "1.19.1", "1.20.0", "1.20.1", } { diff --git a/agent/xds/testing.go b/agent/xds/testing.go index fc2e97baf33c..90ea093c73af 100644 --- a/agent/xds/testing.go +++ b/agent/xds/testing.go @@ -83,6 +83,8 @@ type TestEnvoy struct { proxyID string token string + EnvoyVersion string + deltaStream *TestADSDeltaStream // Incremental v3 } @@ -182,9 +184,14 @@ func (e *TestEnvoy) sendDeltaReq( e.mu.Lock() defer e.mu.Unlock() - ev, valid := stringToEnvoyVersion(proxysupport.EnvoyVersions[0]) + stringVersion := e.EnvoyVersion + if stringVersion == "" { + stringVersion = proxysupport.EnvoyVersions[0] + } + + ev, valid := stringToEnvoyVersion(stringVersion) if !valid { - t.Fatal("envoy version is not valid: %s", proxysupport.EnvoyVersions[0]) + t.Fatal("envoy version is not valid: %s", stringVersion) } if req == nil {