diff --git a/multicluster/clusterprovider/karbour_cluster_manager.go b/multicluster/clusterprovider/config/karbour.go similarity index 72% rename from multicluster/clusterprovider/karbour_cluster_manager.go rename to multicluster/clusterprovider/config/karbour.go index 0b00ecb..75924ee 100644 --- a/multicluster/clusterprovider/karbour_cluster_manager.go +++ b/multicluster/clusterprovider/config/karbour.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package clusterprovider +package config import ( "fmt" @@ -26,34 +26,33 @@ import ( clusterv1beta1 "kusionstack.io/kube-api/cluster/v1beta1" ) -var _ ClusterManager = &KarbourClusterManager{} - -type KarbourClusterManager struct { +// Karbour is a implementation of ClusterConfigProvider +type Karbour struct { config *rest.Config } -func (p *KarbourClusterManager) Init(config *rest.Config) { +func (p *Karbour) Init(config *rest.Config) { p.config = config } -func (p *KarbourClusterManager) GetClusterMangementGVR() schema.GroupVersionResource { +func (p *Karbour) GetGVR() schema.GroupVersionResource { return clusterv1beta1.SchemeGroupVersion.WithResource("clusters") } -func (p *KarbourClusterManager) GetClusterName(obj *unstructured.Unstructured) string { +func (p *Karbour) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } return obj.GetName() } -func (p *KarbourClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *Karbour) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { clusterName := p.GetClusterName(obj) if clusterName == "" || p.config == nil { return nil } - gvr := p.GetClusterMangementGVR() + gvr := p.GetGVR() clusterConfig := *p.config clusterConfig.Host = fmt.Sprintf("%s/apis/%s/%s/%s/%s/proxy", clusterConfig.Host, gvr.Group, gvr.Version, gvr.Resource, clusterName) diff --git a/multicluster/clusterprovider/test_cluster_manager.go b/multicluster/clusterprovider/config/simple.go similarity index 62% rename from multicluster/clusterprovider/test_cluster_manager.go rename to multicluster/clusterprovider/config/simple.go index 2f3a93c..fa00cf7 100644 --- a/multicluster/clusterprovider/test_cluster_manager.go +++ b/multicluster/clusterprovider/config/simple.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package clusterprovider +package config import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -22,30 +22,27 @@ import ( "k8s.io/client-go/rest" ) -var _ ClusterManager = &TestClusterManager{} - -// TestClusterManager is a test implementation of ClusterProviderInfo -type TestClusterManager struct { - schema.GroupVersionResource +// Simple is a implementation of ClusterConfigProvider +type Simple struct { + GVR schema.GroupVersionResource ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig } -func (p *TestClusterManager) Init(config *rest.Config) { - // Do nothing +func (p *Simple) Init(config *rest.Config) { } -func (p *TestClusterManager) GetClusterMangementGVR() schema.GroupVersionResource { - return p.GroupVersionResource +func (p *Simple) GetGVR() schema.GroupVersionResource { + return p.GVR } -func (p *TestClusterManager) GetClusterName(obj *unstructured.Unstructured) string { +func (p *Simple) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } - return obj.GetName() // Use resource name as cluster name + return obj.GetName() } -func (p *TestClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *Simple) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { if obj == nil || p.ClusterNameToConfig == nil { return nil } diff --git a/multicluster/clusterprovider/dynamic_cluster_provider.go b/multicluster/clusterprovider/controller.go similarity index 75% rename from multicluster/clusterprovider/dynamic_cluster_provider.go rename to multicluster/clusterprovider/controller.go index ebc3fa4..b82a288 100644 --- a/multicluster/clusterprovider/dynamic_cluster_provider.go +++ b/multicluster/clusterprovider/controller.go @@ -38,18 +38,18 @@ import ( "kusionstack.io/kube-utils/multicluster/metrics" ) -// ClusterManager is used to provide cluster management resource and cluster kubeconfig -type ClusterManager interface { +// ClusterConfigProvider is used to provide cluster management resource and cluster kubeconfig +type ClusterConfigProvider interface { Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster - GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource + GetGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource } -// DynamicClusterProvider is used to manage clusters -type DynamicClusterProvider struct { - config *rest.Config - ClusterManager ClusterManager +// Controller is used to manage clusters +type Controller struct { + config *rest.Config + ClusterConfigProvider ClusterConfigProvider client dynamic.Interface // Client to get cluster info informerFactory dynamicinformer.DynamicSharedInformerFactory @@ -68,34 +68,34 @@ type DynamicClusterProvider struct { log logr.Logger } -type DynamicClusterProviderConfig struct { - Config *rest.Config // Kubeconfig for the fed cluster - ClusterManager ClusterManager - ResyncPeriod time.Duration // Resync period for cluster management - Log logr.Logger +type ControllerConfig struct { + Config *rest.Config // Kubeconfig for the fed cluster + ClusterConfigProvider ClusterConfigProvider + ResyncPeriod time.Duration // Resync period for cluster management + Log logr.Logger } -// NewDynamicClusterProvider creates a new DynamicClusterProvider which will process events about cluster. -func NewDynamicClusterProvider(cfg *DynamicClusterProviderConfig) (*DynamicClusterProvider, error) { +// NewController creates a new Controller which will process events about cluster. +func NewController(cfg *ControllerConfig) (*Controller, error) { client, err := dynamic.NewForConfig(cfg.Config) if err != nil { return nil, err } - if cfg.ClusterManager == nil { - return nil, fmt.Errorf("ClusterManager is required") + if cfg.ClusterConfigProvider == nil { + return nil, fmt.Errorf("ClusterConfigProvider is required") } informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod) - informer := informerFactory.ForResource(cfg.ClusterManager.GetClusterMangementGVR()).Informer() + informer := informerFactory.ForResource(cfg.ClusterConfigProvider.GetGVR()).Informer() - return &DynamicClusterProvider{ - config: cfg.Config, - ClusterManager: cfg.ClusterManager, + return &Controller{ + config: cfg.Config, + ClusterConfigProvider: cfg.ClusterConfigProvider, client: client, informerFactory: informerFactory, informer: informer, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterManager.GetClusterMangementGVR().Resource), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterConfigProvider.GetGVR().Resource), syncedCh: make(chan struct{}), clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name @@ -107,7 +107,7 @@ func NewDynamicClusterProvider(cfg *DynamicClusterProviderConfig) (*DynamicClust // AddEventHandler adds handlers which will be invoked. // When cluster is added or updated, addUpdateHandler will be invoked. // When cluster is deleted, deleteHandler will be invoked. -func (c *DynamicClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { +func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueClusterEvent, UpdateFunc: func(old, new interface{}) { @@ -120,11 +120,11 @@ func (c *DynamicClusterProvider) AddEventHandler(addUpdateHandler func(string, * c.deleteHandler = deleteHandler } -func (c *DynamicClusterProvider) Run(stopCh <-chan struct{}) error { +func (c *Controller) Run(stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() - c.ClusterManager.Init(c.config) + c.ClusterConfigProvider.Init(c.config) c.informerFactory.Start(stopCh) @@ -146,7 +146,7 @@ func (c *DynamicClusterProvider) Run(stopCh <-chan struct{}) error { return nil } -func (c *DynamicClusterProvider) WaitForSynced(ctx context.Context) bool { +func (c *Controller) WaitForSynced(ctx context.Context) bool { select { case <-c.syncedCh: // Wait for all cluster has been processed return true @@ -155,7 +155,7 @@ func (c *DynamicClusterProvider) WaitForSynced(ctx context.Context) bool { } } -func (c *DynamicClusterProvider) enqueueClusterEvent(obj interface{}) { +func (c *Controller) enqueueClusterEvent(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { c.log.Error(err, "failed to get enqueue key") @@ -164,7 +164,7 @@ func (c *DynamicClusterProvider) enqueueClusterEvent(obj interface{}) { c.workqueue.Add(key) } -func (c *DynamicClusterProvider) runWorker() { +func (c *Controller) runWorker() { for c.processNextWorkItem() { c.mutex.Lock() if c.syncedNum > 0 { @@ -177,7 +177,7 @@ func (c *DynamicClusterProvider) runWorker() { } } -func (c *DynamicClusterProvider) processNextWorkItem() bool { +func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false @@ -209,14 +209,14 @@ func (c *DynamicClusterProvider) processNextWorkItem() bool { } // eventHandler is called when an event about cluster is received. -func (c *DynamicClusterProvider) eventHandler(key string) error { +func (c *Controller) eventHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { c.log.Error(err, "failed to split namespaced key", "key", key) return nil } - obj, err := c.client.Resource(c.ClusterManager.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.client.Resource(c.ClusterConfigProvider.GetGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { c.mutex.Lock() @@ -228,7 +228,7 @@ func (c *DynamicClusterProvider) eventHandler(key string) error { } delete(c.namespacedKeyToObj, key) - clusterName := c.ClusterManager.GetClusterName(oldObj) + clusterName := c.ClusterConfigProvider.GetClusterName(oldObj) delete(c.clusterNameToNamespacedKey, clusterName) metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc() @@ -242,11 +242,11 @@ func (c *DynamicClusterProvider) eventHandler(key string) error { c.mutex.Lock() c.namespacedKeyToObj[key] = obj - clusterName := c.ClusterManager.GetClusterName(obj) + clusterName := c.ClusterConfigProvider.GetClusterName(obj) c.clusterNameToNamespacedKey[clusterName] = key c.mutex.Unlock() - err = c.addUpdateHandler(clusterName, c.ClusterManager.GetClusterConfig(obj)) + err = c.addUpdateHandler(clusterName, c.ClusterConfigProvider.GetClusterConfig(obj)) if err != nil { metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc() c.log.Error(err, "failed to add or update cluster", "key", key) diff --git a/multicluster/clusterprovider/static_cluster_provider.go b/multicluster/clusterprovider/simple.go similarity index 73% rename from multicluster/clusterprovider/static_cluster_provider.go rename to multicluster/clusterprovider/simple.go index 327cd40..10c7847 100644 --- a/multicluster/clusterprovider/static_cluster_provider.go +++ b/multicluster/clusterprovider/simple.go @@ -22,19 +22,19 @@ import ( "k8s.io/client-go/rest" ) -// StaticClusterProvider is a controller that manages a static set of clusters. -type StaticClusterProvider struct { +// SimpleClusterProvider is a controller that manages a static set of clusters. +type SimpleClusterProvider struct { clusterToConfig map[string]*rest.Config addUpdateHandler func(string, *rest.Config) error } -func NewStaticClusterProvider(clusterToConfig map[string]*rest.Config) *StaticClusterProvider { - return &StaticClusterProvider{ +func NewSimpleClusterProvider(clusterToConfig map[string]*rest.Config) *SimpleClusterProvider { + return &SimpleClusterProvider{ clusterToConfig: clusterToConfig, } } -func (c *StaticClusterProvider) Run(stopCh <-chan struct{}) error { +func (c *SimpleClusterProvider) Run(stopCh <-chan struct{}) error { if c.addUpdateHandler == nil { return nil } @@ -48,10 +48,10 @@ func (c *StaticClusterProvider) Run(stopCh <-chan struct{}) error { return nil } -func (c *StaticClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { +func (c *SimpleClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.addUpdateHandler = addUpdateHandler } -func (c *StaticClusterProvider) WaitForSynced(ctx context.Context) bool { +func (c *SimpleClusterProvider) WaitForSynced(ctx context.Context) bool { return true } diff --git a/multicluster/manager.go b/multicluster/manager.go index 2a05e1c..6630e9d 100644 --- a/multicluster/manager.go +++ b/multicluster/manager.go @@ -65,9 +65,9 @@ type ClusterProvider interface { } type ManagerConfig struct { - FedConfig *rest.Config - ClusterScheme *runtime.Scheme - ClusterProvider + FedConfig *rest.Config + ClusterScheme *runtime.Scheme + ClusterProvider ClusterProvider ResyncPeriod time.Duration ClusterFilter func(string) bool // select cluster diff --git a/multicluster/suite_test.go b/multicluster/suite_test.go index 8aa2625..7f74554 100644 --- a/multicluster/suite_test.go +++ b/multicluster/suite_test.go @@ -41,6 +41,7 @@ import ( "kusionstack.io/kube-utils/multicluster/clusterinfo" "kusionstack.io/kube-utils/multicluster/clusterprovider" + "kusionstack.io/kube-utils/multicluster/clusterprovider/config" ) var ( @@ -127,11 +128,11 @@ var _ = BeforeSuite(func() { ) os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2") - clusterProvider, err := clusterprovider.NewDynamicClusterProvider(&clusterprovider.DynamicClusterProviderConfig{ + clusterProvider, err := clusterprovider.NewController(&clusterprovider.ControllerConfig{ Config: fedConfig, - ClusterManager: &clusterprovider.TestClusterManager{ - GroupVersionResource: schema.GroupVersionResource{ // Use deployment as cluster management resource + ClusterConfigProvider: &config.Simple{ + GVR: schema.GroupVersionResource{ // Use deployment as cluster management resource Group: "apps", Version: "v1", Resource: "deployments",