Skip to content

Commit

Permalink
refactor, cluster management
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofan-hs committed Feb 22, 2024
1 parent f1269df commit 7d57a89
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 152 deletions.
5 changes: 3 additions & 2 deletions multicluster/clusterinfo/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ type contextKey string
const (
clusterInfo contextKey = "clusterInfo"

ClusterLabelKey = "kusionstack.io/cluster"
ClusterLabelKey = "kusionstack.io/cluster" // Label key for cluster name that will be attached when use Client or Cache to read

EnvClusterAllowList = "CLUSTER_ALLOW_LIST"
EnvClusterAllowList = "CLUSTER_ALLOW_LIST" // Comma separated list of cluster names that are allowed to be accessed
EnvClusterBlockList = "CLUSTER_BLOCK_LIST" // Comma separated list of cluster names that are blocked to be accessed
)

const (
Expand Down
163 changes: 87 additions & 76 deletions multicluster/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -37,94 +38,80 @@ import (
"kusionstack.io/kube-utils/multicluster/metrics"
)

type ClusterManagementType string

const (
OpenClusterManagement ClusterManagementType = "OpenClusterManagement"
TestCluterManagement ClusterManagementType = "TestCluterManagement"
)
type ClusterProvider interface {
Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster
GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource
GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster
GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource
}

type Controller struct {
config *rest.Config
clusterManagementType ClusterManagementType
clusterManagementGVR schema.GroupVersionResource
config *rest.Config
clusterProvider ClusterProvider

client dynamic.Interface // client to get cluster info
client dynamic.Interface // Client to get cluster info
informerFactory dynamicinformer.DynamicSharedInformerFactory
informer cache.SharedIndexInformer
workqueue workqueue.RateLimitingInterface

mutex sync.Mutex
syncedNum int // number of synced cluster
syncedCh chan struct{}
mutex sync.RWMutex
syncedNum int // Number of synced cluster
syncedCh chan struct{} // Channel to notify all synced clusters have been processed

addUpdateHandler func(string) error // when cluster is added or updated, this handler will be called
deleteHandler func(string) // when cluster is deleted, this handler will be called
log logr.Logger
addUpdateHandler func(string) error // When cluster is added or updated, this handler will be invoked
deleteHandler func(string) // When cluster is deleted, this handler will be invoked

// for test
restConfigForCluster func(cluster string) *rest.Config
clusterNameToNamespacedKey map[string]string
namespacedKeyToObj map[string]*unstructured.Unstructured
log logr.Logger
}

type ControllerConfig struct {
Config *rest.Config // config for cluster management
ClusterManagementType ClusterManagementType
ClusterManagementGVR *schema.GroupVersionResource
ResyncPeriod time.Duration // resync period for cluster management
Log logr.Logger

// for test
RestConfigForCluster func(cluster string) *rest.Config
Config *rest.Config // Kubeconfig for the fed cluster
ClusterProvider ClusterProvider
ResyncPeriod time.Duration // Resync period for cluster management
Log logr.Logger
}

// NewController creates a new Controller which will process events about cluster.
func NewController(cfg *ControllerConfig) (*Controller, error) {
var clusterManagementGVR schema.GroupVersionResource
switch cfg.ClusterManagementType {
case OpenClusterManagement:
if cfg.ClusterManagementGVR == nil {
return nil, fmt.Errorf("ClusterManagementGVR must be set when use %s", cfg.ClusterManagementType)
}
clusterManagementGVR = *cfg.ClusterManagementGVR
case TestCluterManagement:
if cfg.ClusterManagementGVR == nil || cfg.RestConfigForCluster == nil {
return nil, fmt.Errorf("ClusterManagementGVR and RestConfigForCluster must be set when use %s", cfg.ClusterManagementType)
}
clusterManagementGVR = *cfg.ClusterManagementGVR
default:
return nil, fmt.Errorf("not support cluster management type: %v", cfg.ClusterManagementType)
}

client, err := dynamic.NewForConfig(cfg.Config)
if err != nil {
return nil, err
}
if cfg.ClusterProvider == nil {
return nil, fmt.Errorf("ClusterProvider is required")
}

informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod)
informer := informerFactory.ForResource(clusterManagementGVR).Informer()
informer := informerFactory.ForResource(cfg.ClusterProvider.GetClusterMangementGVR()).Informer()

return &Controller{
config: cfg.Config,
client: client,
informerFactory: informerFactory,
clusterManagementType: cfg.ClusterManagementType,
clusterManagementGVR: clusterManagementGVR,
informer: informer,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), clusterManagementGVR.Resource),
syncedCh: make(chan struct{}),
log: cfg.Log,

restConfigForCluster: cfg.RestConfigForCluster,
config: cfg.Config,
clusterProvider: cfg.ClusterProvider,

client: client,
informerFactory: informerFactory,
informer: informer,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterProvider.GetClusterMangementGVR().Resource),
syncedCh: make(chan struct{}),

clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name
namespacedKeyToObj: make(map[string]*unstructured.Unstructured), // Get cluster management resource by namespaced key
log: cfg.Log,
}, nil
}

// AddEventHandler adds handler for events about cluster.
// AddEventHandler adds handlers which will be invoked.
// When cluster is added or updated, addUpdateHandler will be invoked.
// When cluster is deleted, deleteHandler will be invoked.
func (c *Controller) AddEventHandler(addUpdateHandler func(string) error, deleteHandler func(string)) {
c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueClusterExtension,
AddFunc: c.enqueueClusterEvent,
UpdateFunc: func(old, new interface{}) {
c.enqueueClusterExtension(new)
c.enqueueClusterEvent(new)
},
DeleteFunc: c.enqueueClusterExtension,
DeleteFunc: c.enqueueClusterEvent,
})

c.addUpdateHandler = addUpdateHandler
Expand All @@ -135,6 +122,8 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()

c.clusterProvider.Init(c.config)

c.informerFactory.Start(stopCh)

// Wait for the caches to be synced before starting workers
Expand Down Expand Up @@ -164,10 +153,9 @@ func (c *Controller) WaitForSynced(ctx context.Context) bool {
}
}

func (c *Controller) enqueueClusterExtension(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
func (c *Controller) enqueueClusterEvent(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
c.log.Error(err, "failed to get enqueue key")
return
}
Expand Down Expand Up @@ -205,11 +193,9 @@ func (c *Controller) processNextWorkItem() bool {

if err := c.eventHandler(key); err != nil {
c.workqueue.AddRateLimited(key)
metrics.NewControllerEventCountMetrics(key, "failed").Inc()
return err
}

metrics.NewControllerEventCountMetrics(key, "ok").Inc()
c.workqueue.Forget(obj)
return nil
}(obj)
Expand All @@ -228,35 +214,60 @@ func (c *Controller) eventHandler(key string) error {
return nil
}

_, err = c.client.Resource(c.clusterManagementGVR).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
obj, err := c.client.Resource(c.clusterProvider.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
c.deleteHandler(name)
c.mutex.Lock()
defer c.mutex.Unlock()

oldObj, ok := c.namespacedKeyToObj[key]
if !ok {
return nil
}
delete(c.namespacedKeyToObj, key)

clusterName := c.clusterProvider.GetClusterName(oldObj)
delete(c.clusterNameToNamespacedKey, clusterName)

metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc()
c.deleteHandler(clusterName)
return nil
}
metrics.NewClusterEventCountMetrics(key, "delete", "false").Inc()
c.log.Error(err, "failed to get resource", "key", key)
return err
}

err = c.addUpdateHandler(name)
c.mutex.Lock()
c.namespacedKeyToObj[key] = obj
clusterName := c.clusterProvider.GetClusterName(obj)
c.clusterNameToNamespacedKey[clusterName] = key
c.mutex.Unlock()

err = c.addUpdateHandler(clusterName)
if err != nil {
metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc()
c.log.Error(err, "failed to add or update cluster", "key", key)
return err
}

metrics.NewClusterEventCountMetrics(key, "add-update", "true").Inc()
return nil
}

// RestConfigForCluster returns the rest config for the mangered cluster.
func (c *Controller) RestConfigForCluster(cluster string) *rest.Config {
switch c.clusterManagementType {
case OpenClusterManagement:
clusterConfig := *c.config
clusterConfig.Host = fmt.Sprintf("%s/apis/%s/%s/%s/%s/proxy", clusterConfig.Host, c.clusterManagementGVR.Group, c.clusterManagementGVR.Version, c.clusterManagementGVR.Resource, cluster)
return &clusterConfig
case TestCluterManagement:
return c.restConfigForCluster(cluster)
default:
func (c *Controller) RestConfigForCluster(clusterName string) *rest.Config {
c.mutex.RLock()
defer c.mutex.RUnlock()

namespacedKey, ok := c.clusterNameToNamespacedKey[clusterName]
if !ok {
return nil
}

obj, ok := c.namespacedKeyToObj[namespacedKey]
if !ok {
return nil
}
return c.clusterProvider.GetClusterConfig(obj)
}
56 changes: 56 additions & 0 deletions multicluster/controller/test_cluster_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2024 KusionStack Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package controller

import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
)

var _ ClusterProvider = &TestClusterProvider{}

type TestClusterProvider struct {
schema.GroupVersionResource
ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig
}

func (p *TestClusterProvider) Init(config *rest.Config) {
// Do nothing
}

func (p *TestClusterProvider) GetClusterMangementGVR() schema.GroupVersionResource {
return p.GroupVersionResource
}

func (p *TestClusterProvider) GetClusterName(obj *unstructured.Unstructured) string {
if obj == nil {
return ""
}
return obj.GetName() // Use resource name as cluster name
}

func (p *TestClusterProvider) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
if obj == nil || p.ClusterNameToConfig == nil {
return nil
}
config, ok := p.ClusterNameToConfig[obj.GetName()]
if !ok {
return nil
}
return config
}
Loading

0 comments on commit 7d57a89

Please sign in to comment.