From f8952d620cd64309d8f88c95041985039350a148 Mon Sep 17 00:00:00 2001 From: aattuluri Date: Thu, 12 Sep 2024 15:25:49 -0700 Subject: [PATCH] Enable client discovery based on the flags --- admiral/cmd/admiral/cmd/root.go | 6 ++-- admiral/pkg/clusters/registry.go | 45 +++++++++++++++++-------- admiral/pkg/controller/common/config.go | 18 ++++++++++ 3 files changed, 51 insertions(+), 18 deletions(-) diff --git a/admiral/cmd/admiral/cmd/root.go b/admiral/cmd/admiral/cmd/root.go index 1ba8d4d9..f355a618 100644 --- a/admiral/cmd/admiral/cmd/root.go +++ b/admiral/cmd/admiral/cmd/root.go @@ -258,10 +258,8 @@ func GetRootCmd(args []string) *cobra.Command { rootCmd.PersistentFlags().StringArrayVar(¶ms.IngressVSExportToNamespaces, "ingress_vs_export_to_namespaces", []string{"istio-system"}, "List of namespaces where the ingress VS should be exported") rootCmd.PersistentFlags().BoolVar(¶ms.EnableClientDiscovery, "enable_client_discovery", true, "Enable/Disable Client (mesh egress) Discovery") - - rootCmd.PersistentFlags().StringArrayVar(¶ms.ClientDiscoveryClustersForJobs, "client_discovery_clusters_for_jobs", []string{"all"}, "List of clusters for client discovery for k8s jobs") - - rootCmd.PersistentFlags().StringArrayVar(¶ms.DiscoveryClustersForNumaflow, "client_discovery_clusters_for_numaflow", []string{"all"}, "List of clusters for client discovery for numaflow types") + rootCmd.PersistentFlags().StringArrayVar(¶ms.ClientDiscoveryClustersForJobs, "client_discovery_clusters_for_jobs", []string{}, "List of clusters for client discovery for k8s jobs") + rootCmd.PersistentFlags().StringArrayVar(¶ms.DiscoveryClustersForNumaflow, "client_discovery_clusters_for_numaflow", []string{}, "List of clusters for client discovery for numaflow types") return rootCmd } diff --git a/admiral/pkg/clusters/registry.go b/admiral/pkg/clusters/registry.go index f9fc65ac..f0d2dc2b 100644 --- a/admiral/pkg/clusters/registry.go +++ b/admiral/pkg/clusters/registry.go @@ -215,22 +215,34 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste return fmt.Errorf("error with RolloutController initialization, err: %v", err) } } - logrus.Infof("starting JobController clusterID: %v", clusterID) - rc.JobController, err = admiral.NewJobController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader) - if err != nil { - return fmt.Errorf("error with JobController initialization, err: %v", err) - } - logrus.Infof("starting VertexController clusterID: %v", clusterID) - rc.VertexController, err = admiral.NewVertexController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader) - if err != nil { - return fmt.Errorf("error with VertexController initialization, err: %v", err) - } + if common.IsClientDiscoveryEnabled() { - logrus.Infof("starting MonoVertexController clusterID: %v", clusterID) - rc.MonoVertexController, err = admiral.NewMonoVertexController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader) - if err != nil { - return fmt.Errorf("error with MonoVertexController initialization, err: %v", err) + clustersForJobs := common.GetClientDiscoveryClustersForJobs() + + if len(clustersForJobs) == 0 || common.IsPresent(clustersForJobs, clusterID) { + logrus.Infof("starting JobController clusterID: %v", clusterID) + rc.JobController, err = admiral.NewJobController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader) + if err != nil { + return fmt.Errorf("error with JobController initialization, err: %v", err) + } + } + + clustersForNumaflow := common.GetClientDiscoveryClustersForNumaflow() + + if len(clustersForNumaflow) == 0 || common.IsPresent(clustersForNumaflow, clusterID) { + logrus.Infof("starting VertexController clusterID: %v", clusterID) + rc.VertexController, err = admiral.NewVertexController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader) + if err != nil { + return fmt.Errorf("error with VertexController initialization, err: %v", err) + } + + logrus.Infof("starting MonoVertexController clusterID: %v", clusterID) + rc.MonoVertexController, err = admiral.NewMonoVertexController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader) + if err != nil { + return fmt.Errorf("error with MonoVertexController initialization, err: %v", err) + } + } } } @@ -262,6 +274,11 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste return nil } +func isInClusterList(enabledClusters []string, clusterName string) bool { + + return false +} + func (r *RemoteRegistry) updateCacheController(clientConfig *rest.Config, clusterID string, resyncPeriod util.ResyncIntervals) error { //We want to refresh the cache controllers. But the current approach is parking the goroutines used in the previous set of controllers, leading to a rather large memory leak. //This is a temporary fix to only do the controller refresh if the API Server of the remote cluster has changed diff --git a/admiral/pkg/controller/common/config.go b/admiral/pkg/controller/common/config.go index ebe74fd4..d0b5f57a 100644 --- a/admiral/pkg/controller/common/config.go +++ b/admiral/pkg/controller/common/config.go @@ -465,6 +465,24 @@ func GetExportToMaxNamespaces() int { return wrapper.params.ExportToMaxNamespaces } +func IsClientDiscoveryEnabled() bool { + wrapper.RLock() + defer wrapper.RUnlock() + return wrapper.params.EnableClientDiscovery +} + +func GetClientDiscoveryClustersForJobs() []string { + wrapper.RLock() + defer wrapper.RUnlock() + return wrapper.params.ClientDiscoveryClustersForJobs +} + +func GetClientDiscoveryClustersForNumaflow() []string { + wrapper.RLock() + defer wrapper.RUnlock() + return wrapper.params.DiscoveryClustersForNumaflow +} + func IsAdmiralStateSyncerMode() bool { wrapper.RLock() defer wrapper.RUnlock()