From 0542ffeceae23105e99de4af0eaa8037ef7650bb Mon Sep 17 00:00:00 2001 From: shaofan-hs Date: Thu, 21 Mar 2024 15:24:13 +0800 Subject: [PATCH] refactor, cluster provider --- .../dynamic_cluster_provider.go} | 64 +++++++++---------- .../karbour_cluster_manager.go} | 14 ++-- .../static_cluster_provider.go} | 16 ++--- .../test_cluster_manager.go} | 16 ++--- multicluster/manager.go | 22 +++---- multicluster/suite_test.go | 14 ++-- 6 files changed, 73 insertions(+), 73 deletions(-) rename multicluster/{controller/controller.go => clusterprovider/dynamic_cluster_provider.go} (76%) rename multicluster/{controller/karbour_cluster_provider.go => clusterprovider/karbour_cluster_manager.go} (74%) rename multicluster/{controller/static_controller.go => clusterprovider/static_cluster_provider.go} (66%) rename multicluster/{controller/test_cluster_provider.go => clusterprovider/test_cluster_manager.go} (70%) diff --git a/multicluster/controller/controller.go b/multicluster/clusterprovider/dynamic_cluster_provider.go similarity index 76% rename from multicluster/controller/controller.go rename to multicluster/clusterprovider/dynamic_cluster_provider.go index 51fc105..696d704 100644 --- a/multicluster/controller/controller.go +++ b/multicluster/clusterprovider/dynamic_cluster_provider.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package clusterprovider import ( "context" @@ -38,18 +38,18 @@ import ( "kusionstack.io/kube-utils/multicluster/metrics" ) -// ClusterProvider is used to provide cluster management resource and cluster kubeconfig -type ClusterProvider interface { +// ClusterManager is used to provide cluster management resource and cluster kubeconfig +type ClusterManager interface { Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource } -// Controller is used to manage clusters -type Controller struct { - config *rest.Config - clusterProvider ClusterProvider +// DynamicClusterProvider is used to manage clusters +type DynamicClusterProvider struct { + config *rest.Config + ClusterManager ClusterManager client dynamic.Interface // Client to get cluster info informerFactory dynamicinformer.DynamicSharedInformerFactory @@ -69,33 +69,33 @@ type Controller struct { } type ControllerConfig struct { - Config *rest.Config // Kubeconfig for the fed cluster - ClusterProvider ClusterProvider - ResyncPeriod time.Duration // Resync period for cluster management - Log logr.Logger + Config *rest.Config // Kubeconfig for the fed cluster + ClusterManager ClusterManager + ResyncPeriod time.Duration // Resync period for cluster management + Log logr.Logger } -// NewController creates a new Controller which will process events about cluster. -func NewController(cfg *ControllerConfig) (*Controller, error) { +// NewDynamicClusterProvider creates a new DynamicClusterProvider which will process events about cluster. +func NewDynamicClusterProvider(cfg *ControllerConfig) (*DynamicClusterProvider, error) { client, err := dynamic.NewForConfig(cfg.Config) if err != nil { return nil, err } - if cfg.ClusterProvider == nil { - return nil, fmt.Errorf("ClusterProvider is required") + if cfg.ClusterManager == nil { + return nil, fmt.Errorf("ClusterManager is required") } informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod) - informer := informerFactory.ForResource(cfg.ClusterProvider.GetClusterMangementGVR()).Informer() + informer := informerFactory.ForResource(cfg.ClusterManager.GetClusterMangementGVR()).Informer() - return &Controller{ - config: cfg.Config, - clusterProvider: cfg.ClusterProvider, + return &DynamicClusterProvider{ + config: cfg.Config, + ClusterManager: cfg.ClusterManager, client: client, informerFactory: informerFactory, informer: informer, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterProvider.GetClusterMangementGVR().Resource), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterManager.GetClusterMangementGVR().Resource), syncedCh: make(chan struct{}), clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name @@ -107,7 +107,7 @@ func NewController(cfg *ControllerConfig) (*Controller, error) { // AddEventHandler adds handlers which will be invoked. // When cluster is added or updated, addUpdateHandler will be invoked. // When cluster is deleted, deleteHandler will be invoked. -func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { +func (c *DynamicClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueClusterEvent, UpdateFunc: func(old, new interface{}) { @@ -120,11 +120,11 @@ func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) c.deleteHandler = deleteHandler } -func (c *Controller) Run(stopCh <-chan struct{}) error { +func (c *DynamicClusterProvider) Run(stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() - c.clusterProvider.Init(c.config) + c.ClusterManager.Init(c.config) c.informerFactory.Start(stopCh) @@ -146,7 +146,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { return nil } -func (c *Controller) WaitForSynced(ctx context.Context) bool { +func (c *DynamicClusterProvider) WaitForSynced(ctx context.Context) bool { select { case <-c.syncedCh: // Wait for all cluster has been processed return true @@ -155,7 +155,7 @@ func (c *Controller) WaitForSynced(ctx context.Context) bool { } } -func (c *Controller) enqueueClusterEvent(obj interface{}) { +func (c *DynamicClusterProvider) enqueueClusterEvent(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { c.log.Error(err, "failed to get enqueue key") @@ -164,7 +164,7 @@ func (c *Controller) enqueueClusterEvent(obj interface{}) { c.workqueue.Add(key) } -func (c *Controller) runWorker() { +func (c *DynamicClusterProvider) runWorker() { for c.processNextWorkItem() { c.mutex.Lock() if c.syncedNum > 0 { @@ -177,7 +177,7 @@ func (c *Controller) runWorker() { } } -func (c *Controller) processNextWorkItem() bool { +func (c *DynamicClusterProvider) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false @@ -209,14 +209,14 @@ func (c *Controller) processNextWorkItem() bool { } // eventHandler is called when an event about cluster is received. -func (c *Controller) eventHandler(key string) error { +func (c *DynamicClusterProvider) eventHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { c.log.Error(err, "failed to split namespaced key", "key", key) return nil } - obj, err := c.client.Resource(c.clusterProvider.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.client.Resource(c.ClusterManager.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { c.mutex.Lock() @@ -228,7 +228,7 @@ func (c *Controller) eventHandler(key string) error { } delete(c.namespacedKeyToObj, key) - clusterName := c.clusterProvider.GetClusterName(oldObj) + clusterName := c.ClusterManager.GetClusterName(oldObj) delete(c.clusterNameToNamespacedKey, clusterName) metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc() @@ -242,11 +242,11 @@ func (c *Controller) eventHandler(key string) error { c.mutex.Lock() c.namespacedKeyToObj[key] = obj - clusterName := c.clusterProvider.GetClusterName(obj) + clusterName := c.ClusterManager.GetClusterName(obj) c.clusterNameToNamespacedKey[clusterName] = key c.mutex.Unlock() - err = c.addUpdateHandler(clusterName, c.clusterProvider.GetClusterConfig(obj)) + err = c.addUpdateHandler(clusterName, c.ClusterManager.GetClusterConfig(obj)) if err != nil { metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc() c.log.Error(err, "failed to add or update cluster", "key", key) diff --git a/multicluster/controller/karbour_cluster_provider.go b/multicluster/clusterprovider/karbour_cluster_manager.go similarity index 74% rename from multicluster/controller/karbour_cluster_provider.go rename to multicluster/clusterprovider/karbour_cluster_manager.go index c8e1b22..0b00ecb 100644 --- a/multicluster/controller/karbour_cluster_provider.go +++ b/multicluster/clusterprovider/karbour_cluster_manager.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package clusterprovider import ( "fmt" @@ -26,28 +26,28 @@ import ( clusterv1beta1 "kusionstack.io/kube-api/cluster/v1beta1" ) -var _ ClusterProvider = &KarbourClusterProvider{} +var _ ClusterManager = &KarbourClusterManager{} -type KarbourClusterProvider struct { +type KarbourClusterManager struct { config *rest.Config } -func (p *KarbourClusterProvider) Init(config *rest.Config) { +func (p *KarbourClusterManager) Init(config *rest.Config) { p.config = config } -func (p *KarbourClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource { +func (p *KarbourClusterManager) GetClusterMangementGVR() schema.GroupVersionResource { return clusterv1beta1.SchemeGroupVersion.WithResource("clusters") } -func (p *KarbourClusterProvider) GetClusterName(obj *unstructured.Unstructured) string { +func (p *KarbourClusterManager) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } return obj.GetName() } -func (p *KarbourClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *KarbourClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { clusterName := p.GetClusterName(obj) if clusterName == "" || p.config == nil { return nil diff --git a/multicluster/controller/static_controller.go b/multicluster/clusterprovider/static_cluster_provider.go similarity index 66% rename from multicluster/controller/static_controller.go rename to multicluster/clusterprovider/static_cluster_provider.go index b8f70f8..327cd40 100644 --- a/multicluster/controller/static_controller.go +++ b/multicluster/clusterprovider/static_cluster_provider.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package clusterprovider import ( "context" @@ -22,19 +22,19 @@ import ( "k8s.io/client-go/rest" ) -// StaticController is a controller that manages a static set of clusters. -type StaticController struct { +// StaticClusterProvider is a controller that manages a static set of clusters. +type StaticClusterProvider struct { clusterToConfig map[string]*rest.Config addUpdateHandler func(string, *rest.Config) error } -func NewStaticController(clusterToConfig map[string]*rest.Config) *StaticController { - return &StaticController{ +func NewStaticClusterProvider(clusterToConfig map[string]*rest.Config) *StaticClusterProvider { + return &StaticClusterProvider{ clusterToConfig: clusterToConfig, } } -func (c *StaticController) Run(stopCh <-chan struct{}) error { +func (c *StaticClusterProvider) Run(stopCh <-chan struct{}) error { if c.addUpdateHandler == nil { return nil } @@ -48,10 +48,10 @@ func (c *StaticController) Run(stopCh <-chan struct{}) error { return nil } -func (c *StaticController) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { +func (c *StaticClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.addUpdateHandler = addUpdateHandler } -func (c *StaticController) WaitForSynced(ctx context.Context) bool { +func (c *StaticClusterProvider) WaitForSynced(ctx context.Context) bool { return true } diff --git a/multicluster/controller/test_cluster_provider.go b/multicluster/clusterprovider/test_cluster_manager.go similarity index 70% rename from multicluster/controller/test_cluster_provider.go rename to multicluster/clusterprovider/test_cluster_manager.go index 83ed29b..2f3a93c 100644 --- a/multicluster/controller/test_cluster_provider.go +++ b/multicluster/clusterprovider/test_cluster_manager.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package clusterprovider import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -22,30 +22,30 @@ import ( "k8s.io/client-go/rest" ) -var _ ClusterProvider = &TestClusterProvider{} +var _ ClusterManager = &TestClusterManager{} -// TestClusterProvider is a test implementation of ClusterProvider -type TestClusterProvider struct { +// TestClusterManager is a test implementation of ClusterProviderInfo +type TestClusterManager struct { schema.GroupVersionResource ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig } -func (p *TestClusterProvider) Init(config *rest.Config) { +func (p *TestClusterManager) Init(config *rest.Config) { // Do nothing } -func (p *TestClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource { +func (p *TestClusterManager) GetClusterMangementGVR() schema.GroupVersionResource { return p.GroupVersionResource } -func (p *TestClusterProvider) GetClusterName(obj *unstructured.Unstructured) string { +func (p *TestClusterManager) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } return obj.GetName() // Use resource name as cluster name } -func (p *TestClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *TestClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { if obj == nil || p.ClusterNameToConfig == nil { return nil } diff --git a/multicluster/manager.go b/multicluster/manager.go index a86abbb..00ea9ec 100644 --- a/multicluster/manager.go +++ b/multicluster/manager.go @@ -58,8 +58,8 @@ func setOptionsDefaults(opts Options) Options { return opts } -type ClusterController interface { - Run(stopCh <-chan struct{}) error // Run the controller +type ClusterProvider interface { + Run(stopCh <-chan struct{}) error // Run the ClusterProvider AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) // Add event handler for cluster events WaitForSynced(ctx context.Context) bool // Wait for all clusters to be synced } @@ -67,7 +67,7 @@ type ClusterController interface { type ManagerConfig struct { FedConfig *rest.Config ClusterScheme *runtime.Scheme - ClusterController + ClusterProvider ResyncPeriod time.Duration ClusterFilter func(string) bool // select cluster @@ -78,7 +78,7 @@ type Manager struct { newCache cache.NewCacheFunc // Function to create cache for cluster clusterScheme *runtime.Scheme // Scheme which is used to create cache for cluster - controller ClusterController // Controller for cluster management + clusterProvider ClusterProvider // Controller for cluster management clusterCacheManager ClusterCacheManager clusterClientManager ClusterClientManager @@ -92,8 +92,8 @@ type Manager struct { } func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFunc cache.NewCacheFunc, newClientFunc cluster.NewClientFunc, err error) { - if cfg.ClusterController == nil { - return nil, nil, nil, errors.New("ClusterController is required") + if cfg.ClusterProvider == nil { + return nil, nil, nil, errors.New("ClusterProvider is required") } var log logr.Logger @@ -122,7 +122,7 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun clusterCacheManager: clusterCacheManager, clusterClientManager: clusterClientManager, - controller: cfg.ClusterController, + clusterProvider: cfg.ClusterProvider, resyncPeriod: cfg.ResyncPeriod, hasCluster: make(map[string]struct{}), @@ -140,13 +140,13 @@ func (m *Manager) Run(ctx context.Context) error { close(stopCh) }() - m.controller.AddEventHandler(m.addUpdateHandler, m.deleteHandler) - return m.controller.Run(stopCh) + m.clusterProvider.AddEventHandler(m.addUpdateHandler, m.deleteHandler) + return m.clusterProvider.Run(stopCh) } func (m *Manager) WaitForSynced(ctx context.Context) bool { - m.log.Info("wait for controller synced") - return m.controller.WaitForSynced(ctx) + m.log.Info("wait for ClusterProvider synced") + return m.clusterProvider.WaitForSynced(ctx) } func (m *Manager) addUpdateHandler(cluster string, cfg *rest.Config) (err error) { diff --git a/multicluster/suite_test.go b/multicluster/suite_test.go index 7643ff1..85165eb 100644 --- a/multicluster/suite_test.go +++ b/multicluster/suite_test.go @@ -40,7 +40,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" "kusionstack.io/kube-utils/multicluster/clusterinfo" - "kusionstack.io/kube-utils/multicluster/controller" + "kusionstack.io/kube-utils/multicluster/clusterprovider" ) var ( @@ -127,10 +127,10 @@ var _ = BeforeSuite(func() { ) os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2") - clusterController, err := controller.NewController(&controller.ControllerConfig{ + clusterProvider, err := clusterprovider.NewDynamicClusterProvider(&clusterprovider.ControllerConfig{ Config: fedConfig, - ClusterProvider: &controller.TestClusterProvider{ + ClusterManager: &clusterprovider.TestClusterManager{ GroupVersionResource: schema.GroupVersionResource{ // Use deployment as cluster management resource Group: "apps", Version: "v1", @@ -147,10 +147,10 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) manager, newCacheFunc, newClientFunc, err = NewManager(&ManagerConfig{ - FedConfig: fedConfig, - ClusterScheme: clusterScheme, - ResyncPeriod: 10 * time.Minute, - ClusterController: clusterController, + FedConfig: fedConfig, + ClusterScheme: clusterScheme, + ResyncPeriod: 10 * time.Minute, + ClusterProvider: clusterProvider, }, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(manager).NotTo(BeNil())