From 04b05702289ac99f72964f170f77e935ef0b7f3c Mon Sep 17 00:00:00 2001 From: Artem Chernyshev Date: Fri, 15 Apr 2022 22:43:08 +0300 Subject: [PATCH] feat: support `TalosControlPlane` rolling upgrade Track Kubernetes version changes in `TalosControlPlane` resource. When changed recreate nodes one by one until all nodes have the desired Kubernetes version. The same happens when `infrastructureTemplate` is updated. Added new parameters to the `TalosControlPlane` resource which specify controlplanes rollout strategy and strategy configuration. Enable `Defaulter` webhook to populate all created/updated `TalosControlPlane` resources rollout strategy. Signed-off-by: Artem Chernyshev --- Dockerfile | 3 +- api/v1alpha3/conditions.go | 10 + api/v1alpha3/taloscontrolplane_types.go | 51 +++- api/v1alpha3/taloscontrolplane_webhook.go | 50 ++++ api/v1alpha3/zz_generated.deepcopy.go | 46 ++++ ...e.cluster.x-k8s.io_taloscontrolplanes.yaml | 28 +++ config/default/kustomization.yaml | 2 +- config/default/webhookcainjection_patch.yaml | 9 +- config/webhook/manifests.yaml | 29 +++ config/webhook/service.yaml | 2 +- controllers/controlplane.go | 125 +++++++++ controllers/etcd.go | 8 +- controllers/scale.go | 211 ++++++++++++++++ controllers/taloscontrolplane_controller.go | 237 +++--------------- controllers/upgrade.go | 46 ++++ go.mod | 2 + hack/release.toml | 10 +- hack/test/e2e-aws.sh | 4 +- internal/integration/integration_test.go | 72 +++++- 19 files changed, 723 insertions(+), 222 deletions(-) create mode 100644 controllers/controlplane.go create mode 100644 controllers/scale.go create mode 100644 controllers/upgrade.go diff --git a/Dockerfile b/Dockerfile index 99a4bb7..768e540 100644 --- a/Dockerfile +++ b/Dockerfile @@ -40,8 +40,7 @@ ARG NAME RUN --mount=type=cache,target=/.cache controller-gen crd:crdVersions=v1 paths="./api/..." output:crd:dir=config/crd/bases output:webhook:dir=config/webhook webhook RUN --mount=type=cache,target=/.cache controller-gen rbac:roleName=manager-role paths="./controllers/..." output:rbac:dir=config/rbac FROM scratch AS manifests -COPY --from=manifests-build /src/config/crd /config/crd -COPY --from=manifests-build /src/config/rbac /config/rbac +COPY --from=manifests-build /src/config /config FROM build AS generate-build RUN --mount=type=cache,target=/.cache controller-gen object:headerFile=./hack/boilerplate.go.txt paths="./..." diff --git a/api/v1alpha3/conditions.go b/api/v1alpha3/conditions.go index bdedd2b..780ee01 100644 --- a/api/v1alpha3/conditions.go +++ b/api/v1alpha3/conditions.go @@ -38,6 +38,16 @@ const ( InvalidControlPlaneConfigReason = "InvalidControlPlaneConfig" ) +const ( + // MachinesSpecUpToDateCondition documents that the spec of the machines controlled by the TalosControlPlane + // is up to date. When this condition is false, the TalosControlPlane is executing a rolling upgrade. + MachinesSpecUpToDateCondition clusterv1.ConditionType = "MachinesSpecUpToDate" + + // RollingUpdateInProgressReason (Severity=Warning) documents a TalosControlPlane object executing a + // rolling upgrade for aligning the machines spec to the desired state. + RollingUpdateInProgressReason = "RollingUpdateInProgress" +) + const ( // ResizedCondition documents a TalosControlPlane that is resizing the set of controlled machines. ResizedCondition clusterv1.ConditionType = "Resized" diff --git a/api/v1alpha3/taloscontrolplane_types.go b/api/v1alpha3/taloscontrolplane_types.go index fcb991a..d2ec7d8 100644 --- a/api/v1alpha3/taloscontrolplane_types.go +++ b/api/v1alpha3/taloscontrolplane_types.go @@ -8,6 +8,7 @@ import ( cabptv1 "github.com/talos-systems/cluster-api-bootstrap-provider-talos/api/v1alpha3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" ) @@ -21,6 +22,15 @@ type ControlPlaneConfig struct { ControlPlaneConfig cabptv1.TalosConfigSpec `json:"controlplane"` } +// RolloutStrategyType defines the rollout strategies for a KubeadmControlPlane. +type RolloutStrategyType string + +const ( + // RollingUpdateStrategyType replaces the old control planes by new one using rolling update + // i.e. gradually scale up or down the old control planes and scale up or down the new one. + RollingUpdateStrategyType RolloutStrategyType = "RollingUpdate" +) + // TalosControlPlaneSpec defines the desired state of TalosControlPlane type TalosControlPlaneSpec struct { // Number of desired machines. Defaults to 1. When stacked etcd is used only @@ -41,6 +51,39 @@ type TalosControlPlaneSpec struct { // ControlPlaneConfig is a two TalosConfigSpecs // to use for initializing and joining machines to the control plane. ControlPlaneConfig ControlPlaneConfig `json:"controlPlaneConfig"` + + // The RolloutStrategy to use to replace control plane machines with + // new ones. + // +optional + // +kubebuilder:default={type: "RollingUpdate", rollingUpdate: {maxSurge: 1}} + RolloutStrategy *RolloutStrategy `json:"rolloutStrategy,omitempty"` +} + +// RolloutStrategy describes how to replace existing machines +// with new ones. +type RolloutStrategy struct { + // Rolling update config params. Present only if + // RolloutStrategyType = RollingUpdate. + // +optional + RollingUpdate *RollingUpdate `json:"rollingUpdate,omitempty"` + + // Type of rollout. Currently the only supported strategy is + // "RollingUpdate". + // Default is RollingUpdate. + // +optional + Type RolloutStrategyType `json:"type,omitempty"` +} + +// RollingUpdate is used to control the desired behavior of rolling update. +type RollingUpdate struct { + // The maximum number of control planes that can be scheduled above or under the + // desired number of control planes. + // Value can be an absolute number 1 or 0. + // Defaults to 1. + // Example: when this is set to 1, the control plane can be scaled + // up immediately when the rolling update starts. + // +optional + MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"` } // TalosControlPlaneStatus defines the observed state of TalosControlPlane @@ -126,13 +169,13 @@ type TalosControlPlane struct { } // GetConditions returns the set of conditions for this object. -func (in *TalosControlPlane) GetConditions() clusterv1.Conditions { - return in.Status.Conditions +func (r *TalosControlPlane) GetConditions() clusterv1.Conditions { + return r.Status.Conditions } // SetConditions sets the conditions on this object. -func (in *TalosControlPlane) SetConditions(conditions clusterv1.Conditions) { - in.Status.Conditions = conditions +func (r *TalosControlPlane) SetConditions(conditions clusterv1.Conditions) { + r.Status.Conditions = conditions } // +kubebuilder:object:root=true diff --git a/api/v1alpha3/taloscontrolplane_webhook.go b/api/v1alpha3/taloscontrolplane_webhook.go index d901798..3e9862c 100644 --- a/api/v1alpha3/taloscontrolplane_webhook.go +++ b/api/v1alpha3/taloscontrolplane_webhook.go @@ -1,11 +1,61 @@ package v1alpha3 import ( + "strings" + + "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook" ) +// SetupWebhookWithManager implements webhook methods. func (r *TalosControlPlane) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). For(r). Complete() } + +// +kubebuilder:webhook:verbs=create;update,path=/mutate-controlplane-cluster-x-k8s-io-v1beta1-kubeadmcontrolplane,mutating=true,failurePolicy=fail,matchPolicy=Equivalent,groups=controlplane.cluster.x-k8s.io,resources=kubeadmcontrolplanes,versions=v1beta1,name=default.kubeadmcontrolplane.controlplane.cluster.x-k8s.io,sideEffects=None,admissionReviewVersions=v1;v1beta1 + +var _ webhook.Defaulter = &TalosControlPlane{} + +// Default implements webhook.Defaulter so a webhook will be registered for the type. +func (r *TalosControlPlane) Default() { + defaultTalosControlPlaneSpec(&r.Spec, r.Namespace) +} + +func defaultTalosControlPlaneSpec(s *TalosControlPlaneSpec, namespace string) { + if s.Replicas == nil { + replicas := int32(1) + s.Replicas = &replicas + } + + if !strings.HasPrefix(s.Version, "v") { + s.Version = "v" + s.Version + } + + s.RolloutStrategy = defaultRolloutStrategy(s.RolloutStrategy) +} + +func defaultRolloutStrategy(rolloutStrategy *RolloutStrategy) *RolloutStrategy { + ios1 := intstr.FromInt(1) + + if rolloutStrategy == nil { + rolloutStrategy = &RolloutStrategy{} + } + + // Enforce RollingUpdate strategy and default MaxSurge if not set. + if rolloutStrategy != nil { + if len(rolloutStrategy.Type) == 0 { + rolloutStrategy.Type = RollingUpdateStrategyType + } + if rolloutStrategy.Type == RollingUpdateStrategyType { + if rolloutStrategy.RollingUpdate == nil { + rolloutStrategy.RollingUpdate = &RollingUpdate{} + } + rolloutStrategy.RollingUpdate.MaxSurge = intstr.ValueOrDefault(rolloutStrategy.RollingUpdate.MaxSurge, ios1) + } + } + + return rolloutStrategy +} diff --git a/api/v1alpha3/zz_generated.deepcopy.go b/api/v1alpha3/zz_generated.deepcopy.go index ece64d2..19357ea 100644 --- a/api/v1alpha3/zz_generated.deepcopy.go +++ b/api/v1alpha3/zz_generated.deepcopy.go @@ -11,6 +11,7 @@ package v1alpha3 import ( runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/cluster-api/api/v1beta1" ) @@ -31,6 +32,46 @@ func (in *ControlPlaneConfig) DeepCopy() *ControlPlaneConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RollingUpdate) DeepCopyInto(out *RollingUpdate) { + *out = *in + if in.MaxSurge != nil { + in, out := &in.MaxSurge, &out.MaxSurge + *out = new(intstr.IntOrString) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RollingUpdate. +func (in *RollingUpdate) DeepCopy() *RollingUpdate { + if in == nil { + return nil + } + out := new(RollingUpdate) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RolloutStrategy) DeepCopyInto(out *RolloutStrategy) { + *out = *in + if in.RollingUpdate != nil { + in, out := &in.RollingUpdate, &out.RollingUpdate + *out = new(RollingUpdate) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutStrategy. +func (in *RolloutStrategy) DeepCopy() *RolloutStrategy { + if in == nil { + return nil + } + out := new(RolloutStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TalosControlPlane) DeepCopyInto(out *TalosControlPlane) { *out = *in @@ -100,6 +141,11 @@ func (in *TalosControlPlaneSpec) DeepCopyInto(out *TalosControlPlaneSpec) { } out.InfrastructureTemplate = in.InfrastructureTemplate in.ControlPlaneConfig.DeepCopyInto(&out.ControlPlaneConfig) + if in.RolloutStrategy != nil { + in, out := &in.RolloutStrategy, &out.RolloutStrategy + *out = new(RolloutStrategy) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TalosControlPlaneSpec. diff --git a/config/crd/bases/controlplane.cluster.x-k8s.io_taloscontrolplanes.yaml b/config/crd/bases/controlplane.cluster.x-k8s.io_taloscontrolplanes.yaml index 249bec7..f9f40b2 100644 --- a/config/crd/bases/controlplane.cluster.x-k8s.io_taloscontrolplanes.yaml +++ b/config/crd/bases/controlplane.cluster.x-k8s.io_taloscontrolplanes.yaml @@ -183,6 +183,34 @@ spec: This is a pointer to distinguish between explicit zero and not specified. format: int32 type: integer + rolloutStrategy: + default: + rollingUpdate: + maxSurge: 1 + type: RollingUpdate + description: The RolloutStrategy to use to replace control plane machines + with new ones. + properties: + rollingUpdate: + description: Rolling update config params. Present only if RolloutStrategyType + = RollingUpdate. + properties: + maxSurge: + anyOf: + - type: integer + - type: string + description: 'The maximum number of control planes that can + be scheduled above or under the desired number of control + planes. Value can be an absolute number 1 or 0. Defaults + to 1. Example: when this is set to 1, the control plane + can be scaled up immediately when the rolling update starts.' + x-kubernetes-int-or-string: true + type: object + type: + description: Type of rollout. Currently the only supported strategy + is "RollingUpdate". Default is RollingUpdate. + type: string + type: object version: description: Version defines the desired Kubernetes version. minLength: 2 diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index b488b5d..6db1a47 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -18,7 +18,7 @@ bases: patchesStrategicMerge: - manager_webhook_patch.yaml -# - webhookcainjection_patch.yaml + - webhookcainjection_patch.yaml vars: - name: CERTIFICATE_NAMESPACE # namespace of the certificate CR diff --git a/config/default/webhookcainjection_patch.yaml b/config/default/webhookcainjection_patch.yaml index 7e79bf9..28b1826 100644 --- a/config/default/webhookcainjection_patch.yaml +++ b/config/default/webhookcainjection_patch.yaml @@ -1,15 +1,8 @@ # This patch add annotation to admission webhook config and # the variables $(CERTIFICATE_NAMESPACE) and $(CERTIFICATE_NAME) will be substituted by kustomize. -apiVersion: admissionregistration.k8s.io/v1beta1 +apiVersion: admissionregistration.k8s.io/v1 kind: MutatingWebhookConfiguration metadata: name: mutating-webhook-configuration annotations: cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) ---- -apiVersion: admissionregistration.k8s.io/v1beta1 -kind: ValidatingWebhookConfiguration -metadata: - name: validating-webhook-configuration - annotations: - cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index e69de29..a5d9194 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -0,0 +1,29 @@ +--- +apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + creationTimestamp: null + name: mutating-webhook-configuration +webhooks: +- admissionReviewVersions: + - v1 + - v1beta1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-controlplane-cluster-x-k8s-io-v1beta1-kubeadmcontrolplane + failurePolicy: Fail + matchPolicy: Equivalent + name: default.kubeadmcontrolplane.controlplane.cluster.x-k8s.io + rules: + - apiGroups: + - controlplane.cluster.x-k8s.io + apiVersions: + - v1beta1 + operations: + - CREATE + - UPDATE + resources: + - kubeadmcontrolplanes + sideEffects: None diff --git a/config/webhook/service.yaml b/config/webhook/service.yaml index 31e0f82..b8fcf1a 100644 --- a/config/webhook/service.yaml +++ b/config/webhook/service.yaml @@ -1,4 +1,4 @@ - +--- apiVersion: v1 kind: Service metadata: diff --git a/controllers/controlplane.go b/controllers/controlplane.go new file mode 100644 index 0000000..3375632 --- /dev/null +++ b/controllers/controlplane.go @@ -0,0 +1,125 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package controllers + +import ( + "context" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + controlplanev1 "github.com/talos-systems/cluster-api-control-plane-provider-talos/api/v1alpha3" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog/v2/klogr" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/controllers/external" + "sigs.k8s.io/cluster-api/util/collections" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// Log is the global logger for the internal package. +var Log = klogr.New() + +// ControlPlane holds business logic around control planes. +// It should never need to connect to a service, that responsibility lies outside of this struct. +type ControlPlane struct { + TCP *controlplanev1.TalosControlPlane + Cluster *clusterv1.Cluster + Machines collections.Machines + + infraObjects map[string]*unstructured.Unstructured +} + +// newControlPlane returns an instantiated ControlPlane. +func newControlPlane(ctx context.Context, client client.Client, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines collections.Machines) (*ControlPlane, error) { + infraObjects, err := getInfraResources(ctx, client, machines) + if err != nil { + return nil, err + } + + return &ControlPlane{ + TCP: tcp, + Cluster: cluster, + Machines: machines, + infraObjects: infraObjects, + }, nil +} + +// Logger returns a logger with useful context. +func (c *ControlPlane) Logger() logr.Logger { + return Log.WithValues("namespace", c.TCP.Namespace, "name", c.TCP.Name, "cluster-name", c.Cluster.Name) +} + +// MachineWithDeleteAnnotation returns a machine that has been annotated with DeleteMachineAnnotation key. +func (c *ControlPlane) MachineWithDeleteAnnotation(machines collections.Machines) collections.Machines { + // See if there are any machines with DeleteMachineAnnotation key. + annotatedMachines := machines.Filter(collections.HasAnnotationKey(clusterv1.DeleteMachineAnnotation)) + // If there are, return list of annotated machines. + return annotatedMachines +} + +// MachinesNeedingRollout return a list of machines that need to be rolled out. +func (c *ControlPlane) MachinesNeedingRollout() collections.Machines { + // Ignore machines to be deleted. + machines := c.Machines.Filter(collections.Not(collections.HasDeletionTimestamp)) + + // Return machines if they are scheduled for rollout or if with an outdated configuration. + return machines.AnyFilter( + // Machines that do not match with TCP config. + collections.Not( + collections.And( + collections.MatchesKubernetesVersion(c.TCP.Spec.Version), + MatchesTemplateClonedFrom(c.infraObjects, c.TCP), + ), + ), + ) +} + +// getInfraResources fetches the external infrastructure resource for each machine in the collection and returns a map of machine.Name -> infraResource. +func getInfraResources(ctx context.Context, cl client.Client, machines collections.Machines) (map[string]*unstructured.Unstructured, error) { + result := map[string]*unstructured.Unstructured{} + for _, m := range machines { + infraObj, err := external.Get(ctx, cl, &m.Spec.InfrastructureRef, m.Namespace) + if err != nil { + if apierrors.IsNotFound(errors.Cause(err)) { + continue + } + return nil, errors.Wrapf(err, "failed to retrieve infra obj for machine %q", m.Name) + } + result[m.Name] = infraObj + } + return result, nil +} + +// MatchesTemplateClonedFrom returns a filter to find all machines that match a given TCP infra template. +func MatchesTemplateClonedFrom(infraConfigs map[string]*unstructured.Unstructured, tcp *controlplanev1.TalosControlPlane) collections.Func { + return func(machine *clusterv1.Machine) bool { + if machine == nil { + return false + } + infraObj, found := infraConfigs[machine.Name] + if !found { + // Return true here because failing to get infrastructure machine should not be considered as unmatching. + return true + } + + clonedFromName, ok1 := infraObj.GetAnnotations()[clusterv1.TemplateClonedFromNameAnnotation] + clonedFromGroupKind, ok2 := infraObj.GetAnnotations()[clusterv1.TemplateClonedFromGroupKindAnnotation] + if !ok1 || !ok2 { + // All tcp cloned infra machines should have this annotation. + // Missing the annotation may be due to older version machines or adopted machines. + // Should not be considered as mismatch. + return true + } + + // Check if the machine's infrastructure reference has been created from the current TCP infrastructure template. + if clonedFromName != tcp.Spec.InfrastructureTemplate.Name || + clonedFromGroupKind != tcp.Spec.InfrastructureTemplate.GroupVersionKind().GroupKind().String() { + return false + } + + return true + } +} diff --git a/controllers/etcd.go b/controllers/etcd.go index efc5283..2603112 100644 --- a/controllers/etcd.go +++ b/controllers/etcd.go @@ -147,11 +147,11 @@ func (r *TalosControlPlaneReconciler) auditEtcd(ctx context.Context, tcp *contro return err } - if len(machines) == 0 { + if len(machines.Items) == 0 { return nil } - for _, machine := range machines { + for _, machine := range machines.Items { // nb: we'll assume any machine that doesn't have a noderef is new and we can audit later because // otherwise a new etcd member can get removed before even getting the noderef set by the CAPI controllers. if machine.Status.NodeRef == nil { @@ -161,7 +161,7 @@ func (r *TalosControlPlaneReconciler) auditEtcd(ctx context.Context, tcp *contro // Select the first CP machine that's not being deleted and has a noderef var designatedCPMachine clusterv1.Machine - for _, machine := range machines { + for _, machine := range machines.Items { if !machine.ObjectMeta.DeletionTimestamp.IsZero() || machine.Status.NodeRef == nil { continue } @@ -197,7 +197,7 @@ func (r *TalosControlPlaneReconciler) auditEtcd(ctx context.Context, tcp *contro } present := false - for _, machine := range machines { + for _, machine := range machines.Items { // break apart the noderef name in case it's an fqdn (like in AWS) machineNodeNameExploded := strings.Split(machine.Status.NodeRef.Name, ".") diff --git a/controllers/scale.go b/controllers/scale.go new file mode 100644 index 0000000..c04ff95 --- /dev/null +++ b/controllers/scale.go @@ -0,0 +1,211 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package controllers + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/coreos/go-semver/semver" + controlplanev1 "github.com/talos-systems/cluster-api-control-plane-provider-talos/api/v1alpha3" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/collections" + "sigs.k8s.io/cluster-api/util/conditions" + ctrl "sigs.k8s.io/controller-runtime" +) + +func (r *TalosControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, controlPlane *ControlPlane) (ctrl.Result, error) { + numMachines := len(controlPlane.Machines) + desiredReplicas := tcp.Spec.Replicas + + conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingUpReason, clusterv1.ConditionSeverityWarning, + "Scaling up control plane to %d replicas (actual %d)", + desiredReplicas, numMachines) + + // Create a new Machine w/ join + r.Log.Info("scaling up control plane", "Desired", desiredReplicas, "Existing", numMachines) + + return r.bootControlPlane(ctx, cluster, tcp, controlPlane, false) +} + +func (r *TalosControlPlaneReconciler) scaleDownControlPlane( + ctx context.Context, + cluster *clusterv1.Cluster, + tcp *controlplanev1.TalosControlPlane, + controlPlane *ControlPlane, + machinesRequireUpgrade collections.Machines) (ctrl.Result, error) { + + numMachines := len(controlPlane.Machines) + desiredReplicas := tcp.Spec.Replicas + + conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingDownReason, clusterv1.ConditionSeverityWarning, + "Scaling down control plane to %d replicas (actual %d)", + desiredReplicas, numMachines) + + if numMachines == 1 { + conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingDownReason, clusterv1.ConditionSeverityError, + "Cannot scale down control plane nodes to 0", + desiredReplicas, numMachines) + + return ctrl.Result{}, nil + } + + if numMachines == 0 { + return ctrl.Result{}, fmt.Errorf("no machines found") + } + + if err := r.ensureNodesBooted(ctx, controlPlane.TCP, cluster, collections.ToMachineList(controlPlane.Machines).Items); err != nil { + r.Log.Info("waiting for all nodes to finish boot sequence", "error", err) + + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + + if !conditions.IsTrue(tcp, controlplanev1.EtcdClusterHealthyCondition) { + r.Log.Info("waiting for etcd to become healthy before scaling down") + + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + + r.Log.Info("scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines) + + r.Log.Info("Found control plane machines", "machines", numMachines) + + client, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster)) + if err != nil { + return ctrl.Result{RequeueAfter: 20 * time.Second}, err + } + + deleteMachine, err := selectMachineForScaleDown(controlPlane, machinesRequireUpgrade) + if err != nil { + return ctrl.Result{}, err + } + + for _, machine := range controlPlane.Machines { + if !machine.ObjectMeta.DeletionTimestamp.IsZero() { + r.Log.Info("machine is in process of deletion", "machine", machine.Name) + + var node v1.Node + + name := types.NamespacedName{Name: machine.Status.NodeRef.Name, Namespace: machine.Status.NodeRef.Namespace} + + err := client.Get(ctx, name, &node) + if err != nil { + // It's possible for the node to already be deleted in the workload cluster, so we just + // requeue if that's that case instead of throwing a scary error. + if apierrors.IsNotFound(err) { + return ctrl.Result{RequeueAfter: 20 * time.Second}, nil + } + return ctrl.Result{RequeueAfter: 20 * time.Second}, err + } + + r.Log.Info("Deleting node", "machine", machine.Name, "node", node.Name) + + err = client.Delete(ctx, &node) + if err != nil { + return ctrl.Result{RequeueAfter: 20 * time.Second}, err + } + + return ctrl.Result{RequeueAfter: 20 * time.Second}, nil + } + + // do not allow scaling down until all nodes have nodeRefs + if machine.Status.NodeRef == nil { + r.Log.Info("one of machines does not have NodeRef", "machine", machine.Name) + + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + + if machine.CreationTimestamp.Before(&deleteMachine.CreationTimestamp) { + deleteMachine = machine + } + } + + if deleteMachine.Status.NodeRef == nil { + return ctrl.Result{RequeueAfter: 20 * time.Second}, fmt.Errorf("%q machine does not have a nodeRef", deleteMachine.Name) + } + + node := deleteMachine.Status.NodeRef + + c, err := r.talosconfigForMachines(ctx, tcp, *deleteMachine) + if err != nil { + return ctrl.Result{RequeueAfter: 20 * time.Second}, err + } + + defer c.Close() //nolint:errcheck + + err = r.gracefulEtcdLeave(ctx, c, util.ObjectKey(cluster), *deleteMachine) + if err != nil { + return ctrl.Result{}, err + } + + r.Log.Info("deleting machine", "machine", deleteMachine.Name, "node", node.Name) + + err = r.Client.Delete(ctx, deleteMachine) + if err != nil { + return ctrl.Result{}, err + } + + // TODO: drop version check and shutdown when Talos < 0.12.2 reaches end of life + version, err := c.Version(ctx) + if err != nil { + return ctrl.Result{}, err + } + + fromVersion, _ := semver.NewVersion("0.12.2") //nolint:errcheck + + nodeVersion, err := semver.NewVersion( + strings.TrimLeft(version.Messages[0].Version.Tag, "v"), + ) + + if err != nil { + return ctrl.Result{}, err + } + + if nodeVersion.LessThan(*fromVersion) { + // NB: We shutdown the node here so that a loadbalancer will drop the backend. + // The Kubernetes API server is configured to talk to etcd on localhost, but + // at this point etcd has been stopped. + r.Log.Info("shutting down node", "machine", deleteMachine.Name, "node", node.Name) + + err = c.Shutdown(ctx) + if err != nil { + return ctrl.Result{RequeueAfter: 20 * time.Second}, err + } + } + + r.Log.Info("deleting node", "machine", deleteMachine.Name, "node", node.Name) + + n := &v1.Node{} + n.Name = node.Name + n.Namespace = node.Namespace + + err = client.Delete(ctx, n) + if err != nil { + return ctrl.Result{RequeueAfter: 20 * time.Second}, err + } + + // Requeue so that we handle any additional scaling. + return ctrl.Result{Requeue: true}, nil +} + +func selectMachineForScaleDown(controlPlane *ControlPlane, outdatedMachines collections.Machines) (*clusterv1.Machine, error) { + machines := controlPlane.Machines + switch { + case controlPlane.MachineWithDeleteAnnotation(outdatedMachines).Len() > 0: + machines = controlPlane.MachineWithDeleteAnnotation(outdatedMachines) + case controlPlane.MachineWithDeleteAnnotation(machines).Len() > 0: + machines = controlPlane.MachineWithDeleteAnnotation(machines) + case outdatedMachines.Len() > 0: + machines = outdatedMachines + } + + return machines.Oldest(), nil +} diff --git a/controllers/taloscontrolplane_controller.go b/controllers/taloscontrolplane_controller.go index 89a5677..e945298 100644 --- a/controllers/taloscontrolplane_controller.go +++ b/controllers/taloscontrolplane_controller.go @@ -11,10 +11,8 @@ import ( "math/rand" "reflect" "sort" - "strings" "time" - "github.com/coreos/go-semver/semver" "github.com/go-logr/logr" "github.com/pkg/errors" cabptv1 "github.com/talos-systems/cluster-api-bootstrap-provider-talos/api/v1alpha3" @@ -30,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" - "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/storage/names" "k8s.io/utils/pointer" @@ -39,6 +36,7 @@ import ( "sigs.k8s.io/cluster-api/controllers/remote" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/annotations" + "sigs.k8s.io/cluster-api/util/collections" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/kubeconfig" "sigs.k8s.io/cluster-api/util/patch" @@ -55,14 +53,6 @@ import ( const requeueDuration = 30 * time.Second -// ControlPlane holds business logic around control planes. -// It should never need to connect to a service, that responsibility lies outside of this struct. -type ControlPlane struct { - TCP *controlplanev1.TalosControlPlane - Cluster *clusterv1.Cluster - Machines []clusterv1.Machine -} - // TalosControlPlaneReconciler reconciles a TalosControlPlane object type TalosControlPlaneReconciler struct { client.Client @@ -215,9 +205,9 @@ func (r *TalosControlPlaneReconciler) reconcile(ctx context.Context, cluster *cl return ctrl.Result{}, err } - conditionGetters := make([]conditions.Getter, len(ownedMachines)) + conditionGetters := make([]conditions.Getter, len(ownedMachines.Items)) - for i, v := range ownedMachines { + for i, v := range ownedMachines.Items { conditionGetters[i] = &v } @@ -230,14 +220,14 @@ func (r *TalosControlPlaneReconciler) reconcile(ctx context.Context, cluster *cl ) // run all similar reconcile steps in the loop and pick the lowest RetryAfter, aggregate errors and check the requeue flags. - for _, phase := range []func(context.Context, *clusterv1.Cluster, *controlplanev1.TalosControlPlane, []clusterv1.Machine) (ctrl.Result, error){ + for _, phase := range []func(context.Context, *clusterv1.Cluster, *controlplanev1.TalosControlPlane, *clusterv1.MachineList) (ctrl.Result, error){ r.reconcileEtcdMembers, r.reconcileNodeHealth, r.reconcileConditions, r.reconcileKubeconfig, r.reconcileMachines, } { - phaseResult, err = phase(ctx, cluster, tcp, ownedMachines) + phaseResult, err = phase(ctx, cluster, tcp, &ownedMachines) if err != nil { errs = kerrors.NewAggregate([]error{errs, err}) } @@ -275,12 +265,12 @@ func (r *TalosControlPlaneReconciler) reconcileDelete(ctx context.Context, clust } // If no control plane machines remain, remove the finalizer - if len(ownedMachines) == 0 { + if len(ownedMachines.Items) == 0 { controllerutil.RemoveFinalizer(tcp, controlplanev1.TalosControlPlaneFinalizer) return ctrl.Result{}, r.Client.Update(ctx, tcp) } - for _, ownedMachine := range ownedMachines { + for _, ownedMachine := range ownedMachines.Items { // Already deleting this machine if !ownedMachine.ObjectMeta.DeletionTimestamp.IsZero() { continue @@ -297,137 +287,7 @@ func (r *TalosControlPlaneReconciler) reconcileDelete(ctx context.Context, clust return ctrl.Result{RequeueAfter: requeueDuration}, nil } -// newControlPlane returns an instantiated ControlPlane. -func newControlPlane(cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) *ControlPlane { - return &ControlPlane{ - TCP: tcp, - Cluster: cluster, - Machines: machines, - } -} - -func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, tcp *controlplanev1.TalosControlPlane, cluster client.ObjectKey, cpName string, machines []clusterv1.Machine) (ctrl.Result, error) { - if len(machines) == 0 { - return ctrl.Result{}, fmt.Errorf("no machines found") - } - - r.Log.Info("Found control plane machines", "machines", len(machines)) - - client, err := r.Tracker.GetClient(ctx, cluster) - if err != nil { - return ctrl.Result{RequeueAfter: 20 * time.Second}, err - } - - deleteMachine := machines[0] - for _, machine := range machines { - if !machine.ObjectMeta.DeletionTimestamp.IsZero() { - r.Log.Info("machine is in process of deletion", "machine", machine.Name) - - var node v1.Node - - name := types.NamespacedName{Name: machine.Status.NodeRef.Name, Namespace: machine.Status.NodeRef.Namespace} - - err := client.Get(ctx, name, &node) - if err != nil { - // It's possible for the node to already be deleted in the workload cluster, so we just - // requeue if that's that case instead of throwing a scary error. - if apierrors.IsNotFound(err) { - return ctrl.Result{RequeueAfter: 20 * time.Second}, nil - } - return ctrl.Result{RequeueAfter: 20 * time.Second}, err - } - - r.Log.Info("Deleting node", "machine", machine.Name, "node", node.Name) - - err = client.Delete(ctx, &node) - if err != nil { - return ctrl.Result{RequeueAfter: 20 * time.Second}, err - } - - return ctrl.Result{RequeueAfter: 20 * time.Second}, nil - } - - // do not allow scaling down until all nodes have nodeRefs - if machine.Status.NodeRef == nil { - r.Log.Info("one of machines does not have NodeRef", "machine", machine.Name) - - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil - } - - if machine.CreationTimestamp.Before(&deleteMachine.CreationTimestamp) { - deleteMachine = machine - } - } - - if deleteMachine.Status.NodeRef == nil { - return ctrl.Result{RequeueAfter: 20 * time.Second}, fmt.Errorf("%q machine does not have a nodeRef", deleteMachine.Name) - } - - node := deleteMachine.Status.NodeRef - - c, err := r.talosconfigForMachines(ctx, tcp, deleteMachine) - if err != nil { - return ctrl.Result{RequeueAfter: 20 * time.Second}, err - } - - defer c.Close() //nolint:errcheck - - err = r.gracefulEtcdLeave(ctx, c, cluster, deleteMachine) - if err != nil { - return ctrl.Result{}, err - } - - r.Log.Info("deleting machine", "machine", deleteMachine.Name, "node", node.Name) - - err = r.Client.Delete(ctx, &deleteMachine) - if err != nil { - return ctrl.Result{}, err - } - - // TODO: drop version check and shutdown when Talos < 0.12.2 reaches end of life - version, err := c.Version(ctx) - if err != nil { - return ctrl.Result{}, err - } - - fromVersion, _ := semver.NewVersion("0.12.2") //nolint:errcheck - - nodeVersion, err := semver.NewVersion( - strings.TrimLeft(version.Messages[0].Version.Tag, "v"), - ) - - if err != nil { - return ctrl.Result{}, err - } - - if nodeVersion.LessThan(*fromVersion) { - // NB: We shutdown the node here so that a loadbalancer will drop the backend. - // The Kubernetes API server is configured to talk to etcd on localhost, but - // at this point etcd has been stopped. - r.Log.Info("shutting down node", "machine", deleteMachine.Name, "node", node.Name) - - err = c.Shutdown(ctx) - if err != nil { - return ctrl.Result{RequeueAfter: 20 * time.Second}, err - } - } - - r.Log.Info("deleting node", "machine", deleteMachine.Name, "node", node.Name) - - n := &v1.Node{} - n.Name = node.Name - n.Namespace = node.Namespace - - err = client.Delete(ctx, n) - if err != nil { - return ctrl.Result{RequeueAfter: 20 * time.Second}, err - } - - // Requeue so that we handle any additional scaling. - return ctrl.Result{Requeue: true}, nil -} - -func (r *TalosControlPlaneReconciler) getControlPlaneMachinesForCluster(ctx context.Context, cluster client.ObjectKey, cpName string) ([]clusterv1.Machine, error) { +func (r *TalosControlPlaneReconciler) getControlPlaneMachinesForCluster(ctx context.Context, cluster client.ObjectKey, cpName string) (clusterv1.MachineList, error) { selector := map[string]string{ clusterv1.ClusterLabelName: cluster.Name, clusterv1.MachineControlPlaneLabelName: "", @@ -440,10 +300,10 @@ func (r *TalosControlPlaneReconciler) getControlPlaneMachinesForCluster(ctx cont client.InNamespace(cluster.Namespace), client.MatchingLabels(selector), ); err != nil { - return nil, err + return machineList, err } - return machineList.Items, nil + return machineList, nil } // getFailureDomain will return a slice of failure domains from the cluster status. @@ -654,7 +514,7 @@ func (r *TalosControlPlaneReconciler) updateStatus(ctx context.Context, tcp *con return err } - replicas := int32(len(ownedMachines)) + replicas := int32(len(ownedMachines.Items)) // set basic data that does not require interacting with the workload cluster tcp.Status.Ready = false @@ -735,7 +595,7 @@ func (r *TalosControlPlaneReconciler) reconcileExternalReference(ctx context.Con return objPatchHelper.Patch(ctx, obj) } -func (r *TalosControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (ctrl.Result, error) { +func (r *TalosControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines *clusterv1.MachineList) (ctrl.Result, error) { endpoint := cluster.Spec.ControlPlaneEndpoint if endpoint.IsZero() { return ctrl.Result{}, nil @@ -768,14 +628,14 @@ func (r *TalosControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, c return ctrl.Result{}, nil } -func (r *TalosControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (result ctrl.Result, err error) { +func (r *TalosControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines *clusterv1.MachineList) (result ctrl.Result, err error) { var errs error // Audit the etcd member list to remove any nodes that no longer exist if err := r.auditEtcd(ctx, tcp, util.ObjectKey(cluster), tcp.Name); err != nil { errs = kerrors.NewAggregate([]error{errs, err}) } - if err := r.etcdHealthcheck(ctx, tcp, cluster, machines); err != nil { + if err := r.etcdHealthcheck(ctx, tcp, cluster, machines.Items); err != nil { conditions.MarkFalse(tcp, controlplanev1.EtcdClusterHealthyCondition, controlplanev1.EtcdClusterUnhealthyReason, clusterv1.ConditionSeverityWarning, err.Error()) errs = kerrors.NewAggregate([]error{errs, err}) @@ -790,8 +650,8 @@ func (r *TalosControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, return ctrl.Result{}, nil } -func (r *TalosControlPlaneReconciler) reconcileNodeHealth(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (result ctrl.Result, err error) { - if err := r.nodesHealthcheck(ctx, tcp, cluster, machines); err != nil { +func (r *TalosControlPlaneReconciler) reconcileNodeHealth(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines *clusterv1.MachineList) (result ctrl.Result, err error) { + if err := r.nodesHealthcheck(ctx, tcp, cluster, machines.Items); err != nil { reason := controlplanev1.ControlPlaneComponentsInspectionFailedReason if errors.Is(err, &errServiceUnhealthy{}) { @@ -809,7 +669,7 @@ func (r *TalosControlPlaneReconciler) reconcileNodeHealth(ctx context.Context, c return ctrl.Result{}, nil } -func (r *TalosControlPlaneReconciler) reconcileConditions(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (result ctrl.Result, err error) { +func (r *TalosControlPlaneReconciler) reconcileConditions(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines *clusterv1.MachineList) (result ctrl.Result, err error) { if !conditions.Has(tcp, controlplanev1.AvailableCondition) { conditions.MarkFalse(tcp, controlplanev1.AvailableCondition, controlplanev1.WaitingForTalosBootReason, clusterv1.ConditionSeverityInfo, "") } @@ -821,14 +681,32 @@ func (r *TalosControlPlaneReconciler) reconcileConditions(ctx context.Context, c return ctrl.Result{}, nil } -func (r *TalosControlPlaneReconciler) reconcileMachines(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines []clusterv1.Machine) (res ctrl.Result, err error) { +func (r *TalosControlPlaneReconciler) reconcileMachines(ctx context.Context, cluster *clusterv1.Cluster, tcp *controlplanev1.TalosControlPlane, machines *clusterv1.MachineList) (res ctrl.Result, err error) { logger := r.Log.WithValues("namespace", tcp.Namespace, "talosControlPlane", tcp.Name) // If we've made it this far, we can assume that all ownedMachines are up to date - numMachines := len(machines) + numMachines := len(machines.Items) desiredReplicas := int(*tcp.Spec.Replicas) - controlPlane := newControlPlane(cluster, tcp, machines) + controlPlane, err := newControlPlane(ctx, r.Client, cluster, tcp, collections.FromMachineList(machines)) + if err != nil { + return ctrl.Result{}, err + } + + needRollout := controlPlane.MachinesNeedingRollout() + if len(needRollout) > 0 { + logger.Info("rolling out control plane machines", "needRollout", needRollout.Names()) + conditions.MarkFalse(controlPlane.TCP, + controlplanev1.MachinesSpecUpToDateCondition, + controlplanev1.RollingUpdateInProgressReason, + clusterv1.ConditionSeverityWarning, "Rolling %d replicas with outdated spec (%d replicas up to date)", len(needRollout), len(controlPlane.Machines)-len(needRollout)) + + return r.upgradeControlPlane(ctx, cluster, tcp, controlPlane, needRollout) + } else { + if conditions.Has(controlPlane.TCP, controlplanev1.MachinesSpecUpToDateCondition) { + conditions.MarkTrue(controlPlane.TCP, controlplanev1.MachinesSpecUpToDateCondition) + } + } switch { // We are creating the first replica @@ -839,43 +717,10 @@ func (r *TalosControlPlaneReconciler) reconcileMachines(ctx context.Context, clu return r.bootControlPlane(ctx, cluster, tcp, controlPlane, true) // We are scaling up case numMachines < desiredReplicas && numMachines > 0: - conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingUpReason, clusterv1.ConditionSeverityWarning, - "Scaling up control plane to %d replicas (actual %d)", - desiredReplicas, numMachines) - - // Create a new Machine w/ join - logger.Info("scaling up control plane", "Desired", desiredReplicas, "Existing", numMachines) - - return r.bootControlPlane(ctx, cluster, tcp, controlPlane, false) + return r.scaleUpControlPlane(ctx, cluster, tcp, controlPlane) // We are scaling down case numMachines > desiredReplicas: - conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingDownReason, clusterv1.ConditionSeverityWarning, - "Scaling down control plane to %d replicas (actual %d)", - desiredReplicas, numMachines) - - if numMachines == 1 { - conditions.MarkFalse(tcp, controlplanev1.ResizedCondition, controlplanev1.ScalingDownReason, clusterv1.ConditionSeverityError, - "Cannot scale down control plane nodes to 0", - desiredReplicas, numMachines) - - return res, nil - } - - if err := r.ensureNodesBooted(ctx, controlPlane.TCP, cluster, machines); err != nil { - logger.Info("waiting for all nodes to finish boot sequence", "error", err) - - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil - } - - if !conditions.IsTrue(tcp, controlplanev1.EtcdClusterHealthyCondition) { - logger.Info("waiting for etcd to become healthy before scaling down") - - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil - } - - logger.Info("scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines) - - res, err = r.scaleDownControlPlane(ctx, tcp, util.ObjectKey(cluster), controlPlane.TCP.Name, machines) + res, err = r.scaleDownControlPlane(ctx, cluster, tcp, controlPlane, collections.Machines{}) if err != nil { if res.Requeue || res.RequeueAfter > 0 { logger.Info("failed to scale down control plane", "error", err) @@ -892,7 +737,7 @@ func (r *TalosControlPlaneReconciler) reconcileMachines(ctx context.Context, clu } if !tcp.Status.Bootstrapped { - if err := r.bootstrapCluster(ctx, tcp, cluster, machines); err != nil { + if err := r.bootstrapCluster(ctx, tcp, cluster, machines.Items); err != nil { conditions.MarkFalse(tcp, controlplanev1.MachinesBootstrapped, controlplanev1.WaitingForTalosBootReason, clusterv1.ConditionSeverityInfo, err.Error()) logger.Info("bootstrap failed, retrying in 20 seconds", "error", err) diff --git a/controllers/upgrade.go b/controllers/upgrade.go new file mode 100644 index 0000000..2ba5e2d --- /dev/null +++ b/controllers/upgrade.go @@ -0,0 +1,46 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package controllers + +import ( + "context" + + "github.com/pkg/errors" + ctrl "sigs.k8s.io/controller-runtime" + + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/util/collections" + + controlplanev1 "github.com/talos-systems/cluster-api-control-plane-provider-talos/api/v1alpha3" +) + +func (r *TalosControlPlaneReconciler) upgradeControlPlane( + ctx context.Context, + cluster *clusterv1.Cluster, + tcp *controlplanev1.TalosControlPlane, + controlPlane *ControlPlane, + machinesRequireUpgrade collections.Machines, +) (ctrl.Result, error) { + logger := controlPlane.Logger() + + if tcp.Spec.RolloutStrategy == nil || tcp.Spec.RolloutStrategy.RollingUpdate == nil { + return ctrl.Result{}, errors.New("rolloutStrategy is not set") + } + + switch tcp.Spec.RolloutStrategy.Type { + case controlplanev1.RollingUpdateStrategyType: + // RolloutStrategy is currently defaulted and validated to be RollingUpdate + maxNodes := *tcp.Spec.Replicas + int32(tcp.Spec.RolloutStrategy.RollingUpdate.MaxSurge.IntValue()) + if int32(controlPlane.Machines.Len()) < maxNodes { + // scaleUp ensures that we don't continue scaling up while waiting for Machines to have NodeRefs + return r.scaleUpControlPlane(ctx, cluster, tcp, controlPlane) + } + + return r.scaleDownControlPlane(ctx, cluster, tcp, controlPlane, machinesRequireUpgrade) + default: + logger.Info("RolloutStrategy type is not set to RollingUpdateStrategyType, unable to determine the strategy for rolling out machines") + return ctrl.Result{}, nil + } +} diff --git a/go.mod b/go.mod index 9d5e631..5e67b78 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,8 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/containerd/go-cni v1.1.3 // indirect github.com/containernetworking/cni v1.0.1 // indirect + github.com/coredns/caddy v1.1.0 // indirect + github.com/coredns/corefile-migration v1.0.14 // indirect github.com/cosi-project/runtime v0.0.0-20211216175730-264f8fcd1a4f // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.7.1+incompatible // indirect diff --git a/hack/release.toml b/hack/release.toml index 1b90ae1..ee2ec93 100644 --- a/hack/release.toml +++ b/hack/release.toml @@ -6,7 +6,7 @@ github_repo = "talos-systems/cluster-api-control-plane-provider-talos" match_deps = "^github.com/(talos-systems/[a-zA-Z0-9-]+)$" # previous release -previous = "v0.3.0" +previous = "v0.4.0" pre_release = true @@ -19,4 +19,12 @@ preface = """\ title = "CAPI v1beta1" description = """\ This release of CACPPT brings compatibility with CAPI v1beta1. +""" + + [notes.rolling-update] + title = "Support Control Plane Rolling Updates" + description = """\ +The controller now reacts to `TalosControlPlane` spec changes +and can do rolling updates of the control plane machines. +This can be used to do a graceful upgrades of the workload clusters. """ diff --git a/hack/test/e2e-aws.sh b/hack/test/e2e-aws.sh index d3fd4d4..53d9498 100644 --- a/hack/test/e2e-aws.sh +++ b/hack/test/e2e-aws.sh @@ -23,7 +23,9 @@ REGION="us-east-1" BUCKET="talos-ci-e2e" PLATFORM=$(uname -s | tr "[:upper:]" "[:lower:]") TALOS_VERSION="${TALOS_DEFAULT:-v1.0.0}" -K8S_VERSION="${K8S_VERSION:-v1.23.5}" +K8S_VERSION="${K8S_VERSION:-v1.23.4}" +export WORKLOAD_KUBERNETES_VERSION="${WORKLOAD_KUBERNETES_VERSION:-${K8S_VERSION}}" +export UPGRADE_K8S_VERSION="${UPGRADE_K8S_VERSION:-v1.23.5}" KUBECONFIG= AMI=${AWS_AMI:-$(curl -sL https://github.com/talos-systems/talos/releases/download/${TALOS_VERSION}/cloud-images.json | \ jq -r --arg REGION "${REGION}" '.[] | select(.region == $REGION) | select (.arch == "amd64") | .id')} diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go index d08e5bc..68125d3 100644 --- a/internal/integration/integration_test.go +++ b/internal/integration/integration_test.go @@ -29,6 +29,7 @@ import ( clusterv1 "sigs.k8s.io/cluster-api/cmd/clusterctl/api/v1alpha3" logf "sigs.k8s.io/cluster-api/cmd/clusterctl/log" "sigs.k8s.io/cluster-api/util/conditions" + "sigs.k8s.io/cluster-api/util/patch" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" controlplanev1 "github.com/talos-systems/cluster-api-control-plane-provider-talos/api/v1alpha3" @@ -49,10 +50,11 @@ type providerConfig struct { type IntegrationSuite struct { suite.Suite - manager *capi.Manager - cluster *capi.Cluster - ctx context.Context - cancel context.CancelFunc + manager *capi.Manager + cluster *capi.Cluster + ctx context.Context + cancel context.CancelFunc + finalK8sVersion string } func (suite *IntegrationSuite) SetupSuite() { @@ -70,6 +72,7 @@ func (suite *IntegrationSuite) SetupSuite() { } providerType := env("PROVIDER", "aws:v1.1.0") + suite.finalK8sVersion = os.Getenv("UPGRADE_K8S_VERSION") provider, err := infrastructure.NewProvider(providerType) suite.Require().NoError(err) @@ -260,6 +263,67 @@ func (suite *IntegrationSuite) Test01ReconcileMachine() { ) } +func (suite *IntegrationSuite) Test02UpgradeK8s() { + if suite.finalK8sVersion == "" { + suite.T().Skip("K8s upgrade version is not set") + } + + client, err := suite.manager.GetClient(suite.ctx) + suite.Require().NoError(err) + + controlplane, err := suite.cluster.ControlPlanes(suite.ctx) + suite.Require().NoError(err) + + patchHelper, err := patch.NewHelper(controlplane, client) + suite.Require().NoError(err) + + suite.Require().NoError(unstructured.SetNestedField(controlplane.Object, suite.finalK8sVersion, "spec", "version")) + + suite.Require().NoError(patchHelper.Patch(suite.ctx, controlplane)) + + suite.Require().NoError(err) + + selector, err := labels.Parse("cluster.x-k8s.io/control-plane") + suite.Require().NoError(err) + + err = retry.Constant(time.Minute*20, retry.WithUnits(time.Second)).Retry(func() error { + machines := unstructured.UnstructuredList{} + machines.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "cluster.x-k8s.io", + Kind: "Machine", + Version: suite.manager.Version(), + }) + + if err = client.List(suite.ctx, &machines, &runtimeclient.MatchingLabelsSelector{Selector: selector}); err != nil { + return err + } + + for _, machine := range machines.Items { + var ( + machineVersion string + found bool + ) + + machineVersion, found, err = unstructured.NestedString(machine.Object, "spec", "version") + if err != nil { + return err + } + + if !found { + return retry.ExpectedErrorf("version wasn't found for the machine") + } + + if machineVersion != suite.finalK8sVersion { + return retry.ExpectedErrorf("one of the machines is still using an old Kubernetes version") + } + } + + return nil + }) + + suite.Require().NoError(err) +} + // Test02ScaleDown scale control planes down. func (suite *IntegrationSuite) Test03ScaleDown() { suite.Require().NoError(suite.cluster.Sync(suite.ctx))