diff --git a/controllers/taloscontrolplane_controller.go b/controllers/taloscontrolplane_controller.go index 93fda14..d402b5e 100644 --- a/controllers/taloscontrolplane_controller.go +++ b/controllers/taloscontrolplane_controller.go @@ -20,12 +20,15 @@ import ( cabptv1 "github.com/talos-systems/cluster-api-bootstrap-provider-talos/api/v1alpha3" machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" talosclient "github.com/talos-systems/talos/pkg/machinery/client" + "github.com/talos-systems/talos/pkg/machinery/constants" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/storage/names" "k8s.io/utils/pointer" @@ -150,13 +153,6 @@ func (r *TalosControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Re return ctrl.Result{}, nil } - // nb: we moved the deletion reconcile before the defer to avoid additional, unneccessary patching. - // we handle the patches necessary directly in the reconcileDelete function and will eventually get rid of this defer altogether. - if !tcp.ObjectMeta.DeletionTimestamp.IsZero() { - // Handle deletion reconciliation loop. - return r.reconcileDelete(ctx, cluster, tcp) - } - defer func() { r.Log.Info("attempting to set control plane status") @@ -176,7 +172,7 @@ func (r *TalosControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Re // TODO: remove this as soon as we have a proper remote cluster cache in place. // Make TCP to requeue in case status is not ready, so we can check for node status without waiting for a full resync (by default 10 minutes). // Only requeue if we are not going in exponential backoff due to error, or if we are not already re-queueing, or if the object has a deletion timestamp. - if reterr == nil && !res.Requeue && !(res.RequeueAfter > 0) && tcp.ObjectMeta.DeletionTimestamp.IsZero() { + if reterr == nil && !res.Requeue && res.RequeueAfter <= 0 && tcp.ObjectMeta.DeletionTimestamp.IsZero() { if !tcp.Status.Ready || tcp.Status.UnavailableReplicas > 0 { res = ctrl.Result{RequeueAfter: 20 * time.Second} } @@ -185,13 +181,25 @@ func (r *TalosControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Re r.Log.Info("successfully updated control plane status") }() + if !tcp.ObjectMeta.DeletionTimestamp.IsZero() { + // Handle deletion reconciliation loop. + return r.reconcileDelete(ctx, cluster, tcp) + } + + return r.reconcile(ctx, cluster, tcp) +} + +func (r *TalosControlPlaneReconciler) reconcile(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane) (res ctrl.Result, err error) { + logger := ctrl.LoggerFrom(ctx, "cluster", cluster.Name) + logger.Info("reconcile TalosControlPlane") + // Update ownerrefs on infra templates - if err := r.addClusterOwnerToObj(ctx, tcp.Spec.InfrastructureTemplate, cluster); err != nil { + if err := r.reconcileExternalReference(ctx, tcp.Spec.InfrastructureTemplate, cluster); err != nil { return ctrl.Result{}, err } // If ControlPlaneEndpoint is not set, return early - if cluster.Spec.ControlPlaneEndpoint.IsZero() { + if !cluster.Spec.ControlPlaneEndpoint.IsValid() { logger.Info("cluster does not yet have a ControlPlaneEndpoint defined") return ctrl.Result{}, nil } @@ -203,134 +211,37 @@ func (r *TalosControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Re return ctrl.Result{}, err } - controlPlane := newControlPlane(cluster, tcp, ownedMachines) - - // If we've made it this far, we can assume that all ownedMachines are up to date - numMachines := len(ownedMachines) - desiredReplicas := int(*tcp.Spec.Replicas) - - requeue := false - - // Audit the etcd member list to remove any nodes that no longer exist - if err := r.auditEtcd(ctx, util.ObjectKey(cluster), controlPlane.TCP.Name); err != nil { - logger.Info("failed to check etcd membership list", "error", err) - - // if audit failed, requeue the reconcile in any case - requeue = true - } - - if err := r.etcdHealthcheck(ctx, cluster, ownedMachines); err != nil { - conditions.MarkFalse(tcp, controlplanev1.EtcdClusterHealthyCondition, controlplanev1.EtcdClusterUnhealthyReason, - clusterv1.ConditionSeverityWarning, err.Error()) - } else { - conditions.MarkTrue(tcp, controlplanev1.EtcdClusterHealthyCondition) - } - - if err := r.nodesHealthcheck(ctx, cluster, ownedMachines); err != nil { - reason := controlplanev1.ControlPlaneComponentsInspectionFailedReason - - if errors.Is(err, &errServiceUnhealthy{}) { - reason = controlplanev1.ControlPlaneComponentsUnhealthyReason - } - - conditions.MarkFalse(tcp, controlplanev1.ControlPlaneComponentsHealthyCondition, reason, - clusterv1.ConditionSeverityWarning, err.Error()) - } else { - conditions.MarkTrue(tcp, controlplanev1.ControlPlaneComponentsHealthyCondition) - } - - if !conditions.Has(tcp, controlplanev1.AvailableCondition) { - conditions.MarkFalse(tcp, controlplanev1.AvailableCondition, controlplanev1.WaitingForTalosBootReason, clusterv1.ConditionSeverityInfo, "") - } + conditionGetters := make([]conditions.Getter, len(ownedMachines)) - if !conditions.Has(tcp, controlplanev1.MachinesBootstrapped) { - conditions.MarkFalse(tcp, controlplanev1.MachinesBootstrapped, controlplanev1.WaitingForMachinesReason, clusterv1.ConditionSeverityInfo, "") + for i, v := range ownedMachines { + conditionGetters[i] = &v } - switch { - // We are creating the first replica - case numMachines < desiredReplicas && numMachines == 0: - // Create new Machine w/ init - logger.Info("initializing control plane", "Desired", desiredReplicas, "Existing", numMachines) - - return r.bootControlPlane(ctx, cluster, tcp, controlPlane, true) - // We are scaling up - case numMachines < desiredReplicas && numMachines > 0: - conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingUpReason, clusterv1.ConditionSeverityWarning, - "Scaling up control plane to %d replicas (actual %d)", - desiredReplicas, numMachines) - - // Create a new Machine w/ join - logger.Info("scaling up control plane", "Desired", desiredReplicas, "Existing", numMachines) - - return r.bootControlPlane(ctx, cluster, tcp, controlPlane, false) - // We are scaling down - case numMachines > desiredReplicas: - conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingDownReason, clusterv1.ConditionSeverityWarning, - "Scaling down control plane to %d replicas (actual %d)", - desiredReplicas, numMachines) - - if numMachines == 1 { - conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingDownReason, clusterv1.ConditionSeverityError, - "Cannot scale down control plane nodes to 0", - desiredReplicas, numMachines) - - return res, nil - } - - if err := r.ensureNodesBooted(ctx, cluster, ownedMachines); err != nil { - logger.Info("waiting for all nodes to finish boot sequence", "error", err) - - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil - } - - if !conditions.IsTrue(tcp, controlplanev1.EtcdClusterHealthyCondition) { - logger.Info("waiting for etcd to become healthy before scaling down") - - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil - } + conditions.SetAggregate(tcp, controlplanev1.MachinesReadyCondition, conditionGetters, conditions.AddSourceRef(), conditions.WithStepCounterIf(false)) - logger.Info("scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines) + var ( + errs error + result ctrl.Result + phaseResult ctrl.Result + ) - res, err = r.scaleDownControlPlane(ctx, util.ObjectKey(cluster), controlPlane.TCP.Name, ownedMachines) + // run all similar reconcile steps in the loop and pick the lowest RetryAfter, aggregate errors and check the requeue flags. + for _, phase := range []func(context.Context, *clusterv1.Cluster, *controlplanev1.TalosControlPlane, []clusterv1.Machine) (ctrl.Result, error){ + r.reconcileEtcdMembers, + r.reconcileNodeHealth, + r.reconcileConditions, + r.reconcileKubeconfig, + r.reconcileMachines, + } { + phaseResult, err = phase(ctx, cluster, tcp, ownedMachines) if err != nil { - if res.Requeue || res.RequeueAfter > 0 { - logger.Info("failed to scale down control plane", "error", err) - - return res, nil - } - } - - return res, err - default: - if !tcp.Status.Bootstrapped && reflect.ValueOf(tcp.Spec.ControlPlaneConfig.InitConfig).IsZero() { - if err := r.bootstrapCluster(ctx, cluster, ownedMachines); err != nil { - conditions.MarkFalse(tcp, controlplanev1.MachinesBootstrapped, controlplanev1.WaitingForTalosBootReason, clusterv1.ConditionSeverityInfo, err.Error()) - - logger.Info("bootstrap failed, retrying in 20 seconds", "error", err) - - return ctrl.Result{RequeueAfter: time.Second * 20}, nil - } - - conditions.MarkTrue(tcp, controlplanev1.MachinesBootstrapped) - - tcp.Status.Bootstrapped = true + errs = kerrors.NewAggregate([]error{errs, err}) } - if conditions.Has(tcp, controlplanev1.MachinesReadyCondition) { - conditions.MarkTrue(tcp, controlplanev1.ResizedCondition) - } - - conditions.MarkTrue(tcp, controlplanev1.MachinesCreatedCondition) + result = util.LowestNonZeroResult(result, phaseResult) } - // Generate Cluster Kubeconfig if needed - if result, err := r.reconcileKubeconfig(ctx, util.ObjectKey(cluster), cluster.Spec.ControlPlaneEndpoint, tcp); !result.IsZero() || err != nil { - logger.Error(err, "failed to reconcile Kubeconfig") - return result, err - } - - return ctrl.Result{Requeue: requeue}, nil + return result, errs } // ClusterToTalosControlPlane is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation @@ -408,7 +319,7 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, deleteMachine := machines[0] for _, machine := range machines { if !machine.ObjectMeta.DeletionTimestamp.IsZero() { - r.Log.Info("Machine is in process of deletion", "machine", machine.Name) + r.Log.Info("machine is in process of deletion", "machine", machine.Name) node, err := kubeclient.CoreV1().Nodes().Get(ctx, machine.Status.NodeRef.Name, metav1.GetOptions{}) if err != nil { @@ -432,7 +343,7 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, // do not allow scaling down until all nodes have nodeRefs if machine.Status.NodeRef == nil { - r.Log.Info("One of machines does not have NodeRef", "machine", machine.Name) + r.Log.Info("one of machines does not have NodeRef", "machine", machine.Name) return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } @@ -460,7 +371,7 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, return ctrl.Result{}, err } - r.Log.Info("Deleting machine", "machine", deleteMachine.Name, "node", node.Name) + r.Log.Info("deleting machine", "machine", deleteMachine.Name, "node", node.Name) err = r.Client.Delete(ctx, &deleteMachine) if err != nil { @@ -487,7 +398,7 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, // NB: We shutdown the node here so that a loadbalancer will drop the backend. // The Kubernetes API server is configured to talk to etcd on localhost, but // at this point etcd has been stopped. - r.Log.Info("Shutting down node", "machine", deleteMachine.Name, "node", node.Name) + r.Log.Info("shutting down node", "machine", deleteMachine.Name, "node", node.Name) err = c.Shutdown(ctx) if err != nil { @@ -495,7 +406,7 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, } } - r.Log.Info("Deleting node", "machine", deleteMachine.Name, "node", node.Name) + r.Log.Info("deleting node", "machine", deleteMachine.Name, "node", node.Name) err = kubeclient.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}) if err != nil { @@ -523,7 +434,6 @@ func (r *TalosControlPlaneReconciler) getControlPlaneMachinesForCluster(ctx cont } return machineList.Items, nil - } // getFailureDomain will return a slice of failure domains from the cluster status. @@ -749,94 +659,79 @@ func (r *TalosControlPlaneReconciler) updateStatus(ctx context.Context, tcp *con kubeclient, err := r.kubeconfigForCluster(ctx, util.ObjectKey(cluster)) if err != nil { - return err + r.Log.Info("failed to get kubeconfig for the cluster", "error", err) + + return nil } defer kubeclient.Close() //nolint:errcheck - conditionGetters := make([]conditions.Getter, len(ownedMachines)) - - for i, v := range ownedMachines { - conditionGetters[i] = &v + nodeSelector := labels.NewSelector() + req, err := labels.NewRequirement(constants.LabelNodeRoleMaster, selection.Exists, []string{}) + if err != nil { + return err } - errChan := make(chan error) - - for _, ownedMachine := range ownedMachines { - ownedMachine := ownedMachine - - go func() { - e := func() error { - if clusterv1.MachinePhase(ownedMachine.Status.Phase) == clusterv1.MachinePhaseDeleting { - return fmt.Errorf("machine is deleting") - } - - if ownedMachine.Status.NodeRef == nil { - return fmt.Errorf("machine %q does not have a noderef", ownedMachine.Name) - } - - node, err := kubeclient.CoreV1().Nodes().Get(ctx, ownedMachine.Status.NodeRef.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get node %q: %w", node.Name, err) - } - - for _, condition := range node.Status.Conditions { - if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue { - return nil - } - } - - return fmt.Errorf("node ready condition not found") - }() + nodes, err := kubeclient.CoreV1().Nodes().List(ctx, metav1.ListOptions{ + LabelSelector: nodeSelector.Add(*req).String(), + }) - if e != nil { - e = fmt.Errorf("failed to get status for %q: %w", ownedMachine.Name, e) - } + if err != nil { + r.Log.Info("failed to list controlplane nodes", "error", err) - errChan <- e - }() + return nil } - for range ownedMachines { - err = <-errChan - if err == nil { + for _, node := range nodes.Items { + if util.IsNodeReady(&node) { tcp.Status.ReadyReplicas++ - } else { - r.Log.Info("Failed to get readiness of machine", "err", err) } } tcp.Status.UnavailableReplicas = replicas - tcp.Status.ReadyReplicas + if len(nodes.Items) > 0 { + tcp.Status.Initialized = true + conditions.MarkTrue(tcp, controlplanev1.AvailableCondition) + } + if tcp.Status.ReadyReplicas > 0 { - r.Log.Info("Ready replicas", "count", tcp.Status.ReadyReplicas) tcp.Status.Ready = true } - // We consider ourselves "initialized" if the workload cluster returns any number of nodes. - // We also do not return client list errors (just log them) as it's expected that it will fail - // for a while until the cluster is up. - nodeList, err := kubeclient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) - if err == nil { - if len(nodeList.Items) > 0 { - tcp.Status.Initialized = true + r.Log.Info("ready replicas", "count", tcp.Status.ReadyReplicas) - conditions.MarkTrue(tcp, controlplanev1.AvailableCondition) - } - } else { - r.Log.Error(err, "failed attempt to contact workload cluster") + return nil +} + +func (r *TalosControlPlaneReconciler) reconcileExternalReference(ctx context.Context, ref corev1.ObjectReference, cluster *clusterv1.Cluster) error { + obj, err := external.Get(ctx, r.Client, &ref, cluster.Namespace) + if err != nil { + return err } - conditions.SetAggregate(tcp, controlplanev1.MachinesReadyCondition, conditionGetters, conditions.AddSourceRef(), conditions.WithStepCounterIf(false)) + objPatchHelper, err := patch.NewHelper(obj, r.Client) + if err != nil { + return err + } - return nil + obj.SetOwnerReferences(util.EnsureOwnerRef(obj.GetOwnerReferences(), metav1.OwnerReference{ + APIVersion: clusterv1.GroupVersion.String(), + Kind: "Cluster", + Name: cluster.Name, + UID: cluster.UID, + })) + + return objPatchHelper.Patch(ctx, obj) } -func (r *TalosControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, clusterName client.ObjectKey, endpoint clusterv1.APIEndpoint, kcp *controlplanev1.TalosControlPlane) (ctrl.Result, error) { +func (r *TalosControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (ctrl.Result, error) { + endpoint := cluster.Spec.ControlPlaneEndpoint if endpoint.IsZero() { return ctrl.Result{}, nil } + clusterName := util.ObjectKey(cluster) _, err := secret.GetFromNamespacedName(ctx, r.Client, clusterName, secret.Kubeconfig) switch { case apierrors.IsNotFound(err): @@ -845,11 +740,11 @@ func (r *TalosControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, c r.Client, clusterName, endpoint.String(), - *metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("TalosControlPlane")), + *metav1.NewControllerRef(tcp, controlplanev1.GroupVersion.WithKind("TalosControlPlane")), ) if createErr != nil { if errors.Is(createErr, kubeconfig.ErrDependentCertificateNotFound) { - r.Log.Info("Could not find secret", "secret", secret.ClusterCA, "cluster", clusterName.Name, "namespace", clusterName.Namespace) + r.Log.Info("could not find secret", "secret", secret.ClusterCA, "cluster", clusterName.Name, "namespace", clusterName.Namespace) return ctrl.Result{RequeueAfter: 20 * time.Second}, nil } @@ -857,31 +752,152 @@ func (r *TalosControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, c return ctrl.Result{}, createErr } case err != nil: - return ctrl.Result{}, errors.Wrapf(err, "failed to retrieve kubeconfig Secret for Cluster %q in namespace %q", clusterName.Name, clusterName.Namespace) + return ctrl.Result{RequeueAfter: 20 * time.Second}, errors.Wrapf(err, "failed to retrieve kubeconfig Secret for Cluster %q in namespace %q", clusterName.Name, clusterName.Namespace) } return ctrl.Result{}, nil } -func (r *TalosControlPlaneReconciler) addClusterOwnerToObj(ctx context.Context, ref corev1.ObjectReference, cluster *clusterv1.Cluster) error { - obj, err := external.Get(ctx, r.Client, &ref, cluster.Namespace) - if err != nil { - return err +func (r *TalosControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (result ctrl.Result, err error) { + var errs error + // Audit the etcd member list to remove any nodes that no longer exist + if err := r.auditEtcd(ctx, util.ObjectKey(cluster), tcp.Name); err != nil { + errs = kerrors.NewAggregate([]error{errs, err}) } - objPatchHelper, err := patch.NewHelper(obj, r.Client) - if err != nil { - return err + if err := r.etcdHealthcheck(ctx, cluster, machines); err != nil { + conditions.MarkFalse(tcp, controlplanev1.EtcdClusterHealthyCondition, controlplanev1.EtcdClusterUnhealthyReason, + clusterv1.ConditionSeverityWarning, err.Error()) + errs = kerrors.NewAggregate([]error{errs, err}) + } else { + conditions.MarkTrue(tcp, controlplanev1.EtcdClusterHealthyCondition) } - obj.SetOwnerReferences(util.EnsureOwnerRef(obj.GetOwnerReferences(), metav1.OwnerReference{ - APIVersion: clusterv1.GroupVersion.String(), - Kind: "Cluster", - Name: cluster.Name, - UID: cluster.UID, - })) + if errs != nil { + return ctrl.Result{RequeueAfter: 10 * time.Second}, errs + } - return objPatchHelper.Patch(ctx, obj) + return ctrl.Result{}, nil +} + +func (r *TalosControlPlaneReconciler) reconcileNodeHealth(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (result ctrl.Result, err error) { + if err := r.nodesHealthcheck(ctx, cluster, machines); err != nil { + reason := controlplanev1.ControlPlaneComponentsInspectionFailedReason + + if errors.Is(err, &errServiceUnhealthy{}) { + reason = controlplanev1.ControlPlaneComponentsUnhealthyReason + } + + conditions.MarkFalse(tcp, controlplanev1.ControlPlaneComponentsHealthyCondition, reason, + clusterv1.ConditionSeverityWarning, err.Error()) + + return ctrl.Result{RequeueAfter: 10 * time.Second}, err + } else { + conditions.MarkTrue(tcp, controlplanev1.ControlPlaneComponentsHealthyCondition) + } + + return ctrl.Result{}, nil +} + +func (r *TalosControlPlaneReconciler) reconcileConditions(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (result ctrl.Result, err error) { + if !conditions.Has(tcp, controlplanev1.AvailableCondition) { + conditions.MarkFalse(tcp, controlplanev1.AvailableCondition, controlplanev1.WaitingForTalosBootReason, clusterv1.ConditionSeverityInfo, "") + } + + if !conditions.Has(tcp, controlplanev1.MachinesBootstrapped) { + conditions.MarkFalse(tcp, controlplanev1.MachinesBootstrapped, controlplanev1.WaitingForMachinesReason, clusterv1.ConditionSeverityInfo, "") + } + + return ctrl.Result{}, nil +} + +func (r *TalosControlPlaneReconciler) reconcileMachines(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (res ctrl.Result, err error) { + logger := r.Log.WithValues("namespace", tcp.Namespace, "talosControlPlane", tcp.Name) + + // If we've made it this far, we can assume that all ownedMachines are up to date + numMachines := len(machines) + desiredReplicas := int(*tcp.Spec.Replicas) + + controlPlane := newControlPlane(cluster, tcp, machines) + + switch { + // We are creating the first replica + case numMachines < desiredReplicas && numMachines == 0: + // Create new Machine w/ init + logger.Info("initializing control plane", "Desired", desiredReplicas, "Existing", numMachines) + + return r.bootControlPlane(ctx, cluster, tcp, controlPlane, true) + // We are scaling up + case numMachines < desiredReplicas && numMachines > 0: + conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingUpReason, clusterv1.ConditionSeverityWarning, + "Scaling up control plane to %d replicas (actual %d)", + desiredReplicas, numMachines) + + // Create a new Machine w/ join + logger.Info("scaling up control plane", "Desired", desiredReplicas, "Existing", numMachines) + + return r.bootControlPlane(ctx, cluster, tcp, controlPlane, false) + // We are scaling down + case numMachines > desiredReplicas: + conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingDownReason, clusterv1.ConditionSeverityWarning, + "Scaling down control plane to %d replicas (actual %d)", + desiredReplicas, numMachines) + + if numMachines == 1 { + conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingDownReason, clusterv1.ConditionSeverityError, + "Cannot scale down control plane nodes to 0", + desiredReplicas, numMachines) + + return res, nil + } + + if err := r.ensureNodesBooted(ctx, cluster, machines); err != nil { + logger.Info("waiting for all nodes to finish boot sequence", "error", err) + + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + + if !conditions.IsTrue(tcp, controlplanev1.EtcdClusterHealthyCondition) { + logger.Info("waiting for etcd to become healthy before scaling down") + + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + + logger.Info("scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines) + + res, err = r.scaleDownControlPlane(ctx, util.ObjectKey(cluster), controlPlane.TCP.Name, machines) + if err != nil { + if res.Requeue || res.RequeueAfter > 0 { + logger.Info("failed to scale down control plane", "error", err) + + return res, nil + } + } + + return res, err + default: + if !tcp.Status.Bootstrapped && reflect.ValueOf(tcp.Spec.ControlPlaneConfig.InitConfig).IsZero() { + if err := r.bootstrapCluster(ctx, cluster, machines); err != nil { + conditions.MarkFalse(tcp, controlplanev1.MachinesBootstrapped, controlplanev1.WaitingForTalosBootReason, clusterv1.ConditionSeverityInfo, err.Error()) + + logger.Info("bootstrap failed, retrying in 20 seconds", "error", err) + + return ctrl.Result{RequeueAfter: time.Second * 20}, nil + } + + conditions.MarkTrue(tcp, controlplanev1.MachinesBootstrapped) + + tcp.Status.Bootstrapped = true + } + + if conditions.Has(tcp, controlplanev1.MachinesReadyCondition) { + conditions.MarkTrue(tcp, controlplanev1.ResizedCondition) + } + + conditions.MarkTrue(tcp, controlplanev1.MachinesCreatedCondition) + } + + return ctrl.Result{}, nil } func patchTalosControlPlane(ctx context.Context, patchHelper *patch.Helper, tcp *controlplanev1.TalosControlPlane, opts ...patch.Option) error { diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go index 2055eb5..58c6625 100644 --- a/internal/integration/integration_test.go +++ b/internal/integration/integration_test.go @@ -21,6 +21,7 @@ import ( "github.com/talos-systems/go-retry/retry" machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" "gopkg.in/yaml.v3" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -278,7 +279,18 @@ func (suite *IntegrationSuite) Test04ScaleControlPlaneNoWait() { suite.cluster.Scale(ctx, 3, capi.ControlPlaneNodes) //nolint:errcheck - err := suite.cluster.Scale(suite.ctx, 1, capi.ControlPlaneNodes) + err := retry.Constant(time.Second*10, retry.WithUnits(time.Second)).Retry(func() error { + if err := suite.cluster.Scale(suite.ctx, 1, capi.ControlPlaneNodes); err != nil { + if apierrors.IsConflict(err) { + return retry.ExpectedError(err) + } + + return err + } + + return nil + }) + suite.Require().NoError(err) }