Skip to content

Commit

Permalink
feat: add interface ClusterProvider (#37)
Browse files Browse the repository at this point in the history
* add interface ClusterController

* add a static cluster controller

* refactor, cluster provider

* fix
  • Loading branch information
shaofan-hs authored Mar 25, 2024
1 parent 6ad0ec7 commit 5c7cfd0
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package controller
package config

import (
"fmt"
Expand All @@ -26,34 +26,33 @@ import (
clusterv1beta1 "kusionstack.io/kube-api/cluster/v1beta1"
)

var _ ClusterProvider = &KarbourClusterProvider{}

type KarbourClusterProvider struct {
// Karbour is a implementation of ClusterConfigProvider
type Karbour struct {
config *rest.Config
}

func (p *KarbourClusterProvider) Init(config *rest.Config) {
func (p *Karbour) Init(config *rest.Config) {
p.config = config
}

func (p *KarbourClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource {
func (p *Karbour) GetGVR() schema.GroupVersionResource {
return clusterv1beta1.SchemeGroupVersion.WithResource("clusters")
}

func (p *KarbourClusterProvider) GetClusterName(obj *unstructured.Unstructured) string {
func (p *Karbour) GetClusterName(obj *unstructured.Unstructured) string {
if obj == nil {
return ""
}
return obj.GetName()
}

func (p *KarbourClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
func (p *Karbour) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
clusterName := p.GetClusterName(obj)
if clusterName == "" || p.config == nil {
return nil
}

gvr := p.GetClusterMangementGVR()
gvr := p.GetGVR()

clusterConfig := *p.config
clusterConfig.Host = fmt.Sprintf("%s/apis/%s/%s/%s/%s/proxy", clusterConfig.Host, gvr.Group, gvr.Version, gvr.Resource, clusterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,35 @@
* limitations under the License.
*/

package controller
package config

import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
)

var _ ClusterProvider = &TestClusterProvider{}

type TestClusterProvider struct {
schema.GroupVersionResource
// Simple is a implementation of ClusterConfigProvider
type Simple struct {
GVR schema.GroupVersionResource
ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig
}

func (p *TestClusterProvider) Init(config *rest.Config) {
// Do nothing
func (p *Simple) Init(config *rest.Config) {
}

func (p *TestClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource {
return p.GroupVersionResource
func (p *Simple) GetGVR() schema.GroupVersionResource {
return p.GVR
}

func (p *TestClusterProvider) GetClusterName(obj *unstructured.Unstructured) string {
func (p *Simple) GetClusterName(obj *unstructured.Unstructured) string {
if obj == nil {
return ""
}
return obj.GetName() // Use resource name as cluster name
return obj.GetName()
}

func (p *TestClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
func (p *Simple) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
if obj == nil || p.ClusterNameToConfig == nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package controller
package clusterprovider

import (
"context"
Expand All @@ -38,16 +38,18 @@ import (
"kusionstack.io/kube-utils/multicluster/metrics"
)

type ClusterProvider interface {
// ClusterConfigProvider is used to provide cluster management resource and cluster kubeconfig
type ClusterConfigProvider interface {
Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster
GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource
GetGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource
GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster
GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource
}

// Controller is used to manage clusters
type Controller struct {
config *rest.Config
clusterProvider ClusterProvider
config *rest.Config
ClusterConfigProvider ClusterConfigProvider

client dynamic.Interface // Client to get cluster info
informerFactory dynamicinformer.DynamicSharedInformerFactory
Expand All @@ -58,19 +60,19 @@ type Controller struct {
syncedNum int // Number of synced cluster
syncedCh chan struct{} // Channel to notify all synced clusters have been processed

addUpdateHandler func(string) error // When cluster is added or updated, this handler will be invoked
deleteHandler func(string) // When cluster is deleted, this handler will be invoked
addUpdateHandler func(string, *rest.Config) error // When cluster is added or updated, this handler will be invoked
deleteHandler func(string) // When cluster is deleted, this handler will be invoked

clusterNameToNamespacedKey map[string]string
namespacedKeyToObj map[string]*unstructured.Unstructured
log logr.Logger
}

type ControllerConfig struct {
Config *rest.Config // Kubeconfig for the fed cluster
ClusterProvider ClusterProvider
ResyncPeriod time.Duration // Resync period for cluster management
Log logr.Logger
Config *rest.Config // Kubeconfig for the fed cluster
ClusterConfigProvider ClusterConfigProvider
ResyncPeriod time.Duration // Resync period for cluster management
Log logr.Logger
}

// NewController creates a new Controller which will process events about cluster.
Expand All @@ -79,21 +81,21 @@ func NewController(cfg *ControllerConfig) (*Controller, error) {
if err != nil {
return nil, err
}
if cfg.ClusterProvider == nil {
return nil, fmt.Errorf("ClusterProvider is required")
if cfg.ClusterConfigProvider == nil {
return nil, fmt.Errorf("ClusterConfigProvider is required")
}

informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod)
informer := informerFactory.ForResource(cfg.ClusterProvider.GetClusterMangementGVR()).Informer()
informer := informerFactory.ForResource(cfg.ClusterConfigProvider.GetGVR()).Informer()

return &Controller{
config: cfg.Config,
clusterProvider: cfg.ClusterProvider,
config: cfg.Config,
ClusterConfigProvider: cfg.ClusterConfigProvider,

client: client,
informerFactory: informerFactory,
informer: informer,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterProvider.GetClusterMangementGVR().Resource),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterConfigProvider.GetGVR().Resource),
syncedCh: make(chan struct{}),

clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name
Expand All @@ -105,7 +107,7 @@ func NewController(cfg *ControllerConfig) (*Controller, error) {
// AddEventHandler adds handlers which will be invoked.
// When cluster is added or updated, addUpdateHandler will be invoked.
// When cluster is deleted, deleteHandler will be invoked.
func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, deleteHandler func(string)) {
func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueClusterEvent,
UpdateFunc: func(old, new interface{}) {
Expand All @@ -118,11 +120,11 @@ func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, delete
c.deleteHandler = deleteHandler
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
func (c *Controller) Run(stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()

c.clusterProvider.Init(c.config)
c.ClusterConfigProvider.Init(c.config)

c.informerFactory.Start(stopCh)

Expand All @@ -136,7 +138,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
c.mutex.Unlock()

// Start workers to process cluster events
for i := 0; i < threadiness; i++ {
for i := 0; i < 2; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

Expand Down Expand Up @@ -214,7 +216,7 @@ func (c *Controller) eventHandler(key string) error {
return nil
}

obj, err := c.client.Resource(c.clusterProvider.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
obj, err := c.client.Resource(c.ClusterConfigProvider.GetGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
c.mutex.Lock()
Expand All @@ -226,7 +228,7 @@ func (c *Controller) eventHandler(key string) error {
}
delete(c.namespacedKeyToObj, key)

clusterName := c.clusterProvider.GetClusterName(oldObj)
clusterName := c.ClusterConfigProvider.GetClusterName(oldObj)
delete(c.clusterNameToNamespacedKey, clusterName)

metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc()
Expand All @@ -240,11 +242,11 @@ func (c *Controller) eventHandler(key string) error {

c.mutex.Lock()
c.namespacedKeyToObj[key] = obj
clusterName := c.clusterProvider.GetClusterName(obj)
clusterName := c.ClusterConfigProvider.GetClusterName(obj)
c.clusterNameToNamespacedKey[clusterName] = key
c.mutex.Unlock()

err = c.addUpdateHandler(clusterName)
err = c.addUpdateHandler(clusterName, c.ClusterConfigProvider.GetClusterConfig(obj))
if err != nil {
metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc()
c.log.Error(err, "failed to add or update cluster", "key", key)
Expand All @@ -254,20 +256,3 @@ func (c *Controller) eventHandler(key string) error {
metrics.NewClusterEventCountMetrics(key, "add-update", "true").Inc()
return nil
}

// RestConfigForCluster returns the rest config for the mangered cluster.
func (c *Controller) RestConfigForCluster(clusterName string) *rest.Config {
c.mutex.RLock()
defer c.mutex.RUnlock()

namespacedKey, ok := c.clusterNameToNamespacedKey[clusterName]
if !ok {
return nil
}

obj, ok := c.namespacedKeyToObj[namespacedKey]
if !ok {
return nil
}
return c.clusterProvider.GetClusterConfig(obj)
}
57 changes: 57 additions & 0 deletions multicluster/clusterprovider/simple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright 2024 KusionStack Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package clusterprovider

import (
"context"

"k8s.io/client-go/rest"
)

// SimpleClusterProvider is a controller that manages a static set of clusters.
type SimpleClusterProvider struct {
clusterToConfig map[string]*rest.Config
addUpdateHandler func(string, *rest.Config) error
}

func NewSimpleClusterProvider(clusterToConfig map[string]*rest.Config) *SimpleClusterProvider {
return &SimpleClusterProvider{
clusterToConfig: clusterToConfig,
}
}

func (c *SimpleClusterProvider) Run(stopCh <-chan struct{}) error {
if c.addUpdateHandler == nil {
return nil
}

for cluster, config := range c.clusterToConfig {
if err := c.addUpdateHandler(cluster, config); err != nil {
return err
}
}

return nil
}

func (c *SimpleClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
c.addUpdateHandler = addUpdateHandler
}

func (c *SimpleClusterProvider) WaitForSynced(ctx context.Context) bool {
return true
}
Loading

0 comments on commit 5c7cfd0

Please sign in to comment.