From a3f577fb7b77690145884f3c64e01dfd93806d96 Mon Sep 17 00:00:00 2001 From: shaofan-hs Date: Mon, 11 Mar 2024 12:08:12 +0800 Subject: [PATCH 1/4] add interface ClusterController --- multicluster/controller/controller.go | 29 ++++--------------- multicluster/manager.go | 40 ++++++++++++--------------- multicluster/suite_test.go | 19 +++++++++---- 3 files changed, 37 insertions(+), 51 deletions(-) diff --git a/multicluster/controller/controller.go b/multicluster/controller/controller.go index 53a27e0..5430c22 100644 --- a/multicluster/controller/controller.go +++ b/multicluster/controller/controller.go @@ -58,8 +58,8 @@ type Controller struct { syncedNum int // Number of synced cluster syncedCh chan struct{} // Channel to notify all synced clusters have been processed - addUpdateHandler func(string) error // When cluster is added or updated, this handler will be invoked - deleteHandler func(string) // When cluster is deleted, this handler will be invoked + addUpdateHandler func(string, *rest.Config) error // When cluster is added or updated, this handler will be invoked + deleteHandler func(string) // When cluster is deleted, this handler will be invoked clusterNameToNamespacedKey map[string]string namespacedKeyToObj map[string]*unstructured.Unstructured @@ -105,7 +105,7 @@ func NewController(cfg *ControllerConfig) (*Controller, error) { // AddEventHandler adds handlers which will be invoked. // When cluster is added or updated, addUpdateHandler will be invoked. // When cluster is deleted, deleteHandler will be invoked. -func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, deleteHandler func(string)) { +func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueClusterEvent, UpdateFunc: func(old, new interface{}) { @@ -118,7 +118,7 @@ func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, delete c.deleteHandler = deleteHandler } -func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { +func (c *Controller) Run(stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() @@ -136,7 +136,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { c.mutex.Unlock() // Start workers to process cluster events - for i := 0; i < threadiness; i++ { + for i := 0; i < 2; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } @@ -244,7 +244,7 @@ func (c *Controller) eventHandler(key string) error { c.clusterNameToNamespacedKey[clusterName] = key c.mutex.Unlock() - err = c.addUpdateHandler(clusterName) + err = c.addUpdateHandler(clusterName, c.clusterProvider.GetClusterConfig(obj)) if err != nil { metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc() c.log.Error(err, "failed to add or update cluster", "key", key) @@ -254,20 +254,3 @@ func (c *Controller) eventHandler(key string) error { metrics.NewClusterEventCountMetrics(key, "add-update", "true").Inc() return nil } - -// RestConfigForCluster returns the rest config for the mangered cluster. -func (c *Controller) RestConfigForCluster(clusterName string) *rest.Config { - c.mutex.RLock() - defer c.mutex.RUnlock() - - namespacedKey, ok := c.clusterNameToNamespacedKey[clusterName] - if !ok { - return nil - } - - obj, ok := c.namespacedKeyToObj[namespacedKey] - if !ok { - return nil - } - return c.clusterProvider.GetClusterConfig(obj) -} diff --git a/multicluster/manager.go b/multicluster/manager.go index 176f003..a86abbb 100644 --- a/multicluster/manager.go +++ b/multicluster/manager.go @@ -39,7 +39,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cluster" "kusionstack.io/kube-utils/multicluster/clusterinfo" - "kusionstack.io/kube-utils/multicluster/controller" ) const ( @@ -59,10 +58,16 @@ func setOptionsDefaults(opts Options) Options { return opts } +type ClusterController interface { + Run(stopCh <-chan struct{}) error // Run the controller + AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) // Add event handler for cluster events + WaitForSynced(ctx context.Context) bool // Wait for all clusters to be synced +} + type ManagerConfig struct { FedConfig *rest.Config ClusterScheme *runtime.Scheme - controller.ClusterProvider + ClusterController ResyncPeriod time.Duration ClusterFilter func(string) bool // select cluster @@ -73,9 +78,10 @@ type Manager struct { newCache cache.NewCacheFunc // Function to create cache for cluster clusterScheme *runtime.Scheme // Scheme which is used to create cache for cluster + controller ClusterController // Controller for cluster management + clusterCacheManager ClusterCacheManager clusterClientManager ClusterClientManager - controller *controller.Controller // Controller for cluster management resyncPeriod time.Duration hasCluster map[string]struct{} // Whether cluster has been added @@ -86,6 +92,10 @@ type Manager struct { } func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFunc cache.NewCacheFunc, newClientFunc cluster.NewClientFunc, err error) { + if cfg.ClusterController == nil { + return nil, nil, nil, errors.New("ClusterController is required") + } + var log logr.Logger if cfg.Log != nil { log = cfg.Log @@ -97,16 +107,6 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun cfg.ClusterScheme = scheme.Scheme } - controller, err := controller.NewController(&controller.ControllerConfig{ - Config: cfg.FedConfig, - ClusterProvider: cfg.ClusterProvider, - ResyncPeriod: cfg.ResyncPeriod, - Log: log, - }) - if err != nil { - return nil, nil, nil, err - } - opts = setOptionsDefaults(opts) newCacheFunc, clusterCacheManager := MultiClusterCacheBuilder(log, &opts) newClientFunc, clusterClientManager := MultiClusterClientBuilder(log) @@ -122,7 +122,7 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun clusterCacheManager: clusterCacheManager, clusterClientManager: clusterClientManager, - controller: controller, + controller: cfg.ClusterController, resyncPeriod: cfg.ResyncPeriod, hasCluster: make(map[string]struct{}), @@ -133,7 +133,7 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun return manager, newCacheFunc, newClientFunc, nil } -func (m *Manager) Run(threadiness int, ctx context.Context) error { +func (m *Manager) Run(ctx context.Context) error { stopCh := make(chan struct{}) go func() { <-ctx.Done() @@ -141,7 +141,7 @@ func (m *Manager) Run(threadiness int, ctx context.Context) error { }() m.controller.AddEventHandler(m.addUpdateHandler, m.deleteHandler) - return m.controller.Run(threadiness, stopCh) + return m.controller.Run(stopCh) } func (m *Manager) WaitForSynced(ctx context.Context) bool { @@ -149,7 +149,7 @@ func (m *Manager) WaitForSynced(ctx context.Context) bool { return m.controller.WaitForSynced(ctx) } -func (m *Manager) addUpdateHandler(cluster string) (err error) { +func (m *Manager) addUpdateHandler(cluster string, cfg *rest.Config) (err error) { if m.clusterFilter != nil && !m.clusterFilter(cluster) { return nil } @@ -162,12 +162,6 @@ func (m *Manager) addUpdateHandler(cluster string) (err error) { } m.mutex.RUnlock() - // Get rest.Config for the cluster - cfg := m.controller.RestConfigForCluster(cluster) - if cfg == nil { - return fmt.Errorf("failed to get rest.Config for cluster %s", cluster) - } - // Get MemCacheClient for the cluster clientset, err := kubernetes.NewForConfig(cfg) if err != nil { diff --git a/multicluster/suite_test.go b/multicluster/suite_test.go index 01f2fd3..7643ff1 100644 --- a/multicluster/suite_test.go +++ b/multicluster/suite_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" + "k8s.io/klog/v2/klogr" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -125,10 +126,9 @@ var _ = BeforeSuite(func() { newClientFunc cluster.NewClientFunc ) os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2") - manager, newCacheFunc, newClientFunc, err = NewManager(&ManagerConfig{ - FedConfig: fedConfig, - ClusterScheme: clusterScheme, - ResyncPeriod: 10 * time.Minute, + + clusterController, err := controller.NewController(&controller.ControllerConfig{ + Config: fedConfig, ClusterProvider: &controller.TestClusterProvider{ GroupVersionResource: schema.GroupVersionResource{ // Use deployment as cluster management resource @@ -142,6 +142,15 @@ var _ = BeforeSuite(func() { clusterinfo.Fed: fedConfig, }, }, + Log: klogr.New(), + }) + Expect(err).NotTo(HaveOccurred()) + + manager, newCacheFunc, newClientFunc, err = NewManager(&ManagerConfig{ + FedConfig: fedConfig, + ClusterScheme: clusterScheme, + ResyncPeriod: 10 * time.Minute, + ClusterController: clusterController, }, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(manager).NotTo(BeNil()) @@ -167,7 +176,7 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(clusterClient).NotTo(BeNil()) - go manager.Run(2, ctx) + go manager.Run(ctx) }) var _ = AfterSuite(func() { From 05d94c2db0afd1e228062e18bcab1c1f6416f31a Mon Sep 17 00:00:00 2001 From: shaofan-hs Date: Thu, 14 Mar 2024 17:39:50 +0800 Subject: [PATCH 2/4] add a static cluster controller --- multicluster/controller/controller.go | 2 + multicluster/controller/static_controller.go | 57 +++++++++++++++++++ .../controller/test_cluster_provider.go | 1 + 3 files changed, 60 insertions(+) create mode 100644 multicluster/controller/static_controller.go diff --git a/multicluster/controller/controller.go b/multicluster/controller/controller.go index 5430c22..51fc105 100644 --- a/multicluster/controller/controller.go +++ b/multicluster/controller/controller.go @@ -38,6 +38,7 @@ import ( "kusionstack.io/kube-utils/multicluster/metrics" ) +// ClusterProvider is used to provide cluster management resource and cluster kubeconfig type ClusterProvider interface { Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource @@ -45,6 +46,7 @@ type ClusterProvider interface { GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource } +// Controller is used to manage clusters type Controller struct { config *rest.Config clusterProvider ClusterProvider diff --git a/multicluster/controller/static_controller.go b/multicluster/controller/static_controller.go new file mode 100644 index 0000000..b8f70f8 --- /dev/null +++ b/multicluster/controller/static_controller.go @@ -0,0 +1,57 @@ +/** + * Copyright 2024 KusionStack Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controller + +import ( + "context" + + "k8s.io/client-go/rest" +) + +// StaticController is a controller that manages a static set of clusters. +type StaticController struct { + clusterToConfig map[string]*rest.Config + addUpdateHandler func(string, *rest.Config) error +} + +func NewStaticController(clusterToConfig map[string]*rest.Config) *StaticController { + return &StaticController{ + clusterToConfig: clusterToConfig, + } +} + +func (c *StaticController) Run(stopCh <-chan struct{}) error { + if c.addUpdateHandler == nil { + return nil + } + + for cluster, config := range c.clusterToConfig { + if err := c.addUpdateHandler(cluster, config); err != nil { + return err + } + } + + return nil +} + +func (c *StaticController) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { + c.addUpdateHandler = addUpdateHandler +} + +func (c *StaticController) WaitForSynced(ctx context.Context) bool { + return true +} diff --git a/multicluster/controller/test_cluster_provider.go b/multicluster/controller/test_cluster_provider.go index 9275d73..83ed29b 100644 --- a/multicluster/controller/test_cluster_provider.go +++ b/multicluster/controller/test_cluster_provider.go @@ -24,6 +24,7 @@ import ( var _ ClusterProvider = &TestClusterProvider{} +// TestClusterProvider is a test implementation of ClusterProvider type TestClusterProvider struct { schema.GroupVersionResource ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig From d246db4454a1a3376953426c0dba966378e8d8cb Mon Sep 17 00:00:00 2001 From: shaofan-hs Date: Thu, 21 Mar 2024 15:24:13 +0800 Subject: [PATCH 3/4] refactor, cluster provider --- .../dynamic_cluster_provider.go} | 66 +++++++++---------- .../karbour_cluster_manager.go} | 14 ++-- .../static_cluster_provider.go} | 16 ++--- .../test_cluster_manager.go} | 16 ++--- multicluster/manager.go | 22 +++---- multicluster/suite_test.go | 14 ++-- 6 files changed, 74 insertions(+), 74 deletions(-) rename multicluster/{controller/controller.go => clusterprovider/dynamic_cluster_provider.go} (75%) rename multicluster/{controller/karbour_cluster_provider.go => clusterprovider/karbour_cluster_manager.go} (74%) rename multicluster/{controller/static_controller.go => clusterprovider/static_cluster_provider.go} (66%) rename multicluster/{controller/test_cluster_provider.go => clusterprovider/test_cluster_manager.go} (70%) diff --git a/multicluster/controller/controller.go b/multicluster/clusterprovider/dynamic_cluster_provider.go similarity index 75% rename from multicluster/controller/controller.go rename to multicluster/clusterprovider/dynamic_cluster_provider.go index 51fc105..ebc3fa4 100644 --- a/multicluster/controller/controller.go +++ b/multicluster/clusterprovider/dynamic_cluster_provider.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package clusterprovider import ( "context" @@ -38,18 +38,18 @@ import ( "kusionstack.io/kube-utils/multicluster/metrics" ) -// ClusterProvider is used to provide cluster management resource and cluster kubeconfig -type ClusterProvider interface { +// ClusterManager is used to provide cluster management resource and cluster kubeconfig +type ClusterManager interface { Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource } -// Controller is used to manage clusters -type Controller struct { - config *rest.Config - clusterProvider ClusterProvider +// DynamicClusterProvider is used to manage clusters +type DynamicClusterProvider struct { + config *rest.Config + ClusterManager ClusterManager client dynamic.Interface // Client to get cluster info informerFactory dynamicinformer.DynamicSharedInformerFactory @@ -68,34 +68,34 @@ type Controller struct { log logr.Logger } -type ControllerConfig struct { - Config *rest.Config // Kubeconfig for the fed cluster - ClusterProvider ClusterProvider - ResyncPeriod time.Duration // Resync period for cluster management - Log logr.Logger +type DynamicClusterProviderConfig struct { + Config *rest.Config // Kubeconfig for the fed cluster + ClusterManager ClusterManager + ResyncPeriod time.Duration // Resync period for cluster management + Log logr.Logger } -// NewController creates a new Controller which will process events about cluster. -func NewController(cfg *ControllerConfig) (*Controller, error) { +// NewDynamicClusterProvider creates a new DynamicClusterProvider which will process events about cluster. +func NewDynamicClusterProvider(cfg *DynamicClusterProviderConfig) (*DynamicClusterProvider, error) { client, err := dynamic.NewForConfig(cfg.Config) if err != nil { return nil, err } - if cfg.ClusterProvider == nil { - return nil, fmt.Errorf("ClusterProvider is required") + if cfg.ClusterManager == nil { + return nil, fmt.Errorf("ClusterManager is required") } informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod) - informer := informerFactory.ForResource(cfg.ClusterProvider.GetClusterMangementGVR()).Informer() + informer := informerFactory.ForResource(cfg.ClusterManager.GetClusterMangementGVR()).Informer() - return &Controller{ - config: cfg.Config, - clusterProvider: cfg.ClusterProvider, + return &DynamicClusterProvider{ + config: cfg.Config, + ClusterManager: cfg.ClusterManager, client: client, informerFactory: informerFactory, informer: informer, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterProvider.GetClusterMangementGVR().Resource), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterManager.GetClusterMangementGVR().Resource), syncedCh: make(chan struct{}), clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name @@ -107,7 +107,7 @@ func NewController(cfg *ControllerConfig) (*Controller, error) { // AddEventHandler adds handlers which will be invoked. // When cluster is added or updated, addUpdateHandler will be invoked. // When cluster is deleted, deleteHandler will be invoked. -func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { +func (c *DynamicClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueClusterEvent, UpdateFunc: func(old, new interface{}) { @@ -120,11 +120,11 @@ func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) c.deleteHandler = deleteHandler } -func (c *Controller) Run(stopCh <-chan struct{}) error { +func (c *DynamicClusterProvider) Run(stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() - c.clusterProvider.Init(c.config) + c.ClusterManager.Init(c.config) c.informerFactory.Start(stopCh) @@ -146,7 +146,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { return nil } -func (c *Controller) WaitForSynced(ctx context.Context) bool { +func (c *DynamicClusterProvider) WaitForSynced(ctx context.Context) bool { select { case <-c.syncedCh: // Wait for all cluster has been processed return true @@ -155,7 +155,7 @@ func (c *Controller) WaitForSynced(ctx context.Context) bool { } } -func (c *Controller) enqueueClusterEvent(obj interface{}) { +func (c *DynamicClusterProvider) enqueueClusterEvent(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { c.log.Error(err, "failed to get enqueue key") @@ -164,7 +164,7 @@ func (c *Controller) enqueueClusterEvent(obj interface{}) { c.workqueue.Add(key) } -func (c *Controller) runWorker() { +func (c *DynamicClusterProvider) runWorker() { for c.processNextWorkItem() { c.mutex.Lock() if c.syncedNum > 0 { @@ -177,7 +177,7 @@ func (c *Controller) runWorker() { } } -func (c *Controller) processNextWorkItem() bool { +func (c *DynamicClusterProvider) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false @@ -209,14 +209,14 @@ func (c *Controller) processNextWorkItem() bool { } // eventHandler is called when an event about cluster is received. -func (c *Controller) eventHandler(key string) error { +func (c *DynamicClusterProvider) eventHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { c.log.Error(err, "failed to split namespaced key", "key", key) return nil } - obj, err := c.client.Resource(c.clusterProvider.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.client.Resource(c.ClusterManager.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { c.mutex.Lock() @@ -228,7 +228,7 @@ func (c *Controller) eventHandler(key string) error { } delete(c.namespacedKeyToObj, key) - clusterName := c.clusterProvider.GetClusterName(oldObj) + clusterName := c.ClusterManager.GetClusterName(oldObj) delete(c.clusterNameToNamespacedKey, clusterName) metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc() @@ -242,11 +242,11 @@ func (c *Controller) eventHandler(key string) error { c.mutex.Lock() c.namespacedKeyToObj[key] = obj - clusterName := c.clusterProvider.GetClusterName(obj) + clusterName := c.ClusterManager.GetClusterName(obj) c.clusterNameToNamespacedKey[clusterName] = key c.mutex.Unlock() - err = c.addUpdateHandler(clusterName, c.clusterProvider.GetClusterConfig(obj)) + err = c.addUpdateHandler(clusterName, c.ClusterManager.GetClusterConfig(obj)) if err != nil { metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc() c.log.Error(err, "failed to add or update cluster", "key", key) diff --git a/multicluster/controller/karbour_cluster_provider.go b/multicluster/clusterprovider/karbour_cluster_manager.go similarity index 74% rename from multicluster/controller/karbour_cluster_provider.go rename to multicluster/clusterprovider/karbour_cluster_manager.go index c8e1b22..0b00ecb 100644 --- a/multicluster/controller/karbour_cluster_provider.go +++ b/multicluster/clusterprovider/karbour_cluster_manager.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package clusterprovider import ( "fmt" @@ -26,28 +26,28 @@ import ( clusterv1beta1 "kusionstack.io/kube-api/cluster/v1beta1" ) -var _ ClusterProvider = &KarbourClusterProvider{} +var _ ClusterManager = &KarbourClusterManager{} -type KarbourClusterProvider struct { +type KarbourClusterManager struct { config *rest.Config } -func (p *KarbourClusterProvider) Init(config *rest.Config) { +func (p *KarbourClusterManager) Init(config *rest.Config) { p.config = config } -func (p *KarbourClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource { +func (p *KarbourClusterManager) GetClusterMangementGVR() schema.GroupVersionResource { return clusterv1beta1.SchemeGroupVersion.WithResource("clusters") } -func (p *KarbourClusterProvider) GetClusterName(obj *unstructured.Unstructured) string { +func (p *KarbourClusterManager) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } return obj.GetName() } -func (p *KarbourClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *KarbourClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { clusterName := p.GetClusterName(obj) if clusterName == "" || p.config == nil { return nil diff --git a/multicluster/controller/static_controller.go b/multicluster/clusterprovider/static_cluster_provider.go similarity index 66% rename from multicluster/controller/static_controller.go rename to multicluster/clusterprovider/static_cluster_provider.go index b8f70f8..327cd40 100644 --- a/multicluster/controller/static_controller.go +++ b/multicluster/clusterprovider/static_cluster_provider.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package clusterprovider import ( "context" @@ -22,19 +22,19 @@ import ( "k8s.io/client-go/rest" ) -// StaticController is a controller that manages a static set of clusters. -type StaticController struct { +// StaticClusterProvider is a controller that manages a static set of clusters. +type StaticClusterProvider struct { clusterToConfig map[string]*rest.Config addUpdateHandler func(string, *rest.Config) error } -func NewStaticController(clusterToConfig map[string]*rest.Config) *StaticController { - return &StaticController{ +func NewStaticClusterProvider(clusterToConfig map[string]*rest.Config) *StaticClusterProvider { + return &StaticClusterProvider{ clusterToConfig: clusterToConfig, } } -func (c *StaticController) Run(stopCh <-chan struct{}) error { +func (c *StaticClusterProvider) Run(stopCh <-chan struct{}) error { if c.addUpdateHandler == nil { return nil } @@ -48,10 +48,10 @@ func (c *StaticController) Run(stopCh <-chan struct{}) error { return nil } -func (c *StaticController) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { +func (c *StaticClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.addUpdateHandler = addUpdateHandler } -func (c *StaticController) WaitForSynced(ctx context.Context) bool { +func (c *StaticClusterProvider) WaitForSynced(ctx context.Context) bool { return true } diff --git a/multicluster/controller/test_cluster_provider.go b/multicluster/clusterprovider/test_cluster_manager.go similarity index 70% rename from multicluster/controller/test_cluster_provider.go rename to multicluster/clusterprovider/test_cluster_manager.go index 83ed29b..2f3a93c 100644 --- a/multicluster/controller/test_cluster_provider.go +++ b/multicluster/clusterprovider/test_cluster_manager.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package clusterprovider import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -22,30 +22,30 @@ import ( "k8s.io/client-go/rest" ) -var _ ClusterProvider = &TestClusterProvider{} +var _ ClusterManager = &TestClusterManager{} -// TestClusterProvider is a test implementation of ClusterProvider -type TestClusterProvider struct { +// TestClusterManager is a test implementation of ClusterProviderInfo +type TestClusterManager struct { schema.GroupVersionResource ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig } -func (p *TestClusterProvider) Init(config *rest.Config) { +func (p *TestClusterManager) Init(config *rest.Config) { // Do nothing } -func (p *TestClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource { +func (p *TestClusterManager) GetClusterMangementGVR() schema.GroupVersionResource { return p.GroupVersionResource } -func (p *TestClusterProvider) GetClusterName(obj *unstructured.Unstructured) string { +func (p *TestClusterManager) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } return obj.GetName() // Use resource name as cluster name } -func (p *TestClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *TestClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { if obj == nil || p.ClusterNameToConfig == nil { return nil } diff --git a/multicluster/manager.go b/multicluster/manager.go index a86abbb..2a05e1c 100644 --- a/multicluster/manager.go +++ b/multicluster/manager.go @@ -58,8 +58,8 @@ func setOptionsDefaults(opts Options) Options { return opts } -type ClusterController interface { - Run(stopCh <-chan struct{}) error // Run the controller +type ClusterProvider interface { + Run(stopCh <-chan struct{}) error // Run the ClusterProvider AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) // Add event handler for cluster events WaitForSynced(ctx context.Context) bool // Wait for all clusters to be synced } @@ -67,7 +67,7 @@ type ClusterController interface { type ManagerConfig struct { FedConfig *rest.Config ClusterScheme *runtime.Scheme - ClusterController + ClusterProvider ResyncPeriod time.Duration ClusterFilter func(string) bool // select cluster @@ -78,7 +78,7 @@ type Manager struct { newCache cache.NewCacheFunc // Function to create cache for cluster clusterScheme *runtime.Scheme // Scheme which is used to create cache for cluster - controller ClusterController // Controller for cluster management + clusterProvider ClusterProvider clusterCacheManager ClusterCacheManager clusterClientManager ClusterClientManager @@ -92,8 +92,8 @@ type Manager struct { } func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFunc cache.NewCacheFunc, newClientFunc cluster.NewClientFunc, err error) { - if cfg.ClusterController == nil { - return nil, nil, nil, errors.New("ClusterController is required") + if cfg.ClusterProvider == nil { + return nil, nil, nil, errors.New("ClusterProvider is required") } var log logr.Logger @@ -122,7 +122,7 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun clusterCacheManager: clusterCacheManager, clusterClientManager: clusterClientManager, - controller: cfg.ClusterController, + clusterProvider: cfg.ClusterProvider, resyncPeriod: cfg.ResyncPeriod, hasCluster: make(map[string]struct{}), @@ -140,13 +140,13 @@ func (m *Manager) Run(ctx context.Context) error { close(stopCh) }() - m.controller.AddEventHandler(m.addUpdateHandler, m.deleteHandler) - return m.controller.Run(stopCh) + m.clusterProvider.AddEventHandler(m.addUpdateHandler, m.deleteHandler) + return m.clusterProvider.Run(stopCh) } func (m *Manager) WaitForSynced(ctx context.Context) bool { - m.log.Info("wait for controller synced") - return m.controller.WaitForSynced(ctx) + m.log.Info("wait for ClusterProvider synced") + return m.clusterProvider.WaitForSynced(ctx) } func (m *Manager) addUpdateHandler(cluster string, cfg *rest.Config) (err error) { diff --git a/multicluster/suite_test.go b/multicluster/suite_test.go index 7643ff1..8aa2625 100644 --- a/multicluster/suite_test.go +++ b/multicluster/suite_test.go @@ -40,7 +40,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" "kusionstack.io/kube-utils/multicluster/clusterinfo" - "kusionstack.io/kube-utils/multicluster/controller" + "kusionstack.io/kube-utils/multicluster/clusterprovider" ) var ( @@ -127,10 +127,10 @@ var _ = BeforeSuite(func() { ) os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2") - clusterController, err := controller.NewController(&controller.ControllerConfig{ + clusterProvider, err := clusterprovider.NewDynamicClusterProvider(&clusterprovider.DynamicClusterProviderConfig{ Config: fedConfig, - ClusterProvider: &controller.TestClusterProvider{ + ClusterManager: &clusterprovider.TestClusterManager{ GroupVersionResource: schema.GroupVersionResource{ // Use deployment as cluster management resource Group: "apps", Version: "v1", @@ -147,10 +147,10 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) manager, newCacheFunc, newClientFunc, err = NewManager(&ManagerConfig{ - FedConfig: fedConfig, - ClusterScheme: clusterScheme, - ResyncPeriod: 10 * time.Minute, - ClusterController: clusterController, + FedConfig: fedConfig, + ClusterScheme: clusterScheme, + ResyncPeriod: 10 * time.Minute, + ClusterProvider: clusterProvider, }, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(manager).NotTo(BeNil()) From 00dcda60fe426574082c995c2c530d458569b61e Mon Sep 17 00:00:00 2001 From: shaofan-hs Date: Thu, 21 Mar 2024 18:24:07 +0800 Subject: [PATCH 4/4] fix --- .../karbour.go} | 17 +++-- .../simple.go} | 23 +++---- ...amic_cluster_provider.go => controller.go} | 66 +++++++++---------- .../{static_cluster_provider.go => simple.go} | 14 ++-- multicluster/manager.go | 6 +- multicluster/suite_test.go | 7 +- 6 files changed, 65 insertions(+), 68 deletions(-) rename multicluster/clusterprovider/{karbour_cluster_manager.go => config/karbour.go} (72%) rename multicluster/clusterprovider/{test_cluster_manager.go => config/simple.go} (62%) rename multicluster/clusterprovider/{dynamic_cluster_provider.go => controller.go} (75%) rename multicluster/clusterprovider/{static_cluster_provider.go => simple.go} (73%) diff --git a/multicluster/clusterprovider/karbour_cluster_manager.go b/multicluster/clusterprovider/config/karbour.go similarity index 72% rename from multicluster/clusterprovider/karbour_cluster_manager.go rename to multicluster/clusterprovider/config/karbour.go index 0b00ecb..75924ee 100644 --- a/multicluster/clusterprovider/karbour_cluster_manager.go +++ b/multicluster/clusterprovider/config/karbour.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package clusterprovider +package config import ( "fmt" @@ -26,34 +26,33 @@ import ( clusterv1beta1 "kusionstack.io/kube-api/cluster/v1beta1" ) -var _ ClusterManager = &KarbourClusterManager{} - -type KarbourClusterManager struct { +// Karbour is a implementation of ClusterConfigProvider +type Karbour struct { config *rest.Config } -func (p *KarbourClusterManager) Init(config *rest.Config) { +func (p *Karbour) Init(config *rest.Config) { p.config = config } -func (p *KarbourClusterManager) GetClusterMangementGVR() schema.GroupVersionResource { +func (p *Karbour) GetGVR() schema.GroupVersionResource { return clusterv1beta1.SchemeGroupVersion.WithResource("clusters") } -func (p *KarbourClusterManager) GetClusterName(obj *unstructured.Unstructured) string { +func (p *Karbour) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } return obj.GetName() } -func (p *KarbourClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *Karbour) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { clusterName := p.GetClusterName(obj) if clusterName == "" || p.config == nil { return nil } - gvr := p.GetClusterMangementGVR() + gvr := p.GetGVR() clusterConfig := *p.config clusterConfig.Host = fmt.Sprintf("%s/apis/%s/%s/%s/%s/proxy", clusterConfig.Host, gvr.Group, gvr.Version, gvr.Resource, clusterName) diff --git a/multicluster/clusterprovider/test_cluster_manager.go b/multicluster/clusterprovider/config/simple.go similarity index 62% rename from multicluster/clusterprovider/test_cluster_manager.go rename to multicluster/clusterprovider/config/simple.go index 2f3a93c..fa00cf7 100644 --- a/multicluster/clusterprovider/test_cluster_manager.go +++ b/multicluster/clusterprovider/config/simple.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package clusterprovider +package config import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -22,30 +22,27 @@ import ( "k8s.io/client-go/rest" ) -var _ ClusterManager = &TestClusterManager{} - -// TestClusterManager is a test implementation of ClusterProviderInfo -type TestClusterManager struct { - schema.GroupVersionResource +// Simple is a implementation of ClusterConfigProvider +type Simple struct { + GVR schema.GroupVersionResource ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig } -func (p *TestClusterManager) Init(config *rest.Config) { - // Do nothing +func (p *Simple) Init(config *rest.Config) { } -func (p *TestClusterManager) GetClusterMangementGVR() schema.GroupVersionResource { - return p.GroupVersionResource +func (p *Simple) GetGVR() schema.GroupVersionResource { + return p.GVR } -func (p *TestClusterManager) GetClusterName(obj *unstructured.Unstructured) string { +func (p *Simple) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } - return obj.GetName() // Use resource name as cluster name + return obj.GetName() } -func (p *TestClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *Simple) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { if obj == nil || p.ClusterNameToConfig == nil { return nil } diff --git a/multicluster/clusterprovider/dynamic_cluster_provider.go b/multicluster/clusterprovider/controller.go similarity index 75% rename from multicluster/clusterprovider/dynamic_cluster_provider.go rename to multicluster/clusterprovider/controller.go index ebc3fa4..b82a288 100644 --- a/multicluster/clusterprovider/dynamic_cluster_provider.go +++ b/multicluster/clusterprovider/controller.go @@ -38,18 +38,18 @@ import ( "kusionstack.io/kube-utils/multicluster/metrics" ) -// ClusterManager is used to provide cluster management resource and cluster kubeconfig -type ClusterManager interface { +// ClusterConfigProvider is used to provide cluster management resource and cluster kubeconfig +type ClusterConfigProvider interface { Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster - GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource + GetGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource } -// DynamicClusterProvider is used to manage clusters -type DynamicClusterProvider struct { - config *rest.Config - ClusterManager ClusterManager +// Controller is used to manage clusters +type Controller struct { + config *rest.Config + ClusterConfigProvider ClusterConfigProvider client dynamic.Interface // Client to get cluster info informerFactory dynamicinformer.DynamicSharedInformerFactory @@ -68,34 +68,34 @@ type DynamicClusterProvider struct { log logr.Logger } -type DynamicClusterProviderConfig struct { - Config *rest.Config // Kubeconfig for the fed cluster - ClusterManager ClusterManager - ResyncPeriod time.Duration // Resync period for cluster management - Log logr.Logger +type ControllerConfig struct { + Config *rest.Config // Kubeconfig for the fed cluster + ClusterConfigProvider ClusterConfigProvider + ResyncPeriod time.Duration // Resync period for cluster management + Log logr.Logger } -// NewDynamicClusterProvider creates a new DynamicClusterProvider which will process events about cluster. -func NewDynamicClusterProvider(cfg *DynamicClusterProviderConfig) (*DynamicClusterProvider, error) { +// NewController creates a new Controller which will process events about cluster. +func NewController(cfg *ControllerConfig) (*Controller, error) { client, err := dynamic.NewForConfig(cfg.Config) if err != nil { return nil, err } - if cfg.ClusterManager == nil { - return nil, fmt.Errorf("ClusterManager is required") + if cfg.ClusterConfigProvider == nil { + return nil, fmt.Errorf("ClusterConfigProvider is required") } informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod) - informer := informerFactory.ForResource(cfg.ClusterManager.GetClusterMangementGVR()).Informer() + informer := informerFactory.ForResource(cfg.ClusterConfigProvider.GetGVR()).Informer() - return &DynamicClusterProvider{ - config: cfg.Config, - ClusterManager: cfg.ClusterManager, + return &Controller{ + config: cfg.Config, + ClusterConfigProvider: cfg.ClusterConfigProvider, client: client, informerFactory: informerFactory, informer: informer, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterManager.GetClusterMangementGVR().Resource), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterConfigProvider.GetGVR().Resource), syncedCh: make(chan struct{}), clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name @@ -107,7 +107,7 @@ func NewDynamicClusterProvider(cfg *DynamicClusterProviderConfig) (*DynamicClust // AddEventHandler adds handlers which will be invoked. // When cluster is added or updated, addUpdateHandler will be invoked. // When cluster is deleted, deleteHandler will be invoked. -func (c *DynamicClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { +func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueClusterEvent, UpdateFunc: func(old, new interface{}) { @@ -120,11 +120,11 @@ func (c *DynamicClusterProvider) AddEventHandler(addUpdateHandler func(string, * c.deleteHandler = deleteHandler } -func (c *DynamicClusterProvider) Run(stopCh <-chan struct{}) error { +func (c *Controller) Run(stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() - c.ClusterManager.Init(c.config) + c.ClusterConfigProvider.Init(c.config) c.informerFactory.Start(stopCh) @@ -146,7 +146,7 @@ func (c *DynamicClusterProvider) Run(stopCh <-chan struct{}) error { return nil } -func (c *DynamicClusterProvider) WaitForSynced(ctx context.Context) bool { +func (c *Controller) WaitForSynced(ctx context.Context) bool { select { case <-c.syncedCh: // Wait for all cluster has been processed return true @@ -155,7 +155,7 @@ func (c *DynamicClusterProvider) WaitForSynced(ctx context.Context) bool { } } -func (c *DynamicClusterProvider) enqueueClusterEvent(obj interface{}) { +func (c *Controller) enqueueClusterEvent(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { c.log.Error(err, "failed to get enqueue key") @@ -164,7 +164,7 @@ func (c *DynamicClusterProvider) enqueueClusterEvent(obj interface{}) { c.workqueue.Add(key) } -func (c *DynamicClusterProvider) runWorker() { +func (c *Controller) runWorker() { for c.processNextWorkItem() { c.mutex.Lock() if c.syncedNum > 0 { @@ -177,7 +177,7 @@ func (c *DynamicClusterProvider) runWorker() { } } -func (c *DynamicClusterProvider) processNextWorkItem() bool { +func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false @@ -209,14 +209,14 @@ func (c *DynamicClusterProvider) processNextWorkItem() bool { } // eventHandler is called when an event about cluster is received. -func (c *DynamicClusterProvider) eventHandler(key string) error { +func (c *Controller) eventHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { c.log.Error(err, "failed to split namespaced key", "key", key) return nil } - obj, err := c.client.Resource(c.ClusterManager.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.client.Resource(c.ClusterConfigProvider.GetGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { c.mutex.Lock() @@ -228,7 +228,7 @@ func (c *DynamicClusterProvider) eventHandler(key string) error { } delete(c.namespacedKeyToObj, key) - clusterName := c.ClusterManager.GetClusterName(oldObj) + clusterName := c.ClusterConfigProvider.GetClusterName(oldObj) delete(c.clusterNameToNamespacedKey, clusterName) metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc() @@ -242,11 +242,11 @@ func (c *DynamicClusterProvider) eventHandler(key string) error { c.mutex.Lock() c.namespacedKeyToObj[key] = obj - clusterName := c.ClusterManager.GetClusterName(obj) + clusterName := c.ClusterConfigProvider.GetClusterName(obj) c.clusterNameToNamespacedKey[clusterName] = key c.mutex.Unlock() - err = c.addUpdateHandler(clusterName, c.ClusterManager.GetClusterConfig(obj)) + err = c.addUpdateHandler(clusterName, c.ClusterConfigProvider.GetClusterConfig(obj)) if err != nil { metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc() c.log.Error(err, "failed to add or update cluster", "key", key) diff --git a/multicluster/clusterprovider/static_cluster_provider.go b/multicluster/clusterprovider/simple.go similarity index 73% rename from multicluster/clusterprovider/static_cluster_provider.go rename to multicluster/clusterprovider/simple.go index 327cd40..10c7847 100644 --- a/multicluster/clusterprovider/static_cluster_provider.go +++ b/multicluster/clusterprovider/simple.go @@ -22,19 +22,19 @@ import ( "k8s.io/client-go/rest" ) -// StaticClusterProvider is a controller that manages a static set of clusters. -type StaticClusterProvider struct { +// SimpleClusterProvider is a controller that manages a static set of clusters. +type SimpleClusterProvider struct { clusterToConfig map[string]*rest.Config addUpdateHandler func(string, *rest.Config) error } -func NewStaticClusterProvider(clusterToConfig map[string]*rest.Config) *StaticClusterProvider { - return &StaticClusterProvider{ +func NewSimpleClusterProvider(clusterToConfig map[string]*rest.Config) *SimpleClusterProvider { + return &SimpleClusterProvider{ clusterToConfig: clusterToConfig, } } -func (c *StaticClusterProvider) Run(stopCh <-chan struct{}) error { +func (c *SimpleClusterProvider) Run(stopCh <-chan struct{}) error { if c.addUpdateHandler == nil { return nil } @@ -48,10 +48,10 @@ func (c *StaticClusterProvider) Run(stopCh <-chan struct{}) error { return nil } -func (c *StaticClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { +func (c *SimpleClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.addUpdateHandler = addUpdateHandler } -func (c *StaticClusterProvider) WaitForSynced(ctx context.Context) bool { +func (c *SimpleClusterProvider) WaitForSynced(ctx context.Context) bool { return true } diff --git a/multicluster/manager.go b/multicluster/manager.go index 2a05e1c..6630e9d 100644 --- a/multicluster/manager.go +++ b/multicluster/manager.go @@ -65,9 +65,9 @@ type ClusterProvider interface { } type ManagerConfig struct { - FedConfig *rest.Config - ClusterScheme *runtime.Scheme - ClusterProvider + FedConfig *rest.Config + ClusterScheme *runtime.Scheme + ClusterProvider ClusterProvider ResyncPeriod time.Duration ClusterFilter func(string) bool // select cluster diff --git a/multicluster/suite_test.go b/multicluster/suite_test.go index 8aa2625..7f74554 100644 --- a/multicluster/suite_test.go +++ b/multicluster/suite_test.go @@ -41,6 +41,7 @@ import ( "kusionstack.io/kube-utils/multicluster/clusterinfo" "kusionstack.io/kube-utils/multicluster/clusterprovider" + "kusionstack.io/kube-utils/multicluster/clusterprovider/config" ) var ( @@ -127,11 +128,11 @@ var _ = BeforeSuite(func() { ) os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2") - clusterProvider, err := clusterprovider.NewDynamicClusterProvider(&clusterprovider.DynamicClusterProviderConfig{ + clusterProvider, err := clusterprovider.NewController(&clusterprovider.ControllerConfig{ Config: fedConfig, - ClusterManager: &clusterprovider.TestClusterManager{ - GroupVersionResource: schema.GroupVersionResource{ // Use deployment as cluster management resource + ClusterConfigProvider: &config.Simple{ + GVR: schema.GroupVersionResource{ // Use deployment as cluster management resource Group: "apps", Version: "v1", Resource: "deployments",