Skip to content

Commit

Permalink
feat: combine controllers to one deployment [Release Note] (#1753)
Browse files Browse the repository at this point in the history
* feat: combine controllers to one deployment

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Mar 31, 2022
1 parent 6ac8d04 commit 657d33c
Show file tree
Hide file tree
Showing 19 changed files with 204 additions and 696 deletions.
10 changes: 5 additions & 5 deletions cmd/commands/sensorcontroller.go → cmd/commands/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package commands
import (
"github.com/spf13/cobra"

sensorcmd "github.com/argoproj/argo-events/controllers/sensor/cmd"
controllercmd "github.com/argoproj/argo-events/controllers/cmd"
envpkg "github.com/argoproj/pkg/env"
)

func NewSensorControllerCommand() *cobra.Command {
func NewControllerCommand() *cobra.Command {
var (
namespaced bool
managedNamespace string
)

command := &cobra.Command{
Use: "sensor-controller",
Short: "Start a Sensor controller",
Use: "controller",
Short: "Start the controller",
Run: func(cmd *cobra.Command, args []string) {
sensorcmd.Start(namespaced, managedNamespace)
controllercmd.Start(namespaced, managedNamespace)
},
}
command.Flags().BoolVar(&namespaced, "namespaced", false, "Whether to run in namespaced scope, defaults to false.")
Expand Down
26 changes: 0 additions & 26 deletions cmd/commands/eventbuscontroller.go

This file was deleted.

26 changes: 0 additions & 26 deletions cmd/commands/eventsourcecontroller.go

This file was deleted.

4 changes: 1 addition & 3 deletions cmd/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ func Execute() {
}

func init() {
rootCmd.AddCommand(NewEventBusControllerCommand())
rootCmd.AddCommand(NewEventSourceControllerCommand())
rootCmd.AddCommand(NewSensorControllerCommand())
rootCmd.AddCommand(NewControllerCommand())
rootCmd.AddCommand(NewEventSourceCommand())
rootCmd.AddCommand(NewSensorCommand())
rootCmd.AddCommand(NewWebhookCommand())
Expand Down
170 changes: 170 additions & 0 deletions controllers/cmd/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package cmd

import (
"fmt"
"os"

"go.uber.org/zap"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

argoevents "github.com/argoproj/argo-events"
"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/controllers"
"github.com/argoproj/argo-events/controllers/eventbus"
"github.com/argoproj/argo-events/controllers/eventsource"
"github.com/argoproj/argo-events/controllers/sensor"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
eventsourcev1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

const (
imageEnvVar = "ARGO_EVENTS_IMAGE"
)

func Start(namespaced bool, managedNamespace string) {
logger := logging.NewArgoEventsLogger().Named(eventbus.ControllerName)
config, err := controllers.LoadConfig(func(err error) {
logger.Errorf("Failed to reload global configuration file", zap.Error(err))
})
if err != nil {
logger.Fatalw("Failed to load global configuration file", zap.Error(err))
}
imageName, defined := os.LookupEnv(imageEnvVar)
if !defined {
logger.Fatalf("required environment variable '%s' not defined", imageEnvVar)
}
opts := ctrl.Options{
MetricsBindAddress: fmt.Sprintf(":%d", common.ControllerMetricsPort),
HealthProbeBindAddress: ":8081",
}
if namespaced {
opts.Namespace = managedNamespace
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), opts)
if err != nil {
logger.Fatalw("unable to get a controller-runtime manager", zap.Error(err))
}

// Readyness probe
if err := mgr.AddReadyzCheck("readiness", healthz.Ping); err != nil {
logger.Fatalw("unable add a readiness check", zap.Error(err))
}

// Liveness probe
if err := mgr.AddHealthzCheck("liveness", healthz.Ping); err != nil {
logger.Fatalw("unable add a health check", zap.Error(err))
}

if err := eventbusv1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add scheme", zap.Error(err))
}

if err := eventsourcev1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add EventSource scheme", zap.Error(err))
}

if err := sensorv1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add Sensor scheme", zap.Error(err))
}

// EventBus controller
eventBusController, err := controller.New(eventbus.ControllerName, mgr, controller.Options{
Reconciler: eventbus.NewReconciler(mgr.GetClient(), mgr.GetScheme(), config, logger),
})
if err != nil {
logger.Fatalw("unable to set up EventBus controller", zap.Error(err))
}

// Watch EventBus and enqueue EventBus object key
if err := eventBusController.Watch(&source.Kind{Type: &eventbusv1alpha1.EventBus{}}, &handler.EnqueueRequestForObject{},
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.LabelChangedPredicate{},
)); err != nil {
logger.Fatalw("unable to watch EventBus", zap.Error(err))
}

// Watch ConfigMaps and enqueue owning EventBus key
if err := eventBusController.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventbusv1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch ConfigMaps", zap.Error(err))
}

// Watch Secrets and enqueue owning EventBus key
if err := eventBusController.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventbusv1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Secrets", zap.Error(err))
}

// Watch StatefulSets and enqueue owning EventBus key
if err := eventBusController.Watch(&source.Kind{Type: &appv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventbusv1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch StatefulSets", zap.Error(err))
}

// Watch Services and enqueue owning EventBus key
if err := eventBusController.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventbusv1alpha1.EventBus{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Services", zap.Error(err))
}

// EventSource controller
eventSourceController, err := controller.New(eventsource.ControllerName, mgr, controller.Options{
Reconciler: eventsource.NewReconciler(mgr.GetClient(), mgr.GetScheme(), imageName, logger),
})
if err != nil {
logger.Fatalw("unable to set up EventSource controller", zap.Error(err))
}

// Watch EventSource and enqueue EventSource object key
if err := eventSourceController.Watch(&source.Kind{Type: &eventsourcev1alpha1.EventSource{}}, &handler.EnqueueRequestForObject{},
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.LabelChangedPredicate{},
)); err != nil {
logger.Fatalw("unable to watch EventSources", zap.Error(err))
}

// Watch Deployments and enqueue owning EventSource key
if err := eventSourceController.Watch(&source.Kind{Type: &appv1.Deployment{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventsourcev1alpha1.EventSource{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Deployments", zap.Error(err))
}

// Watch Services and enqueue owning EventSource key
if err := eventSourceController.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventsourcev1alpha1.EventSource{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Services", zap.Error(err))
}

// Sensor controller
sensorController, err := controller.New(sensor.ControllerName, mgr, controller.Options{
Reconciler: sensor.NewReconciler(mgr.GetClient(), mgr.GetScheme(), imageName, logger),
})
if err != nil {
logger.Fatalw("unable to set up Sensor controller", zap.Error(err))
}

// Watch Sensor and enqueue Sensor object key
if err := sensorController.Watch(&source.Kind{Type: &sensorv1alpha1.Sensor{}}, &handler.EnqueueRequestForObject{},
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.LabelChangedPredicate{},
)); err != nil {
logger.Fatalw("unable to watch Sensors", zap.Error(err))
}

// Watch Deployments and enqueue owning Sensor key
if err := sensorController.Watch(&source.Kind{Type: &appv1.Deployment{}}, &handler.EnqueueRequestForOwner{OwnerType: &sensorv1alpha1.Sensor{}, IsController: true}, predicate.GenerationChangedPredicate{}); err != nil {
logger.Fatalw("unable to watch Deployments", zap.Error(err))
}

logger.Infow("starting controller manager", "version", argoevents.GetVersion())
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
logger.Fatalw("unable to run eventbus controller", zap.Error(err))
}
}
110 changes: 0 additions & 110 deletions controllers/eventbus/cmd/start.go

This file was deleted.

Loading

0 comments on commit 657d33c

Please sign in to comment.