diff --git a/api/v1beta1/machine_types.go b/api/v1beta1/machine_types.go index 609d6f19c7d3..0628d9f08c33 100644 --- a/api/v1beta1/machine_types.go +++ b/api/v1beta1/machine_types.go @@ -273,12 +273,6 @@ const ( MachineDeletingV1Beta2Condition = DeletingV1Beta2Condition ) -// Machine's Paused condition and corresponding reasons that will be used in v1Beta2 API version. -const ( - // MachinePausedV1Beta2Condition is true if the Machine or the Cluster it belongs to are paused. - MachinePausedV1Beta2Condition = PausedV1Beta2Condition -) - // ANCHOR: MachineSpec // MachineSpec defines the desired state of Machine. diff --git a/bootstrap/kubeadm/internal/builder/builders.go b/bootstrap/kubeadm/internal/builder/builders.go index bdae5a487569..09c16b700318 100644 --- a/bootstrap/kubeadm/internal/builder/builders.go +++ b/bootstrap/kubeadm/internal/builder/builders.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1" ) @@ -80,6 +81,15 @@ func (k *KubeadmConfigBuilder) Build() *bootstrapv1.KubeadmConfig { Namespace: k.namespace, Name: k.name, }, + Status: bootstrapv1.KubeadmConfigStatus{ + V1Beta2: &bootstrapv1.KubeadmConfigV1Beta2Status{ + Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}, + }, + }, } if k.initConfig != nil { config.Spec.InitConfiguration = k.initConfig diff --git a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go index bb05268f331a..61cf9233d989 100644 --- a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go +++ b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go @@ -55,10 +55,10 @@ import ( "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/internal/util/taints" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/conditions" clog "sigs.k8s.io/cluster-api/util/log" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" "sigs.k8s.io/cluster-api/util/secret" ) @@ -117,7 +117,7 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(r.MachineToBootstrapMapFunc), - ).WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)) + ).WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)) if feature.Gates.Enabled(feature.MachinePool) { b = b.Watches( @@ -131,7 +131,7 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl handler.EnqueueRequestsFromMapFunc(r.ClusterToKubeadmConfigs), builder.WithPredicates( predicates.All(mgr.GetScheme(), predicateLog, - predicates.ClusterUnpausedAndInfrastructureReady(mgr.GetScheme(), predicateLog), + predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog), predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), ), ), @@ -199,9 +199,8 @@ func (r *KubeadmConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - if annotations.IsPaused(cluster, config) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, config); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } scope := &Scope{ diff --git a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go index 0a0480c1d5c9..c40c759aa005 100644 --- a/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go +++ b/bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go @@ -107,7 +107,9 @@ func TestKubeadmConfigReconciler_Reconcile_ReturnEarlyIfKubeadmConfigIsReady(t * machine, config, } - myclient := fake.NewClientBuilder().WithObjects(objects...).Build() + myclient := fake.NewClientBuilder(). + WithStatusSubresource(&bootstrapv1.KubeadmConfig{}). + WithObjects(objects...).Build() k := &KubeadmConfigReconciler{ Client: myclient, diff --git a/cmd/clusterctl/client/cluster/topology.go b/cmd/clusterctl/client/cluster/topology.go index 4a89e823232b..2c1c329fce51 100644 --- a/cmd/clusterctl/client/cluster/topology.go +++ b/cmd/clusterctl/client/cluster/topology.go @@ -520,6 +520,11 @@ func reconcileClusterClass(ctx context.Context, apiReader client.Reader, class c Client: reconcilerClient, } + // The first only reconciles the paused condition. + if _, err := clusterClassReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: targetClusterClass}); err != nil { + return nil, errors.Wrap(err, "failed to dry run the ClusterClass controller to reconcile the paused condition") + } + if _, err := clusterClassReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: targetClusterClass}); err != nil { return nil, errors.Wrap(err, "failed to dry run the ClusterClass controller") } diff --git a/controlplane/kubeadm/internal/controllers/controller.go b/controlplane/kubeadm/internal/controllers/controller.go index bd900097e012..b8bb9af3b9e4 100644 --- a/controlplane/kubeadm/internal/controllers/controller.go +++ b/controlplane/kubeadm/internal/controllers/controller.go @@ -48,11 +48,11 @@ import ( "sigs.k8s.io/cluster-api/internal/contract" "sigs.k8s.io/cluster-api/internal/util/ssa" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/collections" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/finalizers" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" "sigs.k8s.io/cluster-api/util/secret" "sigs.k8s.io/cluster-api/util/version" @@ -99,14 +99,14 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg For(&controlplanev1.KubeadmControlPlane{}). Owns(&clusterv1.Machine{}). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(r.ClusterToKubeadmControlPlane), builder.WithPredicates( predicates.All(mgr.GetScheme(), predicateLog, predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), - predicates.ClusterUnpausedAndInfrastructureReady(mgr.GetScheme(), predicateLog), + predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog), ), ), ). @@ -172,9 +172,8 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl. log = log.WithValues("Cluster", klog.KObj(cluster)) ctx = ctrl.LoggerInto(ctx, log) - if annotations.IsPaused(cluster, kcp) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, kcp); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } // Initialize the patch helper. diff --git a/controlplane/kubeadm/internal/controllers/controller_test.go b/controlplane/kubeadm/internal/controllers/controller_test.go index edf1993da4af..ff79e501ac45 100644 --- a/controlplane/kubeadm/internal/controllers/controller_test.go +++ b/controlplane/kubeadm/internal/controllers/controller_test.go @@ -440,6 +440,13 @@ func TestReconcileClusterNoEndpoints(t *testing.T) { }, }, }, + Status: controlplanev1.KubeadmControlPlaneStatus{ + V1Beta2: &controlplanev1.KubeadmControlPlaneV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } webhook := &controlplanev1webhooks.KubeadmControlPlane{} g.Expect(webhook.Default(ctx, kcp)).To(Succeed()) diff --git a/exp/addons/internal/controllers/clusterresourceset_controller.go b/exp/addons/internal/controllers/clusterresourceset_controller.go index 6c4cc1d45c41..6981f9bf648a 100644 --- a/exp/addons/internal/controllers/clusterresourceset_controller.go +++ b/exp/addons/internal/controllers/clusterresourceset_controller.go @@ -48,6 +48,7 @@ import ( "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/finalizers" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -82,9 +83,7 @@ func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr handler.EnqueueRequestsFromMapFunc( resourceToClusterResourceSetFunc[client.Object](r.Client), ), - builder.WithPredicates( - resourcepredicates.TypedResourceCreateOrUpdate[client.Object](predicateLog), - ), + builder.WithPredicates(resourcepredicates.TypedResourceCreateOrUpdate[client.Object](predicateLog)), ). WatchesRawSource(source.Kind( partialSecretCache, @@ -100,7 +99,7 @@ func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr resourcepredicates.TypedResourceCreateOrUpdate[*metav1.PartialObjectMetadata](predicateLog), )). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Complete(r) if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") @@ -129,6 +128,10 @@ func (r *ClusterResourceSetReconciler) Reconcile(ctx context.Context, req ctrl.R return ctrl.Result{}, err } + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, nil, clusterResourceSet); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err + } + // Initialize the patch helper. patchHelper, err := patch.NewHelper(clusterResourceSet, r.Client) if err != nil { diff --git a/exp/internal/controllers/machinepool_controller.go b/exp/internal/controllers/machinepool_controller.go index 27d9a6e8a976..89e44c946c8b 100644 --- a/exp/internal/controllers/machinepool_controller.go +++ b/exp/internal/controllers/machinepool_controller.go @@ -44,10 +44,10 @@ import ( expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" "sigs.k8s.io/cluster-api/internal/util/ssa" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/finalizers" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -109,14 +109,14 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M c, err := ctrl.NewControllerManagedBy(mgr). For(&expv1.MachinePool{}). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToMachinePools), // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? builder.WithPredicates( predicates.All(mgr.GetScheme(), predicateLog, - predicates.ClusterUnpaused(mgr.GetScheme(), predicateLog), + predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), ), ), @@ -168,10 +168,8 @@ func (r *MachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) mp.Spec.ClusterName, mp.Name, mp.Namespace) } - // Return early if the object or Cluster is paused. - if annotations.IsPaused(cluster, mp) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, mp); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } // Initialize the patch helper. diff --git a/exp/internal/controllers/machinepool_controller_test.go b/exp/internal/controllers/machinepool_controller_test.go index 0d5e1e40acb5..f8ea56818661 100644 --- a/exp/internal/controllers/machinepool_controller_test.go +++ b/exp/internal/controllers/machinepool_controller_test.go @@ -158,6 +158,13 @@ func TestMachinePoolOwnerReference(t *testing.T) { Replicas: ptr.To[int32](1), ClusterName: "invalid", }, + Status: expv1.MachinePoolStatus{ + V1Beta2: &expv1.MachinePoolV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } machinePoolValidCluster := &expv1.MachinePool{ @@ -176,6 +183,13 @@ func TestMachinePoolOwnerReference(t *testing.T) { }, ClusterName: "test-cluster", }, + Status: expv1.MachinePoolStatus{ + V1Beta2: &expv1.MachinePoolV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } machinePoolValidMachinePool := &expv1.MachinePool{ @@ -197,6 +211,13 @@ func TestMachinePoolOwnerReference(t *testing.T) { }, ClusterName: "test-cluster", }, + Status: expv1.MachinePoolStatus{ + V1Beta2: &expv1.MachinePoolV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } testCases := []struct { @@ -345,6 +366,11 @@ func TestReconcileMachinePoolRequest(t *testing.T) { {Name: "test"}, }, ObservedGeneration: 1, + V1Beta2: &expv1.MachinePoolV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, }, }, expected: expected{ @@ -390,6 +416,11 @@ func TestReconcileMachinePoolRequest(t *testing.T) { Name: "test-node", }, }, + V1Beta2: &expv1.MachinePoolV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, }, }, nodes: []corev1.Node{ @@ -447,6 +478,11 @@ func TestReconcileMachinePoolRequest(t *testing.T) { Name: "test-node", }, }, + V1Beta2: &expv1.MachinePoolV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, }, }, nodes: []corev1.Node{ @@ -504,6 +540,11 @@ func TestReconcileMachinePoolRequest(t *testing.T) { Name: "test-node", }, }, + V1Beta2: &expv1.MachinePoolV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, }, }, nodes: []corev1.Node{ @@ -820,6 +861,13 @@ func TestRemoveMachinePoolFinalizerAfterDeleteReconcile(t *testing.T) { }, }, }, + Status: expv1.MachinePoolStatus{ + V1Beta2: &expv1.MachinePoolV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } key := client.ObjectKey{Namespace: m.Namespace, Name: m.Name} clientFake := fake.NewClientBuilder().WithObjects(testCluster, m).WithStatusSubresource(&expv1.MachinePool{}).Build() @@ -912,6 +960,13 @@ func TestMachinePoolConditions(t *testing.T) { }, }, }, + Status: expv1.MachinePoolStatus{ + V1Beta2: &expv1.MachinePoolV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } nodeList := corev1.NodeList{ @@ -951,14 +1006,12 @@ func TestMachinePoolConditions(t *testing.T) { infrastructureReady: true, beforeFunc: func(_, _ *unstructured.Unstructured, mp *expv1.MachinePool, _ *corev1.NodeList) { mp.Spec.ProviderIDList = []string{"azure://westus2/id-node-4", "aws://us-east-1/id-node-1"} - mp.Status = expv1.MachinePoolStatus{ - NodeRefs: []corev1.ObjectReference{ - {Name: "node-1"}, - {Name: "azure-node-4"}, - }, - Replicas: 2, - ReadyReplicas: 2, + mp.Status.NodeRefs = []corev1.ObjectReference{ + {Name: "node-1"}, + {Name: "azure-node-4"}, } + mp.Status.Replicas = 2 + mp.Status.ReadyReplicas = 2 }, conditionAssertFunc: func(t *testing.T, getter conditions.Getter) { t.Helper() diff --git a/internal/controllers/cluster/cluster_controller.go b/internal/controllers/cluster/cluster_controller.go index 1d97e001ff04..17a646bcfc32 100644 --- a/internal/controllers/cluster/cluster_controller.go +++ b/internal/controllers/cluster/cluster_controller.go @@ -47,12 +47,12 @@ import ( "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/internal/hooks" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/collections" "sigs.k8s.io/cluster-api/util/conditions" conditionsv1beta2 "sigs.k8s.io/cluster-api/util/conditions/v1beta2" "sigs.k8s.io/cluster-api/util/finalizers" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -98,7 +98,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt handler.EnqueueRequestsFromMapFunc(r.controlPlaneMachineToCluster), ). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Build(r) if err != nil { @@ -115,8 +115,6 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt } func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) { - log := ctrl.LoggerFrom(ctx) - // Fetch the Cluster instance. cluster := &clusterv1.Cluster{} if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil { @@ -135,10 +133,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re return ctrl.Result{}, err } - // Return early if the object or Cluster is paused. - if annotations.IsPaused(cluster, cluster) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, cluster); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } // Initialize the patch helper. diff --git a/internal/controllers/clusterclass/clusterclass_controller.go b/internal/controllers/clusterclass/clusterclass_controller.go index 8a3cf2994c6a..8e5d647cdfca 100644 --- a/internal/controllers/clusterclass/clusterclass_controller.go +++ b/internal/controllers/clusterclass/clusterclass_controller.go @@ -51,6 +51,7 @@ import ( "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/conversion" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -79,7 +80,7 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt &runtimev1.ExtensionConfig{}, handler.EnqueueRequestsFromMapFunc(r.extensionConfigToClusterClass), ). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Complete(r) if err != nil { @@ -89,8 +90,6 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt } func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) { - log := ctrl.LoggerFrom(ctx) - clusterClass := &clusterv1.ClusterClass{} if err := r.Client.Get(ctx, req.NamespacedName, clusterClass); err != nil { if apierrors.IsNotFound(err) { @@ -100,10 +99,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re return ctrl.Result{}, err } - // Return early if the ClusterClass is paused. - if annotations.HasPaused(clusterClass) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, nil, clusterClass); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } if !clusterClass.ObjectMeta.DeletionTimestamp.IsZero() { diff --git a/internal/controllers/machine/machine_controller.go b/internal/controllers/machine/machine_controller.go index f4b2b7d9701f..fc30c8a7e65d 100644 --- a/internal/controllers/machine/machine_controller.go +++ b/internal/controllers/machine/machine_controller.go @@ -55,6 +55,7 @@ import ( "sigs.k8s.io/cluster-api/util/finalizers" clog "sigs.k8s.io/cluster-api/util/log" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -121,17 +122,14 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt c, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.Machine{}). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToMachines), builder.WithPredicates( // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? predicates.All(mgr.GetScheme(), predicateLog, - predicates.Any(mgr.GetScheme(), predicateLog, - predicates.ClusterUnpaused(mgr.GetScheme(), predicateLog), - predicates.ClusterControlPlaneInitialized(mgr.GetScheme(), predicateLog), - ), + predicates.ClusterControlPlaneInitialized(mgr.GetScheme(), predicateLog), predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), ), )). @@ -196,17 +194,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re m.Spec.ClusterName, m.Name, m.Namespace) } + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, m); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err + } + s := &scope{ cluster: cluster, machine: m, } - // Return early if the object or Cluster is paused. - if annotations.IsPaused(cluster, m) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, setPausedCondition(ctx, r.Client, s) - } - // Initialize the patch helper patchHelper, err := patch.NewHelper(m, r.Client) if err != nil { @@ -303,7 +299,6 @@ func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clust clusterv1.MachineNodeReadyV1Beta2Condition, clusterv1.MachineNodeHealthyV1Beta2Condition, clusterv1.MachineDeletingV1Beta2Condition, - clusterv1.MachinePausedV1Beta2Condition, }}, ) diff --git a/internal/controllers/machine/machine_controller_status.go b/internal/controllers/machine/machine_controller_status.go index bdc291294428..5517e8662e6f 100644 --- a/internal/controllers/machine/machine_controller_status.go +++ b/internal/controllers/machine/machine_controller_status.go @@ -27,13 +27,10 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/internal/contract" - "sigs.k8s.io/cluster-api/util/annotations" v1beta2conditions "sigs.k8s.io/cluster-api/util/conditions/v1beta2" - "sigs.k8s.io/cluster-api/util/patch" ) // reconcileStatus reconciles Machine's status during the entire lifecycle of the machine. @@ -72,8 +69,6 @@ func (r *Reconciler) reconcileStatus(ctx context.Context, s *scope) { // TODO: Update the Deleting condition. - unsetPausedCondition(s) - setMachinePhaseAndLastUpdated(ctx, s.machine) } @@ -539,40 +534,6 @@ func setAvailableCondition(_ context.Context, machine *clusterv1.Machine) { }) } -func setPausedCondition(ctx context.Context, c client.Client, s *scope) error { - patchHelper, err := patch.NewHelper(s.machine, c) - if err != nil { - return err - } - - var messages []string - if s.cluster.Spec.Paused { - messages = append(messages, "Cluster spec.paused is set to true") - } - if annotations.HasPaused(s.machine) { - messages = append(messages, "Machine has the cluster.x-k8s.io/paused annotation") - } - v1beta2conditions.Set(s.machine, metav1.Condition{ - Type: clusterv1.MachinePausedV1Beta2Condition, - Status: metav1.ConditionTrue, - Reason: clusterv1.PausedV1Beta2Reason, - Message: strings.Join(messages, ", "), - }) - - return patchHelper.Patch(ctx, s.machine, patch.WithOwnedV1Beta2Conditions{Conditions: []string{ - clusterv1.MachinePausedV1Beta2Condition, - }}) -} - -func unsetPausedCondition(s *scope) { - // Note: If we hit this code, the controller is reconciling and this Paused condition must be set to false. - v1beta2conditions.Set(s.machine, metav1.Condition{ - Type: clusterv1.MachinePausedV1Beta2Condition, - Status: metav1.ConditionFalse, - Reason: clusterv1.NotPausedV1Beta2Reason, - }) -} - func setMachinePhaseAndLastUpdated(_ context.Context, m *clusterv1.Machine) { originalPhase := m.Status.Phase diff --git a/internal/controllers/machine/machine_controller_test.go b/internal/controllers/machine/machine_controller_test.go index c8d90e1fdf56..ec07d2d30efb 100644 --- a/internal/controllers/machine/machine_controller_test.go +++ b/internal/controllers/machine/machine_controller_test.go @@ -627,6 +627,13 @@ func TestMachineOwnerReference(t *testing.T) { }, ClusterName: "test-cluster", }, + Status: clusterv1.MachineStatus{ + V1Beta2: &clusterv1.MachineV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } machineValidMachine := &clusterv1.Machine{ @@ -651,6 +658,13 @@ func TestMachineOwnerReference(t *testing.T) { }, ClusterName: "test-cluster", }, + Status: clusterv1.MachineStatus{ + V1Beta2: &clusterv1.MachineV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } machineValidControlled := &clusterv1.Machine{ @@ -1016,6 +1030,11 @@ func TestMachineConditions(t *testing.T) { Name: "test", }, ObservedGeneration: 1, + V1Beta2: &clusterv1.MachineV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, }, } @@ -1241,6 +1260,13 @@ func TestRemoveMachineFinalizerAfterDeleteReconcile(t *testing.T) { }, Bootstrap: clusterv1.Bootstrap{DataSecretName: ptr.To("data")}, }, + Status: clusterv1.MachineStatus{ + V1Beta2: &clusterv1.MachineV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } key := client.ObjectKey{Namespace: m.Namespace, Name: m.Name} c := fake.NewClientBuilder().WithObjects(testCluster, m, builder.GenericInfrastructureMachineCRD.DeepCopy()).WithStatusSubresource(&clusterv1.Machine{}).Build() diff --git a/internal/controllers/machinedeployment/machinedeployment_controller.go b/internal/controllers/machinedeployment/machinedeployment_controller.go index 73e73a9e2fe7..9850391303f8 100644 --- a/internal/controllers/machinedeployment/machinedeployment_controller.go +++ b/internal/controllers/machinedeployment/machinedeployment_controller.go @@ -40,12 +40,12 @@ import ( "sigs.k8s.io/cluster-api/controllers/external" "sigs.k8s.io/cluster-api/internal/util/ssa" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/conditions" utilconversion "sigs.k8s.io/cluster-api/util/conversion" "sigs.k8s.io/cluster-api/util/finalizers" clog "sigs.k8s.io/cluster-api/util/log" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -94,16 +94,14 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt handler.EnqueueRequestsFromMapFunc(r.MachineSetToDeployments), ). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToMachineDeployments), builder.WithPredicates( - // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? - predicates.All(mgr.GetScheme(), predicateLog, - predicates.ClusterUnpaused(mgr.GetScheme(), predicateLog), - ), + predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), ), + // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? ).Complete(r) if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") @@ -142,10 +140,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re return ctrl.Result{}, err } - // Return early if the object or Cluster is paused. - if annotations.IsPaused(cluster, deployment) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, deployment); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } // Initialize the patch helper diff --git a/internal/controllers/machinedeployment/machinedeployment_sync.go b/internal/controllers/machinedeployment/machinedeployment_sync.go index df1288eaac4d..d953112cbc42 100644 --- a/internal/controllers/machinedeployment/machinedeployment_sync.go +++ b/internal/controllers/machinedeployment/machinedeployment_sync.go @@ -524,6 +524,9 @@ func calculateStatus(allMSs []*clusterv1.MachineSet, newMS *clusterv1.MachineSet AvailableReplicas: availableReplicas, UnavailableReplicas: unavailableReplicas, Conditions: deployment.Status.Conditions, + + // preserve v1beta2 status + V1Beta2: deployment.Status.V1Beta2, } if *deployment.Spec.Replicas == status.ReadyReplicas { diff --git a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go index 9eb380353cb6..a355407901f9 100644 --- a/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go +++ b/internal/controllers/machinehealthcheck/machinehealthcheck_controller.go @@ -52,6 +52,7 @@ import ( "sigs.k8s.io/cluster-api/util/conditions" v1beta2conditions "sigs.k8s.io/cluster-api/util/conditions/v1beta2" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -94,14 +95,14 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt handler.EnqueueRequestsFromMapFunc(r.machineToMachineHealthCheck), ). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(r.clusterToMachineHealthCheck), builder.WithPredicates( // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? predicates.All(mgr.GetScheme(), predicateLog, - predicates.ClusterUnpaused(mgr.GetScheme(), predicateLog), + predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), ), ), @@ -142,10 +143,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re return ctrl.Result{}, err } - // Return early if the object or Cluster is paused. - if annotations.IsPaused(cluster, m) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, m); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } // Initialize the patch helper diff --git a/internal/controllers/machineset/machineset_controller.go b/internal/controllers/machineset/machineset_controller.go index 905dcb66849e..ec81faa40593 100644 --- a/internal/controllers/machineset/machineset_controller.go +++ b/internal/controllers/machineset/machineset_controller.go @@ -52,7 +52,6 @@ import ( "sigs.k8s.io/cluster-api/internal/controllers/machine" "sigs.k8s.io/cluster-api/internal/util/ssa" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/collections" "sigs.k8s.io/cluster-api/util/conditions" utilconversion "sigs.k8s.io/cluster-api/util/conversion" @@ -60,6 +59,7 @@ import ( "sigs.k8s.io/cluster-api/util/labels/format" clog "sigs.k8s.io/cluster-api/util/log" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -117,14 +117,14 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt handler.EnqueueRequestsFromMapFunc(r.MachineToMachineSets), ). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToMachineSets), builder.WithPredicates( // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? predicates.All(mgr.GetScheme(), predicateLog, - predicates.ClusterUnpaused(mgr.GetScheme(), predicateLog), + predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue), ), ), @@ -172,10 +172,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re return ctrl.Result{}, err } - // Return early if the object or Cluster is paused. - if annotations.IsPaused(cluster, machineSet) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, machineSet); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } // Initialize the patch helper diff --git a/internal/controllers/machineset/machineset_controller_test.go b/internal/controllers/machineset/machineset_controller_test.go index 9c1f901ac7f8..a9b1791c6792 100644 --- a/internal/controllers/machineset/machineset_controller_test.go +++ b/internal/controllers/machineset/machineset_controller_test.go @@ -590,6 +590,13 @@ func TestMachineSetReconcile(t *testing.T) { Spec: clusterv1.MachineSetSpec{ ClusterName: testClusterName, }, + Status: clusterv1.MachineSetStatus{ + V1Beta2: &clusterv1.MachineSetV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } request := reconcile.Request{ NamespacedName: util.ObjectKey(ms), @@ -916,6 +923,13 @@ func newMachineSet(name, cluster string, replicas int32) *clusterv1.MachineSet { }, }, }, + Status: clusterv1.MachineSetStatus{ + V1Beta2: &clusterv1.MachineSetV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } } @@ -967,6 +981,13 @@ func TestMachineSetReconcile_MachinesCreatedConditionFalseOnBadInfraRef(t *testi }, }, }, + Status: clusterv1.MachineSetStatus{ + V1Beta2: &clusterv1.MachineSetV1Beta2Status{Conditions: []metav1.Condition{{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + }}}, + }, } key := util.ObjectKey(ms) diff --git a/test/extension/handlers/topologymutation/handler_integration_test.go b/test/extension/handlers/topologymutation/handler_integration_test.go index 183383b9d142..74068c0da042 100644 --- a/test/extension/handlers/topologymutation/handler_integration_test.go +++ b/test/extension/handlers/topologymutation/handler_integration_test.go @@ -54,6 +54,7 @@ import ( "sigs.k8s.io/cluster-api/exp/topology/desiredstate" "sigs.k8s.io/cluster-api/exp/topology/scope" "sigs.k8s.io/cluster-api/feature" + v1beta2conditions "sigs.k8s.io/cluster-api/util/conditions/v1beta2" "sigs.k8s.io/cluster-api/util/contract" "sigs.k8s.io/cluster-api/webhooks" ) @@ -247,6 +248,13 @@ func getScope(cluster *clusterv1.Cluster, clusterClassFile string) (*scope.Scope s.Blueprint.ClusterClass = mustFind(findObject[*clusterv1.ClusterClass](parsedObjects, groupVersionKindName{ Kind: "ClusterClass", })) + // Set paused condition for ClusterClass + v1beta2conditions.Set(s.Blueprint.ClusterClass, metav1.Condition{ + Type: clusterv1.PausedV1Beta2Condition, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + ObservedGeneration: s.Blueprint.ClusterClass.GetGeneration(), + }) // InfrastructureClusterTemplate s.Blueprint.InfrastructureClusterTemplate = mustFind(findObject[*unstructured.Unstructured](parsedObjects, refToGroupVersionKindName(s.Blueprint.ClusterClass.Spec.Infrastructure.Ref))) diff --git a/test/infrastructure/docker/internal/controllers/dockercluster_controller.go b/test/infrastructure/docker/internal/controllers/dockercluster_controller.go index ba0ad37c331b..a831cd9f127b 100644 --- a/test/infrastructure/docker/internal/controllers/dockercluster_controller.go +++ b/test/infrastructure/docker/internal/controllers/dockercluster_controller.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/finalizers" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -83,6 +84,10 @@ func (r *DockerClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, nil } + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, dockerCluster); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err + } + log = log.WithValues("Cluster", klog.KObj(cluster)) ctx = ctrl.LoggerInto(ctx, log) @@ -201,12 +206,12 @@ func (r *DockerClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl err := ctrl.NewControllerManagedBy(mgr). For(&infrav1.DockerCluster{}). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("DockerCluster"), mgr.GetClient(), &infrav1.DockerCluster{})), builder.WithPredicates( - predicates.ClusterUnpaused(mgr.GetScheme(), predicateLog), + predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), ), ).Complete(r) if err != nil { diff --git a/test/infrastructure/docker/internal/controllers/dockermachine_controller.go b/test/infrastructure/docker/internal/controllers/dockermachine_controller.go index fd4a2ba25825..8726bb468ea1 100644 --- a/test/infrastructure/docker/internal/controllers/dockermachine_controller.go +++ b/test/infrastructure/docker/internal/controllers/dockermachine_controller.go @@ -44,12 +44,12 @@ import ( infrav1 "sigs.k8s.io/cluster-api/test/infrastructure/docker/api/v1beta1" "sigs.k8s.io/cluster-api/test/infrastructure/docker/internal/docker" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/finalizers" "sigs.k8s.io/cluster-api/util/labels" clog "sigs.k8s.io/cluster-api/util/log" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -122,10 +122,8 @@ func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reques log = log.WithValues("Cluster", klog.KObj(cluster)) ctx = ctrl.LoggerInto(ctx, log) - // Return early if the object or Cluster is paused. - if annotations.IsPaused(cluster, dockerMachine) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, dockerMachine); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } if cluster.Spec.InfrastructureRef == nil { @@ -487,7 +485,7 @@ func (r *DockerMachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl err = ctrl.NewControllerManagedBy(mgr). For(&infrav1.DockerMachine{}). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(util.MachineToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("DockerMachine"))), @@ -500,7 +498,7 @@ func (r *DockerMachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToDockerMachines), builder.WithPredicates( - predicates.ClusterUnpausedAndInfrastructureReady(mgr.GetScheme(), predicateLog), + predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog), ), ). WatchesRawSource(r.ClusterCache.GetClusterSource("dockermachine", clusterToDockerMachines)). diff --git a/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go b/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go index 32ad7ed8a175..f59322771fa8 100644 --- a/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go +++ b/test/infrastructure/inmemory/internal/controllers/inmemorycluster_controller.go @@ -39,6 +39,7 @@ import ( "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/finalizers" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" ) @@ -91,6 +92,10 @@ func (r *InMemoryClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ log = log.WithValues("Cluster", klog.KObj(cluster)) ctx = ctrl.LoggerInto(ctx, log) + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, inMemoryCluster); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err + } + // Initialize the patch helper patchHelper, err := patch.NewHelper(inMemoryCluster, r.Client) if err != nil { @@ -211,12 +216,12 @@ func (r *InMemoryClusterReconciler) SetupWithManager(ctx context.Context, mgr ct err := ctrl.NewControllerManagedBy(mgr). For(&infrav1.InMemoryCluster{}). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("InMemoryCluster"), mgr.GetClient(), &infrav1.InMemoryCluster{})), builder.WithPredicates( - predicates.ClusterUnpaused(mgr.GetScheme(), predicateLog), + predicates.ClusterPausedTransitions(mgr.GetScheme(), predicateLog), ), ).Complete(r) if err != nil { diff --git a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go index e45d4aee2fde..0f74968402c1 100644 --- a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go +++ b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go @@ -48,12 +48,12 @@ import ( inmemoryruntime "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/runtime" inmemoryserver "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/server" "sigs.k8s.io/cluster-api/util" - "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/certs" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/finalizers" clog "sigs.k8s.io/cluster-api/util/log" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/paused" "sigs.k8s.io/cluster-api/util/predicates" "sigs.k8s.io/cluster-api/util/secret" ) @@ -123,10 +123,8 @@ func (r *InMemoryMachineReconciler) Reconcile(ctx context.Context, req ctrl.Requ log = log.WithValues("Cluster", klog.KObj(cluster)) ctx = ctrl.LoggerInto(ctx, log) - // Return early if the object or Cluster is paused. - if annotations.IsPaused(cluster, inMemoryMachine) { - log.Info("Reconciliation is paused for this object") - return ctrl.Result{}, nil + if isPaused, conditionChanged, err := paused.EnsurePausedCondition(ctx, r.Client, cluster, inMemoryMachine); err != nil || isPaused || conditionChanged { + return ctrl.Result{}, err } // Fetch the in-memory Cluster. @@ -1146,7 +1144,7 @@ func (r *InMemoryMachineReconciler) SetupWithManager(ctx context.Context, mgr ct err = ctrl.NewControllerManagedBy(mgr). For(&infrav1.InMemoryMachine{}). WithOptions(options). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue)). Watches( &clusterv1.Machine{}, handler.EnqueueRequestsFromMapFunc(util.MachineToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("InMemoryMachine"))), @@ -1159,7 +1157,7 @@ func (r *InMemoryMachineReconciler) SetupWithManager(ctx context.Context, mgr ct &clusterv1.Cluster{}, handler.EnqueueRequestsFromMapFunc(clusterToInMemoryMachines), builder.WithPredicates( - predicates.ClusterUnpausedAndInfrastructureReady(mgr.GetScheme(), predicateLog), + predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), predicateLog), ), ).Complete(r) if err != nil { diff --git a/util/conditions/v1beta2/patch.go b/util/conditions/v1beta2/patch.go index f38ff291ec30..19e6b7e6416d 100644 --- a/util/conditions/v1beta2/patch.go +++ b/util/conditions/v1beta2/patch.go @@ -140,7 +140,7 @@ func (p Patch) Apply(latest Setter, options ...ApplyOption) error { // If the condition is already on latest, check if latest and after agree on the change; if not, this is a conflict. if latestCondition := meta.FindStatusCondition(latestConditions, conditionPatch.After.Type); latestCondition != nil { // If latest and after disagree on the change, then it is a conflict - if !hasSameState(latestCondition, conditionPatch.After) { + if !HasSameState(latestCondition, conditionPatch.After) { return errors.Errorf("error patching conditions: The condition %q was modified by a different process and this caused a merge/AddCondition conflict: %v", conditionPatch.After.Type, cmp.Diff(latestCondition, conditionPatch.After)) } // otherwise, the latest is already as intended. @@ -167,7 +167,7 @@ func (p Patch) Apply(latest Setter, options ...ApplyOption) error { // If the condition on the latest is different from the base condition, check if // the after state corresponds to the desired value. If not this is a conflict (unless we should ignore conflicts for this condition type). if !reflect.DeepEqual(latestCondition, conditionPatch.Before) { - if !hasSameState(latestCondition, conditionPatch.After) { + if !HasSameState(latestCondition, conditionPatch.After) { return errors.Errorf("error patching conditions: The condition %q was modified by a different process and this caused a merge/ChangeCondition conflict: %v", conditionPatch.After.Type, cmp.Diff(latestCondition, conditionPatch.After)) } // Otherwise the latest is already as intended. @@ -192,7 +192,7 @@ func (p Patch) Apply(latest Setter, options ...ApplyOption) error { // If the condition is still on the latest, check if it is changed in the meantime; // if so then this is a conflict. if latestCondition := meta.FindStatusCondition(latestConditions, conditionPatch.Before.Type); latestCondition != nil { - if !hasSameState(latestCondition, conditionPatch.Before) { + if !HasSameState(latestCondition, conditionPatch.Before) { return errors.Errorf("error patching conditions: The condition %q was modified by a different process and this caused a merge/RemoveCondition conflict: %v", conditionPatch.Before.Type, cmp.Diff(latestCondition, conditionPatch.Before)) } } @@ -213,9 +213,9 @@ func (p Patch) IsZero() bool { return len(p) == 0 } -// hasSameState returns true if a condition has the same state of another; state is defined +// HasSameState returns true if a condition has the same state of another; state is defined // by the union of following fields: Type, Status, Reason, ObservedGeneration and Message (it excludes LastTransitionTime). -func hasSameState(i, j *metav1.Condition) bool { +func HasSameState(i, j *metav1.Condition) bool { return i.Type == j.Type && i.Status == j.Status && i.ObservedGeneration == j.ObservedGeneration && diff --git a/util/paused/paused.go b/util/paused/paused.go new file mode 100644 index 000000000000..d9a5f3152769 --- /dev/null +++ b/util/paused/paused.go @@ -0,0 +1,112 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package paused implements paused helper functions. +package paused + +import ( + "context" + "fmt" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/util/annotations" + v1beta2conditions "sigs.k8s.io/cluster-api/util/conditions/v1beta2" + "sigs.k8s.io/cluster-api/util/patch" +) + +// ConditionSetter combines the client.Object and Setter interface. +type ConditionSetter interface { + v1beta2conditions.Setter + client.Object +} + +// EnsurePausedCondition sets the paused condition on the object and returns if it should be considered as paused. +func EnsurePausedCondition(ctx context.Context, c client.Client, cluster *clusterv1.Cluster, obj ConditionSetter) (isPaused bool, conditionChanged bool, err error) { + oldCondition := v1beta2conditions.Get(obj, clusterv1.PausedV1Beta2Condition) + newCondition := pausedCondition(c.Scheme(), cluster, obj, clusterv1.PausedV1Beta2Condition) + + isPaused = newCondition.Status == metav1.ConditionTrue + + log := ctrl.LoggerFrom(ctx) + + // Return early if the paused condition did not change. + if oldCondition != nil && v1beta2conditions.HasSameState(oldCondition, &newCondition) { + if isPaused { + log.V(6).Info("Reconciliation is paused for this object", "reason", newCondition.Message) + } + return isPaused, false, nil + } + + patchHelper, err := patch.NewHelper(obj, c) + if err != nil { + return isPaused, false, err + } + + if isPaused { + log.V(4).Info("Pausing reconciliation for this object", "reason", newCondition.Message) + } else { + log.V(4).Info("Unpausing reconciliation for this object") + } + + v1beta2conditions.Set(obj, newCondition) + + if err := patchHelper.Patch(ctx, obj, patch.WithOwnedV1Beta2Conditions{Conditions: []string{ + clusterv1.PausedV1Beta2Condition, + }}); err != nil { + return isPaused, false, err + } + + return isPaused, true, nil +} + +// pausedCondition sets the paused condition on the object and returns if it should be considered as paused. +func pausedCondition(scheme *runtime.Scheme, cluster *clusterv1.Cluster, obj ConditionSetter, targetConditionType string) metav1.Condition { + if (cluster != nil && cluster.Spec.Paused) || annotations.HasPaused(obj) { + var messages []string + if cluster != nil && cluster.Spec.Paused { + messages = append(messages, "Cluster spec.paused is set to true") + } + if annotations.HasPaused(obj) { + kind := "Object" + if gvk, err := apiutil.GVKForObject(obj, scheme); err == nil { + kind = gvk.Kind + } + messages = append(messages, fmt.Sprintf("%s has the cluster.x-k8s.io/paused annotation", kind)) + } + + return metav1.Condition{ + Type: targetConditionType, + Status: metav1.ConditionTrue, + Reason: clusterv1.PausedV1Beta2Reason, + Message: strings.Join(messages, ", "), + ObservedGeneration: obj.GetGeneration(), + } + } + + return metav1.Condition{ + Type: targetConditionType, + Status: metav1.ConditionFalse, + Reason: clusterv1.NotPausedV1Beta2Reason, + ObservedGeneration: obj.GetGeneration(), + } +} diff --git a/util/paused/paused_test.go b/util/paused/paused_test.go new file mode 100644 index 000000000000..679a12105a13 --- /dev/null +++ b/util/paused/paused_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package paused implements paused helper functions. +package paused + +import ( + "context" + "testing" + + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/internal/test/builder" +) + +func TestEnsurePausedCondition(t *testing.T) { + g := NewWithT(t) + + scheme := runtime.NewScheme() + g.Expect(builder.AddTransitionV1Beta2ToScheme(scheme)).To(Succeed()) + g.Expect(clusterv1.AddToScheme(scheme)).To(Succeed()) + + // Cluster Case 1: unpaused + normalCluster := &clusterv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "some-cluster", + Namespace: "default", + }, + } + + // Cluster Case 2: paused + pausedCluster := normalCluster.DeepCopy() + pausedCluster.Spec.Paused = true + + // Object case 1: unpaused + obj := &builder.Phase1Obj{ObjectMeta: metav1.ObjectMeta{ + Name: "some-object", + Namespace: "default", + }} + + // Object case 2: paused + pausedObj := obj.DeepCopy() + pausedObj.SetAnnotations(map[string]string{clusterv1.PausedAnnotation: ""}) + + tests := []struct { + name string + cluster *clusterv1.Cluster + object ConditionSetter + wantIsPaused bool + }{ + { + name: "unpaused cluster and unpaused object", + cluster: normalCluster.DeepCopy(), + object: obj.DeepCopy(), + wantIsPaused: false, + }, + { + name: "paused cluster and unpaused object", + cluster: pausedCluster.DeepCopy(), + object: obj.DeepCopy(), + wantIsPaused: true, + }, + { + name: "unpaused cluster and paused object", + cluster: normalCluster.DeepCopy(), + object: pausedObj.DeepCopy(), + wantIsPaused: true, + }, + { + name: "paused cluster and paused object", + cluster: pausedCluster.DeepCopy(), + object: pausedObj.DeepCopy(), + wantIsPaused: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + ctx := context.Background() + + c := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&clusterv1.Cluster{}, &builder.Phase1Obj{}). + WithObjects(tt.object, tt.cluster).Build() + + g.Expect(c.Get(ctx, client.ObjectKeyFromObject(tt.object), tt.object)).To(Succeed()) + + // The first run should set the condition. + gotIsPaused, gotConditionChanged, err := EnsurePausedCondition(ctx, c, tt.cluster, tt.object) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(gotConditionChanged).To(BeTrue(), "The first reconcile should set the Paused condition") + g.Expect(gotIsPaused).To(Equal(tt.wantIsPaused)) + + // The second reconcile should be a no-op. + gotIsPaused, gotConditionChanged, err = EnsurePausedCondition(ctx, c, tt.cluster, tt.object) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(gotConditionChanged).To(BeFalse(), "The second reconcile should not change the Paused condition") + g.Expect(gotIsPaused).To(Equal(tt.wantIsPaused)) + }) + } +} diff --git a/util/predicates/cluster_predicates.go b/util/predicates/cluster_predicates.go index b14c60690156..707b3c7ac0f4 100644 --- a/util/predicates/cluster_predicates.go +++ b/util/predicates/cluster_predicates.go @@ -150,7 +150,7 @@ func ClusterUpdateUnpaused(scheme *runtime.Scheme, logger logr.Logger) predicate } // This predicate always work in "or" with Paused predicates - // so the logs are adjusted to not provide false negatives/verbosity al V<=5. + // so the logs are adjusted to not provide false negatives/verbosity at V<=5. log.V(6).Info("Cluster was not unpaused, blocking further processing") return false }, @@ -178,6 +178,44 @@ func ClusterUnpaused(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs return Any(scheme, log, ClusterCreateNotPaused(scheme, log), ClusterUpdateUnpaused(scheme, log)) } +// ClusterPausedTransitions returns a predicate that returns true for an update event when a cluster has Spec.Paused changed. +func ClusterPausedTransitions(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + log := logger.WithValues("predicate", "ClusterPausedTransitions", "eventType", "update") + if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil { + log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld)) + } + + oldCluster, ok := e.ObjectOld.(*clusterv1.Cluster) + if !ok { + log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld)) + return false + } + + newCluster := e.ObjectNew.(*clusterv1.Cluster) + + if oldCluster.Spec.Paused && !newCluster.Spec.Paused { + log.V(6).Info("Cluster unpausing, allowing further processing") + return true + } + + if !oldCluster.Spec.Paused && newCluster.Spec.Paused { + log.V(6).Info("Cluster pausing, allowing further processing") + return true + } + + // This predicate always work in "or" with Paused predicates + // so the logs are adjusted to not provide false negatives/verbosity at V<=5. + log.V(6).Info("Cluster paused state was not changed, blocking further processing") + return false + }, + CreateFunc: func(event.CreateEvent) bool { return false }, + DeleteFunc: func(event.DeleteEvent) bool { return false }, + GenericFunc: func(event.GenericEvent) bool { return false }, + } +} + // ClusterControlPlaneInitialized returns a Predicate that returns true on Update events // when ControlPlaneInitializedCondition on a Cluster changes to true. // Example use: @@ -218,6 +256,23 @@ func ClusterControlPlaneInitialized(scheme *runtime.Scheme, logger logr.Logger) } } +// ClusterPausedTransitionsOrInfrastructureReady returns a Predicate that returns true on Cluster Update events where +// either Cluster.Spec.Paused transitions or Cluster.Status.InfrastructureReady transitions to true. +// This implements a common requirement for some cluster-api and provider controllers (such as Machine Infrastructure +// controllers) to resume reconciliation when the Cluster gets paused or unpaused and when the infrastructure becomes ready. +// Example use: +// +// err := controller.Watch( +// source.Kind(cache, &clusterv1.Cluster{}), +// handler.EnqueueRequestsFromMapFunc(clusterToMachines) +// predicates.ClusterPausedTransitionsOrInfrastructureReady(mgr.GetScheme(), r.Log), +// ) +func ClusterPausedTransitionsOrInfrastructureReady(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs { + log := logger.WithValues("predicate", "ClusterPausedTransitionsOrInfrastructureReady") + + return Any(scheme, log, ClusterPausedTransitions(scheme, log), ClusterUpdateInfraReady(scheme, log)) +} + // ClusterUnpausedAndInfrastructureReady returns a Predicate that returns true on Cluster creation events where // both Cluster.Spec.Paused is false and Cluster.Status.InfrastructureReady is true and Update events when // either Cluster.Spec.Paused transitions to false or Cluster.Status.InfrastructureReady transitions to true.