diff --git a/pkg/envoy/ads/response.go b/pkg/envoy/ads/response.go index 3bb283edc7..caf4a81e49 100644 --- a/pkg/envoy/ads/response.go +++ b/pkg/envoy/ads/response.go @@ -49,8 +49,9 @@ func (s *Server) getTypeResources(proxy *envoy.Proxy, request *xds_discovery.Dis func (s *Server) sendResponse(proxy *envoy.Proxy, server *xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer, request *xds_discovery.DiscoveryRequest, cfg configurator.Configurator, typeURIsToSend ...envoy.TypeURI) error { thereWereErrors := false - // A nil request indicates a request for all SDS responses - fullUpdateRequested := request == nil + // A nil request indicates a change on mesh configuration, OSM will trigger an update + // for all proxy config (we generate a response with no direct request from envoy) + osmDrivenUpdate := request == nil cacheResourceMap := map[envoy.TypeURI][]types.Resource{} // Order is important: CDS, EDS, LDS, RDS @@ -58,15 +59,17 @@ func (s *Server) sendResponse(proxy *envoy.Proxy, server *xds_discovery.Aggregat for _, typeURI := range typeURIsToSend { // Handle request when is not provided, and the SDS case var finalReq *xds_discovery.DiscoveryRequest - if fullUpdateRequested { - if typeURI == envoy.TypeSDS { - finalReq = makeRequestForAllSecrets(proxy, s.catalog) - if finalReq == nil { - continue - } - } else { - finalReq = &xds_discovery.DiscoveryRequest{TypeUrl: typeURI.String()} - } + if osmDrivenUpdate { + finalReq = &xds_discovery.DiscoveryRequest{TypeUrl: typeURI.String()} + + // Fill the request resources with subscribed resources from the proxy thus far. + // Verticals should be held accountable for generating the requested resources. + // Any additional resources generated by the verticals that have not been requested/subscribed to, + // will be silently ignored by envoy. + // For CDS and LDS, this is always an empty slice (wildcard) + // For other verticals (RDS, EDS, SDS), this is the list of subscribed resources that we have last received + // from the TypeURL at hand. + finalReq.ResourceNames = getResourceSliceFromMapset(proxy.GetSubscribedResources(typeURI)) } else { finalReq = request } @@ -122,6 +125,7 @@ func (s *Server) SendDiscoveryResponse(proxy *envoy.Proxy, request *xds_discover } resourcesSent := mapset.NewSet() + subscribedResources := proxy.GetSubscribedResources(typeURI) for _, res := range resourcesToSend { proto, err := ptypes.MarshalAny(res) if err != nil { @@ -129,8 +133,22 @@ func (s *Server) SendDiscoveryResponse(proxy *envoy.Proxy, request *xds_discover Msgf("Error marshalling resource %s for proxy %s", typeURI, proxy.GetCertificateSerialNumber()) continue } + // Add resource to response response.Resources = append(response.Resources, proto) - resourcesSent.Add(cache.GetResourceName(res)) + + // Only track as resources sent if they are subscribed resources. + // By doing so, we are making sure a legitimate request down the line is not treated as an ACK just because + // a vertical had potentially sent more resources when they had not been requested yet by the proxy. + // For wildcard TypeURI resources, subscribed resources will purposefully remain empty at all times. + if !envoy.IsWildcardTypeURI(typeURI) { + currentResponseResourceName := cache.GetResourceName(res) + if subscribedResources.Contains(currentResponseResourceName) { + resourcesSent.Add(currentResponseResourceName) + } else { + log.Debug().Msgf("Proxy %s TypeURI %s - sending unsubscribed/unrequested resource %s", + proxy.String(), typeURI.Short(), currentResponseResourceName) + } + } } // NOTE: Never log entire 'response' - will contain secrets! diff --git a/pkg/envoy/ads/response_test.go b/pkg/envoy/ads/response_test.go index 421ad4d898..39697c6ae7 100644 --- a/pkg/envoy/ads/response_test.go +++ b/pkg/envoy/ads/response_test.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + mapset "github.com/deckarep/golang-set" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -148,6 +149,10 @@ var _ = Describe("Test ADS response functions", func() { Expect(s).ToNot(BeNil()) mockCertManager.EXPECT().IssueCertificate(gomock.Any(), certDuration).Return(certPEM, nil).Times(1) + + // Set subscribed resources for SDS + proxy.SetSubscribedResources(envoy.TypeSDS, mapset.NewSetWith("service-cert:default/bookstore", "root-cert-for-mtls-inbound:default/bookstore")) + err := s.sendResponse(proxy, &server, nil, mockConfigurator, envoy.XDSResponseOrder...) Expect(err).To(BeNil()) Expect(actualResponses).ToNot(BeNil()) @@ -221,6 +226,10 @@ var _ = Describe("Test ADS response functions", func() { Expect(s).ToNot(BeNil()) mockCertManager.EXPECT().IssueCertificate(gomock.Any(), certDuration).Return(certPEM, nil).Times(1) + + // Set subscribed resources + proxy.SetSubscribedResources(envoy.TypeSDS, mapset.NewSetWith("service-cert:default/bookstore", "root-cert-for-mtls-inbound:default/bookstore")) + err := s.sendResponse(proxy, &server, nil, mockConfigurator, envoy.TypeSDS) Expect(err).To(BeNil()) Expect(actualResponses).ToNot(BeNil()) diff --git a/pkg/envoy/ads/stream.go b/pkg/envoy/ads/stream.go index 78c4812ef9..42decf28c5 100644 --- a/pkg/envoy/ads/stream.go +++ b/pkg/envoy/ads/stream.go @@ -220,6 +220,7 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov if requestNonce == "" { log.Debug().Msgf("Proxy %s: Empty nonce for %s, should be first message on stream (req resources: %v)", proxy.String(), typeURL.Short(), discoveryRequest.ResourceNames) + proxy.SetSubscribedResources(typeURL, getRequestedResourceNamesSet(discoveryRequest)) return true } @@ -235,6 +236,7 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov proxy.String(), typeURL.Short(), requestNonce, requestVersion) proxy.SetLastSentVersion(typeURL, requestVersion) proxy.SetLastAppliedVersion(typeURL, requestVersion) + proxy.SetSubscribedResources(typeURL, getRequestedResourceNamesSet(discoveryRequest)) metricsstore.DefaultMetricsStore.ProxyReconnectCount.Inc() return true } @@ -247,26 +249,33 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov return false } - // Nonces match - // At this point, there is no error and nonces match, it is guaranteed an ACK with last sent version. + // At this point, there is no error and nonces match. It can either be an ACK or envoy could still be + // requesting a different set of resources on the current version for non-wildcard TypeURIs. proxy.SetLastAppliedVersion(typeURL, requestVersion) - // ---- - // What's left is to check if the resources listed are the same. If they are not, we must respond - // with the new resources requested. - // - // In case of LDS and CDS, "Envoy will always use wildcard mode for Listener and Cluster resources". - // The following logic is not needed (though correct) for LDS and CDS as request resources are also empty in ACK case. - // + // For Wildcard TypeURIs we are done. Resource names in requests are always empty, nonce alone is enough + // to ACK wildcard types. + // This is the case for LDS and CDS, "Envoy will always use wildcard mode for Listener and Cluster resources". + if envoy.IsWildcardTypeURI(typeURL) { + log.Debug().Msgf("Proxy %s: ACK received for %s, version: %d nonce: %s", + proxy.String(), typeURL.Short(), requestVersion, requestNonce) + return false + } + + // For non-wildcard types, what's left is to check if the resources requested are the same as the ones we last sent. + // If they are not, we must respond to the request for the requested resources. // This part of the code was inspired by Istio's `shouldRespond` handling of request resource difference // https://github.com/istio/istio/blob/da6178604559bdf2c707a57f452d16bee0de90c8/pilot/pkg/xds/ads.go#L347 - // ---- - resourcesLastSent := proxy.GetLastResourcesSent(typeURL) + + // Update subscribed resources first resourcesRequested := getRequestedResourceNamesSet(discoveryRequest) + proxy.SetSubscribedResources(typeURL, resourcesRequested) + // Get resources last sent prior to this request + resourcesLastSent := proxy.GetLastResourcesSent(typeURL) // If what we last sent is a superset of what the // requests resources subscribes to, it's ACK and nothing needs to be done. - // Otherwise, envoy might be asking us for additional resources that have to be sent along last time. + // Otherwise, envoy might be asking us for additional resources that have to be sent along last time's resources. // Difference returns elemenets of that are not part of elements of requestedResourcesDifference := resourcesRequested.Difference(resourcesLastSent) @@ -281,7 +290,8 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov return false } -// Helper to turn the resource names on a discovery request to a Set for later efficient intersection +// getRequestedResourceNamesSet is a helper to convert the resource names on a discovery request +// to a Set for later efficient intersection func getRequestedResourceNamesSet(discoveryRequest *xds_discovery.DiscoveryRequest) mapset.Set { resourcesRequested := mapset.NewSet() for idx := range discoveryRequest.ResourceNames { @@ -290,6 +300,22 @@ func getRequestedResourceNamesSet(discoveryRequest *xds_discovery.DiscoveryReque return resourcesRequested } +// getResourceSliceFromMapset is a helper to convert a mapset of resource names to a string slice +func getResourceSliceFromMapset(resourceMap mapset.Set) []string { + resourceSlice := []string{} + it := resourceMap.Iterator() + + for elem := range it.C { + resString, ok := elem.(string) + if !ok { + log.Error().Msgf("Failed to cast resource name to string: %v", elem) + continue + } + resourceSlice = append(resourceSlice, resString) + } + return resourceSlice +} + // isCNforProxy returns true if the given CN for the workload certificate matches the given proxy's identity. // Proxy identity corresponds to the k8s service account, while the workload certificate is of the form // ... diff --git a/pkg/envoy/ads/stream_test.go b/pkg/envoy/ads/stream_test.go index a7a239eced..39d549744a 100644 --- a/pkg/envoy/ads/stream_test.go +++ b/pkg/envoy/ads/stream_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + xds_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/google/uuid" tassert "github.com/stretchr/testify/assert" @@ -51,3 +52,33 @@ func TestIsCNForProxy(t *testing.T) { }) } } + +func findSliceElem(slice []string, elem string) bool { + for _, v := range slice { + if v == elem { + return true + } + } + return false +} + +func TestMapsetToSliceConvFunctions(t *testing.T) { + assert := tassert.New(t) + + discRequest := &xds_discovery.DiscoveryRequest{TypeUrl: "TestTypeurl"} + discRequest.ResourceNames = []string{"A", "B", "C"} + + nameSet := getRequestedResourceNamesSet(discRequest) + + assert.True(nameSet.Contains("A")) + assert.True(nameSet.Contains("B")) + assert.True(nameSet.Contains("C")) + assert.False(nameSet.Contains("D")) + + nameSlice := getResourceSliceFromMapset(nameSet) + + assert.True(findSliceElem(nameSlice, "A")) + assert.True(findSliceElem(nameSlice, "B")) + assert.True(findSliceElem(nameSlice, "C")) + assert.False(findSliceElem(nameSlice, "D")) +} diff --git a/pkg/envoy/proxy.go b/pkg/envoy/proxy.go index c776a8babc..67e0486ae9 100644 --- a/pkg/envoy/proxy.go +++ b/pkg/envoy/proxy.go @@ -36,6 +36,9 @@ type Proxy struct { // Contains the last resource names sent for a given proxy and TypeURL lastxDSResourcesSent map[TypeURI]mapset.Set + // Contains the last requested resource names (and therefore, subscribed) for a given TypeURI + subscribedResources map[TypeURI]mapset.Set + // hash is based on CommonName hash uint64 @@ -207,6 +210,21 @@ func (p *Proxy) SetLastResourcesSent(typeURI TypeURI, resourcesSet mapset.Set) { p.lastxDSResourcesSent[typeURI] = resourcesSet } +// GetSubscribedResources returns a set of resources subscribed for a proxy given a TypeURL +// If none were subscribed, empty set is returned +func (p *Proxy) GetSubscribedResources(typeURI TypeURI) mapset.Set { + sentResources, ok := p.subscribedResources[typeURI] + if !ok { + return mapset.NewSet() + } + return sentResources +} + +// SetSubscribedResources sets the input resources as subscribed resources given a proxy for a TypeURL +func (p *Proxy) SetSubscribedResources(typeURI TypeURI, resourcesSet mapset.Set) { + p.subscribedResources[typeURI] = resourcesSet +} + // Kind return the proxy's kind func (p *Proxy) Kind() ProxyKind { return p.kind @@ -238,6 +256,7 @@ func NewProxy(certCommonName certificate.CommonName, certSerialNumber certificat lastSentVersion: make(map[TypeURI]uint64), lastAppliedVersion: make(map[TypeURI]uint64), lastxDSResourcesSent: make(map[TypeURI]mapset.Set), + subscribedResources: make(map[TypeURI]mapset.Set), kind: cnMeta.ProxyKind, }, nil diff --git a/pkg/envoy/proxy_test.go b/pkg/envoy/proxy_test.go index 3ac79729b1..5651ad9c9b 100644 --- a/pkg/envoy/proxy_test.go +++ b/pkg/envoy/proxy_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + mapset "github.com/deckarep/golang-set" "github.com/golang/mock/gomock" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -479,3 +480,22 @@ func TestPodMetadataString(t *testing.T) { }) } } + +func TestSubscribedResources(t *testing.T) { + assert := tassert.New(t) + + p := Proxy{ + subscribedResources: make(map[TypeURI]mapset.Set), + } + + res := p.GetSubscribedResources("test") + assert.Zero(res.Cardinality()) + + p.SetSubscribedResources(TypeRDS, mapset.NewSetWith("A", "B", "C")) + + res = p.GetSubscribedResources(TypeRDS) + assert.Equal(res.Cardinality(), 3) + assert.True(res.Contains("A")) + assert.True(res.Contains("B")) + assert.True(res.Contains("C")) +} diff --git a/pkg/envoy/types.go b/pkg/envoy/types.go index 60884d4e18..17a0dda269 100644 --- a/pkg/envoy/types.go +++ b/pkg/envoy/types.go @@ -17,6 +17,14 @@ var ( // TypeURI is a string describing the Envoy xDS payload. type TypeURI string +// IsWildcardTypeURI returns if a given TypeURI is an expected wildcard TypeURI or not. +// XDS proto defines general client behavior as: +// "Envoy will always use wildcard subscriptions for Listener and Cluster resources" +// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#client-behavior +func IsWildcardTypeURI(t TypeURI) bool { + return t == TypeCDS || t == TypeLDS +} + func (t TypeURI) String() string { return string(t) }