Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make PVC scaling more idempotent & resilient to crashes #838

Merged
merged 5 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions controllers/rabbitmqcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,23 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}

// only checks for PVC expansion and scale down if statefulSet is created
// only checks for scale down if statefulSet is created
// else continue to CreateOrUpdate()
if !k8serrors.IsNotFound(err) {
if err := builder.Update(sts); err != nil {
return ctrl.Result{}, err
}
if err = r.reconcilePVC(ctx, rabbitmqCluster, current, sts); err != nil {
r.setReconcileSuccess(ctx, rabbitmqCluster, corev1.ConditionFalse, "FailedReconcilePVC", err.Error())
return ctrl.Result{}, err
}
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
// return when cluster scale down detected; unsupported operation
return ctrl.Result{}, nil
}
}

// The PVCs for the StatefulSet may require expanding
if err = r.reconcilePVC(ctx, rabbitmqCluster, sts); err != nil {
r.setReconcileSuccess(ctx, rabbitmqCluster, corev1.ConditionFalse, "FailedReconcilePVC", err.Error())
return ctrl.Result{}, err
}
}

var operationResult controllerutil.OperationResult
Expand Down
146 changes: 8 additions & 138 deletions controllers/reconcile_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,114 +2,26 @@ package controllers

import (
"context"
"errors"
"fmt"
"time"

"github.com/go-logr/logr"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/scaling"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
k8sresource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) error {
resize, err := r.needsPVCExpand(ctx, rmq, current, sts)
if err != nil {
return err
}

if resize {
if err := r.expandPVC(ctx, rmq, current, sts); err != nil {
return err
}
}
return nil
}

func (r *RabbitmqClusterReconciler) expandPVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, desired *appsv1.StatefulSet) error {
logger := ctrl.LoggerFrom(ctx)

currentCapacity := persistenceStorageCapacity(current.Spec.VolumeClaimTemplates)

desiredCapacity := persistenceStorageCapacity(desired.Spec.VolumeClaimTemplates)

// don't allow going from 0 (no PVC) to anything else
if (currentCapacity.Cmp(k8sresource.MustParse("0Gi")) == 0) && (desiredCapacity.Cmp(k8sresource.MustParse("0Gi")) != 0) {
msg := "changing from ephemeral to persistent storage is not supported"
logger.Error(errors.New("unsupported operation"), msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", msg)
return errors.New(msg)
}

logger.Info(fmt.Sprintf("updating storage capacity from %s to %s", currentCapacity.String(), desiredCapacity.String()))

if err := r.deleteSts(ctx, rmq); err != nil {
return err
}

if err := r.updatePVC(ctx, rmq, *current.Spec.Replicas, desiredCapacity); err != nil {
return err
}

return nil
}

func (r *RabbitmqClusterReconciler) updatePVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, replicas int32, desiredCapacity k8sresource.Quantity) error {
logger := ctrl.LoggerFrom(ctx)
logger.Info("expanding PersistentVolumeClaims")

for i := 0; i < int(replicas); i++ {
PVCName := rmq.PVCName(i)
PVC := corev1.PersistentVolumeClaim{}

if err := r.Client.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: PVCName}, &PVC); err != nil {
msg := "failed to get PersistentVolumeClaim"
logger.Error(err, msg, "PersistentVolumeClaim", PVCName)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", fmt.Sprintf("%s %s", msg, PVCName))
return fmt.Errorf("%s %s: %v", msg, PVCName, err)
}
PVC.Spec.Resources.Requests[corev1.ResourceStorage] = desiredCapacity
if err := r.Client.Update(ctx, &PVC, &client.UpdateOptions{}); err != nil {
msg := "failed to update PersistentVolumeClaim"
logger.Error(err, msg, "PersistentVolumeClaim", PVCName)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", fmt.Sprintf("%s %s", msg, PVCName))
return fmt.Errorf("%s %s: %v", msg, PVCName, err)
}
logger.Info("successfully expanded", "PVC", PVCName)
}
return nil
}

// returns true if desired storage capacity is larger than the current storage; returns false when current and desired capacity is the same
// errors when desired capacity is less than current capacity because PVC shrink is not supported by k8s
func (r *RabbitmqClusterReconciler) needsPVCExpand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, desired *appsv1.StatefulSet) (bool, error) {
func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, desiredSts *appsv1.StatefulSet) error {
logger := ctrl.LoggerFrom(ctx)

currentCapacity := persistenceStorageCapacity(current.Spec.VolumeClaimTemplates)

desiredCapacity := persistenceStorageCapacity(desired.Spec.VolumeClaimTemplates)

cmp := currentCapacity.Cmp(desiredCapacity)

// desired storage capacity is larger than the current capacity; PVC needs expansion
if cmp == -1 {
return true, nil
}

// desired storage capacity is less than the current capacity; logs and records a warning event
if cmp == 1 {
msg := "shrinking persistent volumes is not supported"
logger.Error(errors.New("unsupported operation"), msg)
desiredCapacity := persistenceStorageCapacity(desiredSts.Spec.VolumeClaimTemplates)
err := scaling.NewPersistenceScaler(r.Clientset).Scale(ctx, *rmq, desiredCapacity)
if err != nil {
msg := fmt.Sprintf("Failed to scale PVCs: %s", err.Error())
logger.Error(fmt.Errorf("Hit an error while scaling PVC capacity: %w", err), msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", msg)
return false, errors.New(msg)
}
return false, nil
return err
}

func persistenceStorageCapacity(templates []corev1.PersistentVolumeClaim) k8sresource.Quantity {
Expand All @@ -120,45 +32,3 @@ func persistenceStorageCapacity(templates []corev1.PersistentVolumeClaim) k8sres
}
return k8sresource.MustParse("0")
}

// deleteSts deletes a sts without deleting pods and PVCs
// using DeletePropagationPolicy set to 'Orphan'
func (r *RabbitmqClusterReconciler) deleteSts(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
logger := ctrl.LoggerFrom(ctx)
logger.Info("deleting statefulSet (pods won't be deleted)", "statefulSet", rmq.ChildResourceName("server"))
deletePropagationPolicy := metav1.DeletePropagationOrphan
deleteOptions := &client.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}
currentSts, err := r.statefulSet(ctx, rmq)
if err != nil {
return err
}
if err := r.Delete(ctx, currentSts, deleteOptions); err != nil {
msg := "failed to delete statefulSet"
logger.Error(err, msg, "statefulSet", currentSts.Name)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", fmt.Sprintf("%s %s", msg, currentSts.Name))
return fmt.Errorf("%s %s: %v", msg, currentSts.Name, err)
}

if err := retryWithInterval(logger, "delete statefulSet", 10, 3*time.Second, func() bool {
_, getErr := r.statefulSet(ctx, rmq)
return k8serrors.IsNotFound(getErr)
}); err != nil {
msg := "statefulSet not deleting after 30 seconds"
logger.Error(err, msg, "statefulSet", currentSts.Name)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", fmt.Sprintf("%s %s", msg, currentSts.Name))
return fmt.Errorf("%s %s: %v", msg, currentSts.Name, err)
}
logger.Info("statefulSet deleted", "statefulSet", currentSts.Name)
return nil
}

func retryWithInterval(logger logr.Logger, msg string, retry int, interval time.Duration, f func() bool) (err error) {
for i := 0; i < retry; i++ {
if ok := f(); ok {
return
}
time.Sleep(interval)
logger.Info("retrying again", "action", msg, "interval", interval, "attempt", i+1)
}
return fmt.Errorf("failed to %s after %d retries", msg, retry)
}
1 change: 1 addition & 0 deletions controllers/reconcile_persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers_test
import (
"context"
"fmt"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
Expand Down
1 change: 1 addition & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ var _ = BeforeSuite(func() {
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor(controllerName),
Namespace: "rabbitmq-system",
Clientset: clientSet,
PodExecutor: fakeExecutor,
}).SetupWithManager(mgr)
Expect(err).ToNot(HaveOccurred())
Expand Down
Loading