From 7d57a890f0451cfda2daee7f0041a1872a94812b Mon Sep 17 00:00:00 2001 From: shaofan-hs Date: Tue, 20 Feb 2024 15:02:21 +0800 Subject: [PATCH] refactor, cluster management --- multicluster/clusterinfo/context.go | 5 +- multicluster/controller/controller.go | 163 ++++++++++-------- .../controller/test_cluster_provider.go | 56 ++++++ multicluster/manager.go | 113 +++++++----- multicluster/manager_test.go | 85 ++++++++- multicluster/metrics/metrics.go | 10 +- multicluster/multi_cluster_cache.go | 16 +- multicluster/suite_test.go | 26 ++- 8 files changed, 322 insertions(+), 152 deletions(-) create mode 100644 multicluster/controller/test_cluster_provider.go diff --git a/multicluster/clusterinfo/context.go b/multicluster/clusterinfo/context.go index 1aee220..06b7769 100644 --- a/multicluster/clusterinfo/context.go +++ b/multicluster/clusterinfo/context.go @@ -26,9 +26,10 @@ type contextKey string const ( clusterInfo contextKey = "clusterInfo" - ClusterLabelKey = "kusionstack.io/cluster" + ClusterLabelKey = "kusionstack.io/cluster" // Label key for cluster name that will be attached when use Client or Cache to read - EnvClusterAllowList = "CLUSTER_ALLOW_LIST" + EnvClusterAllowList = "CLUSTER_ALLOW_LIST" // Comma separated list of cluster names that are allowed to be accessed + EnvClusterBlockList = "CLUSTER_BLOCK_LIST" // Comma separated list of cluster names that are blocked to be accessed ) const ( diff --git a/multicluster/controller/controller.go b/multicluster/controller/controller.go index 2c3fb8f..53a27e0 100644 --- a/multicluster/controller/controller.go +++ b/multicluster/controller/controller.go @@ -25,6 +25,7 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -37,94 +38,80 @@ import ( "kusionstack.io/kube-utils/multicluster/metrics" ) -type ClusterManagementType string - -const ( - OpenClusterManagement ClusterManagementType = "OpenClusterManagement" - TestCluterManagement ClusterManagementType = "TestCluterManagement" -) +type ClusterProvider interface { + Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster + GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource + GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster + GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource +} type Controller struct { - config *rest.Config - clusterManagementType ClusterManagementType - clusterManagementGVR schema.GroupVersionResource + config *rest.Config + clusterProvider ClusterProvider - client dynamic.Interface // client to get cluster info + client dynamic.Interface // Client to get cluster info informerFactory dynamicinformer.DynamicSharedInformerFactory informer cache.SharedIndexInformer workqueue workqueue.RateLimitingInterface - mutex sync.Mutex - syncedNum int // number of synced cluster - syncedCh chan struct{} + mutex sync.RWMutex + syncedNum int // Number of synced cluster + syncedCh chan struct{} // Channel to notify all synced clusters have been processed - addUpdateHandler func(string) error // when cluster is added or updated, this handler will be called - deleteHandler func(string) // when cluster is deleted, this handler will be called - log logr.Logger + addUpdateHandler func(string) error // When cluster is added or updated, this handler will be invoked + deleteHandler func(string) // When cluster is deleted, this handler will be invoked - // for test - restConfigForCluster func(cluster string) *rest.Config + clusterNameToNamespacedKey map[string]string + namespacedKeyToObj map[string]*unstructured.Unstructured + log logr.Logger } type ControllerConfig struct { - Config *rest.Config // config for cluster management - ClusterManagementType ClusterManagementType - ClusterManagementGVR *schema.GroupVersionResource - ResyncPeriod time.Duration // resync period for cluster management - Log logr.Logger - - // for test - RestConfigForCluster func(cluster string) *rest.Config + Config *rest.Config // Kubeconfig for the fed cluster + ClusterProvider ClusterProvider + ResyncPeriod time.Duration // Resync period for cluster management + Log logr.Logger } // NewController creates a new Controller which will process events about cluster. func NewController(cfg *ControllerConfig) (*Controller, error) { - var clusterManagementGVR schema.GroupVersionResource - switch cfg.ClusterManagementType { - case OpenClusterManagement: - if cfg.ClusterManagementGVR == nil { - return nil, fmt.Errorf("ClusterManagementGVR must be set when use %s", cfg.ClusterManagementType) - } - clusterManagementGVR = *cfg.ClusterManagementGVR - case TestCluterManagement: - if cfg.ClusterManagementGVR == nil || cfg.RestConfigForCluster == nil { - return nil, fmt.Errorf("ClusterManagementGVR and RestConfigForCluster must be set when use %s", cfg.ClusterManagementType) - } - clusterManagementGVR = *cfg.ClusterManagementGVR - default: - return nil, fmt.Errorf("not support cluster management type: %v", cfg.ClusterManagementType) - } - client, err := dynamic.NewForConfig(cfg.Config) if err != nil { return nil, err } + if cfg.ClusterProvider == nil { + return nil, fmt.Errorf("ClusterProvider is required") + } + informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod) - informer := informerFactory.ForResource(clusterManagementGVR).Informer() + informer := informerFactory.ForResource(cfg.ClusterProvider.GetClusterMangementGVR()).Informer() return &Controller{ - config: cfg.Config, - client: client, - informerFactory: informerFactory, - clusterManagementType: cfg.ClusterManagementType, - clusterManagementGVR: clusterManagementGVR, - informer: informer, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), clusterManagementGVR.Resource), - syncedCh: make(chan struct{}), - log: cfg.Log, - - restConfigForCluster: cfg.RestConfigForCluster, + config: cfg.Config, + clusterProvider: cfg.ClusterProvider, + + client: client, + informerFactory: informerFactory, + informer: informer, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterProvider.GetClusterMangementGVR().Resource), + syncedCh: make(chan struct{}), + + clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name + namespacedKeyToObj: make(map[string]*unstructured.Unstructured), // Get cluster management resource by namespaced key + log: cfg.Log, }, nil } -// AddEventHandler adds handler for events about cluster. +// AddEventHandler adds handlers which will be invoked. +// When cluster is added or updated, addUpdateHandler will be invoked. +// When cluster is deleted, deleteHandler will be invoked. func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, deleteHandler func(string)) { c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueueClusterExtension, + AddFunc: c.enqueueClusterEvent, UpdateFunc: func(old, new interface{}) { - c.enqueueClusterExtension(new) + c.enqueueClusterEvent(new) }, - DeleteFunc: c.enqueueClusterExtension, + DeleteFunc: c.enqueueClusterEvent, }) c.addUpdateHandler = addUpdateHandler @@ -135,6 +122,8 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() + c.clusterProvider.Init(c.config) + c.informerFactory.Start(stopCh) // Wait for the caches to be synced before starting workers @@ -164,10 +153,9 @@ func (c *Controller) WaitForSynced(ctx context.Context) bool { } } -func (c *Controller) enqueueClusterExtension(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { +func (c *Controller) enqueueClusterEvent(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { c.log.Error(err, "failed to get enqueue key") return } @@ -205,11 +193,9 @@ func (c *Controller) processNextWorkItem() bool { if err := c.eventHandler(key); err != nil { c.workqueue.AddRateLimited(key) - metrics.NewControllerEventCountMetrics(key, "failed").Inc() return err } - metrics.NewControllerEventCountMetrics(key, "ok").Inc() c.workqueue.Forget(obj) return nil }(obj) @@ -228,35 +214,60 @@ func (c *Controller) eventHandler(key string) error { return nil } - _, err = c.client.Resource(c.clusterManagementGVR).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.client.Resource(c.clusterProvider.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { - c.deleteHandler(name) + c.mutex.Lock() + defer c.mutex.Unlock() + + oldObj, ok := c.namespacedKeyToObj[key] + if !ok { + return nil + } + delete(c.namespacedKeyToObj, key) + + clusterName := c.clusterProvider.GetClusterName(oldObj) + delete(c.clusterNameToNamespacedKey, clusterName) + + metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc() + c.deleteHandler(clusterName) return nil } + metrics.NewClusterEventCountMetrics(key, "delete", "false").Inc() c.log.Error(err, "failed to get resource", "key", key) return err } - err = c.addUpdateHandler(name) + c.mutex.Lock() + c.namespacedKeyToObj[key] = obj + clusterName := c.clusterProvider.GetClusterName(obj) + c.clusterNameToNamespacedKey[clusterName] = key + c.mutex.Unlock() + + err = c.addUpdateHandler(clusterName) if err != nil { + metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc() c.log.Error(err, "failed to add or update cluster", "key", key) return err } + metrics.NewClusterEventCountMetrics(key, "add-update", "true").Inc() return nil } // RestConfigForCluster returns the rest config for the mangered cluster. -func (c *Controller) RestConfigForCluster(cluster string) *rest.Config { - switch c.clusterManagementType { - case OpenClusterManagement: - clusterConfig := *c.config - clusterConfig.Host = fmt.Sprintf("%s/apis/%s/%s/%s/%s/proxy", clusterConfig.Host, c.clusterManagementGVR.Group, c.clusterManagementGVR.Version, c.clusterManagementGVR.Resource, cluster) - return &clusterConfig - case TestCluterManagement: - return c.restConfigForCluster(cluster) - default: +func (c *Controller) RestConfigForCluster(clusterName string) *rest.Config { + c.mutex.RLock() + defer c.mutex.RUnlock() + + namespacedKey, ok := c.clusterNameToNamespacedKey[clusterName] + if !ok { + return nil + } + + obj, ok := c.namespacedKeyToObj[namespacedKey] + if !ok { return nil } + return c.clusterProvider.GetClusterConfig(obj) } diff --git a/multicluster/controller/test_cluster_provider.go b/multicluster/controller/test_cluster_provider.go new file mode 100644 index 0000000..9275d73 --- /dev/null +++ b/multicluster/controller/test_cluster_provider.go @@ -0,0 +1,56 @@ +/** + * Copyright 2024 KusionStack Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controller + +import ( + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" +) + +var _ ClusterProvider = &TestClusterProvider{} + +type TestClusterProvider struct { + schema.GroupVersionResource + ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig +} + +func (p *TestClusterProvider) Init(config *rest.Config) { + // Do nothing +} + +func (p *TestClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource { + return p.GroupVersionResource +} + +func (p *TestClusterProvider) GetClusterName(obj *unstructured.Unstructured) string { + if obj == nil { + return "" + } + return obj.GetName() // Use resource name as cluster name +} + +func (p *TestClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { + if obj == nil || p.ClusterNameToConfig == nil { + return nil + } + config, ok := p.ClusterNameToConfig[obj.GetName()] + if !ok { + return nil + } + return config +} diff --git a/multicluster/manager.go b/multicluster/manager.go index 920ed36..176f003 100644 --- a/multicluster/manager.go +++ b/multicluster/manager.go @@ -18,6 +18,8 @@ package multicluster import ( "context" + "errors" + "fmt" "os" "strings" "sync" @@ -25,7 +27,7 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -45,8 +47,7 @@ const ( ) type Options struct { - // NewCache is the function that will create the cache to be used - // by the manager. If not set this will use the default new cache function. + // NewCache is the function to create the cache to be used by the manager. Default is cache.New. NewCache cache.NewCacheFunc } @@ -59,29 +60,25 @@ func setOptionsDefaults(opts Options) Options { } type ManagerConfig struct { - FedConfig *rest.Config - ClusterScheme *runtime.Scheme - ClusterManagementGVR *schema.GroupVersionResource - ClusterManagementType controller.ClusterManagementType + FedConfig *rest.Config + ClusterScheme *runtime.Scheme + controller.ClusterProvider ResyncPeriod time.Duration ClusterFilter func(string) bool // select cluster Log logr.Logger - - // for test - RestConfigForCluster func(cluster string) *rest.Config } type Manager struct { - newCache cache.NewCacheFunc // function to create cache for cluster - clusterScheme *runtime.Scheme // scheme which is used to create cache for cluster + newCache cache.NewCacheFunc // Function to create cache for cluster + clusterScheme *runtime.Scheme // Scheme which is used to create cache for cluster clusterCacheManager ClusterCacheManager clusterClientManager ClusterClientManager - controller *controller.Controller // controller for cluster management + controller *controller.Controller // Controller for cluster management resyncPeriod time.Duration - hasCluster map[string]struct{} // whether cluster has been added + hasCluster map[string]struct{} // Whether cluster has been added clusterFilter func(string) bool onlyWatchClusterNamespace string mutex sync.RWMutex @@ -100,38 +97,11 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun cfg.ClusterScheme = scheme.Scheme } - clusterFilter := cfg.ClusterFilter - allowList := strings.TrimSpace(os.Getenv(clusterinfo.EnvClusterAllowList)) - if clusterFilter == nil && allowList != "" { - clusters := strings.Split(allowList, ",") - log.Info("allow list", "clusters", clusters) - - hasCluster := map[string]struct{}{} - for _, cluster := range clusters { - if _, ok := hasCluster[cluster]; ok { - continue - } - hasCluster[cluster] = struct{}{} - } - clusterFilter = func(cluster string) bool { - _, ok := hasCluster[cluster] - return ok - } - } - - clusterManagementType := controller.OpenClusterManagement - if cfg.ClusterManagementType != "" { - clusterManagementType = cfg.ClusterManagementType - } - controller, err := controller.NewController(&controller.ControllerConfig{ - Config: cfg.FedConfig, - ResyncPeriod: cfg.ResyncPeriod, - ClusterManagementType: clusterManagementType, - ClusterManagementGVR: cfg.ClusterManagementGVR, - Log: log, - - RestConfigForCluster: cfg.RestConfigForCluster, + Config: cfg.FedConfig, + ClusterProvider: cfg.ClusterProvider, + ResyncPeriod: cfg.ResyncPeriod, + Log: log, }) if err != nil { return nil, nil, nil, err @@ -141,6 +111,11 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun newCacheFunc, clusterCacheManager := MultiClusterCacheBuilder(log, &opts) newClientFunc, clusterClientManager := MultiClusterClientBuilder(log) + clusterFilter, err := getClusterFilter(cfg) + if err != nil { + return nil, nil, nil, err + } + manager = &Manager{ clusterScheme: cfg.ClusterScheme, newCache: opts.NewCache, @@ -189,6 +164,9 @@ func (m *Manager) addUpdateHandler(cluster string) (err error) { // Get rest.Config for the cluster cfg := m.controller.RestConfigForCluster(cluster) + if cfg == nil { + return fmt.Errorf("failed to get rest.Config for cluster %s", cluster) + } // Get MemCacheClient for the cluster clientset, err := kubernetes.NewForConfig(cfg) @@ -265,3 +243,48 @@ func (m *Manager) SyncedClusters() []string { } return clusters } + +// getClusterFilter returns a function to filter clusters +func getClusterFilter(cfg *ManagerConfig) (func(string) bool, error) { + var ( + log = klogr.New() + allowSet, blockSet sets.String + ) + allowList := strings.TrimSpace(os.Getenv(clusterinfo.EnvClusterAllowList)) + if allowList != "" { + allowSet = sets.NewString(strings.Split(allowList, ",")...) + } + + blockList := strings.TrimSpace(os.Getenv(clusterinfo.EnvClusterBlockList)) + if blockList != "" { + blockSet = sets.NewString(strings.Split(blockList, ",")...) + } + + fmt.Printf("allowList: %v, blockList: %v, allowSet: %v, blockSet: %v\n", allowList, blockList, allowSet, blockSet) + + if allowSet != nil && blockSet != nil { + return nil, errors.New("both cluster allow and block lists are set") + } + if cfg.ClusterFilter != nil { + if allowSet != nil || blockSet != nil { + return nil, errors.New("both cluster allow and block lists are set") + } + return cfg.ClusterFilter, nil + } + + clusterFilter := func(cluster string) bool { + if allowSet != nil && allowSet.Len() > 0 { + has := allowSet.Has(cluster) + log.V(4).Info("check cluster allow list", "cluster", cluster, "has", has) + return has + } + if blockSet != nil && blockSet.Len() > 0 { + has := blockSet.Has(cluster) + log.V(4).Info("check cluster block list", "cluster", cluster, "has", has) + return !has + } + return true // Default is allow all clusters + } + + return clusterFilter, nil +} diff --git a/multicluster/manager_test.go b/multicluster/manager_test.go index 9ed2a2b..70c8def 100644 --- a/multicluster/manager_test.go +++ b/multicluster/manager_test.go @@ -19,6 +19,7 @@ package multicluster import ( "context" "fmt" + "os" "time" . "github.com/onsi/ginkgo" @@ -37,7 +38,7 @@ import ( "kusionstack.io/kube-utils/multicluster/clusterinfo" ) -var _ = Describe("multicluster with 1 fed and 4 clusters", func() { +var _ = Describe("multicluster", func() { It("fed adds 4 clusters", func() { for i := 1; i < 5; i++ { name := fmt.Sprintf("cluster%d", i) @@ -453,6 +454,88 @@ var _ = Describe("multicluster with 1 fed and 4 clusters", func() { }) }) +var _ = Describe("test cluster filter", func() { + AfterEach(func() { + os.Unsetenv(clusterinfo.EnvClusterAllowList) + os.Unsetenv(clusterinfo.EnvClusterBlockList) + }) + + It("both allow and block lists are used", func() { + err := os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2") + Expect(err).NotTo(HaveOccurred()) + + err = os.Setenv(clusterinfo.EnvClusterBlockList, "cluster3,cluster4") + Expect(err).NotTo(HaveOccurred()) + + clusterFilter, err := getClusterFilter(&ManagerConfig{}) + Expect(err).To(HaveOccurred()) + Expect(clusterFilter).To(BeNil()) + }) + + It("ClusterFilter and allow lists are used", func() { + err := os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2") + Expect(err).NotTo(HaveOccurred()) + + clusterFilter, err := getClusterFilter(&ManagerConfig{ + ClusterFilter: func(cluster string) bool { + return cluster == "cluster1" + }, + }) + Expect(err).To(HaveOccurred()) + Expect(clusterFilter).To(BeNil()) + }) + + It("ClusterFilter and block lists are used", func() { + err := os.Setenv(clusterinfo.EnvClusterBlockList, "cluster1,cluster2") + Expect(err).NotTo(HaveOccurred()) + + clusterFilter, err := getClusterFilter(&ManagerConfig{ + ClusterFilter: func(cluster string) bool { + return cluster == "cluster3" + }, + }) + Expect(err).To(HaveOccurred()) + Expect(clusterFilter).To(BeNil()) + }) + + It("use allow lists", func() { + err := os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2") + Expect(err).NotTo(HaveOccurred()) + + clusterFilter, err := getClusterFilter(&ManagerConfig{}) + Expect(err).NotTo(HaveOccurred()) + Expect(clusterFilter("cluster1")).To(Equal(true)) + Expect(clusterFilter("cluster2")).To(Equal(true)) + }) + + It("use block lists", func() { + err := os.Setenv(clusterinfo.EnvClusterBlockList, "cluster1,cluster2") + Expect(err).NotTo(HaveOccurred()) + + clusterFilter, err := getClusterFilter(&ManagerConfig{}) + Expect(err).NotTo(HaveOccurred()) + Expect(clusterFilter("cluster1")).To(Equal(false)) + Expect(clusterFilter("cluster2")).To(Equal(false)) + }) + + It("use ClusterFilter", func() { + clusterFilter, err := getClusterFilter(&ManagerConfig{ + ClusterFilter: func(cluster string) bool { + if cluster == "cluster1" { + return true + } + if cluster == "cluster2" { + return true + } + return false + }, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(clusterFilter("cluster1")).To(Equal(true)) + Expect(clusterFilter("cluster3")).To(Equal(false)) + }) +}) + func newMockClient() *MockClient { clientBuilder := fake.NewClientBuilder() return &MockClient{ diff --git a/multicluster/metrics/metrics.go b/multicluster/metrics/metrics.go index 6eeb8c7..239bc98 100644 --- a/multicluster/metrics/metrics.go +++ b/multicluster/metrics/metrics.go @@ -44,17 +44,17 @@ var ( Help: "count the number of client call", }, []string{"cluster", "method", "code"}) - controllerEventCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + clusterEventCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Subsystem: MultiClusterSubSystem, Name: ClusterEventCount, Help: "count the number of cluster event", - }, []string{"cluster", "status"}) + }, []string{"cluster", "event", "success"}) ) func init() { metrics.Registry.MustRegister(cacheCounter) metrics.Registry.MustRegister(clientCounter) - metrics.Registry.MustRegister(controllerEventCounter) + metrics.Registry.MustRegister(clusterEventCounter) } func NewCacheCountMetrics(cluster, method string, err error) prometheus.Counter { @@ -65,8 +65,8 @@ func NewClientCountMetrics(cluster, method string, err error) prometheus.Counter return clientCounter.WithLabelValues(cluster, method, CodeForError(err)) } -func NewControllerEventCountMetrics(cluster, status string) prometheus.Counter { - return controllerEventCounter.WithLabelValues(cluster, status) +func NewClusterEventCountMetrics(cluster, event, success string) prometheus.Counter { + return clusterEventCounter.WithLabelValues(cluster, event, success) } func CodeForError(err error) string { diff --git a/multicluster/multi_cluster_cache.go b/multicluster/multi_cluster_cache.go index 77234bf..6dd84fe 100644 --- a/multicluster/multi_cluster_cache.go +++ b/multicluster/multi_cluster_cache.go @@ -64,14 +64,14 @@ func MultiClusterCacheBuilder(log logr.Logger, managerOption *Options) (cache.Ne } type multiClusterCache struct { - fedCache cache.Cache // cache for fed cluster - clusterToCache map[string]cache.Cache // cluster to cache + fedCache cache.Cache // Cache for fed cluster + clusterToCache map[string]cache.Cache // Cluster to cache - clusterCtx context.Context // context for all cluster caches - started bool // whether all known cluster caches already started - clusterToCancel map[string]context.CancelFunc // cancel function for each cluster cache - objectToInformer map[client.Object]*multiClusterInformer // object to informer - gvkToInformer map[schema.GroupVersionKind]*multiClusterInformer // gvk to informer + clusterCtx context.Context // Context for all cluster caches + started bool // Whether all known cluster caches already started + clusterToCancel map[string]context.CancelFunc // Cancel function for each cluster cache + objectToInformer map[client.Object]*multiClusterInformer // Object to informer + gvkToInformer map[schema.GroupVersionKind]*multiClusterInformer // GVK to informer mutex sync.RWMutex log logr.Logger @@ -493,7 +493,7 @@ func (mcc *multiClusterCache) convertClusters(clusters []string) ([]string, bool } type multiClusterInformer struct { - enableMultiple bool // whether this informer is for managed clusters, or only for fed cluster + enableMultiple bool // Whether this informer is for managed clusters, or only for fed cluster kind string clusterToInformer map[string]cache.Informer diff --git a/multicluster/suite_test.go b/multicluster/suite_test.go index 0b8fd9c..01f2fd3 100644 --- a/multicluster/suite_test.go +++ b/multicluster/suite_test.go @@ -130,22 +130,18 @@ var _ = BeforeSuite(func() { ClusterScheme: clusterScheme, ResyncPeriod: 10 * time.Minute, - RestConfigForCluster: func(clusterName string) *rest.Config { - switch clusterName { - case "cluster1": - return clusterConfig1 - case "cluster2": - return clusterConfig2 - default: - return fedConfig - } + ClusterProvider: &controller.TestClusterProvider{ + GroupVersionResource: schema.GroupVersionResource{ // Use deployment as cluster management resource + Group: "apps", + Version: "v1", + Resource: "deployments", + }, + ClusterNameToConfig: map[string]*rest.Config{ + "cluster1": clusterConfig1, + "cluster2": clusterConfig2, + clusterinfo.Fed: fedConfig, + }, }, - ClusterManagementGVR: &schema.GroupVersionResource{ - Group: "apps", - Version: "v1", - Resource: "deployments", - }, - ClusterManagementType: controller.TestCluterManagement, }, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(manager).NotTo(BeNil())