Skip to content

Commit

Permalink
add interface ClusterController
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofan-hs committed Mar 12, 2024
1 parent add622a commit f52e02d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 51 deletions.
29 changes: 6 additions & 23 deletions multicluster/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type Controller struct {
syncedNum int // Number of synced cluster
syncedCh chan struct{} // Channel to notify all synced clusters have been processed

addUpdateHandler func(string) error // When cluster is added or updated, this handler will be invoked
deleteHandler func(string) // When cluster is deleted, this handler will be invoked
addUpdateHandler func(string, *rest.Config) error // When cluster is added or updated, this handler will be invoked
deleteHandler func(string) // When cluster is deleted, this handler will be invoked

clusterNameToNamespacedKey map[string]string
namespacedKeyToObj map[string]*unstructured.Unstructured
Expand Down Expand Up @@ -105,7 +105,7 @@ func NewController(cfg *ControllerConfig) (*Controller, error) {
// AddEventHandler adds handlers which will be invoked.
// When cluster is added or updated, addUpdateHandler will be invoked.
// When cluster is deleted, deleteHandler will be invoked.
func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, deleteHandler func(string)) {
func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueClusterEvent,
UpdateFunc: func(old, new interface{}) {
Expand All @@ -118,7 +118,7 @@ func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, delete
c.deleteHandler = deleteHandler
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
func (c *Controller) Run(stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()

Expand All @@ -136,7 +136,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
c.mutex.Unlock()

// Start workers to process cluster events
for i := 0; i < threadiness; i++ {
for i := 0; i < 2; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

Expand Down Expand Up @@ -244,7 +244,7 @@ func (c *Controller) eventHandler(key string) error {
c.clusterNameToNamespacedKey[clusterName] = key
c.mutex.Unlock()

err = c.addUpdateHandler(clusterName)
err = c.addUpdateHandler(clusterName, c.clusterProvider.GetClusterConfig(obj))
if err != nil {
metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc()
c.log.Error(err, "failed to add or update cluster", "key", key)
Expand All @@ -254,20 +254,3 @@ func (c *Controller) eventHandler(key string) error {
metrics.NewClusterEventCountMetrics(key, "add-update", "true").Inc()
return nil
}

// RestConfigForCluster returns the rest config for the mangered cluster.
func (c *Controller) RestConfigForCluster(clusterName string) *rest.Config {
c.mutex.RLock()
defer c.mutex.RUnlock()

namespacedKey, ok := c.clusterNameToNamespacedKey[clusterName]
if !ok {
return nil
}

obj, ok := c.namespacedKeyToObj[namespacedKey]
if !ok {
return nil
}
return c.clusterProvider.GetClusterConfig(obj)
}
40 changes: 17 additions & 23 deletions multicluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cluster"

"kusionstack.io/kube-utils/multicluster/clusterinfo"
"kusionstack.io/kube-utils/multicluster/controller"
)

const (
Expand All @@ -59,10 +58,16 @@ func setOptionsDefaults(opts Options) Options {
return opts
}

type ClusterController interface {
Run(stopCh <-chan struct{}) error // Run the controller
AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) // Add event handler for cluster events
WaitForSynced(ctx context.Context) bool // Wait for all clusters to be synced
}

type ManagerConfig struct {
FedConfig *rest.Config
ClusterScheme *runtime.Scheme
controller.ClusterProvider
ClusterController

ResyncPeriod time.Duration
ClusterFilter func(string) bool // select cluster
Expand All @@ -73,9 +78,10 @@ type Manager struct {
newCache cache.NewCacheFunc // Function to create cache for cluster
clusterScheme *runtime.Scheme // Scheme which is used to create cache for cluster

controller ClusterController // Controller for cluster management

clusterCacheManager ClusterCacheManager
clusterClientManager ClusterClientManager
controller *controller.Controller // Controller for cluster management

resyncPeriod time.Duration
hasCluster map[string]struct{} // Whether cluster has been added
Expand All @@ -86,6 +92,10 @@ type Manager struct {
}

func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFunc cache.NewCacheFunc, newClientFunc cluster.NewClientFunc, err error) {
if cfg.ClusterController == nil {
return nil, nil, nil, errors.New("ClusterController is required")
}

var log logr.Logger
if cfg.Log != nil {
log = cfg.Log
Expand All @@ -97,16 +107,6 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun
cfg.ClusterScheme = scheme.Scheme
}

controller, err := controller.NewController(&controller.ControllerConfig{
Config: cfg.FedConfig,
ClusterProvider: cfg.ClusterProvider,
ResyncPeriod: cfg.ResyncPeriod,
Log: log,
})
if err != nil {
return nil, nil, nil, err
}

opts = setOptionsDefaults(opts)
newCacheFunc, clusterCacheManager := MultiClusterCacheBuilder(log, &opts)
newClientFunc, clusterClientManager := MultiClusterClientBuilder(log)
Expand All @@ -122,7 +122,7 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun

clusterCacheManager: clusterCacheManager,
clusterClientManager: clusterClientManager,
controller: controller,
controller: cfg.ClusterController,

resyncPeriod: cfg.ResyncPeriod,
hasCluster: make(map[string]struct{}),
Expand All @@ -133,23 +133,23 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun
return manager, newCacheFunc, newClientFunc, nil
}

func (m *Manager) Run(threadiness int, ctx context.Context) error {
func (m *Manager) Run(ctx context.Context) error {
stopCh := make(chan struct{})
go func() {
<-ctx.Done()
close(stopCh)
}()

m.controller.AddEventHandler(m.addUpdateHandler, m.deleteHandler)
return m.controller.Run(threadiness, stopCh)
return m.controller.Run(stopCh)
}

func (m *Manager) WaitForSynced(ctx context.Context) bool {
m.log.Info("wait for controller synced")
return m.controller.WaitForSynced(ctx)
}

func (m *Manager) addUpdateHandler(cluster string) (err error) {
func (m *Manager) addUpdateHandler(cluster string, cfg *rest.Config) (err error) {
if m.clusterFilter != nil && !m.clusterFilter(cluster) {
return nil
}
Expand All @@ -162,12 +162,6 @@ func (m *Manager) addUpdateHandler(cluster string) (err error) {
}
m.mutex.RUnlock()

// Get rest.Config for the cluster
cfg := m.controller.RestConfigForCluster(cluster)
if cfg == nil {
return fmt.Errorf("failed to get rest.Config for cluster %s", cluster)
}

// Get MemCacheClient for the cluster
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
Expand Down
19 changes: 14 additions & 5 deletions multicluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/klog/v2/klogr"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -125,10 +126,9 @@ var _ = BeforeSuite(func() {
newClientFunc cluster.NewClientFunc
)
os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2")
manager, newCacheFunc, newClientFunc, err = NewManager(&ManagerConfig{
FedConfig: fedConfig,
ClusterScheme: clusterScheme,
ResyncPeriod: 10 * time.Minute,

clusterController, err := controller.NewController(&controller.ControllerConfig{
Config: fedConfig,

ClusterProvider: &controller.TestClusterProvider{
GroupVersionResource: schema.GroupVersionResource{ // Use deployment as cluster management resource
Expand All @@ -142,6 +142,15 @@ var _ = BeforeSuite(func() {
clusterinfo.Fed: fedConfig,
},
},
Log: klogr.New(),
})
Expect(err).NotTo(HaveOccurred())

manager, newCacheFunc, newClientFunc, err = NewManager(&ManagerConfig{
FedConfig: fedConfig,
ClusterScheme: clusterScheme,
ResyncPeriod: 10 * time.Minute,
ClusterController: clusterController,
}, Options{})
Expect(err).NotTo(HaveOccurred())
Expect(manager).NotTo(BeNil())
Expand All @@ -167,7 +176,7 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
Expect(clusterClient).NotTo(BeNil())

go manager.Run(2, ctx)
go manager.Run(ctx)
})

var _ = AfterSuite(func() {
Expand Down

0 comments on commit f52e02d

Please sign in to comment.