diff --git a/controllers/rabbitmqcluster_controller.go b/controllers/rabbitmqcluster_controller.go index 49c62be1f..281c21a46 100644 --- a/controllers/rabbitmqcluster_controller.go +++ b/controllers/rabbitmqcluster_controller.go @@ -170,21 +170,23 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } - // only checks for PVC expansion and scale down if statefulSet is created + // only checks for scale down if statefulSet is created // else continue to CreateOrUpdate() if !k8serrors.IsNotFound(err) { if err := builder.Update(sts); err != nil { return ctrl.Result{}, err } - if err = r.reconcilePVC(ctx, rabbitmqCluster, current, sts); err != nil { - r.setReconcileSuccess(ctx, rabbitmqCluster, corev1.ConditionFalse, "FailedReconcilePVC", err.Error()) - return ctrl.Result{}, err - } if r.scaleDown(ctx, rabbitmqCluster, current, sts) { // return when cluster scale down detected; unsupported operation return ctrl.Result{}, nil } } + + // The PVCs for the StatefulSet may require expanding + if err = r.reconcilePVC(ctx, rabbitmqCluster, sts); err != nil { + r.setReconcileSuccess(ctx, rabbitmqCluster, corev1.ConditionFalse, "FailedReconcilePVC", err.Error()) + return ctrl.Result{}, err + } } var operationResult controllerutil.OperationResult diff --git a/controllers/reconcile_persistence.go b/controllers/reconcile_persistence.go index 5226caaaf..d4ee6d200 100644 --- a/controllers/reconcile_persistence.go +++ b/controllers/reconcile_persistence.go @@ -2,114 +2,26 @@ package controllers import ( "context" - "errors" "fmt" - "time" - "github.com/go-logr/logr" rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" + "github.com/rabbitmq/cluster-operator/internal/scaling" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" k8sresource "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) -func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) error { - resize, err := r.needsPVCExpand(ctx, rmq, current, sts) - if err != nil { - return err - } - - if resize { - if err := r.expandPVC(ctx, rmq, current, sts); err != nil { - return err - } - } - return nil -} - -func (r *RabbitmqClusterReconciler) expandPVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, desired *appsv1.StatefulSet) error { - logger := ctrl.LoggerFrom(ctx) - - currentCapacity := persistenceStorageCapacity(current.Spec.VolumeClaimTemplates) - - desiredCapacity := persistenceStorageCapacity(desired.Spec.VolumeClaimTemplates) - - // don't allow going from 0 (no PVC) to anything else - if (currentCapacity.Cmp(k8sresource.MustParse("0Gi")) == 0) && (desiredCapacity.Cmp(k8sresource.MustParse("0Gi")) != 0) { - msg := "changing from ephemeral to persistent storage is not supported" - logger.Error(errors.New("unsupported operation"), msg) - r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", msg) - return errors.New(msg) - } - - logger.Info(fmt.Sprintf("updating storage capacity from %s to %s", currentCapacity.String(), desiredCapacity.String())) - - if err := r.deleteSts(ctx, rmq); err != nil { - return err - } - - if err := r.updatePVC(ctx, rmq, *current.Spec.Replicas, desiredCapacity); err != nil { - return err - } - - return nil -} - -func (r *RabbitmqClusterReconciler) updatePVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, replicas int32, desiredCapacity k8sresource.Quantity) error { - logger := ctrl.LoggerFrom(ctx) - logger.Info("expanding PersistentVolumeClaims") - - for i := 0; i < int(replicas); i++ { - PVCName := rmq.PVCName(i) - PVC := corev1.PersistentVolumeClaim{} - - if err := r.Client.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: PVCName}, &PVC); err != nil { - msg := "failed to get PersistentVolumeClaim" - logger.Error(err, msg, "PersistentVolumeClaim", PVCName) - r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", fmt.Sprintf("%s %s", msg, PVCName)) - return fmt.Errorf("%s %s: %v", msg, PVCName, err) - } - PVC.Spec.Resources.Requests[corev1.ResourceStorage] = desiredCapacity - if err := r.Client.Update(ctx, &PVC, &client.UpdateOptions{}); err != nil { - msg := "failed to update PersistentVolumeClaim" - logger.Error(err, msg, "PersistentVolumeClaim", PVCName) - r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", fmt.Sprintf("%s %s", msg, PVCName)) - return fmt.Errorf("%s %s: %v", msg, PVCName, err) - } - logger.Info("successfully expanded", "PVC", PVCName) - } - return nil -} - -// returns true if desired storage capacity is larger than the current storage; returns false when current and desired capacity is the same -// errors when desired capacity is less than current capacity because PVC shrink is not supported by k8s -func (r *RabbitmqClusterReconciler) needsPVCExpand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, desired *appsv1.StatefulSet) (bool, error) { +func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, desiredSts *appsv1.StatefulSet) error { logger := ctrl.LoggerFrom(ctx) - - currentCapacity := persistenceStorageCapacity(current.Spec.VolumeClaimTemplates) - - desiredCapacity := persistenceStorageCapacity(desired.Spec.VolumeClaimTemplates) - - cmp := currentCapacity.Cmp(desiredCapacity) - - // desired storage capacity is larger than the current capacity; PVC needs expansion - if cmp == -1 { - return true, nil - } - - // desired storage capacity is less than the current capacity; logs and records a warning event - if cmp == 1 { - msg := "shrinking persistent volumes is not supported" - logger.Error(errors.New("unsupported operation"), msg) + desiredCapacity := persistenceStorageCapacity(desiredSts.Spec.VolumeClaimTemplates) + err := scaling.NewPersistenceScaler(r.Clientset).Scale(ctx, *rmq, desiredCapacity) + if err != nil { + msg := fmt.Sprintf("Failed to scale PVCs: %s", err.Error()) + logger.Error(fmt.Errorf("Hit an error while scaling PVC capacity: %w", err), msg) r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", msg) - return false, errors.New(msg) } - return false, nil + return err } func persistenceStorageCapacity(templates []corev1.PersistentVolumeClaim) k8sresource.Quantity { @@ -120,45 +32,3 @@ func persistenceStorageCapacity(templates []corev1.PersistentVolumeClaim) k8sres } return k8sresource.MustParse("0") } - -// deleteSts deletes a sts without deleting pods and PVCs -// using DeletePropagationPolicy set to 'Orphan' -func (r *RabbitmqClusterReconciler) deleteSts(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error { - logger := ctrl.LoggerFrom(ctx) - logger.Info("deleting statefulSet (pods won't be deleted)", "statefulSet", rmq.ChildResourceName("server")) - deletePropagationPolicy := metav1.DeletePropagationOrphan - deleteOptions := &client.DeleteOptions{PropagationPolicy: &deletePropagationPolicy} - currentSts, err := r.statefulSet(ctx, rmq) - if err != nil { - return err - } - if err := r.Delete(ctx, currentSts, deleteOptions); err != nil { - msg := "failed to delete statefulSet" - logger.Error(err, msg, "statefulSet", currentSts.Name) - r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", fmt.Sprintf("%s %s", msg, currentSts.Name)) - return fmt.Errorf("%s %s: %v", msg, currentSts.Name, err) - } - - if err := retryWithInterval(logger, "delete statefulSet", 10, 3*time.Second, func() bool { - _, getErr := r.statefulSet(ctx, rmq) - return k8serrors.IsNotFound(getErr) - }); err != nil { - msg := "statefulSet not deleting after 30 seconds" - logger.Error(err, msg, "statefulSet", currentSts.Name) - r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", fmt.Sprintf("%s %s", msg, currentSts.Name)) - return fmt.Errorf("%s %s: %v", msg, currentSts.Name, err) - } - logger.Info("statefulSet deleted", "statefulSet", currentSts.Name) - return nil -} - -func retryWithInterval(logger logr.Logger, msg string, retry int, interval time.Duration, f func() bool) (err error) { - for i := 0; i < retry; i++ { - if ok := f(); ok { - return - } - time.Sleep(interval) - logger.Info("retrying again", "action", msg, "interval", interval, "attempt", i+1) - } - return fmt.Errorf("failed to %s after %d retries", msg, retry) -} diff --git a/controllers/reconcile_persistence_test.go b/controllers/reconcile_persistence_test.go index 321851c29..cb43211db 100644 --- a/controllers/reconcile_persistence_test.go +++ b/controllers/reconcile_persistence_test.go @@ -3,6 +3,7 @@ package controllers_test import ( "context" "fmt" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 01c205907..08034fefb 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -86,6 +86,7 @@ var _ = BeforeSuite(func() { Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor(controllerName), Namespace: "rabbitmq-system", + Clientset: clientSet, PodExecutor: fakeExecutor, }).SetupWithManager(mgr) Expect(err).ToNot(HaveOccurred()) diff --git a/internal/scaling/scaling.go b/internal/scaling/scaling.go new file mode 100644 index 000000000..289d74360 --- /dev/null +++ b/internal/scaling/scaling.go @@ -0,0 +1,203 @@ +package scaling + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/go-logr/logr" + rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + k8sresource "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/pointer" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type PersistenceScaler struct { + Client kubernetes.Interface +} + +func NewPersistenceScaler(client kubernetes.Interface) PersistenceScaler { + return PersistenceScaler{ + Client: client, + } +} + +func (p PersistenceScaler) Scale(ctx context.Context, rmq rabbitmqv1beta1.RabbitmqCluster, desiredCapacity k8sresource.Quantity) error { + logger := ctrl.LoggerFrom(ctx) + + existingCapacity, err := p.existingCapacity(ctx, rmq) + if client.IgnoreNotFound(err) != nil { + logErr := fmt.Errorf("Failed to determine existing STS capactiy: %w", err) + logger.Error(logErr, "Could not read sts") + return logErr + } + + // don't allow going from 0 (no PVC) to anything else + if err == nil && (existingCapacity.Cmp(k8sresource.MustParse("0Gi")) == 0) && (desiredCapacity.Cmp(k8sresource.MustParse("0Gi")) != 0) { + msg := "changing from ephemeral to persistent storage is not supported" + logger.Error(errors.New("unsupported operation"), msg) + return errors.New(msg) + } + + // desired storage capacity is smaller than the current capacity; we can't proceed lest we lose data + if err == nil && existingCapacity.Cmp(desiredCapacity) == 1 { + msg := "shrinking persistent volumes is not supported" + logger.Error(errors.New("unsupported operation"), msg) + return errors.New(msg) + } + + existingPVCs, err := p.getClusterPVCs(ctx, rmq) + if err != nil { + logger.Error(err, "failed to retrieve the existing cluster PVCs") + return err + } + pvcsToBeScaled := p.pvcsNeedingScaling(existingPVCs, desiredCapacity) + if len(pvcsToBeScaled) == 0 { + return nil + } + logger.Info("Scaling up PVCs", "RabbitmqCluster", rmq.Name, "pvcsToBeScaled", pvcsToBeScaled) + + if err := p.deleteSts(ctx, rmq); err != nil { + logErr := fmt.Errorf("Failed to delete Statefulset from Kubernetes API: %w", err) + logger.Error(logErr, "Could not delete existing sts") + return logErr + } + + return p.scaleUpPVCs(ctx, rmq, pvcsToBeScaled, desiredCapacity) +} + +func (p PersistenceScaler) getClusterPVCs(ctx context.Context, rmq rabbitmqv1beta1.RabbitmqCluster) ([]*corev1.PersistentVolumeClaim, error) { + logger := ctrl.LoggerFrom(ctx) + + var pvcs []*corev1.PersistentVolumeClaim + + var i int32 + for i = 0; i < pointer.Int32Deref(rmq.Spec.Replicas, 1); i++ { + pvc, err := p.Client.CoreV1().PersistentVolumeClaims(rmq.Namespace).Get(ctx, rmq.PVCName(int(i)), metav1.GetOptions{}) + if client.IgnoreNotFound(err) != nil { + logErr := fmt.Errorf("Failed to get PVC from Kubernetes API: %w", err) + logger.Error(logErr, "Could not read existing PVC") + return nil, logErr + } + // If the PVC exists, we may need to scale it. + if err == nil { + pvcs = append(pvcs, pvc) + } + } + if len(pvcs) > 0 { + logger.V(1).Info("Found existing PVCs", "pvcList", pvcs) + } + return pvcs, nil +} + +func (p PersistenceScaler) pvcsNeedingScaling(existingPVCs []*corev1.PersistentVolumeClaim, desiredCapacity k8sresource.Quantity) []*corev1.PersistentVolumeClaim { + var pvcs []*corev1.PersistentVolumeClaim + + for _, pvc := range existingPVCs { + existingCapacity := pvc.Spec.Resources.Requests[corev1.ResourceStorage] + + // desired storage capacity is larger than the current capacity; PVC needs expansion + if existingCapacity.Cmp(desiredCapacity) == -1 { + pvcs = append(pvcs, pvc) + } + } + return pvcs +} + +func (p PersistenceScaler) getSts(ctx context.Context, rmq rabbitmqv1beta1.RabbitmqCluster) (*appsv1.StatefulSet, error) { + return p.Client.AppsV1().StatefulSets(rmq.Namespace).Get(ctx, rmq.ChildResourceName("server"), metav1.GetOptions{}) +} + +func (p PersistenceScaler) existingCapacity(ctx context.Context, rmq rabbitmqv1beta1.RabbitmqCluster) (k8sresource.Quantity, error) { + sts, err := p.getSts(ctx, rmq) + if err != nil { + return k8sresource.MustParse("0"), err + } + + for _, t := range sts.Spec.VolumeClaimTemplates { + if t.Name == "persistence" { + return t.Spec.Resources.Requests[corev1.ResourceStorage], nil + } + } + return k8sresource.MustParse("0"), nil +} + +// deleteSts deletes a sts without deleting pods and PVCs +// using DeletePropagationPolicy set to 'Orphan' +func (p PersistenceScaler) deleteSts(ctx context.Context, rmq rabbitmqv1beta1.RabbitmqCluster) error { + logger := ctrl.LoggerFrom(ctx) + logger.Info("deleting statefulSet (pods won't be deleted)", "statefulSet", rmq.ChildResourceName("server")) + + sts, err := p.getSts(ctx, rmq) + if client.IgnoreNotFound(err) != nil { + logErr := fmt.Errorf("Failed to get statefulset from Kubernetes API: %w", err) + logger.Error(logErr, "Could not read existing statefulset") + return logErr + } + + // The StatefulSet may have already been deleted. If so, there is no need to delete it again. + if k8serrors.IsNotFound(err) { + logger.Info("statefulset has already been deleted", "StatefulSet", rmq.Name, "RabbitmqCluster", rmq.Name) + return nil + } + + deletePropagationPolicy := metav1.DeletePropagationOrphan + if err = p.Client.AppsV1().StatefulSets(sts.Namespace).Delete(ctx, sts.Name, metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}); err != nil { + msg := "failed to delete statefulSet" + logger.Error(err, msg, "statefulSet", sts.Name) + return fmt.Errorf("%s %s: %w", msg, sts.Name, err) + } + + if err := retryWithInterval(logger, "delete statefulSet", 10, 3*time.Second, func() bool { + _, getErr := p.Client.AppsV1().StatefulSets(rmq.Namespace).Get(ctx, rmq.ChildResourceName("server"), metav1.GetOptions{}) + return k8serrors.IsNotFound(getErr) + }); err != nil { + msg := "statefulSet not deleting after 30 seconds" + logger.Error(err, msg, "statefulSet", sts.Name) + return fmt.Errorf("%s %s: %w", msg, sts.Name, err) + } + logger.Info("statefulSet deleted", "statefulSet", sts.Name) + return nil +} + +func (p PersistenceScaler) scaleUpPVCs(ctx context.Context, rmq rabbitmqv1beta1.RabbitmqCluster, pvcs []*corev1.PersistentVolumeClaim, desiredCapacity k8sresource.Quantity) error { + logger := ctrl.LoggerFrom(ctx) + + for _, pvc := range pvcs { + // To minimise any timing windows, retrieve the latest version of this PVC before updating + pvc, err := p.Client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(ctx, pvc.Name, metav1.GetOptions{}) + if err != nil { + logErr := fmt.Errorf("Failed to get PVC from Kubernetes API: %w", err) + logger.Error(logErr, "Could not read existing PVC") + return logErr + } + + pvc.Spec.Resources.Requests[corev1.ResourceStorage] = desiredCapacity + _, err = p.Client.CoreV1().PersistentVolumeClaims(rmq.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) + if err != nil { + msg := "failed to update PersistentVolumeClaim" + logger.Error(err, msg, "PersistentVolumeClaim", pvc.Name) + return fmt.Errorf("%s %s: %w", msg, pvc.Name, err) + } + logger.Info("Successfully scaled up PVC", "PersistentVolumeClaim", pvc.Name, "newCapacity", desiredCapacity) + } + return nil +} + +func retryWithInterval(logger logr.Logger, msg string, retry int, interval time.Duration, f func() bool) (err error) { + for i := 0; i < retry; i++ { + if ok := f(); ok { + return + } + time.Sleep(interval) + logger.Info("retrying again", "action", msg, "interval", interval, "attempt", i+1) + } + return fmt.Errorf("failed to %s after %d retries", msg, retry) +} diff --git a/internal/scaling/scaling_suite_test.go b/internal/scaling/scaling_suite_test.go new file mode 100644 index 000000000..23f1f1561 --- /dev/null +++ b/internal/scaling/scaling_suite_test.go @@ -0,0 +1,238 @@ +package scaling_test + +import ( + "fmt" + "reflect" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + . "github.com/onsi/gomega/gstruct" + "github.com/onsi/gomega/types" + rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" + "github.com/rabbitmq/cluster-operator/internal/scaling" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + k8sresource "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" +) + +func TestScaling(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Scaling Suite") +} + +const namespace = "exampleNamespace" + +var ( + initialAPIObjects []runtime.Object + fakeClientset *fake.Clientset + persistenceScaler scaling.PersistenceScaler + rmq rabbitmqv1beta1.RabbitmqCluster + existingSts appsv1.StatefulSet + existingPVC corev1.PersistentVolumeClaim + three = int32(3) + oneG = k8sresource.MustParse("1Gi") + tenG = k8sresource.MustParse("10Gi") + fifteenG = k8sresource.MustParse("15Gi") + ephemeralStorage = k8sresource.MustParse("0") +) + +func generatePVCTemplate(rmq rabbitmqv1beta1.RabbitmqCluster, size k8sresource.Quantity) corev1.PersistentVolumeClaim { + return corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "persistence", + Namespace: namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: size, + }, + }, + }, + } +} + +func generatePVC(rmq rabbitmqv1beta1.RabbitmqCluster, index int, size k8sresource.Quantity) corev1.PersistentVolumeClaim { + name := rmq.PVCName(index) + return corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: size, + }, + }, + }, + } +} + +type ActionMatcher struct { + expectedVerb string + expectedResourceType string + expectedNamespace string + actualAction k8stesting.Action +} + +func BeGetActionOnResource(expectedResourceType, expectedResourceName, expectedNamespace string) types.GomegaMatcher { + return &GetActionMatcher{ + ActionMatcher{ + expectedVerb: "get", + expectedResourceType: expectedResourceType, + expectedNamespace: expectedNamespace, + }, + expectedResourceName, + } +} + +type GetActionMatcher struct { + ActionMatcher + expectedResourceName string +} + +func (matcher *GetActionMatcher) Match(actual interface{}) (bool, error) { + genericAction, ok := actual.(k8stesting.Action) + if !ok { + return false, fmt.Errorf("BeGetActionOnResource must be passed an Action from the fakeClientset") + } + matcher.actualAction = genericAction + + action, ok := actual.(k8stesting.GetAction) + if !ok { + return false, nil + } + return action.Matches(matcher.expectedVerb, matcher.expectedResourceType) && + action.GetNamespace() == matcher.expectedNamespace && + action.GetName() == matcher.expectedResourceName, nil +} + +func (matcher *GetActionMatcher) FailureMessage(actual interface{}) string { + return fmt.Sprintf("Expected '%s' on resource '%s' named '%s' in namespace '%s' to match the observed action:\n%+v\n", + matcher.expectedVerb, matcher.expectedResourceType, matcher.expectedResourceName, matcher.expectedNamespace, matcher.actualAction) +} + +func (matcher *GetActionMatcher) NegatedFailureMessage(actual interface{}) string { + return fmt.Sprintf("Expected '%s' on resource '%s' named '%s' in namespace '%s' not to match the observed action:\n%+v\n", + matcher.expectedVerb, matcher.expectedResourceType, matcher.expectedResourceName, matcher.expectedNamespace, matcher.actualAction) +} + +func BeDeleteActionOnResource(expectedResourceType, expectedResourceName, expectedNamespace string) types.GomegaMatcher { + return &DeleteActionMatcher{ + ActionMatcher{ + expectedVerb: "delete", + expectedResourceType: expectedResourceType, + expectedNamespace: expectedNamespace, + }, + expectedResourceName, + } +} + +type DeleteActionMatcher struct { + ActionMatcher + expectedResourceName string +} + +func (matcher *DeleteActionMatcher) Match(actual interface{}) (bool, error) { + genericAction, ok := actual.(k8stesting.Action) + if !ok { + return false, fmt.Errorf("BeDeleteActionOnResource must be passed an Action from the fakeClientset") + } + matcher.actualAction = genericAction + + action, ok := actual.(k8stesting.DeleteAction) + if !ok { + return false, nil + } + return action.Matches(matcher.expectedVerb, matcher.expectedResourceType) && + action.GetNamespace() == matcher.expectedNamespace && + action.GetName() == matcher.expectedResourceName, nil + +} + +func (matcher *DeleteActionMatcher) FailureMessage(actual interface{}) string { + return fmt.Sprintf("Expected '%s' on resource '%s' named '%s' in namespace '%s' to match the observed action:\n%+v\n", + matcher.expectedVerb, matcher.expectedResourceType, matcher.expectedResourceName, matcher.expectedNamespace, matcher.actualAction) +} + +func (matcher *DeleteActionMatcher) NegatedFailureMessage(actual interface{}) string { + return fmt.Sprintf("Expected '%s' on resource '%s' named '%s' in namespace '%s' not to match the observed action:\n%+v\n", + matcher.expectedVerb, matcher.expectedResourceType, matcher.expectedResourceName, matcher.expectedNamespace, matcher.actualAction) +} + +func BeUpdateActionOnResource(expectedResourceType, expectedResourceName, expectedNamespace string, updatedResourceMatcher types.GomegaMatcher) types.GomegaMatcher { + return &UpdateActionMatcher{ + ActionMatcher{ + expectedVerb: "update", + expectedResourceType: expectedResourceType, + expectedNamespace: expectedNamespace, + }, + expectedResourceName, + PointTo(updatedResourceMatcher), + false, + } +} + +type UpdateActionMatcher struct { + ActionMatcher + expectedResourceName string + updatedResourceMatcher types.GomegaMatcher + failedUpdatedResourceMatcher bool +} + +func (matcher *UpdateActionMatcher) Match(actual interface{}) (bool, error) { + genericAction, ok := actual.(k8stesting.Action) + if !ok { + return false, fmt.Errorf("BeUpdateActionOnResource must be passed an Action from the fakeClientset") + } + matcher.actualAction = genericAction + + action, ok := actual.(k8stesting.UpdateAction) + if !ok { + return false, nil + } + + updatedObject := reflect.ValueOf(action.GetObject()).Elem() + objMeta, ok := updatedObject.FieldByName("ObjectMeta").Interface().(metav1.ObjectMeta) + if !ok { + return false, fmt.Errorf("Object of action was not an object with ObjectMeta") + } + + // Check the object's Name, Namespace, resource type and the verb of the action first. If this fails, there's + // no point in running the extra matchers on the updated object. + if !(action.Matches(matcher.expectedVerb, matcher.expectedResourceType) && + action.GetNamespace() == matcher.expectedNamespace && + objMeta.GetName() == matcher.expectedResourceName) { + return false, nil + } + + passedUpdatedResourceMatcher, err := matcher.updatedResourceMatcher.Match(action.GetObject()) + if err != nil { + return false, fmt.Errorf("failed to run embedded matcher: %w", err) + } + matcher.failedUpdatedResourceMatcher = !passedUpdatedResourceMatcher + + return passedUpdatedResourceMatcher, nil +} + +func (matcher *UpdateActionMatcher) FailureMessage(actual interface{}) string { + if matcher.failedUpdatedResourceMatcher { + return matcher.updatedResourceMatcher.FailureMessage(actual) + } + return fmt.Sprintf("Expected '%s' on resource '%s' named '%s' in namespace '%s' to match the observed action:\n%+v\n", + matcher.expectedVerb, matcher.expectedResourceType, matcher.expectedResourceName, matcher.expectedNamespace, matcher.actualAction) +} + +func (matcher *UpdateActionMatcher) NegatedFailureMessage(actual interface{}) string { + if matcher.failedUpdatedResourceMatcher { + return matcher.updatedResourceMatcher.NegatedFailureMessage(actual) + } + return fmt.Sprintf("Expected '%s' on resource '%s' named '%s' in namespace '%s' not to match the observed action:\n%+v\n", + matcher.expectedVerb, matcher.expectedResourceType, matcher.expectedResourceName, matcher.expectedNamespace, matcher.actualAction) +} diff --git a/internal/scaling/scaling_test.go b/internal/scaling/scaling_test.go new file mode 100644 index 000000000..c30bebfc0 --- /dev/null +++ b/internal/scaling/scaling_test.go @@ -0,0 +1,263 @@ +package scaling_test + +import ( + "context" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + . "github.com/onsi/gomega/gstruct" + rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" + "github.com/rabbitmq/cluster-operator/internal/scaling" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +var _ = Describe("Scaling", func() { + BeforeEach(func() { + rmq = rabbitmqv1beta1.RabbitmqCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rabbit", + Namespace: namespace, + }, + } + existingPVC = generatePVC(rmq, 0, tenG) + existingSts = appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rabbit-server", + Namespace: namespace, + }, + Spec: appsv1.StatefulSetSpec{ + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{generatePVCTemplate(rmq, tenG)}, + }, + } + }) + JustBeforeEach(func() { + fakeClientset = fake.NewSimpleClientset(initialAPIObjects...) + persistenceScaler = scaling.NewPersistenceScaler(fakeClientset) + }) + + When("the PVC and StatefulSet already exist", func() { + BeforeEach(func() { + initialAPIObjects = []runtime.Object{&existingSts, &existingPVC} + }) + It("scales the PVC", func() { + Expect(persistenceScaler.Scale(context.Background(), rmq, fifteenG)).To(Succeed()) + Expect(fakeClientset.Actions()).To(MatchAllElementsWithIndex(IndexIdentity, Elements{ + "0": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "1": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + "2": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "3": BeDeleteActionOnResource("statefulsets", "rabbit-server", namespace), + "4": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "5": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + "6": BeUpdateActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace, MatchFields(IgnoreExtras, Fields{ + "Spec": MatchFields(IgnoreExtras, Fields{ + "Resources": MatchFields(IgnoreExtras, Fields{ + "Requests": MatchAllKeys(Keys{ + corev1.ResourceStorage: Equal(fifteenG), + }), + }), + }), + })), + })) + }) + }) + + When("the PVC does not yet exist", func() { + BeforeEach(func() { + initialAPIObjects = []runtime.Object{&existingSts} + }) + It("performs no actions other than checking for the PVC's existence", func() { + Expect(persistenceScaler.Scale(context.Background(), rmq, fifteenG)).To(Succeed()) + Expect(fakeClientset.Actions()).To(MatchAllElementsWithIndex(IndexIdentity, Elements{ + "0": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "1": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + })) + }) + }) + + When("the PVC exists, but the StatefulSet does not exist", func() { + BeforeEach(func() { + initialAPIObjects = []runtime.Object{&existingPVC} + }) + It("does not delete the StatefulSet, but still updates the PVC", func() { + Expect(persistenceScaler.Scale(context.Background(), rmq, fifteenG)).To(Succeed()) + Expect(fakeClientset.Actions()).To(MatchAllElementsWithIndex(IndexIdentity, Elements{ + "0": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "1": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + "2": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "3": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + "4": BeUpdateActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace, MatchFields(IgnoreExtras, Fields{ + "Spec": MatchFields(IgnoreExtras, Fields{ + "Resources": MatchFields(IgnoreExtras, Fields{ + "Requests": MatchAllKeys(Keys{ + corev1.ResourceStorage: Equal(fifteenG), + }), + }), + }), + })), + })) + }) + }) + + When("the desired PVC capacity is lower than the existing PVC", func() { + BeforeEach(func() { + initialAPIObjects = []runtime.Object{&existingSts, &existingPVC} + }) + It("raises an error", func() { + Expect(persistenceScaler.Scale(context.Background(), rmq, oneG)).To(MatchError("shrinking persistent volumes is not supported")) + Expect(fakeClientset.Actions()).To(MatchAllElementsWithIndex(IndexIdentity, Elements{ + "0": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + })) + }) + }) + + When("the existing cluster is using ephemeral storage", func() { + BeforeEach(func() { + existingSts.Spec.VolumeClaimTemplates = nil + initialAPIObjects = []runtime.Object{&existingSts} + }) + It("raises an error if trying to move to persistent storage", func() { + Expect(persistenceScaler.Scale(context.Background(), rmq, tenG)).To(MatchError("changing from ephemeral to persistent storage is not supported")) + Expect(fakeClientset.Actions()).To(MatchAllElementsWithIndex(IndexIdentity, Elements{ + "0": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + })) + }) + It("does nothing if remaining as ephemeral storage", func() { + Expect(persistenceScaler.Scale(context.Background(), rmq, ephemeralStorage)).To(Succeed()) + Expect(fakeClientset.Actions()).To(MatchAllElementsWithIndex(IndexIdentity, Elements{ + "0": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "1": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + })) + }) + }) + + When("the cluster has more than one replica", func() { + When("all the PVCs exist and are the same size", func() { + BeforeEach(func() { + rmq.Spec.Replicas = &three + existingPVC0 := generatePVC(rmq, 0, tenG) + existingPVC1 := generatePVC(rmq, 1, tenG) + existingPVC2 := generatePVC(rmq, 2, tenG) + initialAPIObjects = []runtime.Object{&existingSts, &existingPVC0, &existingPVC1, &existingPVC2} + }) + It("deletes the statefulset and updates each individual PVC", func() { + Expect(persistenceScaler.Scale(context.Background(), rmq, fifteenG)).To(Succeed()) + Expect(fakeClientset.Actions()).To(MatchAllElementsWithIndex(IndexIdentity, Elements{ + "0": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "1": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + "2": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-1", namespace), + "3": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-2", namespace), + "4": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "5": BeDeleteActionOnResource("statefulsets", "rabbit-server", namespace), + "6": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "7": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + "8": BeUpdateActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace, MatchFields(IgnoreExtras, Fields{ + "Spec": MatchFields(IgnoreExtras, Fields{ + "Resources": MatchFields(IgnoreExtras, Fields{ + "Requests": MatchAllKeys(Keys{ + corev1.ResourceStorage: Equal(fifteenG), + }), + }), + }), + })), + "9": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-1", namespace), + "10": BeUpdateActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-1", namespace, MatchFields(IgnoreExtras, Fields{ + "Spec": MatchFields(IgnoreExtras, Fields{ + "Resources": MatchFields(IgnoreExtras, Fields{ + "Requests": MatchAllKeys(Keys{ + corev1.ResourceStorage: Equal(fifteenG), + }), + }), + }), + })), + "11": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-2", namespace), + "12": BeUpdateActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-2", namespace, MatchFields(IgnoreExtras, Fields{ + "Spec": MatchFields(IgnoreExtras, Fields{ + "Resources": MatchFields(IgnoreExtras, Fields{ + "Requests": MatchAllKeys(Keys{ + corev1.ResourceStorage: Equal(fifteenG), + }), + }), + }), + })), + })) + }) + }) + + When("some of the PVCs don't exist yet", func() { + BeforeEach(func() { + rmq.Spec.Replicas = &three + existingPVC0 := generatePVC(rmq, 0, tenG) + existingPVC2 := generatePVC(rmq, 2, tenG) + initialAPIObjects = []runtime.Object{&existingSts, &existingPVC0, &existingPVC2} + }) + It("deletes the statefulset and updates the PVCs that exist", func() { + Expect(persistenceScaler.Scale(context.Background(), rmq, fifteenG)).To(Succeed()) + Expect(fakeClientset.Actions()).To(MatchAllElementsWithIndex(IndexIdentity, Elements{ + "0": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "1": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + "2": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-1", namespace), + "3": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-2", namespace), + "4": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "5": BeDeleteActionOnResource("statefulsets", "rabbit-server", namespace), + "6": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "7": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + "8": BeUpdateActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace, MatchFields(IgnoreExtras, Fields{ + "Spec": MatchFields(IgnoreExtras, Fields{ + "Resources": MatchFields(IgnoreExtras, Fields{ + "Requests": MatchAllKeys(Keys{ + corev1.ResourceStorage: Equal(fifteenG), + }), + }), + }), + })), + "9": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-2", namespace), + "10": BeUpdateActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-2", namespace, MatchFields(IgnoreExtras, Fields{ + "Spec": MatchFields(IgnoreExtras, Fields{ + "Resources": MatchFields(IgnoreExtras, Fields{ + "Requests": MatchAllKeys(Keys{ + corev1.ResourceStorage: Equal(fifteenG), + }), + }), + }), + })), + })) + }) + }) + + When("some of the PVCs have already been resized", func() { + BeforeEach(func() { + rmq.Spec.Replicas = &three + existingPVC0 := generatePVC(rmq, 0, fifteenG) + existingPVC1 := generatePVC(rmq, 1, fifteenG) + existingPVC2 := generatePVC(rmq, 2, tenG) + initialAPIObjects = []runtime.Object{&existingSts, &existingPVC0, &existingPVC1, &existingPVC2} + }) + It("deletes the statefulset and updates the PVCs that exist", func() { + Expect(persistenceScaler.Scale(context.Background(), rmq, fifteenG)).To(Succeed()) + Expect(fakeClientset.Actions()).To(MatchAllElementsWithIndex(IndexIdentity, Elements{ + "0": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "1": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-0", namespace), + "2": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-1", namespace), + "3": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-2", namespace), + "4": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "5": BeDeleteActionOnResource("statefulsets", "rabbit-server", namespace), + "6": BeGetActionOnResource("statefulsets", "rabbit-server", namespace), + "7": BeGetActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-2", namespace), + "8": BeUpdateActionOnResource("persistentvolumeclaims", "persistence-rabbit-server-2", namespace, MatchFields(IgnoreExtras, Fields{ + "Spec": MatchFields(IgnoreExtras, Fields{ + "Resources": MatchFields(IgnoreExtras, Fields{ + "Requests": MatchAllKeys(Keys{ + corev1.ResourceStorage: Equal(fifteenG), + }), + }), + }), + })), + })) + }) + }) + }) +})