From d19404477fb0ab79536db7be8bc685dda576614e Mon Sep 17 00:00:00 2001 From: Matei David Date: Tue, 1 Aug 2023 15:58:28 +0000 Subject: [PATCH 01/15] Introduce an EndpointsWatcher cache structure * EndpointsWatcher cache will be populated with one EndpointsWatcher per cluster (including the local cluster) * Using a namespace scoped informer, it will add/remove watchers based on secrets read from the namespace. Signed-off-by: Matei David --- .../watcher/endpoints_watcher_cache.go | 184 ++++++++++++++++++ controller/k8s/metadata_api.go | 21 ++ 2 files changed, 205 insertions(+) create mode 100644 controller/api/destination/watcher/endpoints_watcher_cache.go diff --git a/controller/api/destination/watcher/endpoints_watcher_cache.go b/controller/api/destination/watcher/endpoints_watcher_cache.go new file mode 100644 index 0000000000000..8a67f5ca8eeb4 --- /dev/null +++ b/controller/api/destination/watcher/endpoints_watcher_cache.go @@ -0,0 +1,184 @@ +package watcher + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/linkerd/linkerd2/controller/k8s" + logging "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + + consts "github.com/linkerd/linkerd2/pkg/k8s" +) + +type ( + EndpointsWatcherCache struct { + sync.RWMutex + + watchers map[string]struct { + store *EndpointsWatcher + trustDomain string + clusterDomain string + } + + enableEndpointSlices bool + log *logging.Entry + } +) + +const ( + clusterNameLabel = "multicluster.linkerd.io/cluster-name" + trustDomainAnnotation = "multicluster.linkerd.io/trust-domain" + clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain" +) + +func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) (*EndpointsWatcherCache, error) { + ewc := &EndpointsWatcherCache{ + watchers: make(map[string]struct { + store *EndpointsWatcher + trustDomain string + clusterDomain string + }), + log: logging.WithFields(logging.Fields{ + "component": "endpoints-watcher-cache", + }), + enableEndpointSlices: enableEndpointSlices, + } + + _, err := k8sAPI.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + secret, ok := obj.(*v1.Secret) + if !ok { + ewc.log.Errorf("Error processing 'Secret' object: got %#v, expected *corev1.Secret", secret) + return + } + + clusterName, found := secret.GetLabels()[clusterNameLabel] + if !found { + ewc.log.Tracef("Skipping Add event for 'Secret' object: missing \"%s\" label", clusterNameLabel) + return + } + + ewc.addWatcher(clusterName, secret) + + }, + DeleteFunc: func(obj interface{}) { + secret, ok := obj.(*v1.Secret) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + ewc.log.Errorf("unable to get object from DeletedFinalStateUnknown %#v", obj) + return + } + secret, ok = tombstone.Obj.(*v1.Secret) + if !ok { + ewc.log.Errorf("DeletedFinalStateUnknown contained object that is not a Secret %#v", obj) + return + } + } + + clusterName, found := secret.GetLabels()[clusterNameLabel] + if !found { + ewc.log.Tracef("Skipping Delete event for 'Secret' object: missing \"%s\" label", clusterNameLabel) + return + } + + ewc.removeWatcher(clusterName) + + }, + }) + + if err != nil { + return nil, err + } + + return ewc, nil +} + +func (ewc *EndpointsWatcherCache) removeWatcher(clusterName string) { + ewc.Lock() + defer ewc.Unlock() + delete(ewc.watchers, clusterName) +} + +func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secret) error { + clusterDomain, found := secret.GetAnnotations()[clusterDomainAnnotation] + if !found { + return fmt.Errorf("missing \"%s\" annotation", clusterDomainAnnotation) + } + + trustDomain, found := secret.GetAnnotations()[trustDomainAnnotation] + if !found { + return fmt.Errorf("missing \"%s\" annotation", trustDomainAnnotation) + } + + data, found := secret.Data[consts.ConfigKeyName] + if !found { + return errors.New("missing kubeconfig file") + } + + cfg, err := clientcmd.RESTConfigFromKubeConfig(data) + if err != nil { + return fmt.Errorf("unable to parse kubeconfig: %w", err) + } + + ctx := context.Background() + var remoteAPI *k8s.API + if ewc.enableEndpointSlices { + remoteAPI, err = k8s.InitializeAPIForConfig( + ctx, + cfg, + true, + k8s.Endpoint, k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, + ) + } else { + remoteAPI, err = k8s.InitializeAPIForConfig( + ctx, + cfg, + true, + k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, + ) + } + if err != nil { + return fmt.Errorf("unable to initialize api for remote cluster %s: %w", clusterName, err) + } + + metadataAPI, err := k8s.InitializeMetadataAPIForConfig(cfg, k8s.Node, k8s.RS) + if err != nil { + return fmt.Errorf("unable to initialize metadata api for remote cluster %s: %w", clusterName, err) + } + + watcher, err := NewEndpointsWatcher( + remoteAPI, + metadataAPI, + logging.WithFields(logging.Fields{ + "remote-cluster": clusterName, + }), + ewc.enableEndpointSlices, + ) + if err != nil { + return fmt.Errorf("unable to initialize endpoints watcher for remote cluster %s: %w", clusterName, err) + } + + ewc.Lock() + ewc.watchers[clusterName] = struct { + store *EndpointsWatcher + trustDomain string + clusterDomain string + }{ + store: watcher, + trustDomain: trustDomain, + clusterDomain: clusterDomain, + } + ewc.Unlock() + + remoteAPI.Sync(nil) + metadataAPI.Sync(nil) + + return nil + +} diff --git a/controller/k8s/metadata_api.go b/controller/k8s/metadata_api.go index 600fd4715a396..ce75412fcc235 100644 --- a/controller/k8s/metadata_api.go +++ b/controller/k8s/metadata_api.go @@ -18,6 +18,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/metadatainformer" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) @@ -57,6 +58,26 @@ func InitializeMetadataAPI(kubeConfig string, resources ...APIResource) (*Metada return api, nil } +func InitializeMetadataAPIForConfig(kubeConfig *rest.Config, resources ...APIResource) (*MetadataAPI, error) { + client, err := metadata.NewForConfig(kubeConfig) + if err != nil { + return nil, err + } + + api, err := newClusterScopedMetadataAPI(client, resources...) + if err != nil { + return nil, err + } + + for _, gauge := range api.gauges { + if err := prometheus.Register(gauge); err != nil { + log.Warnf("failed to register Prometheus gauge %s: %s", gauge.Desc().String(), err) + } + } + return api, nil + +} + func newClusterScopedMetadataAPI( metadataClient metadata.Interface, resources ...APIResource, From 2e4f1e91f8942b538fd1adb7191f9a2309a5be48 Mon Sep 17 00:00:00 2001 From: Matei David Date: Tue, 1 Aug 2023 16:30:07 +0000 Subject: [PATCH 02/15] Handle error Signed-off-by: Matei David --- controller/api/destination/watcher/endpoints_watcher_cache.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/controller/api/destination/watcher/endpoints_watcher_cache.go b/controller/api/destination/watcher/endpoints_watcher_cache.go index 8a67f5ca8eeb4..43d5c24283677 100644 --- a/controller/api/destination/watcher/endpoints_watcher_cache.go +++ b/controller/api/destination/watcher/endpoints_watcher_cache.go @@ -63,7 +63,9 @@ func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) (*Endp return } - ewc.addWatcher(clusterName, secret) + if err := ewc.addWatcher(clusterName, secret); err != nil { + ewc.log.Errorf("Error processing 'Secret' object: %w", err) + } }, DeleteFunc: func(obj interface{}) { From 3e1266ba781d3d104d91f1e497420a7f83294b2e Mon Sep 17 00:00:00 2001 From: Matei David Date: Wed, 2 Aug 2023 17:33:48 +0000 Subject: [PATCH 03/15] Add graceful shutdown for informers Signed-off-by: Matei David --- .../destination/watcher/endpoints_watcher.go | 55 ++++- .../watcher/endpoints_watcher_cache.go | 231 +++++++++++++----- 2 files changed, 225 insertions(+), 61 deletions(-) diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index ca3809cae8dbf..e7cb3749557c2 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -78,6 +78,18 @@ type ( log *logging.Entry enableEndpointSlices bool sync.RWMutex // This mutex protects modification of the map itself. + + informerHandlers + } + + // informerHandlers holds a registration handle for each informer handler + // that has been registered for the EndpointsWatcher. The registration + // handles are used to re-deregister informer handlers when the + // EndpointsWatcher stops. + informerHandlers struct { + epHandle cache.ResourceEventHandlerRegistration + svcHandle cache.ResourceEventHandlerRegistration + srvHandle cache.ResourceEventHandlerRegistration } // servicePublisher represents a service. It keeps a map of portPublishers @@ -149,7 +161,7 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log }), } - _, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + svcHandle, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addService, DeleteFunc: ew.deleteService, UpdateFunc: func(_, obj interface{}) { ew.addService(obj) }, @@ -157,8 +169,9 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log if err != nil { return nil, err } + ew.svcHandle = svcHandle - _, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + srvHandle, err := k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addServer, DeleteFunc: ew.deleteServer, UpdateFunc: func(_, obj interface{}) { ew.addServer(obj) }, @@ -166,10 +179,11 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log if err != nil { return nil, err } + ew.srvHandle = srvHandle if ew.enableEndpointSlices { ew.log.Debugf("Watching EndpointSlice resources") - _, err := k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + epHandle, err := k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addEndpointSlice, DeleteFunc: ew.deleteEndpointSlice, UpdateFunc: ew.updateEndpointSlice, @@ -177,9 +191,11 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log if err != nil { return nil, err } + + ew.epHandle = epHandle } else { ew.log.Debugf("Watching Endpoints resources") - _, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + epHandle, err := k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addEndpoints, DeleteFunc: ew.deleteEndpoints, UpdateFunc: func(_, obj interface{}) { ew.addEndpoints(obj) }, @@ -187,6 +203,8 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log if err != nil { return nil, err } + + ew.epHandle = epHandle } return ew, nil } @@ -232,6 +250,35 @@ func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string sp.unsubscribe(port, hostname, listener) } +// Stop will terminate an EndpointsWatcher by shutting down its informers. It +// uses the write half of the channel used to sync the informers to signal +// shutdown. It additionally de-registers any event handlers used by its +// informers. +func (ew *EndpointsWatcher) Stop(stopCh chan<- struct{}) { + err := ew.k8sAPI.Svc().Informer().RemoveEventHandler(ew.svcHandle) + if err != nil { + ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + } + + err = ew.k8sAPI.Srv().Informer().RemoveEventHandler(ew.srvHandle) + if err != nil { + ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + } + + if ew.enableEndpointSlices { + err = ew.k8sAPI.ES().Informer().RemoveEventHandler(ew.epHandle) + } else { + err = ew.k8sAPI.Endpoint().Informer().RemoveEventHandler(ew.epHandle) + } + + if err != nil { + ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + } + + // Signal informers to stop + stopCh <- struct{}{} +} + func (ew *EndpointsWatcher) addService(obj interface{}) { service := obj.(*corev1.Service) id := ServiceID{ diff --git a/controller/api/destination/watcher/endpoints_watcher_cache.go b/controller/api/destination/watcher/endpoints_watcher_cache.go index 43d5c24283677..4afef63537b77 100644 --- a/controller/api/destination/watcher/endpoints_watcher_cache.go +++ b/controller/api/destination/watcher/endpoints_watcher_cache.go @@ -16,40 +16,72 @@ import ( ) type ( + // EndpointsWatcherCache holds all EndpointsWatchers used by the destination + // service to perform service discovery. Each cluster (including the one the + // controller is running in) that may be looked-up for service discovery has + // an associated EndpointsWatcher in the cache, along with a set of + // immutable cluster configuration primitives (i.e. identity and cluster + // domains). EndpointsWatcherCache struct { + // Protects against illegal accesses sync.RWMutex - watchers map[string]struct { - store *EndpointsWatcher - trustDomain string - clusterDomain string - } - + k8sAPI *k8s.API + store map[string]watchStore enableEndpointSlices bool log *logging.Entry } + + // watchStore is a helper struct that represents a cache item. + watchStore struct { + watcher *EndpointsWatcher + trustDomain string + clusterDomain string + + // Used to signal shutdown to the watcher. + // Warning: it should be the same channel that was used to sync the + // informers, otherwise the informers won't stop. + stopCh chan<- struct{} + } + + configDecoder = func(secret *v1.Secret, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) ) const ( + // LocalClusterKey represents the look-up key that returns an + // EndpointsWatcher associated with the "local" cluster. + LocalClusterKey = "local" clusterNameLabel = "multicluster.linkerd.io/cluster-name" trustDomainAnnotation = "multicluster.linkerd.io/trust-domain" clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain" ) -func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) (*EndpointsWatcherCache, error) { - ewc := &EndpointsWatcherCache{ - watchers: make(map[string]struct { - store *EndpointsWatcher - trustDomain string - clusterDomain string - }), +// NewEndpointsWatcherCache creates a new (empty) EndpointsWatcherCache. It +// requires a Kubernetes API Server client instantiated for the local cluster. +// +// Upon creation, the EndpointsWatcherCache will trigger an informer that +// watches multicluster specific Secrets (for remote service discovery). +// +// Valid secrets will create and start a new EndpointsWatcher for a remote +// cluster. When a secret is removed, the watcher is automatically stopped and +// cleaned-up. +func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) *EndpointsWatcherCache { + return &EndpointsWatcherCache{ + store: make(map[string]watchStore), log: logging.WithFields(logging.Fields{ "component": "endpoints-watcher-cache", }), enableEndpointSlices: enableEndpointSlices, + k8sAPI: k8sAPI, } +} - _, err := k8sAPI.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ +func (ewc *EndpointsWatcherCache) Start() error { + return ewc.startWithDecoder(decodeK8sConfigFromSecret) +} + +func (ewc *EndpointsWatcherCache) startWithDecoder(decodeFn configDecoder) error { + _, err := ewc.k8sAPI.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { secret, ok := obj.(*v1.Secret) if !ok { @@ -57,35 +89,46 @@ func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) (*Endp return } + if secret.Type != consts.MirrorSecretType { + ewc.log.Tracef("Skipping Add event for 'Secret' object %s/%s: invalid type %s", secret.Namespace, secret.Name, secret.Type) + return + + } + clusterName, found := secret.GetLabels()[clusterNameLabel] if !found { - ewc.log.Tracef("Skipping Add event for 'Secret' object: missing \"%s\" label", clusterNameLabel) + ewc.log.Tracef("Skipping Add event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel) return } - if err := ewc.addWatcher(clusterName, secret); err != nil { - ewc.log.Errorf("Error processing 'Secret' object: %w", err) + if err := ewc.addWatcher(clusterName, secret, decodeFn); err != nil { + ewc.log.Errorf("Error adding watcher for cluster %s: %v", clusterName, err) } }, DeleteFunc: func(obj interface{}) { secret, ok := obj.(*v1.Secret) if !ok { + // If the Secret was deleted when the watch was disconnected + // (for whatever reason) and the event was missed, the object is + // added with 'DeletedFinalStateUnknown'. Its state may be + // stale, so it should be cleaned-up. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - ewc.log.Errorf("unable to get object from DeletedFinalStateUnknown %#v", obj) + ewc.log.Debugf("unable to get object from DeletedFinalStateUnknown %#v", obj) return } + // If the zombie object is a `Secret` we can delete it. secret, ok = tombstone.Obj.(*v1.Secret) if !ok { - ewc.log.Errorf("DeletedFinalStateUnknown contained object that is not a Secret %#v", obj) + ewc.log.Debugf("DeletedFinalStateUnknown contained object that is not a Secret %#v", obj) return } } clusterName, found := secret.GetLabels()[clusterNameLabel] if !found { - ewc.log.Tracef("Skipping Delete event for 'Secret' object: missing \"%s\" label", clusterNameLabel) + ewc.log.Tracef("Skipping Delete event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel) return } @@ -95,19 +138,80 @@ func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) (*Endp }) if err != nil { - return nil, err + return err } - return ewc, nil + return err +} + +// Get safely retrieves a watcher from the cache given a cluster name. It +// returns the watcher, cluster configuration, if the entry exists in the cache. +func (ewc *EndpointsWatcherCache) Get(clusterName string) (*EndpointsWatcher, string, string, bool) { + ewc.RLock() + defer ewc.RUnlock() + s, found := ewc.store[clusterName] + return s.watcher, s.trustDomain, s.clusterDomain, found +} + +// GetWatcher is a convenience method that given a cluster name only returns the +// associated EndpointsWatcher if it exists in the cache. +func (ewc *EndpointsWatcherCache) GetWatcher(clusterName string) (*EndpointsWatcher, bool) { + ewc.RLock() + defer ewc.RUnlock() + s, found := ewc.store[clusterName] + return s.watcher, found +} + +// GetLocalWatcher is a convenience method that retrieves the watcher associated +// with the local cluster. Its existence is assumed. +func (ewc *EndpointsWatcherCache) GetLocalWatcher() *EndpointsWatcher { + ewc.RLock() + defer ewc.RUnlock() + return ewc.store[LocalClusterKey].watcher +} + +// GetClusterConfig is a convenience method that given a cluster name retrieves +// its associated configuration strings if the entry exists in the cache. +func (ewc *EndpointsWatcherCache) GetClusterConfig(clusterName string) (string, string, bool) { + ewc.RLock() + defer ewc.RUnlock() + s, found := ewc.store[clusterName] + return s.trustDomain, s.clusterDomain, found +} + +// AddLocalWatcher adds a watcher to the cache using the local cluster key. +func (ewc *EndpointsWatcherCache) AddLocalWatcher(stopCh chan<- struct{}, watcher *EndpointsWatcher, trustDomain, clusterDomain string) { + ewc.Lock() + defer ewc.Unlock() + ewc.store[LocalClusterKey] = watchStore{ + watcher, + trustDomain, + clusterDomain, + stopCh, + } } +// removeWatcher is triggered by the cache's Secret informer when a secret is +// removed. Given a cluster name, it removes the entry from the cache after +// stopping the associated watcher. func (ewc *EndpointsWatcherCache) removeWatcher(clusterName string) { ewc.Lock() defer ewc.Unlock() - delete(ewc.watchers, clusterName) + if s, found := ewc.store[clusterName]; found { + // For good measure, close the channel after stopping to ensure + // informers are shut down. + defer close(s.stopCh) + s.watcher.Stop(s.stopCh) + delete(ewc.store, clusterName) + ewc.log.Tracef("Removed cluster %s from EndpointsWatcherCache", clusterName) + } } -func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secret) error { +// addWatcher is triggered by the cache's Secret informer when a secret is +// added, or during the initial informer list. Given a cluster name and a Secret +// object, it creates an EndpointsWatcher for a remote cluster and syncs its +// informers before returning. +func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secret, decodeFn configDecoder) error { clusterDomain, found := secret.GetAnnotations()[clusterDomainAnnotation] if !found { return fmt.Errorf("missing \"%s\" annotation", clusterDomainAnnotation) @@ -118,19 +222,54 @@ func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secr return fmt.Errorf("missing \"%s\" annotation", trustDomainAnnotation) } + remoteAPI, metadataAPI, err := decodeFn(secret, ewc.enableEndpointSlices) + if err != nil { + return err + } + + watcher, err := NewEndpointsWatcher( + remoteAPI, + metadataAPI, + logging.WithFields(logging.Fields{ + "remote-cluster": clusterName, + }), + ewc.enableEndpointSlices, + ) + if err != nil { + return err + } + + ewc.Lock() + stopCh := make(chan struct{}, 1) + ewc.store[clusterName] = watchStore{ + watcher, + trustDomain, + clusterDomain, + stopCh, + } + ewc.Unlock() + + remoteAPI.Sync(stopCh) + metadataAPI.Sync(stopCh) + ewc.log.Tracef("Added cluster %s to EndpointsWatcherCache", clusterName) + + return nil +} + +func decodeK8sConfigFromSecret(secret *v1.Secret, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { data, found := secret.Data[consts.ConfigKeyName] if !found { - return errors.New("missing kubeconfig file") + return nil, nil, errors.New("missing kubeconfig file") } cfg, err := clientcmd.RESTConfigFromKubeConfig(data) if err != nil { - return fmt.Errorf("unable to parse kubeconfig: %w", err) + return nil, nil, err } ctx := context.Background() var remoteAPI *k8s.API - if ewc.enableEndpointSlices { + if enableEndpointSlices { remoteAPI, err = k8s.InitializeAPIForConfig( ctx, cfg, @@ -146,41 +285,19 @@ func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secr ) } if err != nil { - return fmt.Errorf("unable to initialize api for remote cluster %s: %w", clusterName, err) + return nil, nil, err } metadataAPI, err := k8s.InitializeMetadataAPIForConfig(cfg, k8s.Node, k8s.RS) if err != nil { - return fmt.Errorf("unable to initialize metadata api for remote cluster %s: %w", clusterName, err) + return nil, nil, err } - watcher, err := NewEndpointsWatcher( - remoteAPI, - metadataAPI, - logging.WithFields(logging.Fields{ - "remote-cluster": clusterName, - }), - ewc.enableEndpointSlices, - ) - if err != nil { - return fmt.Errorf("unable to initialize endpoints watcher for remote cluster %s: %w", clusterName, err) - } - - ewc.Lock() - ewc.watchers[clusterName] = struct { - store *EndpointsWatcher - trustDomain string - clusterDomain string - }{ - store: watcher, - trustDomain: trustDomain, - clusterDomain: clusterDomain, - } - ewc.Unlock() - - remoteAPI.Sync(nil) - metadataAPI.Sync(nil) - - return nil + return remoteAPI, metadataAPI, nil +} +func (ewc *EndpointsWatcherCache) len() int { + ewc.RLock() + defer ewc.RUnlock() + return len(ewc.store) } From ab6d71e30bd3864d104123de984aec6402b2f182 Mon Sep 17 00:00:00 2001 From: Matei David Date: Wed, 2 Aug 2023 17:34:16 +0000 Subject: [PATCH 04/15] Add first tests for endpoints watcher cache Signed-off-by: Matei David --- .../watcher/endpoints_watcher_cache_test.go | 190 ++++++++++++++++++ controller/k8s/test_helper.go | 1 + 2 files changed, 191 insertions(+) create mode 100644 controller/api/destination/watcher/endpoints_watcher_cache_test.go diff --git a/controller/api/destination/watcher/endpoints_watcher_cache_test.go b/controller/api/destination/watcher/endpoints_watcher_cache_test.go new file mode 100644 index 0000000000000..b16c8c6f7152d --- /dev/null +++ b/controller/api/destination/watcher/endpoints_watcher_cache_test.go @@ -0,0 +1,190 @@ +package watcher + +import ( + "testing" + "time" + + "github.com/linkerd/linkerd2/controller/k8s" + logging "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" +) + +func CreateMockDecoder() configDecoder { + // Create a mock decoder with some random objs to satisfy client creation + return func(secret *v1.Secret, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { + metadataAPI, err := k8s.NewFakeMetadataAPI([]string{}) + if err != nil { + return nil, nil, err + } + + var remoteAPI *k8s.API + if enableEndpointSlices { + remoteAPI, err = k8s.NewFakeAPI(endpointSliceAPIObj...) + } else { + remoteAPI, err = k8s.NewFakeAPI(endpointsAPIObj...) + } + + if err != nil { + return nil, nil, err + } + + return remoteAPI, metadataAPI, nil + } + +} + +func TestEndpointsWatcherCacheAddHandler(t *testing.T) { + for _, tt := range []struct { + name string + k8sConfigs []string + enableEndpointSlices bool + expectedClusters map[string]struct{} + deleteClusters map[string]struct{} + }{ + { + name: "should correctly add remote watcher to cache when Secret is valid", + k8sConfigs: []string{ + validRemoteSecret, + }, + enableEndpointSlices: true, + expectedClusters: map[string]struct{}{ + "remote": {}, + LocalClusterKey: {}, + }, + deleteClusters: map[string]struct{}{ + "remote": {}, + }, + }, + } { + // Pin + tt := tt + t.Run(tt.name, func(t *testing.T) { + // TODO (matei): use namespace scoped API here + k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } + + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) + if err != nil { + t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) + } + + ewc := NewEndpointsWatcherCache(k8sAPI, tt.enableEndpointSlices) + if err := ewc.startWithDecoder(CreateMockDecoder()); err != nil { + t.Fatalf("Unexpected error when starting watcher cache: %s", err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices) + if err != nil { + t.Fatalf("Unexpected error when creating local watcher: %s", err) + } + + ewc.AddLocalWatcher(nil, watcher, "cluster.local", "cluster.local") + actualLen := ewc.len() + if actualLen != len(tt.expectedClusters) { + t.Fatalf("Unexpected error: expected to see %d cache entries, got: %d", len(tt.expectedClusters), actualLen) + } + for k := range tt.expectedClusters { + if _, found := ewc.GetWatcher(k); !found { + t.Fatalf("Unexpected error: cluster %s is missing from the cache", k) + } + } + + // Handle delete events + if len(tt.deleteClusters) != 0 { + for k := range tt.deleteClusters { + watcher, found := ewc.GetWatcher(k) + if !found { + t.Fatalf("Unexpected error: watcher %s should exist in the cache", k) + } + // Unfortunately, mock k8s client does not propagate + // deletes, so we have to call remove directly. + ewc.removeWatcher(k) + // Leave it to do its thing and gracefully shutdown + time.Sleep(5 * time.Second) + var hasStopped bool + if tt.enableEndpointSlices { + hasStopped = watcher.k8sAPI.ES().Informer().IsStopped() + } else { + hasStopped = watcher.k8sAPI.Endpoint().Informer().IsStopped() + } + if !hasStopped { + t.Fatalf("Unexpected error: informers for watcher %s should be stopped", k) + } + + if _, found := ewc.GetWatcher(k); found { + t.Fatalf("Unexpected error: watcher %s should have been removed from the cache", k) + } + + } + } + }) + } +} + +var validRemoteSecret = ` +apiVersion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: remote-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: remote + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK +` + +var endpointsAPIObj = []string{` +apiVersion: v1 +kind: Service +metadata: + name: name1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, + ` +apiVersion: v1 +kind: Endpoints +metadata: + name: remote-service + namespace: ns +subsets: +- addresses: + - ip: 1.2.3.4 + ports: + - port: 80 +`, +} + +var endpointSliceAPIObj = []string{` +apiVersion: v1 +kind: Service +metadata: + name: name1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, ` +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name1 + name: name1-es + namespace: ns +ports: +- name: "" + port: 8989 +`, +} diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 559722dd96098..1276d62124f2d 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -37,6 +37,7 @@ func NewFakeAPI(configs ...string) (*API, error) { Node, ES, Srv, + Secret, ), nil } From 1362d73e7ed26540004e6001da3d80fb96271052 Mon Sep 17 00:00:00 2001 From: Matei David Date: Wed, 2 Aug 2023 17:41:25 +0000 Subject: [PATCH 05/15] Fix comments Signed-off-by: Matei David --- .../watcher/endpoints_watcher_cache.go | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/controller/api/destination/watcher/endpoints_watcher_cache.go b/controller/api/destination/watcher/endpoints_watcher_cache.go index 4afef63537b77..a4361f5d17bf7 100644 --- a/controller/api/destination/watcher/endpoints_watcher_cache.go +++ b/controller/api/destination/watcher/endpoints_watcher_cache.go @@ -44,6 +44,10 @@ type ( stopCh chan<- struct{} } + // configDecoder is a function type that given a secret, decodes it and + // instantiates the API Server clients. Clients are dynamically created, + // configDecoder allows some degree of isolation between the cache and + // client bootstrapping. configDecoder = func(secret *v1.Secret, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) ) @@ -58,13 +62,6 @@ const ( // NewEndpointsWatcherCache creates a new (empty) EndpointsWatcherCache. It // requires a Kubernetes API Server client instantiated for the local cluster. -// -// Upon creation, the EndpointsWatcherCache will trigger an informer that -// watches multicluster specific Secrets (for remote service discovery). -// -// Valid secrets will create and start a new EndpointsWatcher for a remote -// cluster. When a secret is removed, the watcher is automatically stopped and -// cleaned-up. func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) *EndpointsWatcherCache { return &EndpointsWatcherCache{ store: make(map[string]watchStore), @@ -76,6 +73,14 @@ func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) *Endpo } } +// Start will register a pair of event handlers against a `Secret` informer. +// +// EndpointsWatcherCache will watch multicluster specific `Secret` objects (to +// create watchers that allow for remote service discovery). +// +// Valid secrets will create and start a new EndpointsWatcher for a remote +// cluster. When a secret is removed, the watcher is automatically stopped and +// cleaned-up. func (ewc *EndpointsWatcherCache) Start() error { return ewc.startWithDecoder(decodeK8sConfigFromSecret) } @@ -256,6 +261,8 @@ func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secr return nil } +// decodeK8sConfigFromSecret implements the decoder function type and creates +// the necessary configuration from a secret. func decodeK8sConfigFromSecret(secret *v1.Secret, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { data, found := secret.Data[consts.ConfigKeyName] if !found { From 213f5dbb50338c562e4cf79300637561a70e6796 Mon Sep 17 00:00:00 2001 From: Matei David Date: Thu, 3 Aug 2023 12:42:55 +0000 Subject: [PATCH 06/15] Add more tests for endpoints watcher cache Signed-off-by: Matei David --- .../destination/watcher/endpoints_watcher.go | 45 ++--- .../watcher/endpoints_watcher_cache.go | 63 ++++--- .../watcher/endpoints_watcher_cache_test.go | 174 ++++++++++++------ 3 files changed, 173 insertions(+), 109 deletions(-) diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index e7cb3749557c2..8e70c03f10ee4 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -161,7 +161,8 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log }), } - svcHandle, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + var err error + ew.svcHandle, err = k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addService, DeleteFunc: ew.deleteService, UpdateFunc: func(_, obj interface{}) { ew.addService(obj) }, @@ -169,9 +170,8 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log if err != nil { return nil, err } - ew.svcHandle = svcHandle - srvHandle, err := k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + ew.srvHandle, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addServer, DeleteFunc: ew.deleteServer, UpdateFunc: func(_, obj interface{}) { ew.addServer(obj) }, @@ -179,11 +179,10 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log if err != nil { return nil, err } - ew.srvHandle = srvHandle if ew.enableEndpointSlices { ew.log.Debugf("Watching EndpointSlice resources") - epHandle, err := k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + ew.epHandle, err = k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addEndpointSlice, DeleteFunc: ew.deleteEndpointSlice, UpdateFunc: ew.updateEndpointSlice, @@ -192,10 +191,9 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log return nil, err } - ew.epHandle = epHandle } else { ew.log.Debugf("Watching Endpoints resources") - epHandle, err := k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + ew.epHandle, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addEndpoints, DeleteFunc: ew.deleteEndpoints, UpdateFunc: func(_, obj interface{}) { ew.addEndpoints(obj) }, @@ -203,8 +201,6 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log if err != nil { return nil, err } - - ew.epHandle = epHandle } return ew, nil } @@ -255,24 +251,29 @@ func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string // shutdown. It additionally de-registers any event handlers used by its // informers. func (ew *EndpointsWatcher) Stop(stopCh chan<- struct{}) { - err := ew.k8sAPI.Svc().Informer().RemoveEventHandler(ew.svcHandle) - if err != nil { - ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + ew.Lock() + defer ew.Unlock() + if ew.svcHandle != nil { + if err := ew.k8sAPI.Svc().Informer().RemoveEventHandler(ew.svcHandle); err != nil { + ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + } } - err = ew.k8sAPI.Srv().Informer().RemoveEventHandler(ew.srvHandle) - if err != nil { - ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + if ew.srvHandle != nil { + if err := ew.k8sAPI.Srv().Informer().RemoveEventHandler(ew.srvHandle); err != nil { + ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + } } - if ew.enableEndpointSlices { - err = ew.k8sAPI.ES().Informer().RemoveEventHandler(ew.epHandle) - } else { - err = ew.k8sAPI.Endpoint().Informer().RemoveEventHandler(ew.epHandle) - } + if ew.enableEndpointSlices && ew.epHandle != nil { + if err := ew.k8sAPI.ES().Informer().RemoveEventHandler(ew.epHandle); err != nil { - if err != nil { - ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + } + } else { + if err := ew.k8sAPI.Endpoint().Informer().RemoveEventHandler(ew.epHandle); err != nil { + ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + } } // Signal informers to stop diff --git a/controller/api/destination/watcher/endpoints_watcher_cache.go b/controller/api/destination/watcher/endpoints_watcher_cache.go index a4361f5d17bf7..6f2e81f57a513 100644 --- a/controller/api/destination/watcher/endpoints_watcher_cache.go +++ b/controller/api/destination/watcher/endpoints_watcher_cache.go @@ -30,6 +30,8 @@ type ( store map[string]watchStore enableEndpointSlices bool log *logging.Entry + + decodeFn configDecoder } // watchStore is a helper struct that represents a cache item. @@ -48,7 +50,7 @@ type ( // instantiates the API Server clients. Clients are dynamically created, // configDecoder allows some degree of isolation between the cache and // client bootstrapping. - configDecoder = func(secret *v1.Secret, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) + configDecoder = func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) ) const ( @@ -62,15 +64,8 @@ const ( // NewEndpointsWatcherCache creates a new (empty) EndpointsWatcherCache. It // requires a Kubernetes API Server client instantiated for the local cluster. -func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) *EndpointsWatcherCache { - return &EndpointsWatcherCache{ - store: make(map[string]watchStore), - log: logging.WithFields(logging.Fields{ - "component": "endpoints-watcher-cache", - }), - enableEndpointSlices: enableEndpointSlices, - k8sAPI: k8sAPI, - } +func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) (*EndpointsWatcherCache, error) { + return newWatcherCacheWithDecoder(k8sAPI, enableEndpointSlices, decodeK8sConfigFromSecret) } // Start will register a pair of event handlers against a `Secret` informer. @@ -81,14 +76,22 @@ func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) *Endpo // Valid secrets will create and start a new EndpointsWatcher for a remote // cluster. When a secret is removed, the watcher is automatically stopped and // cleaned-up. -func (ewc *EndpointsWatcherCache) Start() error { - return ewc.startWithDecoder(decodeK8sConfigFromSecret) -} -func (ewc *EndpointsWatcherCache) startWithDecoder(decodeFn configDecoder) error { +func newWatcherCacheWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, decodeFn configDecoder) (*EndpointsWatcherCache, error) { + ewc := &EndpointsWatcherCache{ + store: make(map[string]watchStore), + log: logging.WithFields(logging.Fields{ + "component": "endpoints-watcher-cache", + }), + enableEndpointSlices: enableEndpointSlices, + k8sAPI: k8sAPI, + decodeFn: decodeFn, + } + _, err := ewc.k8sAPI.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { secret, ok := obj.(*v1.Secret) + ewc.log.Infof("GOT SECRET %s", secret.Name) if !ok { ewc.log.Errorf("Error processing 'Secret' object: got %#v, expected *corev1.Secret", secret) return @@ -106,7 +109,7 @@ func (ewc *EndpointsWatcherCache) startWithDecoder(decodeFn configDecoder) error return } - if err := ewc.addWatcher(clusterName, secret, decodeFn); err != nil { + if err := ewc.addWatcher(clusterName, secret); err != nil { ewc.log.Errorf("Error adding watcher for cluster %s: %v", clusterName, err) } @@ -143,10 +146,10 @@ func (ewc *EndpointsWatcherCache) startWithDecoder(decodeFn configDecoder) error }) if err != nil { - return err + return nil, err } - return err + return ewc, nil } // Get safely retrieves a watcher from the cache given a cluster name. It @@ -208,7 +211,7 @@ func (ewc *EndpointsWatcherCache) removeWatcher(clusterName string) { defer close(s.stopCh) s.watcher.Stop(s.stopCh) delete(ewc.store, clusterName) - ewc.log.Tracef("Removed cluster %s from EndpointsWatcherCache", clusterName) + ewc.log.Infof("Removed cluster %s from EndpointsWatcherCache", clusterName) } } @@ -216,7 +219,12 @@ func (ewc *EndpointsWatcherCache) removeWatcher(clusterName string) { // added, or during the initial informer list. Given a cluster name and a Secret // object, it creates an EndpointsWatcher for a remote cluster and syncs its // informers before returning. -func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secret, decodeFn configDecoder) error { +func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secret) error { + data, found := secret.Data[consts.ConfigKeyName] + if !found { + return errors.New("missing kubeconfig file") + } + clusterDomain, found := secret.GetAnnotations()[clusterDomainAnnotation] if !found { return fmt.Errorf("missing \"%s\" annotation", clusterDomainAnnotation) @@ -227,11 +235,13 @@ func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secr return fmt.Errorf("missing \"%s\" annotation", trustDomainAnnotation) } - remoteAPI, metadataAPI, err := decodeFn(secret, ewc.enableEndpointSlices) + remoteAPI, metadataAPI, err := ewc.decodeFn(data, ewc.enableEndpointSlices) if err != nil { return err } + stopCh := make(chan struct{}, 1) + watcher, err := NewEndpointsWatcher( remoteAPI, metadataAPI, @@ -245,17 +255,17 @@ func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secr } ewc.Lock() - stopCh := make(chan struct{}, 1) + defer ewc.Unlock() ewc.store[clusterName] = watchStore{ watcher, trustDomain, clusterDomain, stopCh, } - ewc.Unlock() remoteAPI.Sync(stopCh) metadataAPI.Sync(stopCh) + ewc.log.Tracef("Added cluster %s to EndpointsWatcherCache", clusterName) return nil @@ -263,11 +273,7 @@ func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secr // decodeK8sConfigFromSecret implements the decoder function type and creates // the necessary configuration from a secret. -func decodeK8sConfigFromSecret(secret *v1.Secret, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { - data, found := secret.Data[consts.ConfigKeyName] - if !found { - return nil, nil, errors.New("missing kubeconfig file") - } +func decodeK8sConfigFromSecret(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { cfg, err := clientcmd.RESTConfigFromKubeConfig(data) if err != nil { @@ -306,5 +312,6 @@ func decodeK8sConfigFromSecret(secret *v1.Secret, enableEndpointSlices bool) (*k func (ewc *EndpointsWatcherCache) len() int { ewc.RLock() defer ewc.RUnlock() - return len(ewc.store) + sz := len(ewc.store) + return sz } diff --git a/controller/api/destination/watcher/endpoints_watcher_cache_test.go b/controller/api/destination/watcher/endpoints_watcher_cache_test.go index b16c8c6f7152d..dc135f11dde49 100644 --- a/controller/api/destination/watcher/endpoints_watcher_cache_test.go +++ b/controller/api/destination/watcher/endpoints_watcher_cache_test.go @@ -6,27 +6,16 @@ import ( "github.com/linkerd/linkerd2/controller/k8s" logging "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" ) func CreateMockDecoder() configDecoder { // Create a mock decoder with some random objs to satisfy client creation - return func(secret *v1.Secret, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { - metadataAPI, err := k8s.NewFakeMetadataAPI([]string{}) - if err != nil { - return nil, nil, err - } - - var remoteAPI *k8s.API - if enableEndpointSlices { - remoteAPI, err = k8s.NewFakeAPI(endpointSliceAPIObj...) - } else { - remoteAPI, err = k8s.NewFakeAPI(endpointsAPIObj...) - } - + return func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { + remoteAPI, err := k8s.NewFakeAPI([]string{}...) if err != nil { return nil, nil, err } + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) return remoteAPI, metadataAPI, nil } @@ -42,7 +31,7 @@ func TestEndpointsWatcherCacheAddHandler(t *testing.T) { deleteClusters map[string]struct{} }{ { - name: "should correctly add remote watcher to cache when Secret is valid", + name: "add and remove remote watcher when Secret is valid", k8sConfigs: []string{ validRemoteSecret, }, @@ -55,9 +44,42 @@ func TestEndpointsWatcherCacheAddHandler(t *testing.T) { "remote": {}, }, }, + { + name: "add and remove more than one watcher when Secrets are valid", + k8sConfigs: []string{ + validRemoteSecret, + validTargetSecret, + }, + enableEndpointSlices: false, + expectedClusters: map[string]struct{}{ + "remote": {}, + LocalClusterKey: {}, + "target": {}, + }, + deleteClusters: map[string]struct{}{ + "remote": {}, + }, + }, + { + name: "malformed secrets shouldn't result in created watchers", + k8sConfigs: []string{ + validRemoteSecret, + noClusterSecret, + noDomainSecret, + noIdentitySecret, + invalidTypeSecret, + }, + enableEndpointSlices: true, + expectedClusters: map[string]struct{}{ + "remote": {}, + LocalClusterKey: {}, + }, + deleteClusters: map[string]struct{}{ + "remote": {}, + }, + }, } { - // Pin - tt := tt + tt := tt // Pin t.Run(tt.name, func(t *testing.T) { // TODO (matei): use namespace scoped API here k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...) @@ -70,21 +92,27 @@ func TestEndpointsWatcherCacheAddHandler(t *testing.T) { t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } - ewc := NewEndpointsWatcherCache(k8sAPI, tt.enableEndpointSlices) - if err := ewc.startWithDecoder(CreateMockDecoder()); err != nil { + ewc, err := newWatcherCacheWithDecoder(k8sAPI, tt.enableEndpointSlices, CreateMockDecoder()) + if err != nil { t.Fatalf("Unexpected error when starting watcher cache: %s", err) } k8sAPI.Sync(nil) metadataAPI.Sync(nil) + // Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on + time.Sleep(50 * time.Millisecond) + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices) if err != nil { t.Fatalf("Unexpected error when creating local watcher: %s", err) } ewc.AddLocalWatcher(nil, watcher, "cluster.local", "cluster.local") - actualLen := ewc.len() + ewc.RLock() + actualLen := len(ewc.store) + ewc.RUnlock() + if actualLen != len(tt.expectedClusters) { t.Fatalf("Unexpected error: expected to see %d cache entries, got: %d", len(tt.expectedClusters), actualLen) } @@ -105,7 +133,7 @@ func TestEndpointsWatcherCacheAddHandler(t *testing.T) { // deletes, so we have to call remove directly. ewc.removeWatcher(k) // Leave it to do its thing and gracefully shutdown - time.Sleep(5 * time.Second) + time.Sleep(50 * time.Millisecond) var hasStopped bool if tt.enableEndpointSlices { hasStopped = watcher.k8sAPI.ES().Informer().IsStopped() @@ -142,49 +170,77 @@ data: kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK ` -var endpointsAPIObj = []string{` -apiVersion: v1 -kind: Service +var validTargetSecret = ` +apiversion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig metadata: - name: name1 - namespace: ns -spec: - type: LoadBalancer - ports: - - port: 8989`, - ` + namespace: linkerd + name: target-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk +` + +var noDomainSecret = ` apiVersion: v1 -kind: Endpoints +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig metadata: - name: remote-service - namespace: ns -subsets: -- addresses: - - ip: 1.2.3.4 - ports: - - port: 80 -`, -} + namespace: linkerd + name: target-1-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target-1 + annotations: + multicluster.linkerd.io/trust-domain: cluster.local +data: + kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK +` -var endpointSliceAPIObj = []string{` +var noClusterSecret = ` apiVersion: v1 -kind: Service +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig metadata: - name: name1 - namespace: ns -spec: - type: LoadBalancer - ports: - - port: 8989`, ` -apiVersion: discovery.k8s.io/v1 -kind: EndpointSlice + namespace: linkerd + name: target-2-cluster-credentials + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK +` + +var noIdentitySecret = ` +apiversion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig metadata: + namespace: linkerd + name: target-3-cluster-credentials labels: - kubernetes.io/service-name: name1 - name: name1-es - namespace: ns -ports: -- name: "" - port: 8989 -`, -} + multicluster.linkerd.io/cluster-name: target-3 + annotations: + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk +` +var invalidTypeSecret = ` +apiversion: v1 +kind: Secret +type: kubernetes.io/tls +metadata: + namespace: linkerd + name: target-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk +` From 1081968414039821ea81093dbbbc083d7b1f8daa Mon Sep 17 00:00:00 2001 From: Matei David Date: Thu, 3 Aug 2023 13:41:34 +0000 Subject: [PATCH 07/15] Re-visit comments and fix lints Signed-off-by: Matei David --- .../watcher/endpoints_watcher_cache.go | 75 ++++++++++--------- .../watcher/endpoints_watcher_cache_test.go | 8 +- 2 files changed, 44 insertions(+), 39 deletions(-) diff --git a/controller/api/destination/watcher/endpoints_watcher_cache.go b/controller/api/destination/watcher/endpoints_watcher_cache.go index 6f2e81f57a513..d3a177a069752 100644 --- a/controller/api/destination/watcher/endpoints_watcher_cache.go +++ b/controller/api/destination/watcher/endpoints_watcher_cache.go @@ -31,25 +31,24 @@ type ( enableEndpointSlices bool log *logging.Entry + // Function used to parse a kubeconfig from a byte buffer. Based on the + // kubeconfig, it creates API Server clients decodeFn configDecoder } - // watchStore is a helper struct that represents a cache item. + // watchStore is a helper struct that represents a cache item watchStore struct { watcher *EndpointsWatcher trustDomain string clusterDomain string - // Used to signal shutdown to the watcher. - // Warning: it should be the same channel that was used to sync the - // informers, otherwise the informers won't stop. + // Used to signal shutdown to the associated watcher's informers stopCh chan<- struct{} } - // configDecoder is a function type that given a secret, decodes it and - // instantiates the API Server clients. Clients are dynamically created, - // configDecoder allows some degree of isolation between the cache and - // client bootstrapping. + // configDecoder is the type of a function that given a byte buffer, returns + // a pair of API Server clients. The cache uses this function to dynamically + // create clients after discovering a Secret. configDecoder = func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) ) @@ -64,19 +63,22 @@ const ( // NewEndpointsWatcherCache creates a new (empty) EndpointsWatcherCache. It // requires a Kubernetes API Server client instantiated for the local cluster. +// +// Upon creation, a pair of event handlers will be registered against the API +// Server client's Secret informer. The event handlers will add, or remove +// watcher entries from the cache by watching Secrets in the namespace the +// controller is running in. +// +// A new watcher is created for a remote cluster when its Secret is valid (contains +// necessary configuration, including a kubeconfig file). When a Secret is +// deleted from the cluster, if there is a corresponding watcher in the cache, +// it will be gracefully shutdown to allow for the memory to be freed. func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) (*EndpointsWatcherCache, error) { return newWatcherCacheWithDecoder(k8sAPI, enableEndpointSlices, decodeK8sConfigFromSecret) } -// Start will register a pair of event handlers against a `Secret` informer. -// -// EndpointsWatcherCache will watch multicluster specific `Secret` objects (to -// create watchers that allow for remote service discovery). -// -// Valid secrets will create and start a new EndpointsWatcher for a remote -// cluster. When a secret is removed, the watcher is automatically stopped and -// cleaned-up. - +// newWatcherCacheWithDecoder is a helper function that allows the creation of a +// cache with an arbitrary `configDecoder` function. func newWatcherCacheWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, decodeFn configDecoder) (*EndpointsWatcherCache, error) { ewc := &EndpointsWatcherCache{ store: make(map[string]watchStore), @@ -91,7 +93,6 @@ func newWatcherCacheWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, deco _, err := ewc.k8sAPI.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { secret, ok := obj.(*v1.Secret) - ewc.log.Infof("GOT SECRET %s", secret.Name) if !ok { ewc.log.Errorf("Error processing 'Secret' object: got %#v, expected *corev1.Secret", secret) return @@ -199,24 +200,33 @@ func (ewc *EndpointsWatcherCache) AddLocalWatcher(stopCh chan<- struct{}, watche } } +// Len returns the number of entries in the cache +func (ewc *EndpointsWatcherCache) Len() int { + ewc.RLock() + defer ewc.RUnlock() + return len(ewc.store) +} + // removeWatcher is triggered by the cache's Secret informer when a secret is // removed. Given a cluster name, it removes the entry from the cache after // stopping the associated watcher. func (ewc *EndpointsWatcherCache) removeWatcher(clusterName string) { ewc.Lock() defer ewc.Unlock() - if s, found := ewc.store[clusterName]; found { - // For good measure, close the channel after stopping to ensure - // informers are shut down. - defer close(s.stopCh) - s.watcher.Stop(s.stopCh) - delete(ewc.store, clusterName) - ewc.log.Infof("Removed cluster %s from EndpointsWatcherCache", clusterName) + s, found := ewc.store[clusterName] + if !found { + return } + // For good measure, close the channel after stopping to ensure + // informers are shut down. + defer close(s.stopCh) + s.watcher.Stop(s.stopCh) + delete(ewc.store, clusterName) + ewc.log.Tracef("Removed cluster %s from EndpointsWatcherCache", clusterName) } // addWatcher is triggered by the cache's Secret informer when a secret is -// added, or during the initial informer list. Given a cluster name and a Secret +// discovered for the first time. Given a cluster name and a Secret // object, it creates an EndpointsWatcher for a remote cluster and syncs its // informers before returning. func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secret) error { @@ -271,10 +281,10 @@ func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secr return nil } -// decodeK8sConfigFromSecret implements the decoder function type and creates -// the necessary configuration from a secret. +// decodeK8sConfigFromSecret implements the decoder function type. Given a byte +// buffer, it attempts to parse it as a kubeconfig file. If successful, returns +// a pair of API Server clients. func decodeK8sConfigFromSecret(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { - cfg, err := clientcmd.RESTConfigFromKubeConfig(data) if err != nil { return nil, nil, err @@ -308,10 +318,3 @@ func decodeK8sConfigFromSecret(data []byte, enableEndpointSlices bool) (*k8s.API return remoteAPI, metadataAPI, nil } - -func (ewc *EndpointsWatcherCache) len() int { - ewc.RLock() - defer ewc.RUnlock() - sz := len(ewc.store) - return sz -} diff --git a/controller/api/destination/watcher/endpoints_watcher_cache_test.go b/controller/api/destination/watcher/endpoints_watcher_cache_test.go index dc135f11dde49..20f7d066922b5 100644 --- a/controller/api/destination/watcher/endpoints_watcher_cache_test.go +++ b/controller/api/destination/watcher/endpoints_watcher_cache_test.go @@ -15,7 +15,11 @@ func CreateMockDecoder() configDecoder { if err != nil { return nil, nil, err } + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) + if err != nil { + return nil, nil, err + } return remoteAPI, metadataAPI, nil } @@ -109,9 +113,7 @@ func TestEndpointsWatcherCacheAddHandler(t *testing.T) { } ewc.AddLocalWatcher(nil, watcher, "cluster.local", "cluster.local") - ewc.RLock() - actualLen := len(ewc.store) - ewc.RUnlock() + actualLen := ewc.Len() if actualLen != len(tt.expectedClusters) { t.Fatalf("Unexpected error: expected to see %d cache entries, got: %d", len(tt.expectedClusters), actualLen) From 907c22ee0480526305bb694df5dbb982413d5d47 Mon Sep 17 00:00:00 2001 From: Matei David Date: Thu, 3 Aug 2023 14:39:19 +0000 Subject: [PATCH 08/15] Fix logs for Stop() function in Endpointswatcher Signed-off-by: Matei David --- .../destination/watcher/endpoints_watcher.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index 8e70c03f10ee4..82bec32e19ccf 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -261,18 +261,20 @@ func (ew *EndpointsWatcher) Stop(stopCh chan<- struct{}) { if ew.srvHandle != nil { if err := ew.k8sAPI.Srv().Informer().RemoveEventHandler(ew.srvHandle); err != nil { - ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + ew.log.Errorf("Failed to remove Server informer event handlers: %s", err) } } - if ew.enableEndpointSlices && ew.epHandle != nil { - if err := ew.k8sAPI.ES().Informer().RemoveEventHandler(ew.epHandle); err != nil { + if ew.epHandle != nil { + if ew.enableEndpointSlices { + if err := ew.k8sAPI.ES().Informer().RemoveEventHandler(ew.epHandle); err != nil { - ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) - } - } else { - if err := ew.k8sAPI.Endpoint().Informer().RemoveEventHandler(ew.epHandle); err != nil { - ew.log.Errorf("Failed to remove Service informer event handlers: %s", err) + ew.log.Errorf("Failed to remove EndpointSlice informer event handlers: %s", err) + } + } else { + if err := ew.k8sAPI.Endpoint().Informer().RemoveEventHandler(ew.epHandle); err != nil { + ew.log.Errorf("Failed to remove Endpoints informer event handlers: %s", err) + } } } From 45f8fab61c31b3d913c05786d800e1c311c4eac7 Mon Sep 17 00:00:00 2001 From: Matei David Date: Thu, 3 Aug 2023 14:55:45 +0000 Subject: [PATCH 09/15] Remove Endpoints access when using EndpointSlices Signed-off-by: Matei David --- controller/api/destination/watcher/endpoints_watcher_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controller/api/destination/watcher/endpoints_watcher_cache.go b/controller/api/destination/watcher/endpoints_watcher_cache.go index d3a177a069752..9c7aeefcd6422 100644 --- a/controller/api/destination/watcher/endpoints_watcher_cache.go +++ b/controller/api/destination/watcher/endpoints_watcher_cache.go @@ -297,7 +297,7 @@ func decodeK8sConfigFromSecret(data []byte, enableEndpointSlices bool) (*k8s.API ctx, cfg, true, - k8s.Endpoint, k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, + k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, ) } else { remoteAPI, err = k8s.InitializeAPIForConfig( From ce70d12e16e8e92c90170327c37f56b782673295 Mon Sep 17 00:00:00 2001 From: Matei David Date: Thu, 3 Aug 2023 16:42:20 +0000 Subject: [PATCH 10/15] Rename EndpointsWatcherCache to ClusterStore Signed-off-by: Matei David --- .../destination/watcher/cluster_watcher.go | 320 ++++++++++++++++++ .../watcher/cluster_watcher_test.go | 248 ++++++++++++++ 2 files changed, 568 insertions(+) create mode 100644 controller/api/destination/watcher/cluster_watcher.go create mode 100644 controller/api/destination/watcher/cluster_watcher_test.go diff --git a/controller/api/destination/watcher/cluster_watcher.go b/controller/api/destination/watcher/cluster_watcher.go new file mode 100644 index 0000000000000..88290e3558125 --- /dev/null +++ b/controller/api/destination/watcher/cluster_watcher.go @@ -0,0 +1,320 @@ +package watcher + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/linkerd/linkerd2/controller/k8s" + logging "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + + consts "github.com/linkerd/linkerd2/pkg/k8s" +) + +type ( + // ClusterStore holds all EndpointsWatchers used by the destination + // service to perform service discovery. Each cluster (including the one the + // controller is running in) that may be looked-up for service discovery has + // an associated EndpointsWatcher in the cache, along with a set of + // immutable cluster configuration primitives (i.e. identity and cluster + // domains). + ClusterStore struct { + // Protects against illegal accesses + sync.RWMutex + + k8sAPI *k8s.API + store map[string]cluster + enableEndpointSlices bool + log *logging.Entry + + // Function used to parse a kubeconfig from a byte buffer. Based on the + // kubeconfig, it creates API Server clients + decodeFn configDecoder + } + + // cluster is a helper struct that represents a cache item + cluster struct { + watcher *EndpointsWatcher + trustDomain string + clusterDomain string + + // Used to signal shutdown to the associated watcher's informers + stopCh chan<- struct{} + } + + // configDecoder is the type of a function that given a byte buffer, returns + // a pair of API Server clients. The cache uses this function to dynamically + // create clients after discovering a Secret. + configDecoder = func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) +) + +const ( + // LocalClusterKey represents the look-up key that returns an + // EndpointsWatcher associated with the "local" cluster. + LocalClusterKey = "local" + clusterNameLabel = "multicluster.linkerd.io/cluster-name" + trustDomainAnnotation = "multicluster.linkerd.io/trust-domain" + clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain" +) + +// NewClusterStore creates a new (empty) ClusterStore. It +// requires a Kubernetes API Server client instantiated for the local cluster. +// +// Upon creation, a pair of event handlers will be registered against the API +// Server client's Secret informer. The event handlers will add, or remove +// watcher entries from the cache by watching Secrets in the namespace the +// controller is running in. +// +// A new watcher is created for a remote cluster when its Secret is valid (contains +// necessary configuration, including a kubeconfig file). When a Secret is +// deleted from the cluster, if there is a corresponding watcher in the cache, +// it will be gracefully shutdown to allow for the memory to be freed. +func NewClusterStore(k8sAPI *k8s.API, enableEndpointSlices bool) (*ClusterStore, error) { + return newClusterStoreWithDecoder(k8sAPI, enableEndpointSlices, decodeK8sConfigFromSecret) +} + +// newWatcherCacheWithDecoder is a helper function that allows the creation of a +// cache with an arbitrary `configDecoder` function. +func newClusterStoreWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error) { + cs := &ClusterStore{ + store: make(map[string]cluster), + log: logging.WithFields(logging.Fields{ + "component": "endpoints-watcher-cache", + }), + enableEndpointSlices: enableEndpointSlices, + k8sAPI: k8sAPI, + decodeFn: decodeFn, + } + + _, err := cs.k8sAPI.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + secret, ok := obj.(*v1.Secret) + if !ok { + cs.log.Errorf("Error processing 'Secret' object: got %#v, expected *corev1.Secret", secret) + return + } + + if secret.Type != consts.MirrorSecretType { + cs.log.Tracef("Skipping Add event for 'Secret' object %s/%s: invalid type %s", secret.Namespace, secret.Name, secret.Type) + return + + } + + clusterName, found := secret.GetLabels()[clusterNameLabel] + if !found { + cs.log.Tracef("Skipping Add event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel) + return + } + + if err := cs.addWatcher(clusterName, secret); err != nil { + cs.log.Errorf("Error adding watcher for cluster %s: %v", clusterName, err) + } + + }, + DeleteFunc: func(obj interface{}) { + secret, ok := obj.(*v1.Secret) + if !ok { + // If the Secret was deleted when the watch was disconnected + // (for whatever reason) and the event was missed, the object is + // added with 'DeletedFinalStateUnknown'. Its state may be + // stale, so it should be cleaned-up. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + cs.log.Debugf("unable to get object from DeletedFinalStateUnknown %#v", obj) + return + } + // If the zombie object is a `Secret` we can delete it. + secret, ok = tombstone.Obj.(*v1.Secret) + if !ok { + cs.log.Debugf("DeletedFinalStateUnknown contained object that is not a Secret %#v", obj) + return + } + } + + clusterName, found := secret.GetLabels()[clusterNameLabel] + if !found { + cs.log.Tracef("Skipping Delete event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel) + return + } + + cs.removeWatcher(clusterName) + + }, + }) + + if err != nil { + return nil, err + } + + return cs, nil +} + +// Get safely retrieves a watcher from the cache given a cluster name. It +// returns the watcher, cluster configuration, if the entry exists in the cache. +func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, string, string, bool) { + cs.RLock() + defer cs.RUnlock() + s, found := cs.store[clusterName] + return s.watcher, s.trustDomain, s.clusterDomain, found +} + +// GetWatcher is a convenience method that given a cluster name only returns the +// associated EndpointsWatcher if it exists in the cache. +func (cs *ClusterStore) GetWatcher(clusterName string) (*EndpointsWatcher, bool) { + cs.RLock() + defer cs.RUnlock() + s, found := cs.store[clusterName] + return s.watcher, found +} + +// GetLocalWatcher is a convenience method that retrieves the watcher associated +// with the local cluster. Its existence is assumed. +func (cs *ClusterStore) GetLocalWatcher() *EndpointsWatcher { + cs.RLock() + defer cs.RUnlock() + return cs.store[LocalClusterKey].watcher +} + +// GetClusterConfig is a convenience method that given a cluster name retrieves +// its associated configuration strings if the entry exists in the cache. +func (cs *ClusterStore) GetClusterConfig(clusterName string) (string, string, bool) { + cs.RLock() + defer cs.RUnlock() + s, found := cs.store[clusterName] + return s.trustDomain, s.clusterDomain, found +} + +// AddLocalWatcher adds a watcher to the cache using the local cluster key. +func (cs *ClusterStore) AddLocalWatcher(stopCh chan<- struct{}, watcher *EndpointsWatcher, trustDomain, clusterDomain string) { + cs.Lock() + defer cs.Unlock() + cs.store[LocalClusterKey] = cluster{ + watcher, + trustDomain, + clusterDomain, + stopCh, + } +} + +// Len returns the number of entries in the cache +func (cs *ClusterStore) Len() int { + cs.RLock() + defer cs.RUnlock() + return len(cs.store) +} + +// removeWatcher is triggered by the cache's Secret informer when a secret is +// removed. Given a cluster name, it removes the entry from the cache after +// stopping the associated watcher. +func (cs *ClusterStore) removeWatcher(clusterName string) { + cs.Lock() + defer cs.Unlock() + s, found := cs.store[clusterName] + if !found { + return + } + // For good measure, close the channel after stopping to ensure + // informers are shut down. + defer close(s.stopCh) + s.watcher.Stop(s.stopCh) + delete(cs.store, clusterName) + cs.log.Tracef("Removed cluster %s from ClusterStore", clusterName) +} + +// addWatcher is triggered by the cache's Secret informer when a secret is +// discovered for the first time. Given a cluster name and a Secret +// object, it creates an EndpointsWatcher for a remote cluster and syncs its +// informers before returning. +func (cs *ClusterStore) addWatcher(clusterName string, secret *v1.Secret) error { + data, found := secret.Data[consts.ConfigKeyName] + if !found { + return errors.New("missing kubeconfig file") + } + + clusterDomain, found := secret.GetAnnotations()[clusterDomainAnnotation] + if !found { + return fmt.Errorf("missing \"%s\" annotation", clusterDomainAnnotation) + } + + trustDomain, found := secret.GetAnnotations()[trustDomainAnnotation] + if !found { + return fmt.Errorf("missing \"%s\" annotation", trustDomainAnnotation) + } + + remoteAPI, metadataAPI, err := cs.decodeFn(data, cs.enableEndpointSlices) + if err != nil { + return err + } + + stopCh := make(chan struct{}, 1) + + watcher, err := NewEndpointsWatcher( + remoteAPI, + metadataAPI, + logging.WithFields(logging.Fields{ + "remote-cluster": clusterName, + }), + cs.enableEndpointSlices, + ) + if err != nil { + return err + } + + cs.Lock() + defer cs.Unlock() + cs.store[clusterName] = cluster{ + watcher, + trustDomain, + clusterDomain, + stopCh, + } + + remoteAPI.Sync(stopCh) + metadataAPI.Sync(stopCh) + + cs.log.Tracef("Added cluster %s to ClusterStore", clusterName) + + return nil +} + +// decodeK8sConfigFromSecret implements the decoder function type. Given a byte +// buffer, it attempts to parse it as a kubeconfig file. If successful, returns +// a pair of API Server clients. +func decodeK8sConfigFromSecret(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { + cfg, err := clientcmd.RESTConfigFromKubeConfig(data) + if err != nil { + return nil, nil, err + } + + ctx := context.Background() + var remoteAPI *k8s.API + if enableEndpointSlices { + remoteAPI, err = k8s.InitializeAPIForConfig( + ctx, + cfg, + true, + k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, + ) + } else { + remoteAPI, err = k8s.InitializeAPIForConfig( + ctx, + cfg, + true, + k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, + ) + } + if err != nil { + return nil, nil, err + } + + metadataAPI, err := k8s.InitializeMetadataAPIForConfig(cfg, k8s.Node, k8s.RS) + if err != nil { + return nil, nil, err + } + + return remoteAPI, metadataAPI, nil +} diff --git a/controller/api/destination/watcher/cluster_watcher_test.go b/controller/api/destination/watcher/cluster_watcher_test.go new file mode 100644 index 0000000000000..5f6244e8d89c9 --- /dev/null +++ b/controller/api/destination/watcher/cluster_watcher_test.go @@ -0,0 +1,248 @@ +package watcher + +import ( + "testing" + "time" + + "github.com/linkerd/linkerd2/controller/k8s" + logging "github.com/sirupsen/logrus" +) + +func CreateMockDecoder() configDecoder { + // Create a mock decoder with some random objs to satisfy client creation + return func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { + remoteAPI, err := k8s.NewFakeAPI([]string{}...) + if err != nil { + return nil, nil, err + } + + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) + if err != nil { + return nil, nil, err + } + + return remoteAPI, metadataAPI, nil + } + +} + +func TestClusterStoreHandlers(t *testing.T) { + for _, tt := range []struct { + name string + k8sConfigs []string + enableEndpointSlices bool + expectedClusters map[string]struct{} + deleteClusters map[string]struct{} + }{ + { + name: "add and remove remote watcher when Secret is valid", + k8sConfigs: []string{ + validRemoteSecret, + }, + enableEndpointSlices: true, + expectedClusters: map[string]struct{}{ + "remote": {}, + LocalClusterKey: {}, + }, + deleteClusters: map[string]struct{}{ + "remote": {}, + }, + }, + { + name: "add and remove more than one watcher when Secrets are valid", + k8sConfigs: []string{ + validRemoteSecret, + validTargetSecret, + }, + enableEndpointSlices: false, + expectedClusters: map[string]struct{}{ + "remote": {}, + LocalClusterKey: {}, + "target": {}, + }, + deleteClusters: map[string]struct{}{ + "remote": {}, + }, + }, + { + name: "malformed secrets shouldn't result in created watchers", + k8sConfigs: []string{ + validRemoteSecret, + noClusterSecret, + noDomainSecret, + noIdentitySecret, + invalidTypeSecret, + }, + enableEndpointSlices: true, + expectedClusters: map[string]struct{}{ + "remote": {}, + LocalClusterKey: {}, + }, + deleteClusters: map[string]struct{}{ + "remote": {}, + }, + }, + } { + tt := tt // Pin + t.Run(tt.name, func(t *testing.T) { + // TODO (matei): use namespace scoped API here + k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } + + metadataAPI, err := k8s.NewFakeMetadataAPI(nil) + if err != nil { + t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) + } + + cs, err := newClusterStoreWithDecoder(k8sAPI, tt.enableEndpointSlices, CreateMockDecoder()) + if err != nil { + t.Fatalf("Unexpected error when starting watcher cache: %s", err) + } + + k8sAPI.Sync(nil) + metadataAPI.Sync(nil) + + // Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on + time.Sleep(50 * time.Millisecond) + + watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices) + if err != nil { + t.Fatalf("Unexpected error when creating local watcher: %s", err) + } + + cs.AddLocalWatcher(nil, watcher, "cluster.local", "cluster.local") + actualLen := cs.Len() + + if actualLen != len(tt.expectedClusters) { + t.Fatalf("Unexpected error: expected to see %d cache entries, got: %d", len(tt.expectedClusters), actualLen) + } + for k := range tt.expectedClusters { + if _, found := cs.GetWatcher(k); !found { + t.Fatalf("Unexpected error: cluster %s is missing from the cache", k) + } + } + + // Handle delete events + if len(tt.deleteClusters) != 0 { + for k := range tt.deleteClusters { + watcher, found := cs.GetWatcher(k) + if !found { + t.Fatalf("Unexpected error: watcher %s should exist in the cache", k) + } + // Unfortunately, mock k8s client does not propagate + // deletes, so we have to call remove directly. + cs.removeWatcher(k) + // Leave it to do its thing and gracefully shutdown + time.Sleep(50 * time.Millisecond) + var hasStopped bool + if tt.enableEndpointSlices { + hasStopped = watcher.k8sAPI.ES().Informer().IsStopped() + } else { + hasStopped = watcher.k8sAPI.Endpoint().Informer().IsStopped() + } + if !hasStopped { + t.Fatalf("Unexpected error: informers for watcher %s should be stopped", k) + } + + if _, found := cs.GetWatcher(k); found { + t.Fatalf("Unexpected error: watcher %s should have been removed from the cache", k) + } + + } + } + }) + } +} + +var validRemoteSecret = ` +apiVersion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: remote-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: remote + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK +` + +var validTargetSecret = ` +apiversion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: target-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk +` + +var noDomainSecret = ` +apiVersion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: target-1-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target-1 + annotations: + multicluster.linkerd.io/trust-domain: cluster.local +data: + kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK +` + +var noClusterSecret = ` +apiVersion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: target-2-cluster-credentials + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK +` + +var noIdentitySecret = ` +apiversion: v1 +kind: Secret +type: mirror.linkerd.io/remote-kubeconfig +metadata: + namespace: linkerd + name: target-3-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target-3 + annotations: + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk +` +var invalidTypeSecret = ` +apiversion: v1 +kind: Secret +type: kubernetes.io/tls +metadata: + namespace: linkerd + name: target-cluster-credentials + labels: + multicluster.linkerd.io/cluster-name: target + annotations: + multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/cluster-domain: cluster.local +data: + kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk +` From 3d64ca9faad0fba95d36af4d3938f1173310b0f5 Mon Sep 17 00:00:00 2001 From: Matei David Date: Thu, 3 Aug 2023 16:57:49 +0000 Subject: [PATCH 11/15] Actually remove old files Signed-off-by: Matei David --- .../watcher/endpoints_watcher_cache.go | 320 ------------------ .../watcher/endpoints_watcher_cache_test.go | 248 -------------- 2 files changed, 568 deletions(-) delete mode 100644 controller/api/destination/watcher/endpoints_watcher_cache.go delete mode 100644 controller/api/destination/watcher/endpoints_watcher_cache_test.go diff --git a/controller/api/destination/watcher/endpoints_watcher_cache.go b/controller/api/destination/watcher/endpoints_watcher_cache.go deleted file mode 100644 index 9c7aeefcd6422..0000000000000 --- a/controller/api/destination/watcher/endpoints_watcher_cache.go +++ /dev/null @@ -1,320 +0,0 @@ -package watcher - -import ( - "context" - "errors" - "fmt" - "sync" - - "github.com/linkerd/linkerd2/controller/k8s" - logging "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" - - consts "github.com/linkerd/linkerd2/pkg/k8s" -) - -type ( - // EndpointsWatcherCache holds all EndpointsWatchers used by the destination - // service to perform service discovery. Each cluster (including the one the - // controller is running in) that may be looked-up for service discovery has - // an associated EndpointsWatcher in the cache, along with a set of - // immutable cluster configuration primitives (i.e. identity and cluster - // domains). - EndpointsWatcherCache struct { - // Protects against illegal accesses - sync.RWMutex - - k8sAPI *k8s.API - store map[string]watchStore - enableEndpointSlices bool - log *logging.Entry - - // Function used to parse a kubeconfig from a byte buffer. Based on the - // kubeconfig, it creates API Server clients - decodeFn configDecoder - } - - // watchStore is a helper struct that represents a cache item - watchStore struct { - watcher *EndpointsWatcher - trustDomain string - clusterDomain string - - // Used to signal shutdown to the associated watcher's informers - stopCh chan<- struct{} - } - - // configDecoder is the type of a function that given a byte buffer, returns - // a pair of API Server clients. The cache uses this function to dynamically - // create clients after discovering a Secret. - configDecoder = func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) -) - -const ( - // LocalClusterKey represents the look-up key that returns an - // EndpointsWatcher associated with the "local" cluster. - LocalClusterKey = "local" - clusterNameLabel = "multicluster.linkerd.io/cluster-name" - trustDomainAnnotation = "multicluster.linkerd.io/trust-domain" - clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain" -) - -// NewEndpointsWatcherCache creates a new (empty) EndpointsWatcherCache. It -// requires a Kubernetes API Server client instantiated for the local cluster. -// -// Upon creation, a pair of event handlers will be registered against the API -// Server client's Secret informer. The event handlers will add, or remove -// watcher entries from the cache by watching Secrets in the namespace the -// controller is running in. -// -// A new watcher is created for a remote cluster when its Secret is valid (contains -// necessary configuration, including a kubeconfig file). When a Secret is -// deleted from the cluster, if there is a corresponding watcher in the cache, -// it will be gracefully shutdown to allow for the memory to be freed. -func NewEndpointsWatcherCache(k8sAPI *k8s.API, enableEndpointSlices bool) (*EndpointsWatcherCache, error) { - return newWatcherCacheWithDecoder(k8sAPI, enableEndpointSlices, decodeK8sConfigFromSecret) -} - -// newWatcherCacheWithDecoder is a helper function that allows the creation of a -// cache with an arbitrary `configDecoder` function. -func newWatcherCacheWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, decodeFn configDecoder) (*EndpointsWatcherCache, error) { - ewc := &EndpointsWatcherCache{ - store: make(map[string]watchStore), - log: logging.WithFields(logging.Fields{ - "component": "endpoints-watcher-cache", - }), - enableEndpointSlices: enableEndpointSlices, - k8sAPI: k8sAPI, - decodeFn: decodeFn, - } - - _, err := ewc.k8sAPI.Secret().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - secret, ok := obj.(*v1.Secret) - if !ok { - ewc.log.Errorf("Error processing 'Secret' object: got %#v, expected *corev1.Secret", secret) - return - } - - if secret.Type != consts.MirrorSecretType { - ewc.log.Tracef("Skipping Add event for 'Secret' object %s/%s: invalid type %s", secret.Namespace, secret.Name, secret.Type) - return - - } - - clusterName, found := secret.GetLabels()[clusterNameLabel] - if !found { - ewc.log.Tracef("Skipping Add event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel) - return - } - - if err := ewc.addWatcher(clusterName, secret); err != nil { - ewc.log.Errorf("Error adding watcher for cluster %s: %v", clusterName, err) - } - - }, - DeleteFunc: func(obj interface{}) { - secret, ok := obj.(*v1.Secret) - if !ok { - // If the Secret was deleted when the watch was disconnected - // (for whatever reason) and the event was missed, the object is - // added with 'DeletedFinalStateUnknown'. Its state may be - // stale, so it should be cleaned-up. - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - ewc.log.Debugf("unable to get object from DeletedFinalStateUnknown %#v", obj) - return - } - // If the zombie object is a `Secret` we can delete it. - secret, ok = tombstone.Obj.(*v1.Secret) - if !ok { - ewc.log.Debugf("DeletedFinalStateUnknown contained object that is not a Secret %#v", obj) - return - } - } - - clusterName, found := secret.GetLabels()[clusterNameLabel] - if !found { - ewc.log.Tracef("Skipping Delete event for 'Secret' object %s/%s: missing \"%s\" label", secret.Namespace, secret.Name, clusterNameLabel) - return - } - - ewc.removeWatcher(clusterName) - - }, - }) - - if err != nil { - return nil, err - } - - return ewc, nil -} - -// Get safely retrieves a watcher from the cache given a cluster name. It -// returns the watcher, cluster configuration, if the entry exists in the cache. -func (ewc *EndpointsWatcherCache) Get(clusterName string) (*EndpointsWatcher, string, string, bool) { - ewc.RLock() - defer ewc.RUnlock() - s, found := ewc.store[clusterName] - return s.watcher, s.trustDomain, s.clusterDomain, found -} - -// GetWatcher is a convenience method that given a cluster name only returns the -// associated EndpointsWatcher if it exists in the cache. -func (ewc *EndpointsWatcherCache) GetWatcher(clusterName string) (*EndpointsWatcher, bool) { - ewc.RLock() - defer ewc.RUnlock() - s, found := ewc.store[clusterName] - return s.watcher, found -} - -// GetLocalWatcher is a convenience method that retrieves the watcher associated -// with the local cluster. Its existence is assumed. -func (ewc *EndpointsWatcherCache) GetLocalWatcher() *EndpointsWatcher { - ewc.RLock() - defer ewc.RUnlock() - return ewc.store[LocalClusterKey].watcher -} - -// GetClusterConfig is a convenience method that given a cluster name retrieves -// its associated configuration strings if the entry exists in the cache. -func (ewc *EndpointsWatcherCache) GetClusterConfig(clusterName string) (string, string, bool) { - ewc.RLock() - defer ewc.RUnlock() - s, found := ewc.store[clusterName] - return s.trustDomain, s.clusterDomain, found -} - -// AddLocalWatcher adds a watcher to the cache using the local cluster key. -func (ewc *EndpointsWatcherCache) AddLocalWatcher(stopCh chan<- struct{}, watcher *EndpointsWatcher, trustDomain, clusterDomain string) { - ewc.Lock() - defer ewc.Unlock() - ewc.store[LocalClusterKey] = watchStore{ - watcher, - trustDomain, - clusterDomain, - stopCh, - } -} - -// Len returns the number of entries in the cache -func (ewc *EndpointsWatcherCache) Len() int { - ewc.RLock() - defer ewc.RUnlock() - return len(ewc.store) -} - -// removeWatcher is triggered by the cache's Secret informer when a secret is -// removed. Given a cluster name, it removes the entry from the cache after -// stopping the associated watcher. -func (ewc *EndpointsWatcherCache) removeWatcher(clusterName string) { - ewc.Lock() - defer ewc.Unlock() - s, found := ewc.store[clusterName] - if !found { - return - } - // For good measure, close the channel after stopping to ensure - // informers are shut down. - defer close(s.stopCh) - s.watcher.Stop(s.stopCh) - delete(ewc.store, clusterName) - ewc.log.Tracef("Removed cluster %s from EndpointsWatcherCache", clusterName) -} - -// addWatcher is triggered by the cache's Secret informer when a secret is -// discovered for the first time. Given a cluster name and a Secret -// object, it creates an EndpointsWatcher for a remote cluster and syncs its -// informers before returning. -func (ewc *EndpointsWatcherCache) addWatcher(clusterName string, secret *v1.Secret) error { - data, found := secret.Data[consts.ConfigKeyName] - if !found { - return errors.New("missing kubeconfig file") - } - - clusterDomain, found := secret.GetAnnotations()[clusterDomainAnnotation] - if !found { - return fmt.Errorf("missing \"%s\" annotation", clusterDomainAnnotation) - } - - trustDomain, found := secret.GetAnnotations()[trustDomainAnnotation] - if !found { - return fmt.Errorf("missing \"%s\" annotation", trustDomainAnnotation) - } - - remoteAPI, metadataAPI, err := ewc.decodeFn(data, ewc.enableEndpointSlices) - if err != nil { - return err - } - - stopCh := make(chan struct{}, 1) - - watcher, err := NewEndpointsWatcher( - remoteAPI, - metadataAPI, - logging.WithFields(logging.Fields{ - "remote-cluster": clusterName, - }), - ewc.enableEndpointSlices, - ) - if err != nil { - return err - } - - ewc.Lock() - defer ewc.Unlock() - ewc.store[clusterName] = watchStore{ - watcher, - trustDomain, - clusterDomain, - stopCh, - } - - remoteAPI.Sync(stopCh) - metadataAPI.Sync(stopCh) - - ewc.log.Tracef("Added cluster %s to EndpointsWatcherCache", clusterName) - - return nil -} - -// decodeK8sConfigFromSecret implements the decoder function type. Given a byte -// buffer, it attempts to parse it as a kubeconfig file. If successful, returns -// a pair of API Server clients. -func decodeK8sConfigFromSecret(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { - cfg, err := clientcmd.RESTConfigFromKubeConfig(data) - if err != nil { - return nil, nil, err - } - - ctx := context.Background() - var remoteAPI *k8s.API - if enableEndpointSlices { - remoteAPI, err = k8s.InitializeAPIForConfig( - ctx, - cfg, - true, - k8s.ES, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, - ) - } else { - remoteAPI, err = k8s.InitializeAPIForConfig( - ctx, - cfg, - true, - k8s.Endpoint, k8s.Pod, k8s.Svc, k8s.SP, k8s.Job, k8s.Srv, - ) - } - if err != nil { - return nil, nil, err - } - - metadataAPI, err := k8s.InitializeMetadataAPIForConfig(cfg, k8s.Node, k8s.RS) - if err != nil { - return nil, nil, err - } - - return remoteAPI, metadataAPI, nil -} diff --git a/controller/api/destination/watcher/endpoints_watcher_cache_test.go b/controller/api/destination/watcher/endpoints_watcher_cache_test.go deleted file mode 100644 index 20f7d066922b5..0000000000000 --- a/controller/api/destination/watcher/endpoints_watcher_cache_test.go +++ /dev/null @@ -1,248 +0,0 @@ -package watcher - -import ( - "testing" - "time" - - "github.com/linkerd/linkerd2/controller/k8s" - logging "github.com/sirupsen/logrus" -) - -func CreateMockDecoder() configDecoder { - // Create a mock decoder with some random objs to satisfy client creation - return func(data []byte, enableEndpointSlices bool) (*k8s.API, *k8s.MetadataAPI, error) { - remoteAPI, err := k8s.NewFakeAPI([]string{}...) - if err != nil { - return nil, nil, err - } - - metadataAPI, err := k8s.NewFakeMetadataAPI(nil) - if err != nil { - return nil, nil, err - } - - return remoteAPI, metadataAPI, nil - } - -} - -func TestEndpointsWatcherCacheAddHandler(t *testing.T) { - for _, tt := range []struct { - name string - k8sConfigs []string - enableEndpointSlices bool - expectedClusters map[string]struct{} - deleteClusters map[string]struct{} - }{ - { - name: "add and remove remote watcher when Secret is valid", - k8sConfigs: []string{ - validRemoteSecret, - }, - enableEndpointSlices: true, - expectedClusters: map[string]struct{}{ - "remote": {}, - LocalClusterKey: {}, - }, - deleteClusters: map[string]struct{}{ - "remote": {}, - }, - }, - { - name: "add and remove more than one watcher when Secrets are valid", - k8sConfigs: []string{ - validRemoteSecret, - validTargetSecret, - }, - enableEndpointSlices: false, - expectedClusters: map[string]struct{}{ - "remote": {}, - LocalClusterKey: {}, - "target": {}, - }, - deleteClusters: map[string]struct{}{ - "remote": {}, - }, - }, - { - name: "malformed secrets shouldn't result in created watchers", - k8sConfigs: []string{ - validRemoteSecret, - noClusterSecret, - noDomainSecret, - noIdentitySecret, - invalidTypeSecret, - }, - enableEndpointSlices: true, - expectedClusters: map[string]struct{}{ - "remote": {}, - LocalClusterKey: {}, - }, - deleteClusters: map[string]struct{}{ - "remote": {}, - }, - }, - } { - tt := tt // Pin - t.Run(tt.name, func(t *testing.T) { - // TODO (matei): use namespace scoped API here - k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...) - if err != nil { - t.Fatalf("NewFakeAPI returned an error: %s", err) - } - - metadataAPI, err := k8s.NewFakeMetadataAPI(nil) - if err != nil { - t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) - } - - ewc, err := newWatcherCacheWithDecoder(k8sAPI, tt.enableEndpointSlices, CreateMockDecoder()) - if err != nil { - t.Fatalf("Unexpected error when starting watcher cache: %s", err) - } - - k8sAPI.Sync(nil) - metadataAPI.Sync(nil) - - // Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on - time.Sleep(50 * time.Millisecond) - - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices) - if err != nil { - t.Fatalf("Unexpected error when creating local watcher: %s", err) - } - - ewc.AddLocalWatcher(nil, watcher, "cluster.local", "cluster.local") - actualLen := ewc.Len() - - if actualLen != len(tt.expectedClusters) { - t.Fatalf("Unexpected error: expected to see %d cache entries, got: %d", len(tt.expectedClusters), actualLen) - } - for k := range tt.expectedClusters { - if _, found := ewc.GetWatcher(k); !found { - t.Fatalf("Unexpected error: cluster %s is missing from the cache", k) - } - } - - // Handle delete events - if len(tt.deleteClusters) != 0 { - for k := range tt.deleteClusters { - watcher, found := ewc.GetWatcher(k) - if !found { - t.Fatalf("Unexpected error: watcher %s should exist in the cache", k) - } - // Unfortunately, mock k8s client does not propagate - // deletes, so we have to call remove directly. - ewc.removeWatcher(k) - // Leave it to do its thing and gracefully shutdown - time.Sleep(50 * time.Millisecond) - var hasStopped bool - if tt.enableEndpointSlices { - hasStopped = watcher.k8sAPI.ES().Informer().IsStopped() - } else { - hasStopped = watcher.k8sAPI.Endpoint().Informer().IsStopped() - } - if !hasStopped { - t.Fatalf("Unexpected error: informers for watcher %s should be stopped", k) - } - - if _, found := ewc.GetWatcher(k); found { - t.Fatalf("Unexpected error: watcher %s should have been removed from the cache", k) - } - - } - } - }) - } -} - -var validRemoteSecret = ` -apiVersion: v1 -kind: Secret -type: mirror.linkerd.io/remote-kubeconfig -metadata: - namespace: linkerd - name: remote-cluster-credentials - labels: - multicluster.linkerd.io/cluster-name: remote - annotations: - multicluster.linkerd.io/trust-domain: cluster.local - multicluster.linkerd.io/cluster-domain: cluster.local -data: - kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK -` - -var validTargetSecret = ` -apiversion: v1 -kind: Secret -type: mirror.linkerd.io/remote-kubeconfig -metadata: - namespace: linkerd - name: target-cluster-credentials - labels: - multicluster.linkerd.io/cluster-name: target - annotations: - multicluster.linkerd.io/trust-domain: cluster.local - multicluster.linkerd.io/cluster-domain: cluster.local -data: - kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk -` - -var noDomainSecret = ` -apiVersion: v1 -kind: Secret -type: mirror.linkerd.io/remote-kubeconfig -metadata: - namespace: linkerd - name: target-1-cluster-credentials - labels: - multicluster.linkerd.io/cluster-name: target-1 - annotations: - multicluster.linkerd.io/trust-domain: cluster.local -data: - kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK -` - -var noClusterSecret = ` -apiVersion: v1 -kind: Secret -type: mirror.linkerd.io/remote-kubeconfig -metadata: - namespace: linkerd - name: target-2-cluster-credentials - annotations: - multicluster.linkerd.io/trust-domain: cluster.local - multicluster.linkerd.io/cluster-domain: cluster.local -data: - kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK -` - -var noIdentitySecret = ` -apiversion: v1 -kind: Secret -type: mirror.linkerd.io/remote-kubeconfig -metadata: - namespace: linkerd - name: target-3-cluster-credentials - labels: - multicluster.linkerd.io/cluster-name: target-3 - annotations: - multicluster.linkerd.io/cluster-domain: cluster.local -data: - kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk -` -var invalidTypeSecret = ` -apiversion: v1 -kind: Secret -type: kubernetes.io/tls -metadata: - namespace: linkerd - name: target-cluster-credentials - labels: - multicluster.linkerd.io/cluster-name: target - annotations: - multicluster.linkerd.io/trust-domain: cluster.local - multicluster.linkerd.io/cluster-domain: cluster.local -data: - kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk -` From 61725271b53fbb090b464eb249ca232e46932b7e Mon Sep 17 00:00:00 2001 From: Matei David Date: Fri, 4 Aug 2023 15:30:28 +0000 Subject: [PATCH 12/15] Review feedback Signed-off-by: Matei David --- .../{cluster_watcher.go => cluster_store.go} | 131 ++++++------------ ..._watcher_test.go => cluster_store_test.go} | 65 +++++---- .../destination/watcher/endpoints_watcher.go | 11 +- 3 files changed, 84 insertions(+), 123 deletions(-) rename controller/api/destination/watcher/{cluster_watcher.go => cluster_store.go} (62%) rename controller/api/destination/watcher/{cluster_watcher_test.go => cluster_store_test.go} (80%) diff --git a/controller/api/destination/watcher/cluster_watcher.go b/controller/api/destination/watcher/cluster_store.go similarity index 62% rename from controller/api/destination/watcher/cluster_watcher.go rename to controller/api/destination/watcher/cluster_store.go index 88290e3558125..d71b5745cbf7e 100644 --- a/controller/api/destination/watcher/cluster_watcher.go +++ b/controller/api/destination/watcher/cluster_store.go @@ -16,18 +16,17 @@ import ( ) type ( - // ClusterStore holds all EndpointsWatchers used by the destination - // service to perform service discovery. Each cluster (including the one the - // controller is running in) that may be looked-up for service discovery has - // an associated EndpointsWatcher in the cache, along with a set of - // immutable cluster configuration primitives (i.e. identity and cluster - // domains). + // ClusterStore indexes clusters in which remote service discovery may be + // performed. For each store item, an EndpointsWatcher is created to read + // state directly from the respective cluster's API Server. In addition, + // each store item has some associated and immutable configuration that is + // required for service discovery. ClusterStore struct { // Protects against illegal accesses sync.RWMutex k8sAPI *k8s.API - store map[string]cluster + store map[string]remoteCluster enableEndpointSlices bool log *logging.Entry @@ -36,16 +35,21 @@ type ( decodeFn configDecoder } - // cluster is a helper struct that represents a cache item - cluster struct { - watcher *EndpointsWatcher - trustDomain string - clusterDomain string + // remoteCluster is a helper struct that represents a store item + remoteCluster struct { + watcher *EndpointsWatcher + config clusterConfig // Used to signal shutdown to the associated watcher's informers stopCh chan<- struct{} } + // clusterConfig holds immutable configuration for a given cluster + clusterConfig struct { + TrustDomain string + ClusterDomain string + } + // configDecoder is the type of a function that given a byte buffer, returns // a pair of API Server clients. The cache uses this function to dynamically // create clients after discovering a Secret. @@ -53,9 +57,6 @@ type ( ) const ( - // LocalClusterKey represents the look-up key that returns an - // EndpointsWatcher associated with the "local" cluster. - LocalClusterKey = "local" clusterNameLabel = "multicluster.linkerd.io/cluster-name" trustDomainAnnotation = "multicluster.linkerd.io/trust-domain" clusterDomainAnnotation = "multicluster.linkerd.io/cluster-domain" @@ -64,26 +65,20 @@ const ( // NewClusterStore creates a new (empty) ClusterStore. It // requires a Kubernetes API Server client instantiated for the local cluster. // -// Upon creation, a pair of event handlers will be registered against the API -// Server client's Secret informer. The event handlers will add, or remove -// watcher entries from the cache by watching Secrets in the namespace the -// controller is running in. -// -// A new watcher is created for a remote cluster when its Secret is valid (contains -// necessary configuration, including a kubeconfig file). When a Secret is -// deleted from the cluster, if there is a corresponding watcher in the cache, -// it will be gracefully shutdown to allow for the memory to be freed. +// When created, a pair of event handlers are registered for the local cluster's +// Secret informer. The event handlers are responsible for driving the discovery +// of remote clusters and their configuration func NewClusterStore(k8sAPI *k8s.API, enableEndpointSlices bool) (*ClusterStore, error) { return newClusterStoreWithDecoder(k8sAPI, enableEndpointSlices, decodeK8sConfigFromSecret) } -// newWatcherCacheWithDecoder is a helper function that allows the creation of a -// cache with an arbitrary `configDecoder` function. +// newClusterStoreWithDecoder is a helper function that allows the creation of a +// store with an arbitrary `configDecoder` function. func newClusterStoreWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error) { cs := &ClusterStore{ - store: make(map[string]cluster), + store: make(map[string]remoteCluster), log: logging.WithFields(logging.Fields{ - "component": "endpoints-watcher-cache", + "component": "cluster-store", }), enableEndpointSlices: enableEndpointSlices, k8sAPI: k8sAPI, @@ -110,8 +105,8 @@ func newClusterStoreWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, deco return } - if err := cs.addWatcher(clusterName, secret); err != nil { - cs.log.Errorf("Error adding watcher for cluster %s: %v", clusterName, err) + if err := cs.addCluster(clusterName, secret); err != nil { + cs.log.Errorf("Error adding cluster %s to store: %v", clusterName, err) } }, @@ -124,7 +119,7 @@ func newClusterStoreWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, deco // stale, so it should be cleaned-up. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - cs.log.Debugf("unable to get object from DeletedFinalStateUnknown %#v", obj) + cs.log.Debugf("Unable to get object from DeletedFinalStateUnknown %#v", obj) return } // If the zombie object is a `Secret` we can delete it. @@ -141,7 +136,7 @@ func newClusterStoreWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, deco return } - cs.removeWatcher(clusterName) + cs.removeCluster(clusterName) }, }) @@ -153,51 +148,12 @@ func newClusterStoreWithDecoder(k8sAPI *k8s.API, enableEndpointSlices bool, deco return cs, nil } -// Get safely retrieves a watcher from the cache given a cluster name. It -// returns the watcher, cluster configuration, if the entry exists in the cache. -func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, string, string, bool) { - cs.RLock() - defer cs.RUnlock() - s, found := cs.store[clusterName] - return s.watcher, s.trustDomain, s.clusterDomain, found -} - -// GetWatcher is a convenience method that given a cluster name only returns the -// associated EndpointsWatcher if it exists in the cache. -func (cs *ClusterStore) GetWatcher(clusterName string) (*EndpointsWatcher, bool) { - cs.RLock() - defer cs.RUnlock() - s, found := cs.store[clusterName] - return s.watcher, found -} - -// GetLocalWatcher is a convenience method that retrieves the watcher associated -// with the local cluster. Its existence is assumed. -func (cs *ClusterStore) GetLocalWatcher() *EndpointsWatcher { - cs.RLock() - defer cs.RUnlock() - return cs.store[LocalClusterKey].watcher -} - -// GetClusterConfig is a convenience method that given a cluster name retrieves -// its associated configuration strings if the entry exists in the cache. -func (cs *ClusterStore) GetClusterConfig(clusterName string) (string, string, bool) { +// Get safely retrieves a store item given a cluster name. +func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, clusterConfig, bool) { cs.RLock() defer cs.RUnlock() - s, found := cs.store[clusterName] - return s.trustDomain, s.clusterDomain, found -} - -// AddLocalWatcher adds a watcher to the cache using the local cluster key. -func (cs *ClusterStore) AddLocalWatcher(stopCh chan<- struct{}, watcher *EndpointsWatcher, trustDomain, clusterDomain string) { - cs.Lock() - defer cs.Unlock() - cs.store[LocalClusterKey] = cluster{ - watcher, - trustDomain, - clusterDomain, - stopCh, - } + cw, found := cs.store[clusterName] + return cw.watcher, cw.config, found } // Len returns the number of entries in the cache @@ -207,29 +163,27 @@ func (cs *ClusterStore) Len() int { return len(cs.store) } -// removeWatcher is triggered by the cache's Secret informer when a secret is +// removeCluster is triggered by the cache's Secret informer when a secret is // removed. Given a cluster name, it removes the entry from the cache after // stopping the associated watcher. -func (cs *ClusterStore) removeWatcher(clusterName string) { +func (cs *ClusterStore) removeCluster(clusterName string) { cs.Lock() defer cs.Unlock() - s, found := cs.store[clusterName] + r, found := cs.store[clusterName] if !found { return } - // For good measure, close the channel after stopping to ensure - // informers are shut down. - defer close(s.stopCh) - s.watcher.Stop(s.stopCh) + r.watcher.removeHandlers() + close(r.stopCh) delete(cs.store, clusterName) cs.log.Tracef("Removed cluster %s from ClusterStore", clusterName) } -// addWatcher is triggered by the cache's Secret informer when a secret is +// addCluster is triggered by the cache's Secret informer when a secret is // discovered for the first time. Given a cluster name and a Secret // object, it creates an EndpointsWatcher for a remote cluster and syncs its // informers before returning. -func (cs *ClusterStore) addWatcher(clusterName string, secret *v1.Secret) error { +func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error { data, found := secret.Data[consts.ConfigKeyName] if !found { return errors.New("missing kubeconfig file") @@ -251,7 +205,6 @@ func (cs *ClusterStore) addWatcher(clusterName string, secret *v1.Secret) error } stopCh := make(chan struct{}, 1) - watcher, err := NewEndpointsWatcher( remoteAPI, metadataAPI, @@ -266,10 +219,12 @@ func (cs *ClusterStore) addWatcher(clusterName string, secret *v1.Secret) error cs.Lock() defer cs.Unlock() - cs.store[clusterName] = cluster{ + cs.store[clusterName] = remoteCluster{ watcher, - trustDomain, - clusterDomain, + clusterConfig{ + trustDomain, + clusterDomain, + }, stopCh, } diff --git a/controller/api/destination/watcher/cluster_watcher_test.go b/controller/api/destination/watcher/cluster_store_test.go similarity index 80% rename from controller/api/destination/watcher/cluster_watcher_test.go rename to controller/api/destination/watcher/cluster_store_test.go index 5f6244e8d89c9..bb756965fa73e 100644 --- a/controller/api/destination/watcher/cluster_watcher_test.go +++ b/controller/api/destination/watcher/cluster_store_test.go @@ -5,7 +5,6 @@ import ( "time" "github.com/linkerd/linkerd2/controller/k8s" - logging "github.com/sirupsen/logrus" ) func CreateMockDecoder() configDecoder { @@ -31,7 +30,7 @@ func TestClusterStoreHandlers(t *testing.T) { name string k8sConfigs []string enableEndpointSlices bool - expectedClusters map[string]struct{} + expectedClusters map[string]clusterConfig deleteClusters map[string]struct{} }{ { @@ -40,9 +39,11 @@ func TestClusterStoreHandlers(t *testing.T) { validRemoteSecret, }, enableEndpointSlices: true, - expectedClusters: map[string]struct{}{ - "remote": {}, - LocalClusterKey: {}, + expectedClusters: map[string]clusterConfig{ + "remote": { + TrustDomain: "identity.org", + ClusterDomain: "cluster.local", + }, }, deleteClusters: map[string]struct{}{ "remote": {}, @@ -55,10 +56,15 @@ func TestClusterStoreHandlers(t *testing.T) { validTargetSecret, }, enableEndpointSlices: false, - expectedClusters: map[string]struct{}{ - "remote": {}, - LocalClusterKey: {}, - "target": {}, + expectedClusters: map[string]clusterConfig{ + "remote": { + TrustDomain: "identity.org", + ClusterDomain: "cluster.local", + }, + "target": { + TrustDomain: "cluster.target.local", + ClusterDomain: "cluster.target.local", + }, }, deleteClusters: map[string]struct{}{ "remote": {}, @@ -74,9 +80,11 @@ func TestClusterStoreHandlers(t *testing.T) { invalidTypeSecret, }, enableEndpointSlices: true, - expectedClusters: map[string]struct{}{ - "remote": {}, - LocalClusterKey: {}, + expectedClusters: map[string]clusterConfig{ + "remote": { + TrustDomain: "identity.org", + ClusterDomain: "cluster.local", + }, }, deleteClusters: map[string]struct{}{ "remote": {}, @@ -106,34 +114,37 @@ func TestClusterStoreHandlers(t *testing.T) { // Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on time.Sleep(50 * time.Millisecond) - - watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices) - if err != nil { - t.Fatalf("Unexpected error when creating local watcher: %s", err) - } - - cs.AddLocalWatcher(nil, watcher, "cluster.local", "cluster.local") actualLen := cs.Len() if actualLen != len(tt.expectedClusters) { t.Fatalf("Unexpected error: expected to see %d cache entries, got: %d", len(tt.expectedClusters), actualLen) } - for k := range tt.expectedClusters { - if _, found := cs.GetWatcher(k); !found { + + for k, expected := range tt.expectedClusters { + _, cfg, found := cs.Get(k) + if !found { t.Fatalf("Unexpected error: cluster %s is missing from the cache", k) } + + if cfg.ClusterDomain != expected.ClusterDomain { + t.Fatalf("Unexpected error: expected cluster domain %s for cluster '%s', got: %s", expected.ClusterDomain, k, cfg.ClusterDomain) + } + + if cfg.TrustDomain != expected.TrustDomain { + t.Fatalf("Unexpected error: expected cluster domain %s for cluster '%s', got: %s", expected.TrustDomain, k, cfg.TrustDomain) + } } // Handle delete events if len(tt.deleteClusters) != 0 { for k := range tt.deleteClusters { - watcher, found := cs.GetWatcher(k) + watcher, _, found := cs.Get(k) if !found { t.Fatalf("Unexpected error: watcher %s should exist in the cache", k) } // Unfortunately, mock k8s client does not propagate // deletes, so we have to call remove directly. - cs.removeWatcher(k) + cs.removeCluster(k) // Leave it to do its thing and gracefully shutdown time.Sleep(50 * time.Millisecond) var hasStopped bool @@ -146,7 +157,7 @@ func TestClusterStoreHandlers(t *testing.T) { t.Fatalf("Unexpected error: informers for watcher %s should be stopped", k) } - if _, found := cs.GetWatcher(k); found { + if _, _, found := cs.Get(k); found { t.Fatalf("Unexpected error: watcher %s should have been removed from the cache", k) } @@ -166,7 +177,7 @@ metadata: labels: multicluster.linkerd.io/cluster-name: remote annotations: - multicluster.linkerd.io/trust-domain: cluster.local + multicluster.linkerd.io/trust-domain: identity.org multicluster.linkerd.io/cluster-domain: cluster.local data: kubeconfig: dmVyeSB0b3Agc2VjcmV0IGluZm9ybWF0aW9uIGhlcmUK @@ -182,8 +193,8 @@ metadata: labels: multicluster.linkerd.io/cluster-name: target annotations: - multicluster.linkerd.io/trust-domain: cluster.local - multicluster.linkerd.io/cluster-domain: cluster.local + multicluster.linkerd.io/trust-domain: cluster.target.local + multicluster.linkerd.io/cluster-domain: cluster.target.local data: kubeconfig: dmvyesb0b3agc2vjcmv0igluzm9ybwf0aw9uighlcmuk ` diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index 82bec32e19ccf..05bca4c4e0932 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -246,11 +246,9 @@ func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string sp.unsubscribe(port, hostname, listener) } -// Stop will terminate an EndpointsWatcher by shutting down its informers. It -// uses the write half of the channel used to sync the informers to signal -// shutdown. It additionally de-registers any event handlers used by its -// informers. -func (ew *EndpointsWatcher) Stop(stopCh chan<- struct{}) { +// removeHanders will de-register any event handlers used by the +// EndpointsWatcher's informers. +func (ew *EndpointsWatcher) removeHandlers() { ew.Lock() defer ew.Unlock() if ew.svcHandle != nil { @@ -277,9 +275,6 @@ func (ew *EndpointsWatcher) Stop(stopCh chan<- struct{}) { } } } - - // Signal informers to stop - stopCh <- struct{}{} } func (ew *EndpointsWatcher) addService(obj interface{}) { From 90ffd35178310e68c9f0e74d34dd95f3b3459711 Mon Sep 17 00:00:00 2001 From: Matei David Date: Fri, 4 Aug 2023 16:41:07 +0000 Subject: [PATCH 13/15] Remove Len() from cluster store Signed-off-by: Matei David --- controller/api/destination/watcher/cluster_store.go | 7 ------- controller/api/destination/watcher/cluster_store_test.go | 5 ++++- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/controller/api/destination/watcher/cluster_store.go b/controller/api/destination/watcher/cluster_store.go index d71b5745cbf7e..2bd270b71b55d 100644 --- a/controller/api/destination/watcher/cluster_store.go +++ b/controller/api/destination/watcher/cluster_store.go @@ -156,13 +156,6 @@ func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, clusterConfi return cw.watcher, cw.config, found } -// Len returns the number of entries in the cache -func (cs *ClusterStore) Len() int { - cs.RLock() - defer cs.RUnlock() - return len(cs.store) -} - // removeCluster is triggered by the cache's Secret informer when a secret is // removed. Given a cluster name, it removes the entry from the cache after // stopping the associated watcher. diff --git a/controller/api/destination/watcher/cluster_store_test.go b/controller/api/destination/watcher/cluster_store_test.go index bb756965fa73e..061580197d170 100644 --- a/controller/api/destination/watcher/cluster_store_test.go +++ b/controller/api/destination/watcher/cluster_store_test.go @@ -114,7 +114,10 @@ func TestClusterStoreHandlers(t *testing.T) { // Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on time.Sleep(50 * time.Millisecond) - actualLen := cs.Len() + + cs.RLock() + actualLen := len(cs.store) + defer cs.RUnlock() if actualLen != len(tt.expectedClusters) { t.Fatalf("Unexpected error: expected to see %d cache entries, got: %d", len(tt.expectedClusters), actualLen) From 501e57a2778c9b9a78824a31f340e16ece6bfb7f Mon Sep 17 00:00:00 2001 From: Matei David Date: Fri, 4 Aug 2023 17:20:06 +0000 Subject: [PATCH 14/15] Release lock in test Signed-off-by: Matei David --- controller/api/destination/watcher/cluster_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controller/api/destination/watcher/cluster_store_test.go b/controller/api/destination/watcher/cluster_store_test.go index 061580197d170..1f11fc7831a64 100644 --- a/controller/api/destination/watcher/cluster_store_test.go +++ b/controller/api/destination/watcher/cluster_store_test.go @@ -117,7 +117,7 @@ func TestClusterStoreHandlers(t *testing.T) { cs.RLock() actualLen := len(cs.store) - defer cs.RUnlock() + cs.RUnlock() if actualLen != len(tt.expectedClusters) { t.Fatalf("Unexpected error: expected to see %d cache entries, got: %d", len(tt.expectedClusters), actualLen) From 50db8fde4c95909cd72fb685154f7e32abbdca73 Mon Sep 17 00:00:00 2001 From: Matei David Date: Fri, 4 Aug 2023 18:58:22 +0000 Subject: [PATCH 15/15] Sync API clients async Signed-off-by: Matei David --- controller/api/destination/watcher/cluster_store.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/controller/api/destination/watcher/cluster_store.go b/controller/api/destination/watcher/cluster_store.go index 2bd270b71b55d..945ece39371ce 100644 --- a/controller/api/destination/watcher/cluster_store.go +++ b/controller/api/destination/watcher/cluster_store.go @@ -221,8 +221,10 @@ func (cs *ClusterStore) addCluster(clusterName string, secret *v1.Secret) error stopCh, } - remoteAPI.Sync(stopCh) - metadataAPI.Sync(stopCh) + go func() { + remoteAPI.Sync(stopCh) + metadataAPI.Sync(stopCh) + }() cs.log.Tracef("Added cluster %s to ClusterStore", clusterName)