Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3803 from eduser25/requestedResources
Browse files Browse the repository at this point in the history
ads: fix wrong accountability of sent resources
  • Loading branch information
eduser25 authored Jul 29, 2021
2 parents f258b86 + 0cf7acf commit 0fc7e5e
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 25 deletions.
42 changes: 30 additions & 12 deletions pkg/envoy/ads/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,27 @@ func (s *Server) getTypeResources(proxy *envoy.Proxy, request *xds_discovery.Dis
func (s *Server) sendResponse(proxy *envoy.Proxy, server *xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer, request *xds_discovery.DiscoveryRequest, cfg configurator.Configurator, typeURIsToSend ...envoy.TypeURI) error {
thereWereErrors := false

// A nil request indicates a request for all SDS responses
fullUpdateRequested := request == nil
// A nil request indicates a change on mesh configuration, OSM will trigger an update
// for all proxy config (we generate a response with no direct request from envoy)
osmDrivenUpdate := request == nil
cacheResourceMap := map[envoy.TypeURI][]types.Resource{}

// Order is important: CDS, EDS, LDS, RDS
// See: https://github.com/envoyproxy/go-control-plane/issues/59
for _, typeURI := range typeURIsToSend {
// Handle request when is not provided, and the SDS case
var finalReq *xds_discovery.DiscoveryRequest
if fullUpdateRequested {
if typeURI == envoy.TypeSDS {
finalReq = makeRequestForAllSecrets(proxy, s.catalog)
if finalReq == nil {
continue
}
} else {
finalReq = &xds_discovery.DiscoveryRequest{TypeUrl: typeURI.String()}
}
if osmDrivenUpdate {
finalReq = &xds_discovery.DiscoveryRequest{TypeUrl: typeURI.String()}

// Fill the request resources with subscribed resources from the proxy thus far.
// Verticals should be held accountable for generating the requested resources.
// Any additional resources generated by the verticals that have not been requested/subscribed to,
// will be silently ignored by envoy.
// For CDS and LDS, this is always an empty slice (wildcard)
// For other verticals (RDS, EDS, SDS), this is the list of subscribed resources that we have last received
// from the TypeURL at hand.
finalReq.ResourceNames = getResourceSliceFromMapset(proxy.GetSubscribedResources(typeURI))
} else {
finalReq = request
}
Expand Down Expand Up @@ -122,15 +125,30 @@ func (s *Server) SendDiscoveryResponse(proxy *envoy.Proxy, request *xds_discover
}

resourcesSent := mapset.NewSet()
subscribedResources := proxy.GetSubscribedResources(typeURI)
for _, res := range resourcesToSend {
proto, err := ptypes.MarshalAny(res)
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.ErrMarshallingXDSResource.String()).
Msgf("Error marshalling resource %s for proxy %s", typeURI, proxy.GetCertificateSerialNumber())
continue
}
// Add resource to response
response.Resources = append(response.Resources, proto)
resourcesSent.Add(cache.GetResourceName(res))

// Only track as resources sent if they are subscribed resources.
// By doing so, we are making sure a legitimate request down the line is not treated as an ACK just because
// a vertical had potentially sent more resources when they had not been requested yet by the proxy.
// For wildcard TypeURI resources, subscribed resources will purposefully remain empty at all times.
if !envoy.IsWildcardTypeURI(typeURI) {
currentResponseResourceName := cache.GetResourceName(res)
if subscribedResources.Contains(currentResponseResourceName) {
resourcesSent.Add(currentResponseResourceName)
} else {
log.Debug().Msgf("Proxy %s TypeURI %s - sending unsubscribed/unrequested resource %s",
proxy.String(), typeURI.Short(), currentResponseResourceName)
}
}
}

// NOTE: Never log entire 'response' - will contain secrets!
Expand Down
9 changes: 9 additions & 0 deletions pkg/envoy/ads/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

mapset "github.com/deckarep/golang-set"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -148,6 +149,10 @@ var _ = Describe("Test ADS response functions", func() {
Expect(s).ToNot(BeNil())

mockCertManager.EXPECT().IssueCertificate(gomock.Any(), certDuration).Return(certPEM, nil).Times(1)

// Set subscribed resources for SDS
proxy.SetSubscribedResources(envoy.TypeSDS, mapset.NewSetWith("service-cert:default/bookstore", "root-cert-for-mtls-inbound:default/bookstore"))

err := s.sendResponse(proxy, &server, nil, mockConfigurator, envoy.XDSResponseOrder...)
Expect(err).To(BeNil())
Expect(actualResponses).ToNot(BeNil())
Expand Down Expand Up @@ -221,6 +226,10 @@ var _ = Describe("Test ADS response functions", func() {
Expect(s).ToNot(BeNil())

mockCertManager.EXPECT().IssueCertificate(gomock.Any(), certDuration).Return(certPEM, nil).Times(1)

// Set subscribed resources
proxy.SetSubscribedResources(envoy.TypeSDS, mapset.NewSetWith("service-cert:default/bookstore", "root-cert-for-mtls-inbound:default/bookstore"))

err := s.sendResponse(proxy, &server, nil, mockConfigurator, envoy.TypeSDS)
Expect(err).To(BeNil())
Expect(actualResponses).ToNot(BeNil())
Expand Down
52 changes: 39 additions & 13 deletions pkg/envoy/ads/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov
if requestNonce == "" {
log.Debug().Msgf("Proxy %s: Empty nonce for %s, should be first message on stream (req resources: %v)",
proxy.String(), typeURL.Short(), discoveryRequest.ResourceNames)
proxy.SetSubscribedResources(typeURL, getRequestedResourceNamesSet(discoveryRequest))
return true
}

Expand All @@ -235,6 +236,7 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov
proxy.String(), typeURL.Short(), requestNonce, requestVersion)
proxy.SetLastSentVersion(typeURL, requestVersion)
proxy.SetLastAppliedVersion(typeURL, requestVersion)
proxy.SetSubscribedResources(typeURL, getRequestedResourceNamesSet(discoveryRequest))
metricsstore.DefaultMetricsStore.ProxyReconnectCount.Inc()
return true
}
Expand All @@ -247,26 +249,33 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov
return false
}

// Nonces match
// At this point, there is no error and nonces match, it is guaranteed an ACK with last sent version.
// At this point, there is no error and nonces match. It can either be an ACK or envoy could still be
// requesting a different set of resources on the current version for non-wildcard TypeURIs.
proxy.SetLastAppliedVersion(typeURL, requestVersion)

// ----
// What's left is to check if the resources listed are the same. If they are not, we must respond
// with the new resources requested.
//
// In case of LDS and CDS, "Envoy will always use wildcard mode for Listener and Cluster resources".
// The following logic is not needed (though correct) for LDS and CDS as request resources are also empty in ACK case.
//
// For Wildcard TypeURIs we are done. Resource names in requests are always empty, nonce alone is enough
// to ACK wildcard types.
// This is the case for LDS and CDS, "Envoy will always use wildcard mode for Listener and Cluster resources".
if envoy.IsWildcardTypeURI(typeURL) {
log.Debug().Msgf("Proxy %s: ACK received for %s, version: %d nonce: %s",
proxy.String(), typeURL.Short(), requestVersion, requestNonce)
return false
}

// For non-wildcard types, what's left is to check if the resources requested are the same as the ones we last sent.
// If they are not, we must respond to the request for the requested resources.
// This part of the code was inspired by Istio's `shouldRespond` handling of request resource difference
// https://github.com/istio/istio/blob/da6178604559bdf2c707a57f452d16bee0de90c8/pilot/pkg/xds/ads.go#L347
// ----
resourcesLastSent := proxy.GetLastResourcesSent(typeURL)

// Update subscribed resources first
resourcesRequested := getRequestedResourceNamesSet(discoveryRequest)
proxy.SetSubscribedResources(typeURL, resourcesRequested)
// Get resources last sent prior to this request
resourcesLastSent := proxy.GetLastResourcesSent(typeURL)

// If what we last sent is a superset of what the
// requests resources subscribes to, it's ACK and nothing needs to be done.
// Otherwise, envoy might be asking us for additional resources that have to be sent along last time.
// Otherwise, envoy might be asking us for additional resources that have to be sent along last time's resources.
// Difference returns elemenets of <requested> that are not part of elements of <last sent>

requestedResourcesDifference := resourcesRequested.Difference(resourcesLastSent)
Expand All @@ -281,7 +290,8 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov
return false
}

// Helper to turn the resource names on a discovery request to a Set for later efficient intersection
// getRequestedResourceNamesSet is a helper to convert the resource names on a discovery request
// to a Set for later efficient intersection
func getRequestedResourceNamesSet(discoveryRequest *xds_discovery.DiscoveryRequest) mapset.Set {
resourcesRequested := mapset.NewSet()
for idx := range discoveryRequest.ResourceNames {
Expand All @@ -290,6 +300,22 @@ func getRequestedResourceNamesSet(discoveryRequest *xds_discovery.DiscoveryReque
return resourcesRequested
}

// getResourceSliceFromMapset is a helper to convert a mapset of resource names to a string slice
func getResourceSliceFromMapset(resourceMap mapset.Set) []string {
resourceSlice := []string{}
it := resourceMap.Iterator()

for elem := range it.C {
resString, ok := elem.(string)
if !ok {
log.Error().Msgf("Failed to cast resource name to string: %v", elem)
continue
}
resourceSlice = append(resourceSlice, resString)
}
return resourceSlice
}

// isCNforProxy returns true if the given CN for the workload certificate matches the given proxy's identity.
// Proxy identity corresponds to the k8s service account, while the workload certificate is of the form
// <svc-account>.<namespace>.<trust-domain>.
Expand Down
31 changes: 31 additions & 0 deletions pkg/envoy/ads/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"testing"

xds_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/google/uuid"
tassert "github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -51,3 +52,33 @@ func TestIsCNForProxy(t *testing.T) {
})
}
}

func findSliceElem(slice []string, elem string) bool {
for _, v := range slice {
if v == elem {
return true
}
}
return false
}

func TestMapsetToSliceConvFunctions(t *testing.T) {
assert := tassert.New(t)

discRequest := &xds_discovery.DiscoveryRequest{TypeUrl: "TestTypeurl"}
discRequest.ResourceNames = []string{"A", "B", "C"}

nameSet := getRequestedResourceNamesSet(discRequest)

assert.True(nameSet.Contains("A"))
assert.True(nameSet.Contains("B"))
assert.True(nameSet.Contains("C"))
assert.False(nameSet.Contains("D"))

nameSlice := getResourceSliceFromMapset(nameSet)

assert.True(findSliceElem(nameSlice, "A"))
assert.True(findSliceElem(nameSlice, "B"))
assert.True(findSliceElem(nameSlice, "C"))
assert.False(findSliceElem(nameSlice, "D"))
}
19 changes: 19 additions & 0 deletions pkg/envoy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type Proxy struct {
// Contains the last resource names sent for a given proxy and TypeURL
lastxDSResourcesSent map[TypeURI]mapset.Set

// Contains the last requested resource names (and therefore, subscribed) for a given TypeURI
subscribedResources map[TypeURI]mapset.Set

// hash is based on CommonName
hash uint64

Expand Down Expand Up @@ -207,6 +210,21 @@ func (p *Proxy) SetLastResourcesSent(typeURI TypeURI, resourcesSet mapset.Set) {
p.lastxDSResourcesSent[typeURI] = resourcesSet
}

// GetSubscribedResources returns a set of resources subscribed for a proxy given a TypeURL
// If none were subscribed, empty set is returned
func (p *Proxy) GetSubscribedResources(typeURI TypeURI) mapset.Set {
sentResources, ok := p.subscribedResources[typeURI]
if !ok {
return mapset.NewSet()
}
return sentResources
}

// SetSubscribedResources sets the input resources as subscribed resources given a proxy for a TypeURL
func (p *Proxy) SetSubscribedResources(typeURI TypeURI, resourcesSet mapset.Set) {
p.subscribedResources[typeURI] = resourcesSet
}

// Kind return the proxy's kind
func (p *Proxy) Kind() ProxyKind {
return p.kind
Expand Down Expand Up @@ -238,6 +256,7 @@ func NewProxy(certCommonName certificate.CommonName, certSerialNumber certificat
lastSentVersion: make(map[TypeURI]uint64),
lastAppliedVersion: make(map[TypeURI]uint64),
lastxDSResourcesSent: make(map[TypeURI]mapset.Set),
subscribedResources: make(map[TypeURI]mapset.Set),

kind: cnMeta.ProxyKind,
}, nil
Expand Down
20 changes: 20 additions & 0 deletions pkg/envoy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

mapset "github.com/deckarep/golang-set"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -479,3 +480,22 @@ func TestPodMetadataString(t *testing.T) {
})
}
}

func TestSubscribedResources(t *testing.T) {
assert := tassert.New(t)

p := Proxy{
subscribedResources: make(map[TypeURI]mapset.Set),
}

res := p.GetSubscribedResources("test")
assert.Zero(res.Cardinality())

p.SetSubscribedResources(TypeRDS, mapset.NewSetWith("A", "B", "C"))

res = p.GetSubscribedResources(TypeRDS)
assert.Equal(res.Cardinality(), 3)
assert.True(res.Contains("A"))
assert.True(res.Contains("B"))
assert.True(res.Contains("C"))
}
8 changes: 8 additions & 0 deletions pkg/envoy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ var (
// TypeURI is a string describing the Envoy xDS payload.
type TypeURI string

// IsWildcardTypeURI returns if a given TypeURI is an expected wildcard TypeURI or not.
// XDS proto defines general client behavior as:
// "Envoy will always use wildcard subscriptions for Listener and Cluster resources"
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#client-behavior
func IsWildcardTypeURI(t TypeURI) bool {
return t == TypeCDS || t == TypeLDS
}

func (t TypeURI) String() string {
return string(t)
}
Expand Down

0 comments on commit 0fc7e5e

Please sign in to comment.