diff --git a/.licenses-gomod.sha256 b/.licenses-gomod.sha256 index 868ca3e74e..be8333abf0 100644 --- a/.licenses-gomod.sha256 +++ b/.licenses-gomod.sha256 @@ -1 +1 @@ -100644 071ab373715c6372fbcf4212bc32597eef52773e go.mod +100644 56ef1577038b01bc456601ea415f8f3045c37372 go.mod diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 3dd7824645..a2c68fedda 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -23,42 +23,22 @@ import ( "log" "os" "strings" - "time" - - "go.uber.org/zap/zapcore" - ctrzap "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/webhook" "github.com/go-logr/zapr" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" + "go.uber.org/zap/zapcore" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/healthz" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "sigs.k8s.io/controller-runtime/pkg/predicate" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/collection" "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags" "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/kube" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatabaseuser" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatafederation" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdeployment" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasfederatedauth" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasproject" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlassearchindexconfig" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasstream" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/connectionsecret" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/watch" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/operator" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/version" ) @@ -72,219 +52,43 @@ const ( subobjectDeletionProtectionMessage = "Note: sub-object deletion protection is IGNORED because it does not work deterministically." ) -var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") -) - -func init() { - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(akov2.AddToScheme(scheme)) -} - func main() { - // controller-runtime/pkg/log/zap is a wrapper over zap that implements logr - // logr looks quite limited in functionality so we better use Zap directly. - // Though we still need the controller-runtime library and go-logr/zapr as they are used in controller-runtime - // logging - ctrzap.NewRaw(ctrzap.UseDevMode(true), ctrzap.StacktraceLevel(zap.ErrorLevel)) + akoScheme := runtime.NewScheme() + utilruntime.Must(scheme.AddToScheme(akoScheme)) + utilruntime.Must(akov2.AddToScheme(akoScheme)) + + ctx := ctrl.SetupSignalHandler() config := parseConfiguration() + logger, err := initCustomZapLogger(config.LogLevel, config.LogEncoder) if err != nil { fmt.Printf("error instantiating logger: %v\r\n", err) os.Exit(1) } - logger.Info("starting with configuration", zap.Any("config", config), zap.Any("version", version.Version)) - ctrl.SetLogger(zapr.NewLogger(logger)) - - syncPeriod := time.Hour * 3 - - var cacheFunc cache.NewCacheFunc - if len(config.WatchedNamespaces) > 0 { - var namespaces []string - for ns := range config.WatchedNamespaces { - namespaces = append(namespaces, ns) - } - cacheFunc = controller.MultiNamespacedCacheBuilder(namespaces) - } else { - cacheFunc = controller.CustomLabelSelectorCacheBuilder( - &corev1.Secret{}, - labels.SelectorFromSet(labels.Set{ - connectionsecret.TypeLabelKey: connectionsecret.CredLabelVal, - }), - ) - } - - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: scheme, - Metrics: metricsserver.Options{BindAddress: config.MetricsAddr}, - WebhookServer: webhook.NewServer(webhook.Options{ - Port: 9443, - }), - Cache: cache.Options{ - SyncPeriod: &syncPeriod, - }, - HealthProbeBindAddress: config.ProbeAddr, - LeaderElection: config.EnableLeaderElection, - LeaderElectionID: "06d035fb.mongodb.com", - NewCache: cacheFunc, - }) + setupLog := logger.Named("setup").Sugar() + setupLog.Info("starting with configuration", zap.Any("config", config), zap.Any("version", version.Version)) + + mgr, err := operator.NewBuilder(operator.ManagerProviderFunc(ctrl.NewManager), akoScheme). + WithConfig(ctrl.GetConfigOrDie()). + WithNamespaces(collection.Keys(config.WatchedNamespaces)...). + WithLogger(logger). + WithMetricAddress(config.MetricsAddr). + WithProbeAddress(config.ProbeAddr). + WithLeaderElection(config.EnableLeaderElection). + WithAtlasDomain(config.AtlasDomain). + WithAPISecret(config.GlobalAPISecret). + WithDeletionProtection(config.ObjectDeletionProtection). + Build(ctx) if err != nil { - setupLog.Error(err, "unable to start manager") - os.Exit(1) - } - - ctx := ctrl.SetupSignalHandler() - - // globalPredicates should be used for general controller Predicates - // that should be applied to all controllers in order to limit the - // resources they receive events for. - predicateNamespaces := controller.NamespacesOrAllPredicate(config.WatchedNamespaces) - globalPredicates := []predicate.Predicate{ - watch.CommonPredicates(), // ignore spurious changes. status changes etc. - watch.SelectNamespacesPredicate(predicateNamespaces), // select only desired namespaces - } - - atlasProvider := atlas.NewProductionProvider(config.AtlasDomain, config.GlobalAPISecret, mgr.GetClient()) - - if err := indexer.RegisterAll(ctx, mgr, logger); err != nil { - setupLog.Error(err, "unable to create indexers") - os.Exit(1) - } - - if err = (&atlasdeployment.AtlasDeploymentReconciler{ - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasDeployment").Sugar(), - Scheme: mgr.GetScheme(), - GlobalPredicates: globalPredicates, - EventRecorder: mgr.GetEventRecorderFor("AtlasDeployment"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasDeployment") - os.Exit(1) - } - - if err = (&atlasproject.AtlasProjectReconciler{ - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasProject").Sugar(), - Scheme: mgr.GetScheme(), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - GlobalPredicates: globalPredicates, - EventRecorder: mgr.GetEventRecorderFor("AtlasProject"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasProject") - os.Exit(1) - } - - if err = (&atlasdatabaseuser.AtlasDatabaseUserReconciler{ - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasDatabaseUser").Sugar(), - Scheme: mgr.GetScheme(), - EventRecorder: mgr.GetEventRecorderFor("AtlasDatabaseUser"), - AtlasProvider: atlasProvider, - GlobalPredicates: globalPredicates, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: false, - FeaturePreviewOIDCAuthEnabled: config.FeatureFlags.IsFeaturePresent(featureflags.FeatureOIDC), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasDatabaseUser") - os.Exit(1) - } - - if err = (&atlasdatafederation.AtlasDataFederationReconciler{ - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasDataFederation").Sugar(), - Scheme: mgr.GetScheme(), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - GlobalPredicates: globalPredicates, - EventRecorder: mgr.GetEventRecorderFor("AtlasDataFederation"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasDataFederation") - os.Exit(1) - } - - if err = (&atlasfederatedauth.AtlasFederatedAuthReconciler{ - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasFederatedAuth").Sugar(), - Scheme: mgr.GetScheme(), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - GlobalPredicates: globalPredicates, - EventRecorder: mgr.GetEventRecorderFor("AtlasFederatedAuth"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasFederatedAuth") - os.Exit(1) - } - - if err = (&atlasstream.AtlasStreamsInstanceReconciler{ - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor("AtlasStreamsInstance"), - GlobalPredicates: globalPredicates, - Log: logger.Named("controllers").Named("AtlasStreamsInstance").Sugar(), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(ctx, mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasStreamsInstance") - os.Exit(1) - } - - if err = (&atlasstream.AtlasStreamsConnectionReconciler{ - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor("AtlasStreamsConnection"), - GlobalPredicates: globalPredicates, - Log: logger.Named("controllers").Named("AtlasStreamsConnection").Sugar(), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(ctx, mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasStreamsConnection") - os.Exit(1) - } - - if err = (&atlassearchindexconfig.AtlasSearchIndexConfigReconciler{ - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor("AtlasSearchIndexConfig"), - GlobalPredicates: globalPredicates, - Log: logger.Named("controllers").Named("AtlasSearchIndexConfig").Sugar(), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(ctx, mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasSearchIndexConfig") - os.Exit(1) - } - - // +kubebuilder:scaffold:builder - - if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up health check") - os.Exit(1) - } - if err := mgr.AddReadyzCheck("check", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up ready check") + setupLog.Error(err, "unable to start operator") os.Exit(1) } setupLog.Info(subobjectDeletionProtectionMessage) setupLog.Info("starting manager") - if err := mgr.Start(ctx); err != nil { + if err = mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } @@ -308,7 +112,7 @@ type Config struct { func parseConfiguration() Config { var globalAPISecretName string config := Config{} - flag.StringVar(&config.AtlasDomain, "atlas-domain", "https://cloud.mongodb.com/", "the Atlas URL domain name (with slash in the end).") + flag.StringVar(&config.AtlasDomain, "atlas-domain", operator.DefaultAtlasDomain, "the Atlas URL domain name (with slash in the end).") flag.StringVar(&config.MetricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&config.ProbeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.StringVar(&globalAPISecretName, "global-api-secret-name", "", "The name of the Secret that contains Atlas API keys. "+ diff --git a/go.mod b/go.mod index 071ab37371..56ef157703 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.1 github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect diff --git a/internal/collection/collection.go b/internal/collection/collection.go index a11497370c..8f43235aef 100644 --- a/internal/collection/collection.go +++ b/internal/collection/collection.go @@ -11,3 +11,13 @@ func CopyWithSkip[T comparable](list []T, skip T) []T { return newList } + +func Keys[K comparable, V any](m map[K]V) []K { + s := make([]K, 0, len(m)) + + for k := range m { + s = append(s, k) + } + + return s +} diff --git a/pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go b/pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go index 6dfea63f40..3fa65aa745 100644 --- a/pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go +++ b/pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go @@ -29,11 +29,12 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/connectionsecret" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" @@ -260,3 +261,24 @@ func (r *AtlasDatabaseUserReconciler) SetupWithManager(mgr ctrl.Manager) error { Watches(&corev1.Secret{}, watch.NewSecretHandler(&r.DeprecatedResourceWatcher)). Complete(r) } + +func NewAtlasDatabaseUserReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + featureFlags *featureflags.FeatureFlags, + logger *zap.Logger, +) *AtlasDatabaseUserReconciler { + return &AtlasDatabaseUserReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasDatabaseUser"), + DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasDatabaseUser").Sugar(), + AtlasProvider: atlasProvider, + ObjectDeletionProtection: deletionProtection, + FeaturePreviewOIDCAuthEnabled: featureFlags.IsFeaturePresent(featureflags.FeatureOIDC), + } +} diff --git a/pkg/controller/atlasdatafederation/datafederation_controller.go b/pkg/controller/atlasdatafederation/datafederation_controller.go index 3ba8920463..34d0b2cab0 100644 --- a/pkg/controller/atlasdatafederation/datafederation_controller.go +++ b/pkg/controller/atlasdatafederation/datafederation_controller.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" + "sigs.k8s.io/controller-runtime/pkg/manager" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/event" @@ -203,6 +205,25 @@ func (r *AtlasDataFederationReconciler) SetupWithManager(mgr ctrl.Manager) error Complete(r) } +func NewAtlasDataFederationReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + logger *zap.Logger, +) *AtlasDataFederationReconciler { + return &AtlasDataFederationReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasDataFederation"), + DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasDataFederation").Sugar(), + AtlasProvider: atlasProvider, + ObjectDeletionProtection: deletionProtection, + } +} + // Delete implements a handler for the Delete event func (r *AtlasDataFederationReconciler) Delete(ctx context.Context, e event.DeleteEvent) error { dataFederation, ok := e.Object.(*akov2.AtlasDataFederation) diff --git a/pkg/controller/atlasdeployment/atlasdeployment_controller.go b/pkg/controller/atlasdeployment/atlasdeployment_controller.go index 19ba736c82..ab5bd115e8 100644 --- a/pkg/controller/atlasdeployment/atlasdeployment_controller.go +++ b/pkg/controller/atlasdeployment/atlasdeployment_controller.go @@ -22,6 +22,8 @@ import ( "fmt" "strings" + "sigs.k8s.io/controller-runtime/pkg/manager" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" "k8s.io/apimachinery/pkg/fields" @@ -481,6 +483,24 @@ func (r *AtlasDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +func NewAtlasDeploymentReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + logger *zap.Logger, +) *AtlasDeploymentReconciler { + return &AtlasDeploymentReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasDeployment"), + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasDeployment").Sugar(), + AtlasProvider: atlasProvider, + ObjectDeletionProtection: deletionProtection, + } +} + func (r *AtlasDeploymentReconciler) findDeploymentsForBackupPolicy(ctx context.Context, obj client.Object) []reconcile.Request { backupPolicy, ok := obj.(*akov2.AtlasBackupPolicy) if !ok { diff --git a/pkg/controller/atlasfederatedauth/atlasfederated_auth_controller.go b/pkg/controller/atlasfederatedauth/atlasfederated_auth_controller.go index 704ec3c028..4f53c8a76a 100644 --- a/pkg/controller/atlasfederatedauth/atlasfederated_auth_controller.go +++ b/pkg/controller/atlasfederatedauth/atlasfederated_auth_controller.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/manager" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -105,6 +107,25 @@ func (r *AtlasFederatedAuthReconciler) SetupWithManager(mgr ctrl.Manager) error Complete(r) } +func NewAtlasFederatedAuthReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + logger *zap.Logger, +) *AtlasFederatedAuthReconciler { + return &AtlasFederatedAuthReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasFederatedAuth"), + DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasFederatedAuth").Sugar(), + AtlasProvider: atlasProvider, + ObjectDeletionProtection: deletionProtection, + } +} + func setCondition(ctx *workflow.Context, condition api.ConditionType, result workflow.Result) { ctx.SetConditionFromResult(condition, result) logIfWarning(ctx, result) diff --git a/pkg/controller/atlasproject/atlasproject_controller.go b/pkg/controller/atlasproject/atlasproject_controller.go index 8a72d57807..4e60d25b18 100644 --- a/pkg/controller/atlasproject/atlasproject_controller.go +++ b/pkg/controller/atlasproject/atlasproject_controller.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" + "sigs.k8s.io/controller-runtime/pkg/manager" + "go.mongodb.org/atlas/mongodbatlas" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -380,6 +382,25 @@ func (r *AtlasProjectReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +func NewAtlasProjectReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + logger *zap.Logger, +) *AtlasProjectReconciler { + return &AtlasProjectReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasProject"), + DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasProject").Sugar(), + AtlasProvider: atlasProvider, + ObjectDeletionProtection: deletionProtection, + } +} + // setCondition sets the condition from the result and logs the warnings func setCondition(ctx *workflow.Context, condition api.ConditionType, result workflow.Result) { ctx.SetConditionFromResult(condition, result) diff --git a/pkg/controller/atlassearchindexconfig/atlassearchindexconfig_controller.go b/pkg/controller/atlassearchindexconfig/atlassearchindexconfig_controller.go index 19d97274f1..9132e6cc34 100644 --- a/pkg/controller/atlassearchindexconfig/atlassearchindexconfig_controller.go +++ b/pkg/controller/atlassearchindexconfig/atlassearchindexconfig_controller.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/manager" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" @@ -92,7 +94,7 @@ func (r *AtlasSearchIndexConfigReconciler) Reconcile(ctx context.Context, req ct return r.release(workflowCtx, atlasSearchIndexConfig) } -func (r *AtlasSearchIndexConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (r *AtlasSearchIndexConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named("AtlasSearchIndexConfig"). For(&akov2.AtlasSearchIndexConfig{}, builder.WithPredicates(r.GlobalPredicates...)). @@ -104,6 +106,24 @@ func (r *AtlasSearchIndexConfigReconciler) SetupWithManager(ctx context.Context, Complete(r) } +func NewAtlasSearchIndexConfigReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + logger *zap.Logger, +) *AtlasSearchIndexConfigReconciler { + return &AtlasSearchIndexConfigReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasSearchIndexConfig"), + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasSearchIndexConfig").Sugar(), + AtlasProvider: atlasProvider, + ObjectDeletionProtection: deletionProtection, + } +} + func (r *AtlasSearchIndexConfigReconciler) findReferencesInAtlasDeployments(ctx context.Context, obj client.Object) []reconcile.Request { deployment, ok := obj.(*akov2.AtlasDeployment) if !ok { diff --git a/pkg/controller/atlasstream/atlasstream_connection_controller.go b/pkg/controller/atlasstream/atlasstream_connection_controller.go index f11657570d..ba58e77a65 100644 --- a/pkg/controller/atlasstream/atlasstream_connection_controller.go +++ b/pkg/controller/atlasstream/atlasstream_connection_controller.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/manager" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" @@ -92,7 +94,7 @@ func (r *AtlasStreamsConnectionReconciler) ensureAtlasStreamConnection(ctx conte return r.release(workflowCtx, akoStreamConnection) } -func (r *AtlasStreamsConnectionReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (r *AtlasStreamsConnectionReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named("AtlasStreamConnection"). For(&akov2.AtlasStreamConnection{}, builder.WithPredicates(r.GlobalPredicates...)). @@ -104,6 +106,24 @@ func (r *AtlasStreamsConnectionReconciler) SetupWithManager(ctx context.Context, Complete(r) } +func NewAtlasStreamsConnectionReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + logger *zap.Logger, +) *AtlasStreamsConnectionReconciler { + return &AtlasStreamsConnectionReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasStreamsConnection"), + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasStreamsConnection").Sugar(), + AtlasProvider: atlasProvider, + ObjectDeletionProtection: deletionProtection, + } +} + func (r *AtlasStreamsConnectionReconciler) findStreamConnectionsForStreamInstances(_ context.Context, obj client.Object) []reconcile.Request { streamInstance, ok := obj.(*akov2.AtlasStreamInstance) if !ok { diff --git a/pkg/controller/atlasstream/atlasstream_instance_controller.go b/pkg/controller/atlasstream/atlasstream_instance_controller.go index 9d1afb50cc..e2fdd4212f 100644 --- a/pkg/controller/atlasstream/atlasstream_instance_controller.go +++ b/pkg/controller/atlasstream/atlasstream_instance_controller.go @@ -14,6 +14,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -133,7 +134,7 @@ func hasChanged(streamInstance *akov2.AtlasStreamInstance, atlasStreamInstance * return config.Provider != dataProcessRegion.GetCloudProvider() || config.Region != dataProcessRegion.GetRegion() } -func (r *AtlasStreamsInstanceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (r *AtlasStreamsInstanceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). Named("AtlasStreamInstance"). For(&akov2.AtlasStreamInstance{}, builder.WithPredicates(r.GlobalPredicates...)). @@ -150,6 +151,24 @@ func (r *AtlasStreamsInstanceReconciler) SetupWithManager(ctx context.Context, m Complete(r) } +func NewAtlasStreamsInstanceReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + logger *zap.Logger, +) *AtlasStreamsInstanceReconciler { + return &AtlasStreamsInstanceReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasStreamsInstance"), + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasStreamsInstance").Sugar(), + AtlasProvider: atlasProvider, + ObjectDeletionProtection: deletionProtection, + } +} + func (r *AtlasStreamsInstanceReconciler) findStreamInstancesForStreamConnection(ctx context.Context, obj client.Object) []reconcile.Request { streamConnection, ok := obj.(*akov2.AtlasStreamConnection) if !ok { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go deleted file mode 100644 index 1d095de7a5..0000000000 --- a/pkg/controller/controller.go +++ /dev/null @@ -1,38 +0,0 @@ -package controller - -import ( - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// MultiNamespacedCacheBuilder returns a manager cache builder for a list of namespaces -func MultiNamespacedCacheBuilder(namespaces []string) cache.NewCacheFunc { - return func(config *rest.Config, opts cache.Options) (cache.Cache, error) { - opts.DefaultNamespaces = map[string]cache.Config{} - for _, ns := range namespaces { - opts.DefaultNamespaces[ns] = cache.Config{} - } - return cache.New(config, opts) - } -} - -// CustomLabelSelectorCacheBuilder returns a manager cache builder for a custom label selector -func CustomLabelSelectorCacheBuilder(obj client.Object, labelsSelector labels.Selector) cache.NewCacheFunc { - return func(config *rest.Config, opts cache.Options) (cache.Cache, error) { - if opts.ByObject == nil { - opts.ByObject = map[client.Object]cache.ByObject{} - } - opts.ByObject[obj] = cache.ByObject{Label: labelsSelector} - return cache.New(config, opts) - } -} - -func NamespacesOrAllPredicate(namespaceMap map[string]bool) map[string]bool { - if len(namespaceMap) > 0 { - return namespaceMap - } - // if no namespaces where specified it must check all namespaces - return map[string]bool{cache.AllNamespaces: true} -} diff --git a/pkg/controller/watch/predicates.go b/pkg/controller/watch/predicates.go index 8d73bb4a72..4228debfe8 100644 --- a/pkg/controller/watch/predicates.go +++ b/pkg/controller/watch/predicates.go @@ -21,29 +21,16 @@ func CommonPredicates() predicate.Funcs { } } -// DeleteOnly returns a predicate that will filter out everything except the Delete event -func DeleteOnly() predicate.Funcs { - return predicate.Funcs{ - CreateFunc: func(ce event.CreateEvent) bool { - return false - }, - UpdateFunc: func(ce event.UpdateEvent) bool { - return false - }, - GenericFunc: func(ce event.GenericEvent) bool { - return false - }, - } -} - -func SelectNamespacesPredicate(namespaceMap map[string]bool) predicate.Funcs { +func SelectNamespacesPredicate(namespaces []string) predicate.Funcs { return predicate.NewPredicateFuncs(func(object client.Object) bool { - if _, ok := namespaceMap[""]; ok { + if len(namespaces) == 0 { return true } - if _, ok := namespaceMap[object.GetNamespace()]; ok { - return true + for _, ns := range namespaces { + if object.GetNamespace() == ns { + return true + } } return false diff --git a/pkg/controller/watch/predicates_test.go b/pkg/controller/watch/predicates_test.go new file mode 100644 index 0000000000..2e9e07b165 --- /dev/null +++ b/pkg/controller/watch/predicates_test.go @@ -0,0 +1,57 @@ +package watch + +import ( + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/event" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" +) + +func TestSelectNamespacesPredicate(t *testing.T) { + tests := map[string]struct { + namespaces []string + createEvent event.CreateEvent + updateEvent event.UpdateEvent + deleteEvent event.DeleteEvent + genericEvent event.GenericEvent + expect bool + }{ + "should return true when there are no namespace to filter": { + namespaces: []string{}, + createEvent: event.CreateEvent{}, + updateEvent: event.UpdateEvent{}, + deleteEvent: event.DeleteEvent{}, + genericEvent: event.GenericEvent{}, + expect: true, + }, + "should return true when matching namespace": { + namespaces: []string{"test"}, + createEvent: event.CreateEvent{Object: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}}}, + updateEvent: event.UpdateEvent{ObjectNew: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}}}, + deleteEvent: event.DeleteEvent{Object: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}}}, + genericEvent: event.GenericEvent{Object: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}}}, + expect: true, + }, + "should return false when not matching namespace": { + namespaces: []string{"other"}, + createEvent: event.CreateEvent{Object: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}}}, + updateEvent: event.UpdateEvent{ObjectNew: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}}}, + deleteEvent: event.DeleteEvent{Object: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}}}, + genericEvent: event.GenericEvent{Object: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}}}, + expect: false, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + f := SelectNamespacesPredicate(tt.namespaces) + assert.Equal(t, tt.expect, f.CreateFunc(tt.createEvent)) + assert.Equal(t, tt.expect, f.UpdateFunc(tt.updateEvent)) + assert.Equal(t, tt.expect, f.DeleteFunc(tt.deleteEvent)) + assert.Equal(t, tt.expect, f.GenericFunc(tt.genericEvent)) + }) + } +} diff --git a/pkg/operator/builder.go b/pkg/operator/builder.go new file mode 100644 index 0000000000..f3e94f935f --- /dev/null +++ b/pkg/operator/builder.go @@ -0,0 +1,337 @@ +package operator + +import ( + "context" + "fmt" + "os" + "time" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/healthz" + ctrzap "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/webhook" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatabaseuser" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatafederation" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdeployment" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasfederatedauth" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasproject" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlassearchindexconfig" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasstream" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/connectionsecret" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/watch" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer" +) + +const ( + DefaultAtlasDomain = "https://cloud.mongodb.com/" + DefaultSyncPeriod = 3 * time.Hour + DefaultLeaderElectionID = "06d035fb.mongodb.com" +) + +type ManagerProvider interface { + New(config *rest.Config, options manager.Options) (manager.Manager, error) +} + +type ManagerProviderFunc func(config *rest.Config, options manager.Options) (manager.Manager, error) + +func (f ManagerProviderFunc) New(config *rest.Config, options manager.Options) (manager.Manager, error) { + return f(config, options) +} + +type Builder struct { + managerProvider ManagerProvider + scheme *runtime.Scheme + + config *rest.Config + namespaces []string + logger *zap.Logger + syncPeriod time.Duration + metricAddress string + probeAddress string + leaderElection bool + leaderElectionID string + + atlasDomain string + predicates []predicate.Predicate + apiSecret client.ObjectKey + atlasProvider atlas.Provider + featureFlags *featureflags.FeatureFlags + deletionProtection bool +} + +func (b *Builder) WithConfig(config *rest.Config) *Builder { + b.config = config + return b +} + +func (b *Builder) WithNamespaces(namespaces ...string) *Builder { + b.namespaces = namespaces + return b +} + +func (b *Builder) WithLogger(logger *zap.Logger) *Builder { + b.logger = logger + return b +} + +func (b *Builder) WithSyncPeriod(period time.Duration) *Builder { + b.syncPeriod = period + return b +} + +func (b *Builder) WithMetricAddress(address string) *Builder { + b.metricAddress = address + return b +} + +func (b *Builder) WithProbeAddress(address string) *Builder { + b.probeAddress = address + return b +} + +func (b *Builder) WithLeaderElection(enable bool) *Builder { + b.leaderElection = enable + return b +} + +func (b *Builder) WithAtlasDomain(domain string) *Builder { + b.atlasDomain = domain + return b +} + +func (b *Builder) WithPredicates(predicates []predicate.Predicate) *Builder { + b.predicates = predicates + return b +} + +func (b *Builder) WithAPISecret(apiSecret client.ObjectKey) *Builder { + b.apiSecret = apiSecret + return b +} + +func (b *Builder) WithAtlasProvider(provider atlas.Provider) *Builder { + b.atlasProvider = provider + return b +} + +func (b *Builder) WithFeatureFlags(featureFlags *featureflags.FeatureFlags) *Builder { + b.featureFlags = featureFlags + return b +} + +func (b *Builder) WithDeletionProtection(deletionProtection bool) *Builder { + b.deletionProtection = deletionProtection + return b +} + +// Build builds the controller manager and configure operator controllers +func (b *Builder) Build(ctx context.Context) (manager.Manager, error) { + mergeDefaults(b) + + cacheOpts := cache.Options{ + SyncPeriod: &b.syncPeriod, + } + + if len(b.namespaces) == 0 { + cacheOpts.ByObject = map[client.Object]cache.ByObject{ + &corev1.Secret{}: { + Label: labels.SelectorFromSet(labels.Set{ + connectionsecret.TypeLabelKey: connectionsecret.CredLabelVal, + }), + }, + } + } else { + cacheOpts.DefaultNamespaces = map[string]cache.Config{} + for _, namespace := range b.namespaces { + cacheOpts.DefaultNamespaces[namespace] = cache.Config{} + } + } + + mgr, err := b.managerProvider.New( + b.config, + ctrl.Options{ + Scheme: b.scheme, + Metrics: metricsserver.Options{BindAddress: b.metricAddress}, + WebhookServer: webhook.NewServer(webhook.Options{ + Port: 9443, + }), + Cache: cacheOpts, + HealthProbeBindAddress: b.probeAddress, + LeaderElection: b.leaderElection, + LeaderElectionID: b.leaderElectionID, + }, + ) + + if err != nil { + return nil, err + } + + if err = mgr.AddHealthzCheck("health", healthz.Ping); err != nil { + return nil, err + } + + if err = mgr.AddReadyzCheck("check", healthz.Ping); err != nil { + return nil, err + } + + if err = indexer.RegisterAll(ctx, mgr, b.logger); err != nil { + return nil, fmt.Errorf("unable to create indexers: %w", err) + } + + if b.atlasProvider == nil { + b.atlasProvider = atlas.NewProductionProvider(b.atlasDomain, b.apiSecret, mgr.GetClient()) + } + + projectReconciler := atlasproject.NewAtlasProjectReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.logger, + ) + if err = projectReconciler.SetupWithManager(mgr); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasProject: %w", err) + } + + deploymentReconciler := atlasdeployment.NewAtlasDeploymentReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.logger, + ) + if err = deploymentReconciler.SetupWithManager(mgr); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasDeployment: %w", err) + } + + dbUserReconciler := atlasdatabaseuser.NewAtlasDatabaseUserReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.featureFlags, + b.logger, + ) + if err = dbUserReconciler.SetupWithManager(mgr); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasDatabaseUser: %w", err) + } + + dataFedReconciler := atlasdatafederation.NewAtlasDataFederationReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.logger, + ) + if err = dataFedReconciler.SetupWithManager(mgr); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasDataFederation: %w", err) + } + + fedAuthReconciler := atlasfederatedauth.NewAtlasFederatedAuthReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.logger, + ) + if err = fedAuthReconciler.SetupWithManager(mgr); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasFederatedAuth: %w", err) + } + + streamsInstanceReconiler := atlasstream.NewAtlasStreamsInstanceReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.logger, + ) + if err = streamsInstanceReconiler.SetupWithManager(mgr); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasStreamsInstance: %w", err) + } + + streamsConnReconciler := atlasstream.NewAtlasStreamsConnectionReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.logger, + ) + if err = streamsConnReconciler.SetupWithManager(mgr); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasStreamsConnection: %w", err) + } + + searchINdexReconciler := atlassearchindexconfig.NewAtlasSearchIndexConfigReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.logger, + ) + if err = searchINdexReconciler.SetupWithManager(mgr); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasSearchIndexConfig: %w", err) + } + + return mgr, nil +} + +// NewBuilder return a new Builder to construct operator controllers +func NewBuilder(provider ManagerProvider, scheme *runtime.Scheme) *Builder { + return &Builder{ + managerProvider: provider, + scheme: scheme, + } +} + +func mergeDefaults(b *Builder) { + if b.config == nil { + b.config = &rest.Config{} + } + + if b.logger == nil { + b.logger = ctrzap.NewRaw(ctrzap.UseDevMode(true), ctrzap.StacktraceLevel(zap.ErrorLevel)) + } + + if b.syncPeriod == 0 { + b.syncPeriod = DefaultSyncPeriod + } + + if b.metricAddress == "" { + b.metricAddress = "0" + } + + if b.probeAddress == "" { + b.probeAddress = "0" + } + + if b.leaderElection { + b.leaderElectionID = DefaultLeaderElectionID + } + + if b.atlasDomain == "" { + b.atlasDomain = DefaultAtlasDomain + } + + if len(b.predicates) == 0 { + b.predicates = []predicate.Predicate{ + watch.CommonPredicates(), + watch.SelectNamespacesPredicate(b.namespaces), + } + } + + if b.featureFlags == nil { + b.featureFlags = featureflags.NewFeatureFlags(os.Environ) + } +} diff --git a/pkg/operator/builder_test.go b/pkg/operator/builder_test.go new file mode 100644 index 0000000000..df22726ad9 --- /dev/null +++ b/pkg/operator/builder_test.go @@ -0,0 +1,155 @@ +package operator + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/config" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/mocks/atlas" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" +) + +type managerMock struct { + ManagerProvider + manager.Manager + client.FieldIndexer + + client client.Client + scheme *runtime.Scheme + + gotHealthzCheck string + gotReadyzCheck string + + opts ctrl.Options +} + +func (m *managerMock) GetCache() cache.Cache { + return &informertest.FakeInformers{} +} + +func (m *managerMock) Add(runnable manager.Runnable) error { + return nil +} + +func (m *managerMock) GetLogger() logr.Logger { + return logr.Logger{} +} + +func (m *managerMock) GetControllerOptions() config.Controller { + return config.Controller{} +} + +func (m *managerMock) GetScheme() *runtime.Scheme { + return m.scheme +} + +func (m *managerMock) GetEventRecorderFor(name string) record.EventRecorder { + return record.NewFakeRecorder(100) +} + +func (m *managerMock) GetClient() client.Client { + return m.client +} + +func (m *managerMock) GetFieldIndexer() client.FieldIndexer { + return &informertest.FakeInformers{} +} + +func (m *managerMock) New(config *rest.Config, options manager.Options) (manager.Manager, error) { + m.opts = options + m.scheme = options.Scheme + m.client = fake.NewClientBuilder(). + WithScheme(options.Scheme). + Build() + + return m, nil +} + +func (m *managerMock) AddHealthzCheck(name string, check healthz.Checker) error { + m.gotHealthzCheck = name + return nil +} + +func (m *managerMock) AddReadyzCheck(name string, check healthz.Checker) error { + m.gotReadyzCheck = name + return nil +} + +func TestBuildManager(t *testing.T) { + tests := map[string]struct { + configure func(b *Builder) + expectedSyncPeriod time.Duration + expectedClusterWideCache bool + expectedNamespacedCache bool + }{ + "should build the manager with default values": { + configure: func(b *Builder) {}, + expectedSyncPeriod: DefaultSyncPeriod, + expectedClusterWideCache: true, + expectedNamespacedCache: false, + }, + "should build the manager with namespace config": { + configure: func(b *Builder) { + b.WithNamespaces("ns1", "ns2") + }, + expectedSyncPeriod: DefaultSyncPeriod, + expectedClusterWideCache: false, + expectedNamespacedCache: true, + }, + "should build the manager with custom config": { + configure: func(b *Builder) { + b.WithConfig(&rest.Config{}). + WithNamespaces("ns1"). + WithLogger(zaptest.NewLogger(t)). + WithSyncPeriod(time.Hour). + WithMetricAddress(":9090"). + WithProbeAddress(":9091"). + WithLeaderElection(true). + WithAtlasDomain("https://cloud-qa.mongodb.com"). + WithPredicates([]predicate.Predicate{predicate.GenerationChangedPredicate{}}). + WithAPISecret(client.ObjectKey{Namespace: "ns1", Name: "creds"}). + WithAtlasProvider(&atlas.TestProvider{}). + WithFeatureFlags(featureflags.NewFeatureFlags(func() []string { return nil })). + WithDeletionProtection(true) + }, + expectedSyncPeriod: time.Hour, + expectedClusterWideCache: false, + expectedNamespacedCache: true, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + akoScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(akoScheme)) + + mgrMock := &managerMock{} + builder := NewBuilder(mgrMock, akoScheme) + tt.configure(builder) + _, err := builder.Build(context.Background()) + require.NoError(t, err) + + assert.Equal(t, tt.expectedSyncPeriod, *mgrMock.opts.Cache.SyncPeriod) + assert.Equal(t, tt.expectedClusterWideCache, len(mgrMock.opts.Cache.ByObject) > 0) + assert.Equal(t, tt.expectedNamespacedCache, len(mgrMock.opts.Cache.DefaultNamespaces) > 0) + }) + } +} diff --git a/test/e2e/db_users_oidc_test.go b/test/e2e/db_users_oidc_test.go index 9b57e4f407..a293413659 100644 --- a/test/e2e/db_users_oidc_test.go +++ b/test/e2e/db_users_oidc_test.go @@ -73,7 +73,8 @@ var _ = Describe("Operator to run db-user with the OIDC feature flags", Ordered, Namespace: config.DefaultOperatorNS, Name: config.DefaultOperatorGlobalKey, }, - FeatureFlags: featureflags.NewFeatureFlags(func() []string { return []string{} }), + WatchedNamespaces: map[string]bool{config.DefaultOperatorNS: true}, + FeatureFlags: featureflags.NewFeatureFlags(func() []string { return []string{} }), }) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/db_users_test.go b/test/e2e/db_users_test.go index 5bc2e7357b..1d64895943 100644 --- a/test/e2e/db_users_test.go +++ b/test/e2e/db_users_test.go @@ -85,6 +85,10 @@ var _ = Describe("Operator watch all namespace should create connection secrets Namespace: config.DefaultOperatorNS, Name: config.DefaultOperatorGlobalKey, }, + WatchedNamespaces: map[string]bool{ + config.DefaultOperatorNS: true, + secondNamespace: true, + }, FeatureFlags: featureflags.NewFeatureFlags(func() []string { return []string{} }), }) Expect(err).NotTo(HaveOccurred()) diff --git a/test/helper/e2e/k8s/operator.go b/test/helper/e2e/k8s/operator.go index e3b6536085..6283d14d89 100644 --- a/test/helper/e2e/k8s/operator.go +++ b/test/helper/e2e/k8s/operator.go @@ -4,40 +4,24 @@ import ( "context" "os" "sync" - "time" "github.com/go-logr/zapr" . "github.com/onsi/ginkgo/v2" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/healthz" - ctrzap "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/webhook" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/collection" "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatabaseuser" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatafederation" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdeployment" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasproject" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/connectionsecret" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/watch" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/operator" ) var ( @@ -46,162 +30,43 @@ var ( ) func BuildManager(initCfg *Config) (manager.Manager, error) { - scheme := runtime.NewScheme() - setupLog := ctrl.Log.WithName("setup") + akoScheme := runtime.NewScheme() + utilruntime.Must(scheme.AddToScheme(akoScheme)) + utilruntime.Must(akov2.AddToScheme(akoScheme)) - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(akov2.AddToScheme(scheme)) - - ctrzap.NewRaw(ctrzap.UseDevMode(true), ctrzap.StacktraceLevel(zap.ErrorLevel)) config := mergeConfiguration(initCfg) - logger := zaptest.NewLogger( GinkgoT(), zaptest.WrapOptions( zap.ErrorOutput(zapcore.Lock(zapcore.AddSync(GinkgoWriter))), ), ) - - logger.Info("starting with configuration", zap.Any("config", config)) - ctrl.SetLogger(zapr.NewLogger(logger)) - - syncPeriod := time.Hour * 3 - - logger.Info("starting manager", zap.Any("config", config)) - - var cacheFunc cache.NewCacheFunc - if len(config.WatchedNamespaces) > 0 { - var namespaces []string - for ns := range config.WatchedNamespaces { - namespaces = append(namespaces, ns) - } - cacheFunc = controller.MultiNamespacedCacheBuilder(namespaces) - } else { - cacheFunc = controller.CustomLabelSelectorCacheBuilder( - &corev1.Secret{}, - labels.SelectorFromSet(labels.Set{ - connectionsecret.TypeLabelKey: connectionsecret.CredLabelVal, - }), - ) - } - - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: scheme, - Metrics: metricsserver.Options{BindAddress: config.MetricsAddr}, - WebhookServer: webhook.NewServer(webhook.Options{ - Port: 9443, - }), - Cache: cache.Options{ - SyncPeriod: &syncPeriod, - }, - HealthProbeBindAddress: config.ProbeAddr, - LeaderElection: config.EnableLeaderElection, - LeaderElectionID: "06d035fb.mongodb.com", - NewCache: cacheFunc, - }) - if err != nil { - setupLog.Error(err, "unable to start manager") - return nil, err - } - - // globalPredicates should be used for general controller Predicates - // that should be applied to all controllers in order to limit the - // resources they receive events for. - predicateNamespaces := controller.NamespacesOrAllPredicate(config.WatchedNamespaces) - globalPredicates := []predicate.Predicate{ - watch.CommonPredicates(), // ignore spurious changes. status changes etc. - watch.SelectNamespacesPredicate(predicateNamespaces), // select only desired namespaces - } - - atlasProvider := atlas.NewProductionProvider(config.AtlasDomain, config.GlobalAPISecret, mgr.GetClient()) + setupLog := logger.Named("setup").Sugar() + setupLog.Info("starting with configuration", zap.Any("config", *config)) // Ensure all concurrent managers configured per test share a single exit signal handler setupSignalHandlerOnce.Do(func() { signalCancelledCtx = ctrl.SetupSignalHandler() }) - if err := indexer.RegisterAll(signalCancelledCtx, mgr, logger); err != nil { - setupLog.Error(err, "unable to create indexers") - return nil, err - } - - if err = (&atlasdeployment.AtlasDeploymentReconciler{ - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasDeployment").Sugar(), - Scheme: mgr.GetScheme(), - GlobalPredicates: globalPredicates, - EventRecorder: mgr.GetEventRecorderFor("AtlasDeployment"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: config.SubObjectDeletionProtection, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasDeployment") - return nil, err - } - - if err = (&atlasproject.AtlasProjectReconciler{ - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasProject").Sugar(), - Scheme: mgr.GetScheme(), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - GlobalPredicates: globalPredicates, - EventRecorder: mgr.GetEventRecorderFor("AtlasProject"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: config.SubObjectDeletionProtection, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasProject") - return nil, err - } - - if err = (&atlasdatabaseuser.AtlasDatabaseUserReconciler{ - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasDatabaseUser").Sugar(), - Scheme: mgr.GetScheme(), - EventRecorder: mgr.GetEventRecorderFor("AtlasDatabaseUser"), - AtlasProvider: atlasProvider, - GlobalPredicates: globalPredicates, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: config.SubObjectDeletionProtection, - FeaturePreviewOIDCAuthEnabled: config.FeatureFlags.IsFeaturePresent(featureflags.FeatureOIDC), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasDatabaseUser") - return nil, err - } - - if err = (&atlasdatafederation.AtlasDataFederationReconciler{ - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasDataFederation").Sugar(), - Scheme: mgr.GetScheme(), - EventRecorder: mgr.GetEventRecorderFor("AtlasDataFederation"), - AtlasProvider: atlasProvider, - GlobalPredicates: globalPredicates, - ObjectDeletionProtection: config.ObjectDeletionProtection, - SubObjectDeletionProtection: config.SubObjectDeletionProtection, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "AtlasDataFederation") - return nil, err - } - - if err = mgr.AddHealthzCheck("health", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up health check") - return nil, err - } - if err = mgr.AddReadyzCheck("check", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up ready check") - return nil, err - } - return mgr, nil + return operator.NewBuilder(operator.ManagerProviderFunc(ctrl.NewManager), akoScheme). + WithConfig(ctrl.GetConfigOrDie()). + WithNamespaces(collection.Keys(config.WatchedNamespaces)...). + WithLogger(logger). + WithMetricAddress(config.MetricsAddr). + WithProbeAddress(config.ProbeAddr). + WithLeaderElection(config.EnableLeaderElection). + WithAtlasDomain(config.AtlasDomain). + WithAPISecret(config.GlobalAPISecret). + WithDeletionProtection(config.ObjectDeletionProtection). + Build(signalCancelledCtx) } type Config struct { AtlasDomain string EnableLeaderElection bool MetricsAddr string - Namespace string WatchedNamespaces map[string]bool ProbeAddr string GlobalAPISecret client.ObjectKey diff --git a/test/int/clusterwide/dbuser_test.go b/test/int/clusterwide/dbuser_test.go index 5fb08ce2a3..8a97c44850 100644 --- a/test/int/clusterwide/dbuser_test.go +++ b/test/int/clusterwide/dbuser_test.go @@ -15,6 +15,7 @@ import ( "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/project" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/connectionsecret" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/conditions" "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/resources" @@ -137,6 +138,9 @@ func buildConnectionSecret(name string) corev1.Secret { ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace.Name, + Labels: map[string]string{ + connectionsecret.TypeLabelKey: connectionsecret.CredLabelVal, + }, }, StringData: map[string]string{"orgId": orgID, "publicApiKey": publicKey, "privateApiKey": privateKey}, } @@ -147,6 +151,9 @@ func buildPasswordSecret(namespace, name, password string) corev1.Secret { ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + connectionsecret.TypeLabelKey: connectionsecret.CredLabelVal, + }, }, StringData: map[string]string{"password": password}, } diff --git a/test/int/clusterwide/integration_suite_test.go b/test/int/clusterwide/integration_suite_test.go index b9a74341b0..cf29217954 100644 --- a/test/int/clusterwide/integration_suite_test.go +++ b/test/int/clusterwide/integration_suite_test.go @@ -32,20 +32,13 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" ctrzap "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/kube" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatabaseuser" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdeployment" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasproject" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/watch" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/operator" "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/control" ) @@ -121,62 +114,17 @@ var _ = BeforeSuite(func() { logger := ctrzap.NewRaw(ctrzap.UseDevMode(true), ctrzap.WriteTo(GinkgoWriter), ctrzap.StacktraceLevel(zap.ErrorLevel)) ctrl.SetLogger(zapr.NewLogger(logger)) - syncPeriod := time.Minute * 30 - // The manager watches ALL namespaces - k8sManager, err := ctrl.NewManager(testEnv.Config, ctrl.Options{ - Scheme: scheme.Scheme, - Cache: cache.Options{ - SyncPeriod: &syncPeriod, - }, - }) - Expect(err).ToNot(HaveOccurred()) - - // globalPredicates should be used for general controller Predicates - // that should be applied to all controllers in order to limit the - // resources they receive events for. - globalPredicates := []predicate.Predicate{ - watch.CommonPredicates(), // ignore spurious changes. status changes etc. - watch.SelectNamespacesPredicate(map[string]bool{ // select only desired namespaces - namespace.Name: true, - }), - } - - atlasProvider := atlas.NewProductionProvider(atlasDomain, kube.ObjectKey(namespace.Name, "atlas-operator-api-key"), k8sManager.GetClient()) - - err = indexer.RegisterAll(ctx, k8sManager, logger) - Expect(err).ToNot(HaveOccurred()) - - err = (&atlasproject.AtlasProjectReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasProject").Sugar(), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - EventRecorder: k8sManager.GetEventRecorderFor("AtlasProject"), - AtlasProvider: atlasProvider, - GlobalPredicates: globalPredicates, - }).SetupWithManager(k8sManager) - Expect(err).ToNot(HaveOccurred()) - - err = (&atlasdeployment.AtlasDeploymentReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasDeployment").Sugar(), - EventRecorder: k8sManager.GetEventRecorderFor("AtlasDeployment"), - AtlasProvider: atlasProvider, - GlobalPredicates: globalPredicates, - }).SetupWithManager(k8sManager) - Expect(err).ToNot(HaveOccurred()) - err = (&atlasdatabaseuser.AtlasDatabaseUserReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasDeployment").Sugar(), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - EventRecorder: k8sManager.GetEventRecorderFor("AtlasDeployment"), - AtlasProvider: atlasProvider, - GlobalPredicates: globalPredicates, - }).SetupWithManager(k8sManager) + mgr, err := operator.NewBuilder(operator.ManagerProviderFunc(ctrl.NewManager), testEnv.Scheme). + WithConfig(testEnv.Config). + WithLogger(logger). + WithAtlasDomain(atlasDomain). + WithSyncPeriod(30 * time.Minute). + Build(ctx) Expect(err).ToNot(HaveOccurred()) go func() { - err = k8sManager.Start(ctx) + err = mgr.Start(ctx) Expect(err).ToNot(HaveOccurred()) }() }) diff --git a/test/int/integration_suite_test.go b/test/int/integration_suite_test.go index 614c3f03ef..2e7a4b7026 100644 --- a/test/int/integration_suite_test.go +++ b/test/int/integration_suite_test.go @@ -33,28 +33,17 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" ctrzap "sigs.k8s.io/controller-runtime/pkg/log/zap" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/kube" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatabaseuser" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatafederation" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdeployment" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasfederatedauth" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasproject" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasstream" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/watch" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/operator" "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/control" ) @@ -88,10 +77,9 @@ var ( func TestAPIs(t *testing.T) { control.SkipTestUnless(t, "AKO_INT_TEST") - err := akov2.AddToScheme(scheme.Scheme) - if err != nil { - t.Fatal(err) - } + + utilruntime.Must(scheme.AddToScheme(scheme.Scheme)) + utilruntime.Must(akov2.AddToScheme(scheme.Scheme)) RegisterFailHandler(Fail) RunSpecs(t, "Atlas Operator Namespaced Integration Test Suite") @@ -116,10 +104,8 @@ var _ = SynchronizedBeforeSuite(func() []byte { var b bytes.Buffer By("Bootstrapping test environment", func() { - useExistingCluster := os.Getenv("USE_EXISTING_CLUSTER") != "" testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, - UseExistingCluster: &useExistingCluster, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, } cfg, err := testEnv.Start() @@ -146,9 +132,6 @@ var _ = SynchronizedBeforeSuite(func() []byte { atlasDomain = domain } - err = akov2.AddToScheme(scheme.Scheme) - Expect(err).ToNot(HaveOccurred()) - // shallow copy global config ginkgoCfg := *cfg ginkgoCfg.UserAgent = "ginkgo" @@ -181,7 +164,8 @@ func defaultTimeouts() { // prepareControllers is a common function used by all the tests that creates the namespace and registers all the // reconcilers there. Each of them listens only this specific namespace only, otherwise it's not possible to run in parallel func prepareControllers(deletionProtection bool) (*corev1.Namespace, context.CancelFunc) { - ctx := context.Background() + var ctx context.Context + ctx, managerCancelFunc = context.WithCancel(context.Background()) namespace = corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Namespace: "test", @@ -194,132 +178,25 @@ func prepareControllers(deletionProtection bool) (*corev1.Namespace, context.Can Expect(namespace.Name).ToNot(BeEmpty()) GinkgoWriter.Printf("Generated namespace %q\n", namespace.Name) - // +kubebuilder:scaffold:scheme logger := ctrzap.NewRaw(ctrzap.UseDevMode(true), ctrzap.WriteTo(GinkgoWriter), ctrzap.StacktraceLevel(zap.ErrorLevel)) - ctrl.SetLogger(zapr.NewLogger(logger)) - // Note on the syncPeriod - decreasing this to a smaller time allows to test its work for the long-running tests - // (deployments, database users). The prod value is much higher - syncPeriod := time.Minute * 30 - // shallow copy global config managerCfg := *cfg managerCfg.UserAgent = "AKO" - k8sManager, err := ctrl.NewManager(&managerCfg, ctrl.Options{ - Scheme: scheme.Scheme, - Metrics: metricsserver.Options{BindAddress: "0"}, - Cache: cache.Options{ - SyncPeriod: &syncPeriod, - }, - }) - Expect(err).ToNot(HaveOccurred()) - - // globalPredicates should be used for general controller Predicates - // that should be applied to all controllers in order to limit the - // resources they receive events for. - globalPredicates := []predicate.Predicate{ - watch.CommonPredicates(), // ignore spurious changes. status changes etc. - watch.SelectNamespacesPredicate(map[string]bool{ // select only desired namespaces - namespace.Name: true, - }), - } - - featureFlags := featureflags.NewFeatureFlags(os.Environ) - - atlasProvider := atlas.NewProductionProvider(atlasDomain, kube.ObjectKey(namespace.Name, "atlas-operator-api-key"), k8sManager.GetClient()) - - err = indexer.RegisterAll(ctx, k8sManager, logger) - Expect(err).ToNot(HaveOccurred()) - - err = (&atlasproject.AtlasProjectReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasProject").Sugar(), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - GlobalPredicates: globalPredicates, - EventRecorder: k8sManager.GetEventRecorderFor("AtlasProject"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: deletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(k8sManager) - Expect(err).ToNot(HaveOccurred()) - - err = (&atlasdeployment.AtlasDeploymentReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasDeployment").Sugar(), - GlobalPredicates: globalPredicates, - EventRecorder: k8sManager.GetEventRecorderFor("AtlasDeployment"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: deletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(k8sManager) - Expect(err).ToNot(HaveOccurred()) - - err = (&atlasdatabaseuser.AtlasDatabaseUserReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasDatabaseUser").Sugar(), - EventRecorder: k8sManager.GetEventRecorderFor("AtlasDatabaseUser"), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - AtlasProvider: atlasProvider, - GlobalPredicates: globalPredicates, - ObjectDeletionProtection: deletionProtection, - SubObjectDeletionProtection: false, - FeaturePreviewOIDCAuthEnabled: featureFlags.IsFeaturePresent(featureflags.FeatureOIDC), - }).SetupWithManager(k8sManager) - Expect(err).ToNot(HaveOccurred()) - - err = (&atlasdatafederation.AtlasDataFederationReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasDataFederation").Sugar(), - EventRecorder: k8sManager.GetEventRecorderFor("AtlasDatabaseUser"), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - AtlasProvider: atlasProvider, - GlobalPredicates: globalPredicates, - ObjectDeletionProtection: deletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(k8sManager) - Expect(err).ToNot(HaveOccurred()) - - err = (&atlasfederatedauth.AtlasFederatedAuthReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasFederatedAuth").Sugar(), - DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(), - GlobalPredicates: globalPredicates, - EventRecorder: k8sManager.GetEventRecorderFor("AtlasFederatedAuth"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: deletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(k8sManager) - Expect(err).ToNot(HaveOccurred()) - - err = (&atlasstream.AtlasStreamsInstanceReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasStreamInstance").Sugar(), - GlobalPredicates: globalPredicates, - EventRecorder: k8sManager.GetEventRecorderFor("AtlasStreamInstance"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: deletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(ctx, k8sManager) - Expect(err).ToNot(HaveOccurred()) - - err = (&atlasstream.AtlasStreamsConnectionReconciler{ - Client: k8sManager.GetClient(), - Log: logger.Named("controllers").Named("AtlasStreamConnection").Sugar(), - GlobalPredicates: globalPredicates, - EventRecorder: k8sManager.GetEventRecorderFor("AtlasStreamConnection"), - AtlasProvider: atlasProvider, - ObjectDeletionProtection: deletionProtection, - SubObjectDeletionProtection: false, - }).SetupWithManager(ctx, k8sManager) + mgr, err := operator.NewBuilder(operator.ManagerProviderFunc(ctrl.NewManager), scheme.Scheme). + WithConfig(&managerCfg). + WithNamespaces(namespace.Name). + WithLogger(logger). + WithAtlasDomain(atlasDomain). + WithSyncPeriod(30 * time.Minute). + WithAPISecret(client.ObjectKey{Name: "atlas-operator-api-key", Namespace: namespace.Name}). + WithDeletionProtection(deletionProtection). + Build(ctx) Expect(err).ToNot(HaveOccurred()) - By("Starting controllers") - - ctx, managerCancelFunc = context.WithCancel(context.Background()) - go func() { - err = k8sManager.Start(ctx) + err = mgr.Start(ctx) Expect(err).ToNot(HaveOccurred()) }()