Skip to content

Commit

Permalink
add operator builder and centralize creation (#1619)
Browse files Browse the repository at this point in the history
  • Loading branch information
helderjs authored Jun 4, 2024
1 parent b8a2adf commit 41d1934
Show file tree
Hide file tree
Showing 23 changed files with 820 additions and 642 deletions.
2 changes: 1 addition & 1 deletion .licenses-gomod.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
100644 071ab373715c6372fbcf4212bc32597eef52773e go.mod
100644 56ef1577038b01bc456601ea415f8f3045c37372 go.mod
250 changes: 27 additions & 223 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,22 @@ import (
"log"
"os"
"strings"
"time"

"go.uber.org/zap/zapcore"
ctrzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/go-logr/zapr"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/collection"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/kube"
akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatabaseuser"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatafederation"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdeployment"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasfederatedauth"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasproject"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlassearchindexconfig"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasstream"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/connectionsecret"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/watch"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/operator"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/version"
)

Expand All @@ -72,219 +52,43 @@ const (
subobjectDeletionProtectionMessage = "Note: sub-object deletion protection is IGNORED because it does not work deterministically."
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(akov2.AddToScheme(scheme))
}

func main() {
// controller-runtime/pkg/log/zap is a wrapper over zap that implements logr
// logr looks quite limited in functionality so we better use Zap directly.
// Though we still need the controller-runtime library and go-logr/zapr as they are used in controller-runtime
// logging
ctrzap.NewRaw(ctrzap.UseDevMode(true), ctrzap.StacktraceLevel(zap.ErrorLevel))
akoScheme := runtime.NewScheme()
utilruntime.Must(scheme.AddToScheme(akoScheme))
utilruntime.Must(akov2.AddToScheme(akoScheme))

ctx := ctrl.SetupSignalHandler()
config := parseConfiguration()

logger, err := initCustomZapLogger(config.LogLevel, config.LogEncoder)
if err != nil {
fmt.Printf("error instantiating logger: %v\r\n", err)
os.Exit(1)
}

logger.Info("starting with configuration", zap.Any("config", config), zap.Any("version", version.Version))

ctrl.SetLogger(zapr.NewLogger(logger))

syncPeriod := time.Hour * 3

var cacheFunc cache.NewCacheFunc
if len(config.WatchedNamespaces) > 0 {
var namespaces []string
for ns := range config.WatchedNamespaces {
namespaces = append(namespaces, ns)
}
cacheFunc = controller.MultiNamespacedCacheBuilder(namespaces)
} else {
cacheFunc = controller.CustomLabelSelectorCacheBuilder(
&corev1.Secret{},
labels.SelectorFromSet(labels.Set{
connectionsecret.TypeLabelKey: connectionsecret.CredLabelVal,
}),
)
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: config.MetricsAddr},
WebhookServer: webhook.NewServer(webhook.Options{
Port: 9443,
}),
Cache: cache.Options{
SyncPeriod: &syncPeriod,
},
HealthProbeBindAddress: config.ProbeAddr,
LeaderElection: config.EnableLeaderElection,
LeaderElectionID: "06d035fb.mongodb.com",
NewCache: cacheFunc,
})
setupLog := logger.Named("setup").Sugar()
setupLog.Info("starting with configuration", zap.Any("config", config), zap.Any("version", version.Version))

mgr, err := operator.NewBuilder(operator.ManagerProviderFunc(ctrl.NewManager), akoScheme).
WithConfig(ctrl.GetConfigOrDie()).
WithNamespaces(collection.Keys(config.WatchedNamespaces)...).
WithLogger(logger).
WithMetricAddress(config.MetricsAddr).
WithProbeAddress(config.ProbeAddr).
WithLeaderElection(config.EnableLeaderElection).
WithAtlasDomain(config.AtlasDomain).
WithAPISecret(config.GlobalAPISecret).
WithDeletionProtection(config.ObjectDeletionProtection).
Build(ctx)
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()

// globalPredicates should be used for general controller Predicates
// that should be applied to all controllers in order to limit the
// resources they receive events for.
predicateNamespaces := controller.NamespacesOrAllPredicate(config.WatchedNamespaces)
globalPredicates := []predicate.Predicate{
watch.CommonPredicates(), // ignore spurious changes. status changes etc.
watch.SelectNamespacesPredicate(predicateNamespaces), // select only desired namespaces
}

atlasProvider := atlas.NewProductionProvider(config.AtlasDomain, config.GlobalAPISecret, mgr.GetClient())

if err := indexer.RegisterAll(ctx, mgr, logger); err != nil {
setupLog.Error(err, "unable to create indexers")
os.Exit(1)
}

if err = (&atlasdeployment.AtlasDeploymentReconciler{
Client: mgr.GetClient(),
Log: logger.Named("controllers").Named("AtlasDeployment").Sugar(),
Scheme: mgr.GetScheme(),
GlobalPredicates: globalPredicates,
EventRecorder: mgr.GetEventRecorderFor("AtlasDeployment"),
AtlasProvider: atlasProvider,
ObjectDeletionProtection: config.ObjectDeletionProtection,
SubObjectDeletionProtection: false,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AtlasDeployment")
os.Exit(1)
}

if err = (&atlasproject.AtlasProjectReconciler{
Client: mgr.GetClient(),
Log: logger.Named("controllers").Named("AtlasProject").Sugar(),
Scheme: mgr.GetScheme(),
DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(),
GlobalPredicates: globalPredicates,
EventRecorder: mgr.GetEventRecorderFor("AtlasProject"),
AtlasProvider: atlasProvider,
ObjectDeletionProtection: config.ObjectDeletionProtection,
SubObjectDeletionProtection: false,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AtlasProject")
os.Exit(1)
}

if err = (&atlasdatabaseuser.AtlasDatabaseUserReconciler{
DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(),
Client: mgr.GetClient(),
Log: logger.Named("controllers").Named("AtlasDatabaseUser").Sugar(),
Scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("AtlasDatabaseUser"),
AtlasProvider: atlasProvider,
GlobalPredicates: globalPredicates,
ObjectDeletionProtection: config.ObjectDeletionProtection,
SubObjectDeletionProtection: false,
FeaturePreviewOIDCAuthEnabled: config.FeatureFlags.IsFeaturePresent(featureflags.FeatureOIDC),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AtlasDatabaseUser")
os.Exit(1)
}

if err = (&atlasdatafederation.AtlasDataFederationReconciler{
Client: mgr.GetClient(),
Log: logger.Named("controllers").Named("AtlasDataFederation").Sugar(),
Scheme: mgr.GetScheme(),
DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(),
GlobalPredicates: globalPredicates,
EventRecorder: mgr.GetEventRecorderFor("AtlasDataFederation"),
AtlasProvider: atlasProvider,
ObjectDeletionProtection: config.ObjectDeletionProtection,
SubObjectDeletionProtection: false,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AtlasDataFederation")
os.Exit(1)
}

if err = (&atlasfederatedauth.AtlasFederatedAuthReconciler{
Client: mgr.GetClient(),
Log: logger.Named("controllers").Named("AtlasFederatedAuth").Sugar(),
Scheme: mgr.GetScheme(),
DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(),
GlobalPredicates: globalPredicates,
EventRecorder: mgr.GetEventRecorderFor("AtlasFederatedAuth"),
AtlasProvider: atlasProvider,
ObjectDeletionProtection: config.ObjectDeletionProtection,
SubObjectDeletionProtection: false,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AtlasFederatedAuth")
os.Exit(1)
}

if err = (&atlasstream.AtlasStreamsInstanceReconciler{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor("AtlasStreamsInstance"),
GlobalPredicates: globalPredicates,
Log: logger.Named("controllers").Named("AtlasStreamsInstance").Sugar(),
AtlasProvider: atlasProvider,
ObjectDeletionProtection: config.ObjectDeletionProtection,
SubObjectDeletionProtection: false,
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AtlasStreamsInstance")
os.Exit(1)
}

if err = (&atlasstream.AtlasStreamsConnectionReconciler{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor("AtlasStreamsConnection"),
GlobalPredicates: globalPredicates,
Log: logger.Named("controllers").Named("AtlasStreamsConnection").Sugar(),
AtlasProvider: atlasProvider,
ObjectDeletionProtection: config.ObjectDeletionProtection,
SubObjectDeletionProtection: false,
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AtlasStreamsConnection")
os.Exit(1)
}

if err = (&atlassearchindexconfig.AtlasSearchIndexConfigReconciler{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor("AtlasSearchIndexConfig"),
GlobalPredicates: globalPredicates,
Log: logger.Named("controllers").Named("AtlasSearchIndexConfig").Sugar(),
AtlasProvider: atlasProvider,
ObjectDeletionProtection: config.ObjectDeletionProtection,
SubObjectDeletionProtection: false,
}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AtlasSearchIndexConfig")
os.Exit(1)
}

// +kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("check", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
setupLog.Error(err, "unable to start operator")
os.Exit(1)
}

setupLog.Info(subobjectDeletionProtectionMessage)
setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
if err = mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand All @@ -308,7 +112,7 @@ type Config struct {
func parseConfiguration() Config {
var globalAPISecretName string
config := Config{}
flag.StringVar(&config.AtlasDomain, "atlas-domain", "https://cloud.mongodb.com/", "the Atlas URL domain name (with slash in the end).")
flag.StringVar(&config.AtlasDomain, "atlas-domain", operator.DefaultAtlasDomain, "the Atlas URL domain name (with slash in the end).")
flag.StringVar(&config.MetricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&config.ProbeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.StringVar(&globalAPISecretName, "global-api-secret-name", "", "The name of the Secret that contains Atlas API keys. "+
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require (
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/logr v1.4.1
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
Expand Down
10 changes: 10 additions & 0 deletions internal/collection/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,13 @@ func CopyWithSkip[T comparable](list []T, skip T) []T {

return newList
}

func Keys[K comparable, V any](m map[K]V) []K {
s := make([]K, 0, len(m))

for k := range m {
s = append(s, k)
}

return s
}
24 changes: 23 additions & 1 deletion pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api"
akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1"

"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/connectionsecret"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource"
Expand Down Expand Up @@ -260,3 +261,24 @@ func (r *AtlasDatabaseUserReconciler) SetupWithManager(mgr ctrl.Manager) error {
Watches(&corev1.Secret{}, watch.NewSecretHandler(&r.DeprecatedResourceWatcher)).
Complete(r)
}

func NewAtlasDatabaseUserReconciler(
mgr manager.Manager,
predicates []predicate.Predicate,
atlasProvider atlas.Provider,
deletionProtection bool,
featureFlags *featureflags.FeatureFlags,
logger *zap.Logger,
) *AtlasDatabaseUserReconciler {
return &AtlasDatabaseUserReconciler{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor("AtlasDatabaseUser"),
DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(),
GlobalPredicates: predicates,
Log: logger.Named("controllers").Named("AtlasDatabaseUser").Sugar(),
AtlasProvider: atlasProvider,
ObjectDeletionProtection: deletionProtection,
FeaturePreviewOIDCAuthEnabled: featureFlags.IsFeaturePresent(featureflags.FeatureOIDC),
}
}
21 changes: 21 additions & 0 deletions pkg/controller/atlasdatafederation/datafederation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/manager"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -203,6 +205,25 @@ func (r *AtlasDataFederationReconciler) SetupWithManager(mgr ctrl.Manager) error
Complete(r)
}

func NewAtlasDataFederationReconciler(
mgr manager.Manager,
predicates []predicate.Predicate,
atlasProvider atlas.Provider,
deletionProtection bool,
logger *zap.Logger,
) *AtlasDataFederationReconciler {
return &AtlasDataFederationReconciler{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor("AtlasDataFederation"),
DeprecatedResourceWatcher: watch.NewDeprecatedResourceWatcher(),
GlobalPredicates: predicates,
Log: logger.Named("controllers").Named("AtlasDataFederation").Sugar(),
AtlasProvider: atlasProvider,
ObjectDeletionProtection: deletionProtection,
}
}

// Delete implements a handler for the Delete event
func (r *AtlasDataFederationReconciler) Delete(ctx context.Context, e event.DeleteEvent) error {
dataFederation, ok := e.Object.(*akov2.AtlasDataFederation)
Expand Down
Loading

0 comments on commit 41d1934

Please sign in to comment.