Skip to content

Commit

Permalink
Enable client discovery based on the flags
Browse files Browse the repository at this point in the history
  • Loading branch information
aattuluri committed Sep 12, 2024
1 parent e8546c9 commit f8952d6
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 18 deletions.
6 changes: 2 additions & 4 deletions admiral/cmd/admiral/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,8 @@ func GetRootCmd(args []string) *cobra.Command {
rootCmd.PersistentFlags().StringArrayVar(&params.IngressVSExportToNamespaces, "ingress_vs_export_to_namespaces", []string{"istio-system"}, "List of namespaces where the ingress VS should be exported")

rootCmd.PersistentFlags().BoolVar(&params.EnableClientDiscovery, "enable_client_discovery", true, "Enable/Disable Client (mesh egress) Discovery")

rootCmd.PersistentFlags().StringArrayVar(&params.ClientDiscoveryClustersForJobs, "client_discovery_clusters_for_jobs", []string{"all"}, "List of clusters for client discovery for k8s jobs")

rootCmd.PersistentFlags().StringArrayVar(&params.DiscoveryClustersForNumaflow, "client_discovery_clusters_for_numaflow", []string{"all"}, "List of clusters for client discovery for numaflow types")
rootCmd.PersistentFlags().StringArrayVar(&params.ClientDiscoveryClustersForJobs, "client_discovery_clusters_for_jobs", []string{}, "List of clusters for client discovery for k8s jobs")
rootCmd.PersistentFlags().StringArrayVar(&params.DiscoveryClustersForNumaflow, "client_discovery_clusters_for_numaflow", []string{}, "List of clusters for client discovery for numaflow types")

return rootCmd
}
Expand Down
45 changes: 31 additions & 14 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,22 +215,34 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste
return fmt.Errorf("error with RolloutController initialization, err: %v", err)
}
}
logrus.Infof("starting JobController clusterID: %v", clusterID)
rc.JobController, err = admiral.NewJobController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader)
if err != nil {
return fmt.Errorf("error with JobController initialization, err: %v", err)
}

logrus.Infof("starting VertexController clusterID: %v", clusterID)
rc.VertexController, err = admiral.NewVertexController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader)
if err != nil {
return fmt.Errorf("error with VertexController initialization, err: %v", err)
}
if common.IsClientDiscoveryEnabled() {

logrus.Infof("starting MonoVertexController clusterID: %v", clusterID)
rc.MonoVertexController, err = admiral.NewMonoVertexController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader)
if err != nil {
return fmt.Errorf("error with MonoVertexController initialization, err: %v", err)
clustersForJobs := common.GetClientDiscoveryClustersForJobs()

if len(clustersForJobs) == 0 || common.IsPresent(clustersForJobs, clusterID) {
logrus.Infof("starting JobController clusterID: %v", clusterID)
rc.JobController, err = admiral.NewJobController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader)
if err != nil {
return fmt.Errorf("error with JobController initialization, err: %v", err)
}
}

clustersForNumaflow := common.GetClientDiscoveryClustersForNumaflow()

if len(clustersForNumaflow) == 0 || common.IsPresent(clustersForNumaflow, clusterID) {
logrus.Infof("starting VertexController clusterID: %v", clusterID)
rc.VertexController, err = admiral.NewVertexController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader)
if err != nil {
return fmt.Errorf("error with VertexController initialization, err: %v", err)
}

logrus.Infof("starting MonoVertexController clusterID: %v", clusterID)
rc.MonoVertexController, err = admiral.NewMonoVertexController(stop, &ClientDiscoveryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0, r.ClientLoader)
if err != nil {
return fmt.Errorf("error with MonoVertexController initialization, err: %v", err)
}
}
}

}
Expand Down Expand Up @@ -262,6 +274,11 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste
return nil
}

func isInClusterList(enabledClusters []string, clusterName string) bool {

return false
}

func (r *RemoteRegistry) updateCacheController(clientConfig *rest.Config, clusterID string, resyncPeriod util.ResyncIntervals) error {
//We want to refresh the cache controllers. But the current approach is parking the goroutines used in the previous set of controllers, leading to a rather large memory leak.
//This is a temporary fix to only do the controller refresh if the API Server of the remote cluster has changed
Expand Down
18 changes: 18 additions & 0 deletions admiral/pkg/controller/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,24 @@ func GetExportToMaxNamespaces() int {
return wrapper.params.ExportToMaxNamespaces
}

func IsClientDiscoveryEnabled() bool {
wrapper.RLock()
defer wrapper.RUnlock()
return wrapper.params.EnableClientDiscovery
}

func GetClientDiscoveryClustersForJobs() []string {
wrapper.RLock()
defer wrapper.RUnlock()
return wrapper.params.ClientDiscoveryClustersForJobs
}

func GetClientDiscoveryClustersForNumaflow() []string {
wrapper.RLock()
defer wrapper.RUnlock()
return wrapper.params.DiscoveryClustersForNumaflow
}

func IsAdmiralStateSyncerMode() bool {
wrapper.RLock()
defer wrapper.RUnlock()
Expand Down

0 comments on commit f8952d6

Please sign in to comment.