Skip to content

Commit

Permalink
feat: allow running only analysis controller
Browse files Browse the repository at this point in the history
Signed-off-by: Soumya Ghosh Dastidar <[email protected]>
  • Loading branch information
gdsoumya committed Jan 29, 2024
1 parent 5e5314b commit cf26d73
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 78 deletions.
93 changes: 58 additions & 35 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/argoproj/argo-rollouts/utils/record"

"github.com/argoproj/pkg/kubeclientmetrics"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -72,6 +71,7 @@ func newCommand() *cobra.Command {
namespaced bool
printVersion bool
selfServiceNotificationEnabled bool
onlyAnalysisMode bool
)
electOpts := controller.NewLeaderElectionOptions()
var command = cobra.Command{
Expand Down Expand Up @@ -185,41 +185,63 @@ func newCommand() *cobra.Command {
ingressWrapper, err := ingressutil.NewIngressWrapper(mode, kubeClient, kubeInformerFactory)
checkError(err)

cm := controller.NewManager(
namespace,
kubeClient,
argoprojClient,
dynamicClient,
smiClient,
discoveryClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
ingressWrapper,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantRolloutInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantExperimentInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory),
istioPrimaryDynamicClient,
istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(),
istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(),
notificationConfigMapInformerFactory,
notificationSecretInformerFactory,
resyncDuration,
instanceID,
metricsPort,
healthzPort,
k8sRequestProvider,
nginxIngressClasses,
albIngressClasses,
dynamicInformerFactory,
clusterDynamicInformerFactory,
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
var cm *controller.Manager

if onlyAnalysisMode {
log.Info("Running only analysis controller")
cm = controller.NewAnalysisManager(
namespace,
kubeClient,
argoprojClient,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory),
resyncDuration,
metricsPort,
healthzPort,
k8sRequestProvider,
dynamicInformerFactory,
clusterDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
} else {
cm = controller.NewManager(
namespace,
kubeClient,
argoprojClient,
dynamicClient,
smiClient,
discoveryClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
ingressWrapper,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantRolloutInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantExperimentInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory),
istioPrimaryDynamicClient,
istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(),
istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(),
notificationConfigMapInformerFactory,
notificationSecretInformerFactory,
resyncDuration,
instanceID,
metricsPort,
healthzPort,
k8sRequestProvider,
nginxIngressClasses,
albIngressClasses,
dynamicInformerFactory,
clusterDynamicInformerFactory,
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
}
if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
log.Fatalf("Error running controller: %s", err.Error())
}
Expand Down Expand Up @@ -262,6 +284,7 @@ func newCommand() *cobra.Command {
command.Flags().DurationVar(&electOpts.LeaderElectionRenewDeadline, "leader-election-renew-deadline", controller.DefaultLeaderElectionRenewDeadline, "The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRetryPeriod, "leader-election-retry-period", controller.DefaultLeaderElectionRetryPeriod, "The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.")
command.Flags().BoolVar(&selfServiceNotificationEnabled, "self-service-notification-enabled", false, "Allows rollouts controller to pull notification config from the namespace that the rollout resource is in. This is useful for self-service notification.")
command.Flags().BoolVar(&onlyAnalysisMode, "only-analysis-mode", false, "Only runs analysis controller")
return &command
}

Expand Down
160 changes: 129 additions & 31 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,87 @@ type Manager struct {
notificationSecretInformerFactory kubeinformers.SharedInformerFactory
jobInformerFactory kubeinformers.SharedInformerFactory
istioPrimaryDynamicClient dynamic.Interface

onlyAnalysisMode bool
}

func NewAnalysisManager(
namespace string,
kubeclientset kubernetes.Interface,
argoprojclientset clientset.Interface,
jobInformer batchinformers.JobInformer,
analysisRunInformer informers.AnalysisRunInformer,
analysisTemplateInformer informers.AnalysisTemplateInformer,
clusterAnalysisTemplateInformer informers.ClusterAnalysisTemplateInformer,
resyncPeriod time.Duration,
metricsPort int,
healthzPort int,
k8sRequestProvider *metrics.K8sRequestsCountProvider,
dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory,
clusterDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory,
namespaced bool,
kubeInformerFactory kubeinformers.SharedInformerFactory,
jobInformerFactory kubeinformers.SharedInformerFactory,
) *Manager {
runtime.Must(rolloutscheme.AddToScheme(scheme.Scheme))
log.Info("Creating event broadcaster")

metricsAddr := fmt.Sprintf(listenAddr, metricsPort)
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: metricsAddr,
RolloutLister: nil,
AnalysisRunLister: analysisRunInformer.Lister(),
AnalysisTemplateLister: analysisTemplateInformer.Lister(),
ClusterAnalysisTemplateLister: clusterAnalysisTemplateInformer.Lister(),
ExperimentLister: nil,
K8SRequestProvider: k8sRequestProvider,
})

healthzServer := NewHealthzServer(fmt.Sprintf(listenAddr, healthzPort))
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns")
recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, metrics.MetricNotificationFailedTotal, metrics.MetricNotificationSuccessTotal, metrics.MetricNotificationSend, nil)
analysisController := analysis.NewController(analysis.ControllerConfig{
KubeClientSet: kubeclientset,
ArgoProjClientset: argoprojclientset,
AnalysisRunInformer: analysisRunInformer,
JobInformer: jobInformer,
ResyncPeriod: resyncPeriod,
AnalysisRunWorkQueue: analysisRunWorkqueue,
MetricsServer: metricsServer,
Recorder: recorder,
})

cm := &Manager{
wg: &sync.WaitGroup{},
metricsServer: metricsServer,
healthzServer: healthzServer,
jobSynced: jobInformer.Informer().HasSynced,
analysisRunSynced: analysisRunInformer.Informer().HasSynced,
analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced,
clusterAnalysisTemplateSynced: clusterAnalysisTemplateInformer.Informer().HasSynced,
analysisRunWorkqueue: analysisRunWorkqueue,
analysisController: analysisController,
namespace: namespace,
kubeClientSet: kubeclientset,
dynamicInformerFactory: dynamicInformerFactory,
clusterDynamicInformerFactory: clusterDynamicInformerFactory,
namespaced: namespaced,
kubeInformerFactory: kubeInformerFactory,
jobInformerFactory: jobInformerFactory,
onlyAnalysisMode: true,
}

_, err := rolloutsConfig.InitializeConfig(kubeclientset, defaults.DefaultRolloutsConfigMapName)
if err != nil {
log.Fatalf("Failed to init config: %v", err)
}

Check warning on line 239 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L187-L239

Added lines #L187 - L239 were not covered by tests

err = plugin.DownloadPlugins(plugin.FileDownloaderImpl{})
if err != nil {
log.Fatalf("Failed to download plugins: %v", err)
}

Check warning on line 244 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L241-L244

Added lines #L241 - L244 were not covered by tests

return cm

Check warning on line 246 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L246

Added line #L246 was not covered by tests
}

// NewManager returns a new manager to manage all the controllers
Expand Down Expand Up @@ -441,11 +522,13 @@ func (c *Manager) Run(ctx context.Context, rolloutThreadiness, serviceThreadines
log.Info("Shutting down workers")
goPlugin.CleanupClients()

c.serviceWorkqueue.ShutDownWithDrain()
c.ingressWorkqueue.ShutDownWithDrain()
c.rolloutWorkqueue.ShutDownWithDrain()
c.experimentWorkqueue.ShutDownWithDrain()
c.analysisRunWorkqueue.ShutDownWithDrain()
if !c.onlyAnalysisMode {
c.serviceWorkqueue.ShutDownWithDrain()
c.ingressWorkqueue.ShutDownWithDrain()
c.rolloutWorkqueue.ShutDownWithDrain()
c.experimentWorkqueue.ShutDownWithDrain()
}

c.analysisRunWorkqueue.ShutDownWithDrain()

ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second) // give max of 10 seconds for http servers to shut down
Expand All @@ -463,12 +546,6 @@ func (c *Manager) startLeading(ctx context.Context, rolloutThreadiness, serviceT
// Start the informer factories to begin populating the informer caches
log.Info("Starting Controllers")

c.notificationConfigMapInformerFactory.Start(ctx.Done())
c.notificationSecretInformerFactory.Start(ctx.Done())
if ok := cache.WaitForCacheSync(ctx.Done(), c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for configmap/secret caches to sync, exiting")
}

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
c.dynamicInformerFactory.Start(ctx.Done())
Expand All @@ -479,29 +556,50 @@ func (c *Manager) startLeading(ctx context.Context, rolloutThreadiness, serviceT

c.jobInformerFactory.Start(ctx.Done())

// Check if Istio installed on cluster before starting dynamicInformerFactory
if istioutil.DoesIstioExist(c.istioPrimaryDynamicClient, c.namespace) {
c.istioDynamicInformerFactory.Start(ctx.Done())
}
if c.onlyAnalysisMode {
log.Info("Waiting for controller's informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.analysisRunSynced, c.analysisTemplateSynced, c.jobSynced); !ok {
log.Fatalf("failed to wait for caches to sync, exiting")
}

Check warning on line 563 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L560-L563

Added lines #L560 - L563 were not covered by tests
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
if c.namespace == metav1.NamespaceAll {
if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok {
log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting")
}

Check warning on line 568 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L565-L568

Added lines #L565 - L568 were not covered by tests
}
go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done())

Check warning on line 570 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L570

Added line #L570 was not covered by tests
} else {

// Wait for the caches to be synced before starting workers
log.Info("Waiting for controller's informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.serviceSynced, c.ingressSynced, c.jobSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced, c.replicasSetSynced, c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for caches to sync, exiting")
}
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
if c.namespace == metav1.NamespaceAll {
if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok {
log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting")
c.notificationConfigMapInformerFactory.Start(ctx.Done())
c.notificationSecretInformerFactory.Start(ctx.Done())
if ok := cache.WaitForCacheSync(ctx.Done(), c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for configmap/secret caches to sync, exiting")

Check warning on line 576 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L576

Added line #L576 was not covered by tests
}
}

go wait.Until(func() { c.wg.Add(1); c.rolloutController.Run(ctx, rolloutThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.serviceController.Run(ctx, serviceThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.ingressController.Run(ctx, ingressThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.experimentController.Run(ctx, experimentThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.notificationsController.Run(rolloutThreadiness, ctx.Done()); c.wg.Done() }, time.Second, ctx.Done())
// Check if Istio installed on cluster before starting dynamicInformerFactory
if istioutil.DoesIstioExist(c.istioPrimaryDynamicClient, c.namespace) {
c.istioDynamicInformerFactory.Start(ctx.Done())
}

// Wait for the caches to be synced before starting workers
log.Info("Waiting for controller's informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.serviceSynced, c.ingressSynced, c.jobSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced, c.replicasSetSynced, c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for caches to sync, exiting")
}

Check warning on line 588 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L587-L588

Added lines #L587 - L588 were not covered by tests
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
if c.namespace == metav1.NamespaceAll {
if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok {
log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting")
}

Check warning on line 593 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L592-L593

Added lines #L592 - L593 were not covered by tests
}

go wait.Until(func() { c.wg.Add(1); c.rolloutController.Run(ctx, rolloutThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.serviceController.Run(ctx, serviceThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.ingressController.Run(ctx, ingressThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.experimentController.Run(ctx, experimentThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.notificationsController.Run(rolloutThreadiness, ctx.Done()); c.wg.Done() }, time.Second, ctx.Done())

}
log.Info("Started controller")
}
8 changes: 6 additions & 2 deletions controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ func NewMetricsServer(cfg ServerConfig) *MetricsServer {

reg := prometheus.NewRegistry()

reg.MustRegister(NewRolloutCollector(cfg.RolloutLister))
if cfg.RolloutLister != nil {
reg.MustRegister(NewRolloutCollector(cfg.RolloutLister))
}
if cfg.ExperimentLister != nil {
reg.MustRegister(NewExperimentCollector(cfg.ExperimentLister))
}
reg.MustRegister(NewAnalysisRunCollector(cfg.AnalysisRunLister, cfg.AnalysisTemplateLister, cfg.ClusterAnalysisTemplateLister))
reg.MustRegister(NewExperimentCollector(cfg.ExperimentLister))
cfg.K8SRequestProvider.MustRegister(reg)
reg.MustRegister(MetricRolloutReconcile)
reg.MustRegister(MetricRolloutReconcileError)
Expand Down
22 changes: 12 additions & 10 deletions utils/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,22 @@ func (e *EventRecorderAdapter) defaultEventf(object runtime.Object, warn bool, o
e.RolloutEventCounter.WithLabelValues(namespace, name, opts.EventType, opts.EventReason).Inc()
}

apis, err := e.apiFactory.GetAPIsFromNamespace(namespace)
if err != nil {
logCtx.Errorf("notifications failed to get apis for eventReason %s with error: %s", opts.EventReason, err)
e.NotificationFailedCounter.WithLabelValues(namespace, name, opts.EventType, opts.EventReason).Inc()
}

for _, api := range apis {
err := e.sendNotifications(api, object, opts)
if e.apiFactory != nil {
apis, err := e.apiFactory.GetAPIsFromNamespace(namespace)
if err != nil {
logCtx.Errorf("Notifications failed to send for eventReason %s with error: %s", opts.EventReason, err)
logCtx.Errorf("notifications failed to get apis for eventReason %s with error: %s", opts.EventReason, err)
e.NotificationFailedCounter.WithLabelValues(namespace, name, opts.EventType, opts.EventReason).Inc()
}

Check warning on line 216 in utils/record/record.go

View check run for this annotation

Codecov / codecov/patch

utils/record/record.go#L214-L216

Added lines #L214 - L216 were not covered by tests

for _, api := range apis {
err := e.sendNotifications(api, object, opts)
if err != nil {
logCtx.Errorf("Notifications failed to send for eventReason %s with error: %s", opts.EventReason, err)
}

Check warning on line 222 in utils/record/record.go

View check run for this annotation

Codecov / codecov/patch

utils/record/record.go#L221-L222

Added lines #L221 - L222 were not covered by tests
}
}
}

}
logFn := logCtx.Infof
if warn {
logFn = logCtx.Warnf
Expand Down

0 comments on commit cf26d73

Please sign in to comment.