diff --git a/multicluster/controller/karbour_cluster_provider.go b/multicluster/clusterprovider/config/karbour.go similarity index 72% rename from multicluster/controller/karbour_cluster_provider.go rename to multicluster/clusterprovider/config/karbour.go index c8e1b22..75924ee 100644 --- a/multicluster/controller/karbour_cluster_provider.go +++ b/multicluster/clusterprovider/config/karbour.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package config import ( "fmt" @@ -26,34 +26,33 @@ import ( clusterv1beta1 "kusionstack.io/kube-api/cluster/v1beta1" ) -var _ ClusterProvider = &KarbourClusterProvider{} - -type KarbourClusterProvider struct { +// Karbour is a implementation of ClusterConfigProvider +type Karbour struct { config *rest.Config } -func (p *KarbourClusterProvider) Init(config *rest.Config) { +func (p *Karbour) Init(config *rest.Config) { p.config = config } -func (p *KarbourClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource { +func (p *Karbour) GetGVR() schema.GroupVersionResource { return clusterv1beta1.SchemeGroupVersion.WithResource("clusters") } -func (p *KarbourClusterProvider) GetClusterName(obj *unstructured.Unstructured) string { +func (p *Karbour) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } return obj.GetName() } -func (p *KarbourClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *Karbour) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { clusterName := p.GetClusterName(obj) if clusterName == "" || p.config == nil { return nil } - gvr := p.GetClusterMangementGVR() + gvr := p.GetGVR() clusterConfig := *p.config clusterConfig.Host = fmt.Sprintf("%s/apis/%s/%s/%s/%s/proxy", clusterConfig.Host, gvr.Group, gvr.Version, gvr.Resource, clusterName) diff --git a/multicluster/controller/test_cluster_provider.go b/multicluster/clusterprovider/config/simple.go similarity index 64% rename from multicluster/controller/test_cluster_provider.go rename to multicluster/clusterprovider/config/simple.go index 9275d73..fa00cf7 100644 --- a/multicluster/controller/test_cluster_provider.go +++ b/multicluster/clusterprovider/config/simple.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package config import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -22,29 +22,27 @@ import ( "k8s.io/client-go/rest" ) -var _ ClusterProvider = &TestClusterProvider{} - -type TestClusterProvider struct { - schema.GroupVersionResource +// Simple is a implementation of ClusterConfigProvider +type Simple struct { + GVR schema.GroupVersionResource ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig } -func (p *TestClusterProvider) Init(config *rest.Config) { - // Do nothing +func (p *Simple) Init(config *rest.Config) { } -func (p *TestClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource { - return p.GroupVersionResource +func (p *Simple) GetGVR() schema.GroupVersionResource { + return p.GVR } -func (p *TestClusterProvider) GetClusterName(obj *unstructured.Unstructured) string { +func (p *Simple) GetClusterName(obj *unstructured.Unstructured) string { if obj == nil { return "" } - return obj.GetName() // Use resource name as cluster name + return obj.GetName() } -func (p *TestClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { +func (p *Simple) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config { if obj == nil || p.ClusterNameToConfig == nil { return nil } diff --git a/multicluster/controller/controller.go b/multicluster/clusterprovider/controller.go similarity index 78% rename from multicluster/controller/controller.go rename to multicluster/clusterprovider/controller.go index 53a27e0..b82a288 100644 --- a/multicluster/controller/controller.go +++ b/multicluster/clusterprovider/controller.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package controller +package clusterprovider import ( "context" @@ -38,16 +38,18 @@ import ( "kusionstack.io/kube-utils/multicluster/metrics" ) -type ClusterProvider interface { +// ClusterConfigProvider is used to provide cluster management resource and cluster kubeconfig +type ClusterConfigProvider interface { Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster - GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource + GetGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource } +// Controller is used to manage clusters type Controller struct { - config *rest.Config - clusterProvider ClusterProvider + config *rest.Config + ClusterConfigProvider ClusterConfigProvider client dynamic.Interface // Client to get cluster info informerFactory dynamicinformer.DynamicSharedInformerFactory @@ -58,8 +60,8 @@ type Controller struct { syncedNum int // Number of synced cluster syncedCh chan struct{} // Channel to notify all synced clusters have been processed - addUpdateHandler func(string) error // When cluster is added or updated, this handler will be invoked - deleteHandler func(string) // When cluster is deleted, this handler will be invoked + addUpdateHandler func(string, *rest.Config) error // When cluster is added or updated, this handler will be invoked + deleteHandler func(string) // When cluster is deleted, this handler will be invoked clusterNameToNamespacedKey map[string]string namespacedKeyToObj map[string]*unstructured.Unstructured @@ -67,10 +69,10 @@ type Controller struct { } type ControllerConfig struct { - Config *rest.Config // Kubeconfig for the fed cluster - ClusterProvider ClusterProvider - ResyncPeriod time.Duration // Resync period for cluster management - Log logr.Logger + Config *rest.Config // Kubeconfig for the fed cluster + ClusterConfigProvider ClusterConfigProvider + ResyncPeriod time.Duration // Resync period for cluster management + Log logr.Logger } // NewController creates a new Controller which will process events about cluster. @@ -79,21 +81,21 @@ func NewController(cfg *ControllerConfig) (*Controller, error) { if err != nil { return nil, err } - if cfg.ClusterProvider == nil { - return nil, fmt.Errorf("ClusterProvider is required") + if cfg.ClusterConfigProvider == nil { + return nil, fmt.Errorf("ClusterConfigProvider is required") } informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod) - informer := informerFactory.ForResource(cfg.ClusterProvider.GetClusterMangementGVR()).Informer() + informer := informerFactory.ForResource(cfg.ClusterConfigProvider.GetGVR()).Informer() return &Controller{ - config: cfg.Config, - clusterProvider: cfg.ClusterProvider, + config: cfg.Config, + ClusterConfigProvider: cfg.ClusterConfigProvider, client: client, informerFactory: informerFactory, informer: informer, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterProvider.GetClusterMangementGVR().Resource), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterConfigProvider.GetGVR().Resource), syncedCh: make(chan struct{}), clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name @@ -105,7 +107,7 @@ func NewController(cfg *ControllerConfig) (*Controller, error) { // AddEventHandler adds handlers which will be invoked. // When cluster is added or updated, addUpdateHandler will be invoked. // When cluster is deleted, deleteHandler will be invoked. -func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, deleteHandler func(string)) { +func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueClusterEvent, UpdateFunc: func(old, new interface{}) { @@ -118,11 +120,11 @@ func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, delete c.deleteHandler = deleteHandler } -func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { +func (c *Controller) Run(stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() - c.clusterProvider.Init(c.config) + c.ClusterConfigProvider.Init(c.config) c.informerFactory.Start(stopCh) @@ -136,7 +138,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { c.mutex.Unlock() // Start workers to process cluster events - for i := 0; i < threadiness; i++ { + for i := 0; i < 2; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } @@ -214,7 +216,7 @@ func (c *Controller) eventHandler(key string) error { return nil } - obj, err := c.client.Resource(c.clusterProvider.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) + obj, err := c.client.Resource(c.ClusterConfigProvider.GetGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { c.mutex.Lock() @@ -226,7 +228,7 @@ func (c *Controller) eventHandler(key string) error { } delete(c.namespacedKeyToObj, key) - clusterName := c.clusterProvider.GetClusterName(oldObj) + clusterName := c.ClusterConfigProvider.GetClusterName(oldObj) delete(c.clusterNameToNamespacedKey, clusterName) metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc() @@ -240,11 +242,11 @@ func (c *Controller) eventHandler(key string) error { c.mutex.Lock() c.namespacedKeyToObj[key] = obj - clusterName := c.clusterProvider.GetClusterName(obj) + clusterName := c.ClusterConfigProvider.GetClusterName(obj) c.clusterNameToNamespacedKey[clusterName] = key c.mutex.Unlock() - err = c.addUpdateHandler(clusterName) + err = c.addUpdateHandler(clusterName, c.ClusterConfigProvider.GetClusterConfig(obj)) if err != nil { metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc() c.log.Error(err, "failed to add or update cluster", "key", key) @@ -254,20 +256,3 @@ func (c *Controller) eventHandler(key string) error { metrics.NewClusterEventCountMetrics(key, "add-update", "true").Inc() return nil } - -// RestConfigForCluster returns the rest config for the mangered cluster. -func (c *Controller) RestConfigForCluster(clusterName string) *rest.Config { - c.mutex.RLock() - defer c.mutex.RUnlock() - - namespacedKey, ok := c.clusterNameToNamespacedKey[clusterName] - if !ok { - return nil - } - - obj, ok := c.namespacedKeyToObj[namespacedKey] - if !ok { - return nil - } - return c.clusterProvider.GetClusterConfig(obj) -} diff --git a/multicluster/clusterprovider/simple.go b/multicluster/clusterprovider/simple.go new file mode 100644 index 0000000..10c7847 --- /dev/null +++ b/multicluster/clusterprovider/simple.go @@ -0,0 +1,57 @@ +/** + * Copyright 2024 KusionStack Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package clusterprovider + +import ( + "context" + + "k8s.io/client-go/rest" +) + +// SimpleClusterProvider is a controller that manages a static set of clusters. +type SimpleClusterProvider struct { + clusterToConfig map[string]*rest.Config + addUpdateHandler func(string, *rest.Config) error +} + +func NewSimpleClusterProvider(clusterToConfig map[string]*rest.Config) *SimpleClusterProvider { + return &SimpleClusterProvider{ + clusterToConfig: clusterToConfig, + } +} + +func (c *SimpleClusterProvider) Run(stopCh <-chan struct{}) error { + if c.addUpdateHandler == nil { + return nil + } + + for cluster, config := range c.clusterToConfig { + if err := c.addUpdateHandler(cluster, config); err != nil { + return err + } + } + + return nil +} + +func (c *SimpleClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) { + c.addUpdateHandler = addUpdateHandler +} + +func (c *SimpleClusterProvider) WaitForSynced(ctx context.Context) bool { + return true +} diff --git a/multicluster/manager.go b/multicluster/manager.go index 176f003..6630e9d 100644 --- a/multicluster/manager.go +++ b/multicluster/manager.go @@ -39,7 +39,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cluster" "kusionstack.io/kube-utils/multicluster/clusterinfo" - "kusionstack.io/kube-utils/multicluster/controller" ) const ( @@ -59,10 +58,16 @@ func setOptionsDefaults(opts Options) Options { return opts } +type ClusterProvider interface { + Run(stopCh <-chan struct{}) error // Run the ClusterProvider + AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) // Add event handler for cluster events + WaitForSynced(ctx context.Context) bool // Wait for all clusters to be synced +} + type ManagerConfig struct { - FedConfig *rest.Config - ClusterScheme *runtime.Scheme - controller.ClusterProvider + FedConfig *rest.Config + ClusterScheme *runtime.Scheme + ClusterProvider ClusterProvider ResyncPeriod time.Duration ClusterFilter func(string) bool // select cluster @@ -73,9 +78,10 @@ type Manager struct { newCache cache.NewCacheFunc // Function to create cache for cluster clusterScheme *runtime.Scheme // Scheme which is used to create cache for cluster + clusterProvider ClusterProvider + clusterCacheManager ClusterCacheManager clusterClientManager ClusterClientManager - controller *controller.Controller // Controller for cluster management resyncPeriod time.Duration hasCluster map[string]struct{} // Whether cluster has been added @@ -86,6 +92,10 @@ type Manager struct { } func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFunc cache.NewCacheFunc, newClientFunc cluster.NewClientFunc, err error) { + if cfg.ClusterProvider == nil { + return nil, nil, nil, errors.New("ClusterProvider is required") + } + var log logr.Logger if cfg.Log != nil { log = cfg.Log @@ -97,16 +107,6 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun cfg.ClusterScheme = scheme.Scheme } - controller, err := controller.NewController(&controller.ControllerConfig{ - Config: cfg.FedConfig, - ClusterProvider: cfg.ClusterProvider, - ResyncPeriod: cfg.ResyncPeriod, - Log: log, - }) - if err != nil { - return nil, nil, nil, err - } - opts = setOptionsDefaults(opts) newCacheFunc, clusterCacheManager := MultiClusterCacheBuilder(log, &opts) newClientFunc, clusterClientManager := MultiClusterClientBuilder(log) @@ -122,7 +122,7 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun clusterCacheManager: clusterCacheManager, clusterClientManager: clusterClientManager, - controller: controller, + clusterProvider: cfg.ClusterProvider, resyncPeriod: cfg.ResyncPeriod, hasCluster: make(map[string]struct{}), @@ -133,23 +133,23 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun return manager, newCacheFunc, newClientFunc, nil } -func (m *Manager) Run(threadiness int, ctx context.Context) error { +func (m *Manager) Run(ctx context.Context) error { stopCh := make(chan struct{}) go func() { <-ctx.Done() close(stopCh) }() - m.controller.AddEventHandler(m.addUpdateHandler, m.deleteHandler) - return m.controller.Run(threadiness, stopCh) + m.clusterProvider.AddEventHandler(m.addUpdateHandler, m.deleteHandler) + return m.clusterProvider.Run(stopCh) } func (m *Manager) WaitForSynced(ctx context.Context) bool { - m.log.Info("wait for controller synced") - return m.controller.WaitForSynced(ctx) + m.log.Info("wait for ClusterProvider synced") + return m.clusterProvider.WaitForSynced(ctx) } -func (m *Manager) addUpdateHandler(cluster string) (err error) { +func (m *Manager) addUpdateHandler(cluster string, cfg *rest.Config) (err error) { if m.clusterFilter != nil && !m.clusterFilter(cluster) { return nil } @@ -162,12 +162,6 @@ func (m *Manager) addUpdateHandler(cluster string) (err error) { } m.mutex.RUnlock() - // Get rest.Config for the cluster - cfg := m.controller.RestConfigForCluster(cluster) - if cfg == nil { - return fmt.Errorf("failed to get rest.Config for cluster %s", cluster) - } - // Get MemCacheClient for the cluster clientset, err := kubernetes.NewForConfig(cfg) if err != nil { diff --git a/multicluster/suite_test.go b/multicluster/suite_test.go index 01f2fd3..7f74554 100644 --- a/multicluster/suite_test.go +++ b/multicluster/suite_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" + "k8s.io/klog/v2/klogr" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -39,7 +40,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" "kusionstack.io/kube-utils/multicluster/clusterinfo" - "kusionstack.io/kube-utils/multicluster/controller" + "kusionstack.io/kube-utils/multicluster/clusterprovider" + "kusionstack.io/kube-utils/multicluster/clusterprovider/config" ) var ( @@ -125,13 +127,12 @@ var _ = BeforeSuite(func() { newClientFunc cluster.NewClientFunc ) os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2") - manager, newCacheFunc, newClientFunc, err = NewManager(&ManagerConfig{ - FedConfig: fedConfig, - ClusterScheme: clusterScheme, - ResyncPeriod: 10 * time.Minute, - ClusterProvider: &controller.TestClusterProvider{ - GroupVersionResource: schema.GroupVersionResource{ // Use deployment as cluster management resource + clusterProvider, err := clusterprovider.NewController(&clusterprovider.ControllerConfig{ + Config: fedConfig, + + ClusterConfigProvider: &config.Simple{ + GVR: schema.GroupVersionResource{ // Use deployment as cluster management resource Group: "apps", Version: "v1", Resource: "deployments", @@ -142,6 +143,15 @@ var _ = BeforeSuite(func() { clusterinfo.Fed: fedConfig, }, }, + Log: klogr.New(), + }) + Expect(err).NotTo(HaveOccurred()) + + manager, newCacheFunc, newClientFunc, err = NewManager(&ManagerConfig{ + FedConfig: fedConfig, + ClusterScheme: clusterScheme, + ResyncPeriod: 10 * time.Minute, + ClusterProvider: clusterProvider, }, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(manager).NotTo(BeNil()) @@ -167,7 +177,7 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(clusterClient).NotTo(BeNil()) - go manager.Run(2, ctx) + go manager.Run(ctx) }) var _ = AfterSuite(func() {