Skip to content

Commit

Permalink
feat: rewrite multicluster discovery interface
Browse files Browse the repository at this point in the history
  • Loading branch information
zoumo authored and shaofan-hs committed Jan 17, 2024
1 parent 95bdfe4 commit 200d68c
Showing 1 changed file with 131 additions and 101 deletions.
232 changes: 131 additions & 101 deletions multicluster/multi_cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ import (
"kusionstack.io/kube-utils/multicluster/metrics"
)

// PartialCachedDiscoveryInterface is a subset of discovery.CachedDiscoveryInterface.
// MultiClusterDiscovery provides fed and member clusters discovery interface
type MultiClusterDiscovery interface {
GetFedDiscoveryInterface() discovery.DiscoveryInterface
GetMembersCachedDiscoveryInterface() PartialCachedDiscoveryInterface
}

// PartialCachedDiscoveryInterface is a subset of discovery.DiscoveryInterface.
type PartialCachedDiscoveryInterface interface {
ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error)
Invalidate()
Expand All @@ -50,8 +56,8 @@ type ClusterClientManager interface {

func MultiClusterClientBuilder(log logr.Logger) (cluster.NewClientFunc, ClusterClientManager) {
mcc := &multiClusterClient{
clusterToClient: map[string]client.Client{},
clusterToCachedDiscoveryClient: map[string]discovery.CachedDiscoveryInterface{},
clusterToClient: map[string]client.Client{},
clusterToDiscoveryClient: map[string]discovery.CachedDiscoveryInterface{},

log: log,
}
Expand All @@ -72,6 +78,12 @@ func MultiClusterClientBuilder(log logr.Logger) (cluster.NewClientFunc, ClusterC
return nil, fmt.Errorf("failed to create fed client: %v", err)
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create fed discovery client: %v", err)
}

mcc.fedDiscovery = discoveryClient
mcc.fedClient = delegatingFedClient
mcc.fedScheme = delegatingFedClient.Scheme()
mcc.fedMapper = delegatingFedClient.RESTMapper()
Expand All @@ -81,31 +93,33 @@ func MultiClusterClientBuilder(log logr.Logger) (cluster.NewClientFunc, ClusterC
return newClientFunc, mcc
}

var (
_ client.Client = &multiClusterClient{}

_ MultiClusterDiscovery = &multiClusterClient{}

_ ClusterClientManager = &multiClusterClient{}
)

type multiClusterClient struct {
fedClient client.Client
fedScheme *runtime.Scheme
fedMapper meta.RESTMapper
fedDiscovery discovery.DiscoveryInterface
fedClient client.Client
fedScheme *runtime.Scheme
fedMapper meta.RESTMapper

clusterToClient map[string]client.Client
clusterToCachedDiscoveryClient map[string]discovery.CachedDiscoveryInterface
clusterToClient map[string]client.Client
clusterToDiscoveryClient map[string]discovery.CachedDiscoveryInterface

mutex sync.RWMutex
log logr.Logger
}

var (
_ client.Client = &multiClusterClient{}
_ PartialCachedDiscoveryInterface = &multiClusterClient{}

_ ClusterClientManager = &multiClusterClient{}
)

func (mcc *multiClusterClient) AddClusterClient(cluster string, clusterClient client.Client, clusterCachedDiscoveryClient discovery.CachedDiscoveryInterface) {
func (mcc *multiClusterClient) AddClusterClient(cluster string, clusterClient client.Client, clusterDiscoveryClient discovery.CachedDiscoveryInterface) {
mcc.mutex.Lock()
defer mcc.mutex.Unlock()

mcc.clusterToClient[cluster] = clusterClient
mcc.clusterToCachedDiscoveryClient[cluster] = clusterCachedDiscoveryClient
mcc.clusterToDiscoveryClient[cluster] = clusterDiscoveryClient
mcc.log.V(5).Info("add cluster client", "cluster", cluster)
}

Expand All @@ -114,93 +128,10 @@ func (mcc *multiClusterClient) RemoveClusterClient(cluster string) {
defer mcc.mutex.Unlock()

delete(mcc.clusterToClient, cluster)
delete(mcc.clusterToCachedDiscoveryClient, cluster)
delete(mcc.clusterToDiscoveryClient, cluster)
mcc.log.V(5).Info("remove cluster client", "cluster", cluster)
}

// ServerGroupsAndResources returns the supported server groups and resources for all clusters.
func (mcc *multiClusterClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
mcc.mutex.Lock()
defer mcc.mutex.Unlock()

// If there is only one cluster, we can use the cached discovery client to get the server groups and resources
if len(mcc.clusterToCachedDiscoveryClient) == 1 {
for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient {
return clusterCachedDiscoveryClient.ServerGroupsAndResources()
}
}

// If there are multiple clusters, we need to get the intersection of groups and resources
var (
groupVersionCount = make(map[string]int)
groupVersionKindCount = make(map[string]int)

apiGroupsRes []*metav1.APIGroup
apiResourceListsRes []*metav1.APIResourceList
)
for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient {
apiGroups, apiResourceLists, err := clusterCachedDiscoveryClient.ServerGroupsAndResources()
if err != nil {
return nil, nil, err
}

for _, apiGroup := range apiGroups {
groupVersion := apiGroup.PreferredVersion.GroupVersion

if _, ok := groupVersionCount[groupVersion]; !ok {
groupVersionCount[groupVersion] = 1
} else {
groupVersionCount[groupVersion]++

if groupVersionCount[groupVersion] == len(mcc.clusterToCachedDiscoveryClient) { // all clusters have this PreferredVersion
apiGroupsRes = append(apiGroupsRes, apiGroup)
}
}
}

for _, apiResourceList := range apiResourceLists {
for _, apiResource := range apiResourceList.APIResources {
groupVersionKind := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Kind)

if _, ok := groupVersionKindCount[groupVersionKind]; !ok {
groupVersionKindCount[groupVersionKind] = 1
} else {
groupVersionKindCount[groupVersionKind]++

if groupVersionKindCount[groupVersionKind] == len(mcc.clusterToCachedDiscoveryClient) { // all clusters have this GroupVersion and Kind
apiResourceListsRes = append(apiResourceListsRes, apiResourceList)
}
}
}
}
}

return apiGroupsRes, apiResourceListsRes, nil
}

// Invalidate invalidates the cached discovery clients for all clusters.
func (mcc *multiClusterClient) Invalidate() {
mcc.mutex.Lock()
defer mcc.mutex.Unlock()

for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient {
clusterCachedDiscoveryClient.Invalidate()
}
}

// Fresh returns true if all cached discovery clients are fresh.
func (mcc *multiClusterClient) Fresh() bool {
mcc.mutex.Lock()
defer mcc.mutex.Unlock()

for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient {
if !clusterCachedDiscoveryClient.Fresh() {
return false
}
}
return true
}

func (mcc *multiClusterClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) (err error) {
var cluster string
defer func() {
Expand Down Expand Up @@ -534,3 +465,102 @@ func (mcc *multiClusterClient) getClusterNames(ctx context.Context) (clusters []
}
return
}

func (mcc *multiClusterClient) GetFedDiscoveryInterface() discovery.DiscoveryInterface {
return mcc.fedDiscovery
}

func (mcc *multiClusterClient) GetMembersCachedDiscoveryInterface() PartialCachedDiscoveryInterface {
return &cachedMultiClusterDiscoveryClient{
delegate: mcc,
}
}

type cachedMultiClusterDiscoveryClient struct {
delegate *multiClusterClient
}

// ServerGroupsAndResources returns the supported server groups and resources for all clusters.
func (c *cachedMultiClusterDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
c.delegate.mutex.Lock()
defer c.delegate.mutex.Unlock()

allDiscoveryClient := c.delegate.clusterToDiscoveryClient

// If there is only one cluster, we can use the cached discovery client to get the server groups and resources
if len(allDiscoveryClient) == 1 {
for _, cachedClient := range allDiscoveryClient {
return cachedClient.ServerGroupsAndResources()
}
}

// If there are multiple clusters, we need to get the intersection of groups and resources
var (
groupVersionCount = make(map[string]int)
groupVersionKindCount = make(map[string]int)

apiGroupsRes []*metav1.APIGroup
apiResourceListsRes []*metav1.APIResourceList
)
for _, cachedClient := range allDiscoveryClient {
apiGroups, apiResourceLists, err := cachedClient.ServerGroupsAndResources()
if err != nil {
return nil, nil, err
}

for _, apiGroup := range apiGroups {
groupVersion := apiGroup.PreferredVersion.GroupVersion

if _, ok := groupVersionCount[groupVersion]; !ok {
groupVersionCount[groupVersion] = 1
} else {
groupVersionCount[groupVersion]++

if groupVersionCount[groupVersion] == len(allDiscoveryClient) { // all clusters have this PreferredVersion
apiGroupsRes = append(apiGroupsRes, apiGroup)
}
}
}

for _, apiResourceList := range apiResourceLists {
for _, apiResource := range apiResourceList.APIResources {
groupVersionKind := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Kind)

if _, ok := groupVersionKindCount[groupVersionKind]; !ok {
groupVersionKindCount[groupVersionKind] = 1
} else {
groupVersionKindCount[groupVersionKind]++

if groupVersionKindCount[groupVersionKind] == len(allDiscoveryClient) { // all clusters have this GroupVersion and Kind
apiResourceListsRes = append(apiResourceListsRes, apiResourceList)
}
}
}
}
}

return apiGroupsRes, apiResourceListsRes, nil
}

// Invalidate invalidates the cached discovery clients for all clusters.
func (c *cachedMultiClusterDiscoveryClient) Invalidate() {
c.delegate.mutex.Lock()
defer c.delegate.mutex.Unlock()

for _, cachedClient := range c.delegate.clusterToDiscoveryClient {
cachedClient.Invalidate()
}
}

// Fresh returns true if all cached discovery clients are fresh.
func (c *cachedMultiClusterDiscoveryClient) Fresh() bool {
c.delegate.mutex.Lock()
defer c.delegate.mutex.Unlock()

for _, cachedClient := range c.delegate.clusterToDiscoveryClient {
if !cachedClient.Fresh() {
return false
}
}
return true
}

0 comments on commit 200d68c

Please sign in to comment.