Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofan-hs committed Mar 22, 2024
1 parent d246db4 commit 6591737
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package clusterprovider
package config

import (
"fmt"
Expand All @@ -24,36 +24,38 @@ import (
"k8s.io/client-go/rest"

clusterv1beta1 "kusionstack.io/kube-api/cluster/v1beta1"

Check failure on line 26 in multicluster/clusterprovider/config/karbour.go

View workflow job for this annotation

GitHub Actions / Golang Lint

File is not `goimports`-ed with -local kusionstack.io/kube-utils (goimports)
clusterprovider "kusionstack.io/kube-utils/multicluster/clusterprovider"
)

var _ ClusterManager = &KarbourClusterManager{}
var _ clusterprovider.ClusterConfigProvider = &Karbour{}

type KarbourClusterManager struct {
// Karbour is a implementation of ClusterConfigProvider
type Karbour struct {
config *rest.Config
}

func (p *KarbourClusterManager) Init(config *rest.Config) {
func (p *Karbour) Init(config *rest.Config) {
p.config = config
}

func (p *KarbourClusterManager) GetClusterMangementGVR() schema.GroupVersionResource {
func (p *Karbour) GetGVR() schema.GroupVersionResource {
return clusterv1beta1.SchemeGroupVersion.WithResource("clusters")
}

func (p *KarbourClusterManager) GetClusterName(obj *unstructured.Unstructured) string {
func (p *Karbour) GetClusterName(obj *unstructured.Unstructured) string {
if obj == nil {
return ""
}
return obj.GetName()
}

func (p *KarbourClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
func (p *Karbour) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
clusterName := p.GetClusterName(obj)
if clusterName == "" || p.config == nil {
return nil
}

gvr := p.GetClusterMangementGVR()
gvr := p.GetGVR()

clusterConfig := *p.config
clusterConfig.Host = fmt.Sprintf("%s/apis/%s/%s/%s/%s/proxy", clusterConfig.Host, gvr.Group, gvr.Version, gvr.Resource, clusterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,38 @@
* limitations under the License.
*/

package clusterprovider
package config

import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"

Check failure on line 22 in multicluster/clusterprovider/config/simple.go

View workflow job for this annotation

GitHub Actions / Golang Lint

File is not `goimports`-ed with -local kusionstack.io/kube-utils (goimports)
clusterprovider "kusionstack.io/kube-utils/multicluster/clusterprovider"
)

var _ ClusterManager = &TestClusterManager{}
var _ clusterprovider.ClusterConfigProvider = &Simple{}

// TestClusterManager is a test implementation of ClusterProviderInfo
type TestClusterManager struct {
schema.GroupVersionResource
// Simple is a implementation of ClusterConfigProvider
type Simple struct {
GVR schema.GroupVersionResource
ClusterNameToConfig map[string]*rest.Config // Map from cluster name to kubeconfig
}

func (p *TestClusterManager) Init(config *rest.Config) {
// Do nothing
func (p *Simple) Init(config *rest.Config) {
}

func (p *TestClusterManager) GetClusterMangementGVR() schema.GroupVersionResource {
return p.GroupVersionResource
func (p *Simple) GetGVR() schema.GroupVersionResource {
return p.GVR
}

func (p *TestClusterManager) GetClusterName(obj *unstructured.Unstructured) string {
func (p *Simple) GetClusterName(obj *unstructured.Unstructured) string {
if obj == nil {
return ""
}
return obj.GetName() // Use resource name as cluster name
return obj.GetName()
}

func (p *TestClusterManager) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
func (p *Simple) GetClusterConfig(obj *unstructured.Unstructured) *rest.Config {
if obj == nil || p.ClusterNameToConfig == nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ import (
"kusionstack.io/kube-utils/multicluster/metrics"
)

// ClusterManager is used to provide cluster management resource and cluster kubeconfig
type ClusterManager interface {
// ClusterConfigProvider is used to provide cluster management resource and cluster kubeconfig
type ClusterConfigProvider interface {
Init(config *rest.Config) // Init is used to initialize the cluster provider, config is the kubeconfig for the fed cluster
GetClusterMangementGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource
GetGVR() schema.GroupVersionResource // The GVR will be used to watch cluster management resource
GetClusterName(obj *unstructured.Unstructured) string // Get cluster name from cluster management resource, cluster name is used to identify the cluster
GetClusterConfig(obj *unstructured.Unstructured) *rest.Config // Get kubeconfig from cluster management resource
}

// DynamicClusterProvider is used to manage clusters
type DynamicClusterProvider struct {
config *rest.Config
ClusterManager ClusterManager
// Controller is used to manage clusters
type Controller struct {
config *rest.Config
ClusterConfigProvider ClusterConfigProvider

client dynamic.Interface // Client to get cluster info
informerFactory dynamicinformer.DynamicSharedInformerFactory
Expand All @@ -68,34 +68,34 @@ type DynamicClusterProvider struct {
log logr.Logger
}

type DynamicClusterProviderConfig struct {
Config *rest.Config // Kubeconfig for the fed cluster
ClusterManager ClusterManager
ResyncPeriod time.Duration // Resync period for cluster management
Log logr.Logger
type ControllerConfig struct {
Config *rest.Config // Kubeconfig for the fed cluster
ClusterConfigProvider ClusterConfigProvider
ResyncPeriod time.Duration // Resync period for cluster management
Log logr.Logger
}

// NewDynamicClusterProvider creates a new DynamicClusterProvider which will process events about cluster.
func NewDynamicClusterProvider(cfg *DynamicClusterProviderConfig) (*DynamicClusterProvider, error) {
// NewController creates a new Controller which will process events about cluster.
func NewController(cfg *ControllerConfig) (*Controller, error) {
client, err := dynamic.NewForConfig(cfg.Config)
if err != nil {
return nil, err
}
if cfg.ClusterManager == nil {
return nil, fmt.Errorf("ClusterManager is required")
if cfg.ClusterConfigProvider == nil {
return nil, fmt.Errorf("ClusterConfigProvider is required")
}

informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod)
informer := informerFactory.ForResource(cfg.ClusterManager.GetClusterMangementGVR()).Informer()
informer := informerFactory.ForResource(cfg.ClusterConfigProvider.GetGVR()).Informer()

return &DynamicClusterProvider{
config: cfg.Config,
ClusterManager: cfg.ClusterManager,
return &Controller{
config: cfg.Config,
ClusterConfigProvider: cfg.ClusterConfigProvider,

client: client,
informerFactory: informerFactory,
informer: informer,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterManager.GetClusterMangementGVR().Resource),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), cfg.ClusterConfigProvider.GetGVR().Resource),
syncedCh: make(chan struct{}),

clusterNameToNamespacedKey: make(map[string]string), // Get namespaced key by cluster name
Expand All @@ -107,7 +107,7 @@ func NewDynamicClusterProvider(cfg *DynamicClusterProviderConfig) (*DynamicClust
// AddEventHandler adds handlers which will be invoked.
// When cluster is added or updated, addUpdateHandler will be invoked.
// When cluster is deleted, deleteHandler will be invoked.
func (c *DynamicClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
func (c *Controller) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueClusterEvent,
UpdateFunc: func(old, new interface{}) {
Expand All @@ -120,11 +120,11 @@ func (c *DynamicClusterProvider) AddEventHandler(addUpdateHandler func(string, *
c.deleteHandler = deleteHandler
}

func (c *DynamicClusterProvider) Run(stopCh <-chan struct{}) error {
func (c *Controller) Run(stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()

c.ClusterManager.Init(c.config)
c.ClusterConfigProvider.Init(c.config)

c.informerFactory.Start(stopCh)

Expand All @@ -146,7 +146,7 @@ func (c *DynamicClusterProvider) Run(stopCh <-chan struct{}) error {
return nil
}

func (c *DynamicClusterProvider) WaitForSynced(ctx context.Context) bool {
func (c *Controller) WaitForSynced(ctx context.Context) bool {
select {
case <-c.syncedCh: // Wait for all cluster has been processed
return true
Expand All @@ -155,7 +155,7 @@ func (c *DynamicClusterProvider) WaitForSynced(ctx context.Context) bool {
}
}

func (c *DynamicClusterProvider) enqueueClusterEvent(obj interface{}) {
func (c *Controller) enqueueClusterEvent(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
c.log.Error(err, "failed to get enqueue key")
Expand All @@ -164,7 +164,7 @@ func (c *DynamicClusterProvider) enqueueClusterEvent(obj interface{}) {
c.workqueue.Add(key)
}

func (c *DynamicClusterProvider) runWorker() {
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
c.mutex.Lock()
if c.syncedNum > 0 {
Expand All @@ -177,7 +177,7 @@ func (c *DynamicClusterProvider) runWorker() {
}
}

func (c *DynamicClusterProvider) processNextWorkItem() bool {
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
Expand Down Expand Up @@ -209,14 +209,14 @@ func (c *DynamicClusterProvider) processNextWorkItem() bool {
}

// eventHandler is called when an event about cluster is received.
func (c *DynamicClusterProvider) eventHandler(key string) error {
func (c *Controller) eventHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.log.Error(err, "failed to split namespaced key", "key", key)
return nil
}

obj, err := c.client.Resource(c.ClusterManager.GetClusterMangementGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
obj, err := c.client.Resource(c.ClusterConfigProvider.GetGVR()).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
c.mutex.Lock()
Expand All @@ -228,7 +228,7 @@ func (c *DynamicClusterProvider) eventHandler(key string) error {
}
delete(c.namespacedKeyToObj, key)

clusterName := c.ClusterManager.GetClusterName(oldObj)
clusterName := c.ClusterConfigProvider.GetClusterName(oldObj)
delete(c.clusterNameToNamespacedKey, clusterName)

metrics.NewClusterEventCountMetrics(key, "delete", "true").Inc()
Expand All @@ -242,11 +242,11 @@ func (c *DynamicClusterProvider) eventHandler(key string) error {

c.mutex.Lock()
c.namespacedKeyToObj[key] = obj
clusterName := c.ClusterManager.GetClusterName(obj)
clusterName := c.ClusterConfigProvider.GetClusterName(obj)
c.clusterNameToNamespacedKey[clusterName] = key
c.mutex.Unlock()

err = c.addUpdateHandler(clusterName, c.ClusterManager.GetClusterConfig(obj))
err = c.addUpdateHandler(clusterName, c.ClusterConfigProvider.GetClusterConfig(obj))
if err != nil {
metrics.NewClusterEventCountMetrics(key, "add-update", "false").Inc()
c.log.Error(err, "failed to add or update cluster", "key", key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ import (
"k8s.io/client-go/rest"
)

// StaticClusterProvider is a controller that manages a static set of clusters.
type StaticClusterProvider struct {
// SimpleClusterProvider is a controller that manages a static set of clusters.
type SimpleClusterProvider struct {
clusterToConfig map[string]*rest.Config
addUpdateHandler func(string, *rest.Config) error
}

func NewStaticClusterProvider(clusterToConfig map[string]*rest.Config) *StaticClusterProvider {
return &StaticClusterProvider{
func NewSimpleClusterProvider(clusterToConfig map[string]*rest.Config) *SimpleClusterProvider {
return &SimpleClusterProvider{
clusterToConfig: clusterToConfig,
}
}

func (c *StaticClusterProvider) Run(stopCh <-chan struct{}) error {
func (c *SimpleClusterProvider) Run(stopCh <-chan struct{}) error {
if c.addUpdateHandler == nil {
return nil
}
Expand All @@ -48,10 +48,10 @@ func (c *StaticClusterProvider) Run(stopCh <-chan struct{}) error {
return nil
}

func (c *StaticClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
func (c *SimpleClusterProvider) AddEventHandler(addUpdateHandler func(string, *rest.Config) error, deleteHandler func(string)) {
c.addUpdateHandler = addUpdateHandler
}

func (c *StaticClusterProvider) WaitForSynced(ctx context.Context) bool {
func (c *SimpleClusterProvider) WaitForSynced(ctx context.Context) bool {
return true
}
6 changes: 3 additions & 3 deletions multicluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ type ClusterProvider interface {
}

type ManagerConfig struct {
FedConfig *rest.Config
ClusterScheme *runtime.Scheme
ClusterProvider
FedConfig *rest.Config
ClusterScheme *runtime.Scheme
ClusterProvider ClusterProvider

ResyncPeriod time.Duration
ClusterFilter func(string) bool // select cluster
Expand Down
7 changes: 4 additions & 3 deletions multicluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

"kusionstack.io/kube-utils/multicluster/clusterinfo"
"kusionstack.io/kube-utils/multicluster/clusterprovider"
"kusionstack.io/kube-utils/multicluster/clusterprovider/config"
)

var (
Expand Down Expand Up @@ -127,11 +128,11 @@ var _ = BeforeSuite(func() {
)
os.Setenv(clusterinfo.EnvClusterAllowList, "cluster1,cluster2")

clusterProvider, err := clusterprovider.NewDynamicClusterProvider(&clusterprovider.DynamicClusterProviderConfig{
clusterProvider, err := clusterprovider.NewController(&clusterprovider.ControllerConfig{
Config: fedConfig,

ClusterManager: &clusterprovider.TestClusterManager{
GroupVersionResource: schema.GroupVersionResource{ // Use deployment as cluster management resource
ClusterConfigProvider: &config.Simple{
GVR: schema.GroupVersionResource{ // Use deployment as cluster management resource
Group: "apps",
Version: "v1",
Resource: "deployments",
Expand Down

0 comments on commit 6591737

Please sign in to comment.