Skip to content

Commit

Permalink
fix, typo and discovery client (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofan-hs authored Feb 20, 2024
1 parent b0ae0c6 commit f1269df
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 61 deletions.
66 changes: 33 additions & 33 deletions multicluster/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ import (
"kusionstack.io/kube-utils/multicluster/metrics"
)

type ClusterManagermentType string
type ClusterManagementType string

const (
OpenClusterManagement ClusterManagermentType = "OpenClusterManagement"
TestCluterManagement ClusterManagermentType = "TestCluterManagement"
OpenClusterManagement ClusterManagementType = "OpenClusterManagement"
TestCluterManagement ClusterManagementType = "TestCluterManagement"
)

type Controller struct {
config *rest.Config
clusterManagermentType ClusterManagermentType
clusterManagermentGVR schema.GroupVersionResource
config *rest.Config
clusterManagementType ClusterManagementType
clusterManagementGVR schema.GroupVersionResource

client dynamic.Interface // client to get cluster info
informerFactory dynamicinformer.DynamicSharedInformerFactory
Expand All @@ -67,51 +67,51 @@ type Controller struct {
}

type ControllerConfig struct {
Config *rest.Config // config for cluster managerment
ClusterManagermentType ClusterManagermentType
ClusterManagermentGVR *schema.GroupVersionResource
ResyncPeriod time.Duration // resync period for cluster managerment
Log logr.Logger
Config *rest.Config // config for cluster management
ClusterManagementType ClusterManagementType
ClusterManagementGVR *schema.GroupVersionResource
ResyncPeriod time.Duration // resync period for cluster management
Log logr.Logger

// for test
RestConfigForCluster func(cluster string) *rest.Config
}

// NewController creates a new Controller which will process events about cluster.
func NewController(cfg *ControllerConfig) (*Controller, error) {
var clusterManagermentGVR schema.GroupVersionResource
switch cfg.ClusterManagermentType {
var clusterManagementGVR schema.GroupVersionResource
switch cfg.ClusterManagementType {
case OpenClusterManagement:
if cfg.ClusterManagermentGVR == nil {
return nil, fmt.Errorf("ClusterManagermentGVR must be set when use %s", cfg.ClusterManagermentType)
if cfg.ClusterManagementGVR == nil {
return nil, fmt.Errorf("ClusterManagementGVR must be set when use %s", cfg.ClusterManagementType)
}
clusterManagermentGVR = *cfg.ClusterManagermentGVR
clusterManagementGVR = *cfg.ClusterManagementGVR
case TestCluterManagement:
if cfg.ClusterManagermentGVR == nil || cfg.RestConfigForCluster == nil {
return nil, fmt.Errorf("ClusterManagermentGVR and RestConfigForCluster must be set when use %s", cfg.ClusterManagermentType)
if cfg.ClusterManagementGVR == nil || cfg.RestConfigForCluster == nil {
return nil, fmt.Errorf("ClusterManagementGVR and RestConfigForCluster must be set when use %s", cfg.ClusterManagementType)
}
clusterManagermentGVR = *cfg.ClusterManagermentGVR
clusterManagementGVR = *cfg.ClusterManagementGVR
default:
return nil, fmt.Errorf("not support cluster managerment type: %v", cfg.ClusterManagermentType)
return nil, fmt.Errorf("not support cluster management type: %v", cfg.ClusterManagementType)
}

client, err := dynamic.NewForConfig(cfg.Config)
if err != nil {
return nil, err
}
informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, cfg.ResyncPeriod)
informer := informerFactory.ForResource(clusterManagermentGVR).Informer()
informer := informerFactory.ForResource(clusterManagementGVR).Informer()

return &Controller{
config: cfg.Config,
client: client,
informerFactory: informerFactory,
clusterManagermentType: cfg.ClusterManagermentType,
clusterManagermentGVR: clusterManagermentGVR,
informer: informer,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), clusterManagermentGVR.Resource),
syncedCh: make(chan struct{}),
log: cfg.Log,
config: cfg.Config,
client: client,
informerFactory: informerFactory,
clusterManagementType: cfg.ClusterManagementType,
clusterManagementGVR: clusterManagementGVR,
informer: informer,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), clusterManagementGVR.Resource),
syncedCh: make(chan struct{}),
log: cfg.Log,

restConfigForCluster: cfg.RestConfigForCluster,
}, nil
Expand Down Expand Up @@ -228,7 +228,7 @@ func (c *Controller) eventHandler(key string) error {
return nil
}

_, err = c.client.Resource(c.clusterManagermentGVR).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
_, err = c.client.Resource(c.clusterManagementGVR).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
c.deleteHandler(name)
Expand All @@ -249,10 +249,10 @@ func (c *Controller) eventHandler(key string) error {

// RestConfigForCluster returns the rest config for the mangered cluster.
func (c *Controller) RestConfigForCluster(cluster string) *rest.Config {
switch c.clusterManagermentType {
switch c.clusterManagementType {
case OpenClusterManagement:
clusterConfig := *c.config
clusterConfig.Host = fmt.Sprintf("%s/apis/%s/%s/%s/%s/proxy", clusterConfig.Host, c.clusterManagermentGVR.Group, c.clusterManagermentGVR.Version, c.clusterManagermentGVR.Resource, cluster)
clusterConfig.Host = fmt.Sprintf("%s/apis/%s/%s/%s/%s/proxy", clusterConfig.Host, c.clusterManagementGVR.Group, c.clusterManagementGVR.Version, c.clusterManagementGVR.Resource, cluster)
return &clusterConfig
case TestCluterManagement:
return c.restConfigForCluster(cluster)
Expand Down
26 changes: 13 additions & 13 deletions multicluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func setOptionsDefaults(opts Options) Options {
}

type ManagerConfig struct {
FedConfig *rest.Config
ClusterScheme *runtime.Scheme
ClusterManagermentGVR *schema.GroupVersionResource
ClusterManagermentType controller.ClusterManagermentType
FedConfig *rest.Config
ClusterScheme *runtime.Scheme
ClusterManagementGVR *schema.GroupVersionResource
ClusterManagementType controller.ClusterManagementType

ResyncPeriod time.Duration
ClusterFilter func(string) bool // select cluster
Expand All @@ -78,7 +78,7 @@ type Manager struct {

clusterCacheManager ClusterCacheManager
clusterClientManager ClusterClientManager
controller *controller.Controller // controller for cluster managerment
controller *controller.Controller // controller for cluster management

resyncPeriod time.Duration
hasCluster map[string]struct{} // whether cluster has been added
Expand Down Expand Up @@ -119,17 +119,17 @@ func NewManager(cfg *ManagerConfig, opts Options) (manager *Manager, newCacheFun
}
}

clusterManagermentType := controller.OpenClusterManagement
if cfg.ClusterManagermentType != "" {
clusterManagermentType = cfg.ClusterManagermentType
clusterManagementType := controller.OpenClusterManagement
if cfg.ClusterManagementType != "" {
clusterManagementType = cfg.ClusterManagementType
}

controller, err := controller.NewController(&controller.ControllerConfig{
Config: cfg.FedConfig,
ResyncPeriod: cfg.ResyncPeriod,
ClusterManagermentType: clusterManagermentType,
ClusterManagermentGVR: cfg.ClusterManagermentGVR,
Log: log,
Config: cfg.FedConfig,
ResyncPeriod: cfg.ResyncPeriod,
ClusterManagementType: clusterManagementType,
ClusterManagementGVR: cfg.ClusterManagementGVR,
Log: log,

RestConfigForCluster: cfg.RestConfigForCluster,
})
Expand Down
8 changes: 4 additions & 4 deletions multicluster/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/sets"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -199,11 +199,11 @@ var _ = Describe("multicluster with 1 fed and 4 clusters", func() {
apiResourceSets := sets.NewString()
for _, apiResourceList := range apiResourceLists {
for _, apiResource := range apiResourceList.APIResources {
groupVersionKind := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Kind)
apiResourceSets.Insert(groupVersionKind)
groupVersionName := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Name)
apiResourceSets.Insert(groupVersionName)
}
}
Expect(apiResourceSets.HasAll("apps/v1/Deployment", "v1/ConfigMap")).To(Equal(true))
Expect(apiResourceSets.HasAll("apps/v1/deployments", "apps/v1/deployments/status", "v1/configmaps")).To(Equal(true))
})

It("multiClusterClient update the deployment status of cluster1", func() {
Expand Down
28 changes: 19 additions & 9 deletions multicluster/multi_cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,11 @@ func (c *cachedMultiClusterDiscoveryClient) ServerGroupsAndResources() ([]*metav
// If there are multiple clusters, we need to get the intersection of groups and resources
var (
groupVersionCount = make(map[string]int)
groupVersionKindCount = make(map[string]int)
groupVersionNameCount = make(map[string]int)

apiGroupsRes []*metav1.APIGroup
apiResourceListsRes []*metav1.APIResourceList
apiGroupsRes []*metav1.APIGroup
apiResourceListsRes []*metav1.APIResourceList
groupVersionToResources = make(map[string][]metav1.APIResource)
)
for _, cachedClient := range allDiscoveryClient {
apiGroups, apiResourceLists, err := cachedClient.ServerGroupsAndResources()
Expand All @@ -524,21 +525,30 @@ func (c *cachedMultiClusterDiscoveryClient) ServerGroupsAndResources() ([]*metav

for _, apiResourceList := range apiResourceLists {
for _, apiResource := range apiResourceList.APIResources {
groupVersionKind := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Kind)
groupVersionName := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Name)

if _, ok := groupVersionKindCount[groupVersionKind]; !ok {
groupVersionKindCount[groupVersionKind] = 1
if _, ok := groupVersionNameCount[groupVersionName]; !ok {
groupVersionNameCount[groupVersionName] = 1
} else {
groupVersionKindCount[groupVersionKind]++
groupVersionNameCount[groupVersionName]++

if groupVersionKindCount[groupVersionKind] == len(allDiscoveryClient) { // all clusters have this GroupVersion and Kind
apiResourceListsRes = append(apiResourceListsRes, apiResourceList)
if groupVersionNameCount[groupVersionName] == len(allDiscoveryClient) { // all clusters have this GroupVersion and Name
groupVersionToResources[apiResourceList.GroupVersion] = append(groupVersionToResources[apiResourceList.GroupVersion], apiResource)
}
}
}
}
}

for groupVersion, resources := range groupVersionToResources {
apiResourceList := metav1.APIResourceList{
TypeMeta: metav1.TypeMeta{Kind: "APIResourceList", APIVersion: "v1"},
GroupVersion: groupVersion,
}
apiResourceList.APIResources = append(apiResourceList.APIResources, resources...)
apiResourceListsRes = append(apiResourceListsRes, &apiResourceList)
}

return apiGroupsRes, apiResourceListsRes, nil
}

Expand Down
4 changes: 2 additions & 2 deletions multicluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ var _ = BeforeSuite(func() {
return fedConfig
}
},
ClusterManagermentGVR: &schema.GroupVersionResource{
ClusterManagementGVR: &schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
},
ClusterManagermentType: controller.TestCluterManagement,
ClusterManagementType: controller.TestCluterManagement,
}, Options{})
Expect(err).NotTo(HaveOccurred())
Expect(manager).NotTo(BeNil())
Expand Down

0 comments on commit f1269df

Please sign in to comment.