Skip to content

Commit

Permalink
refactor, cluster provider
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofan-hs committed Mar 21, 2024
1 parent 05d94c2 commit 0b6fd4e
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package controller
package clusterprovider

import (
"context"
Expand All @@ -38,18 +38,18 @@ import (
"kusionstack.io/kube-utils/multicluster/metrics"
)

// ClusterProvider is used to provide cluster management resource and cluster kubeconfig
type ClusterProvider interface {
// ClusterManager is used to provide cluster management resource and cluster kubeconfig
type ClusterManager interface {
Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster
GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource
GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster
GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource
}

// Controller is used to manage clusters
type Controller struct {
config *rest.Config
clusterProvider ClusterProvider
// DynamicClusterProvider is used to manage clusters
type DynamicClusterProvider struct {
config *rest.Config
ClusterManager ClusterManager

client dynamic.Interface // Client to get cluster info
informerFactory dynamicinformer.DynamicSharedInformerFactory
Expand All @@ -68,34 +68,34 @@ type Controller struct {
log logr.Logger
}

type ControllerConfig struct {
Config *rest.Config // Kubeconfig for the fed cluster
ClusterProvider ClusterProvider
ResyncPeriod time.Duration // Resync period for cluster management
Log logr.Logger
type DynamicClusterProviderConfig struct {
Config *rest.Config // Kubeconfig for the fed cluster
ClusterManager ClusterManager
ResyncPeriod time.Duration // Resync period for cluster management
Log logr.Logger
}

// NewController creates a new Controller which will process events about cluster.
func NewController(cfg *ControllerConfig) (*Controller, error) {
// NewDynamicClusterProvider creates a new DynamicClusterProvider which will process events about cluster.
func NewDynamicClusterProvider(cfg *DynamicClusterProviderConfig) (*DynamicClusterProvider, error) {
client, err := dynamic.NewForConfig(cfg.Config)
if err != nil {
return nil, err
}
if cfg.ClusterProvider == nil {
return nil, fmt.Errorf("ClusterProvider is required")
if cfg.ClusterManager == nil {
return nil, fmt.Errorf("ClusterManager is required")
}

informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod)
informer := informerFactory.ForResource(cfg.ClusterProvider.GetClusterMangementGVR()).Informer()
informer := informerFactory.ForResource(cfg.ClusterManager.GetClusterMangementGVR()).Informer()

return &Controller{
config: cfg.Config,
clusterProvider: cfg.ClusterProvider,
return &DynamicClusterProvider{
config: cfg.Config,
ClusterManager: cfg.ClusterManager,

client: client,
informerFactory: informerFactory,
informer: informer,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterProvider.GetClusterMangementGVR().Resource),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterManager.GetClusterMangementGVR().Resource),
syncedCh: make(chan struct{}),

clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name
Expand All @@ -107,7 +107,7 @@ func NewController(cfg *ControllerConfig) (*Controller, error) {
// AddEventHandler adds handlers which will be invoked.
// When cluster is added or updated, addUpdateHandler will be invoked.
// When cluster is deleted, deleteHandler will be invoked.
func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
func (c *DynamicClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueClusterEvent,
UpdateFunc: func(old, new interface{}) {
Expand All @@ -120,11 +120,11 @@ func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config)
c.deleteHandler = deleteHandler
}

func (c *Controller) Run(stopCh <-chan struct{}) error {
func (c *DynamicClusterProvider) Run(stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()

c.clusterProvider.Init(c.config)
c.ClusterManager.Init(c.config)

c.informerFactory.Start(stopCh)

Expand All @@ -146,7 +146,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
return nil
}

func (c *Controller) WaitForSynced(ctx context.Context) bool {
func (c *DynamicClusterProvider) WaitForSynced(ctx context.Context) bool {
select {
case <-c.syncedCh: // Wait for all cluster has been processed
return true
Expand All @@ -155,7 +155,7 @@ func (c *Controller) WaitForSynced(ctx context.Context) bool {
}
}

func (c *Controller) enqueueClusterEvent(obj interface{}) {
func (c *DynamicClusterProvider) enqueueClusterEvent(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
c.log.Error(err, "failed to get enqueue key")
Expand All @@ -164,7 +164,7 @@ func (c *Controller) enqueueClusterEvent(obj interface{}) {
c.workqueue.Add(key)
}

func (c *Controller) runWorker() {
func (c *DynamicClusterProvider) runWorker() {
for c.processNextWorkItem() {
c.mutex.Lock()
if c.syncedNum > 0 {
Expand All @@ -177,7 +177,7 @@ func (c *Controller) runWorker() {
}
}

func (c *Controller) processNextWorkItem() bool {
func (c *DynamicClusterProvider) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
Expand Down Expand Up @@ -209,14 +209,14 @@ func (c *Controller) processNextWorkItem() bool {
}

// eventHandler is called when an event about cluster is received.
func (c *Controller) eventHandler(key string) error {
func (c *DynamicClusterProvider) eventHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.log.Error(err, "failed to split namespaced key", "key", key)
return nil
}

obj, err := c.client.Resource(c.clusterProvider.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
obj, err := c.client.Resource(c.ClusterManager.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
c.mutex.Lock()
Expand All @@ -228,7 +228,7 @@ func (c *Controller) eventHandler(key string) error {
}
delete(c.namespacedKeyToObj, key)

clusterName := c.clusterProvider.GetClusterName(oldObj)
clusterName := c.ClusterManager.GetClusterName(oldObj)
delete(c.clusterNameToNamespacedKey, clusterName)

metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc()
Expand All @@ -242,11 +242,11 @@ func (c *Controller) eventHandler(key string) error {

c.mutex.Lock()
c.namespacedKeyToObj[key] = obj
clusterName := c.clusterProvider.GetClusterName(obj)
clusterName := c.ClusterManager.GetClusterName(obj)
c.clusterNameToNamespacedKey[clusterName] = key
c.mutex.Unlock()

err = c.addUpdateHandler(clusterName, c.clusterProvider.GetClusterConfig(obj))
err = c.addUpdateHandler(clusterName, c.ClusterManager.GetClusterConfig(obj))
if err != nil {
metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc()
c.log.Error(err, "failed to add or update cluster", "key", key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package controller
package clusterprovider

import (
"fmt"
Expand All @@ -26,28 +26,28 @@ import (
clusterv1beta1 "kusionstack.io/kube-api/cluster/v1beta1"
)

var _ ClusterProvider = &KarbourClusterProvider{}
var _ ClusterManager = &KarbourClusterManager{}

type KarbourClusterProvider struct {
type KarbourClusterManager struct {
config *rest.Config
}

func (p *KarbourClusterProvider) Init(config *rest.Config) {
func (p *KarbourClusterManager) Init(config *rest.Config) {
p.config = config
}

func (p *KarbourClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource {
func (p *KarbourClusterManager) GetClusterMangementGVR() schema.GroupVersionResource {
return clusterv1beta1.SchemeGroupVersion.WithResource("clusters")
}

func (p *KarbourClusterProvider) GetClusterName(obj *unstructured.Unstructured) string {
func (p *KarbourClusterManager) GetClusterName(obj *unstructured.Unstructured) string {
if obj == nil {
return ""
}
return obj.GetName()
}

func (p *KarbourClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
func (p *KarbourClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
clusterName := p.GetClusterName(obj)
if clusterName == "" || p.config == nil {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@
* limitations under the License.
*/

package controller
package clusterprovider

import (
"context"

"k8s.io/client-go/rest"
)

// StaticController is a controller that manages a static set of clusters.
type StaticController struct {
// StaticClusterProvider is a controller that manages a static set of clusters.
type StaticClusterProvider struct {
clusterToConfig map[string]*rest.Config
addUpdateHandler func(string, *rest.Config) error
}

func NewStaticController(clusterToConfig map[string]*rest.Config) *StaticController {
return &StaticController{
func NewStaticClusterProvider(clusterToConfig map[string]*rest.Config) *StaticClusterProvider {
return &StaticClusterProvider{
clusterToConfig: clusterToConfig,
}
}

func (c *StaticController) Run(stopCh <-chan struct{}) error {
func (c *StaticClusterProvider) Run(stopCh <-chan struct{}) error {
if c.addUpdateHandler == nil {
return nil
}
Expand All @@ -48,10 +48,10 @@ func (c *StaticController) Run(stopCh <-chan struct{}) error {
return nil
}

func (c *StaticController) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
func (c *StaticClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
c.addUpdateHandler = addUpdateHandler
}

func (c *StaticController) WaitForSynced(ctx context.Context) bool {
func (c *StaticClusterProvider) WaitForSynced(ctx context.Context) bool {
return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,38 @@
* limitations under the License.
*/

package controller
package clusterprovider

import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
)

var _ ClusterProvider = &TestClusterProvider{}
var _ ClusterManager = &TestClusterManager{}

// TestClusterProvider is a test implementation of ClusterProvider
type TestClusterProvider struct {
// TestClusterManager is a test implementation of ClusterProviderInfo
type TestClusterManager struct {
schema.GroupVersionResource
ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig
}

func (p *TestClusterProvider) Init(config *rest.Config) {
func (p *TestClusterManager) Init(config *rest.Config) {
// Do nothing
}

func (p *TestClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource {
func (p *TestClusterManager) GetClusterMangementGVR() schema.GroupVersionResource {
return p.GroupVersionResource
}

func (p *TestClusterProvider) GetClusterName(obj *unstructured.Unstructured) string {
func (p *TestClusterManager) GetClusterName(obj *unstructured.Unstructured) string {
if obj == nil {
return ""
}
return obj.GetName() // Use resource name as cluster name
}

func (p *TestClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
func (p *TestClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
if obj == nil || p.ClusterNameToConfig == nil {
return nil
}
Expand Down
22 changes: 11 additions & 11 deletions multicluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ func setOptionsDefaults(opts Options) Options {
return opts
}

type ClusterController interface {
Run(stopCh <-chan struct{}) error // Run the controller
type ClusterProvider interface {
Run(stopCh <-chan struct{}) error // Run the ClusterProvider
AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) // Add event handler for cluster events
WaitForSynced(ctx context.Context) bool // Wait for all clusters to be synced
}

type ManagerConfig struct {
FedConfig *rest.Config
ClusterScheme *runtime.Scheme
ClusterController
ClusterProvider

ResyncPeriod time.Duration
ClusterFilter func(string) bool // select cluster
Expand All @@ -78,7 +78,7 @@ type Manager struct {
newCache cache.NewCacheFunc // Function to create cache for cluster
clusterScheme *runtime.Scheme // Scheme which is used to create cache for cluster

controller ClusterController // Controller for cluster management
clusterProvider ClusterProvider // Controller for cluster management

clusterCacheManager ClusterCacheManager
clusterClientManager ClusterClientManager
Expand All @@ -92,8 +92,8 @@ type Manager struct {
}

func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFunc cache.NewCacheFunc, newClientFunc cluster.NewClientFunc, err error) {
if cfg.ClusterController == nil {
return nil, nil, nil, errors.New("ClusterController is required")
if cfg.ClusterProvider == nil {
return nil, nil, nil, errors.New("ClusterProvider is required")
}

var log logr.Logger
Expand Down Expand Up @@ -122,7 +122,7 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun

clusterCacheManager: clusterCacheManager,
clusterClientManager: clusterClientManager,
controller: cfg.ClusterController,
clusterProvider: cfg.ClusterProvider,

resyncPeriod: cfg.ResyncPeriod,
hasCluster: make(map[string]struct{}),
Expand All @@ -140,13 +140,13 @@ func (m *Manager) Run(ctx context.Context) error {
close(stopCh)
}()

m.controller.AddEventHandler(m.addUpdateHandler, m.deleteHandler)
return m.controller.Run(stopCh)
m.clusterProvider.AddEventHandler(m.addUpdateHandler, m.deleteHandler)
return m.clusterProvider.Run(stopCh)
}

func (m *Manager) WaitForSynced(ctx context.Context) bool {
m.log.Info("wait for controller synced")
return m.controller.WaitForSynced(ctx)
m.log.Info("wait for ClusterProvider synced")
return m.clusterProvider.WaitForSynced(ctx)
}

func (m *Manager) addUpdateHandler(cluster string, cfg *rest.Config) (err error) {
Expand Down
Loading

0 comments on commit 0b6fd4e

Please sign in to comment.