Skip to content

Commit

Permalink
Errors when received PVC shrink requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunyiLyu committed Apr 12, 2021
1 parent 6dbc83d commit 5aa1019
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 5 deletions.
24 changes: 19 additions & 5 deletions controllers/reconcile_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, cluster *rabbitmqv1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) error {
resize, err := r.needsPVCResize(current, sts)
func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) error {
resize, err := r.needsPVCExpansion(ctx, rmq, current, sts)
if err != nil {
return err
}

if resize {
if err := r.expandPVC(ctx, cluster, current, sts); err != nil {
if err := r.expandPVC(ctx, rmq, current, sts); err != nil {
return err
}
}
Expand Down Expand Up @@ -84,7 +84,11 @@ func (r *RabbitmqClusterReconciler) updatePVC(ctx context.Context, rmq *rabbitmq
return nil
}

func (r *RabbitmqClusterReconciler) needsPVCResize(current, desired *appsv1.StatefulSet) (bool, error) {
// returns true if desired storage capacity is larger than the current storage; returns false when current and desired capacity is the same
// errors when desired capacity is less than current capacity because PVC shrink is not supported by k8s
func (r *RabbitmqClusterReconciler) needsPVCExpansion(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, desired *appsv1.StatefulSet) (bool, error) {
logger := ctrl.LoggerFrom(ctx)

currentCapacity, err := persistenceStorageCapacity(current.Spec.VolumeClaimTemplates)
if err != nil {
return false, err
Expand All @@ -95,10 +99,20 @@ func (r *RabbitmqClusterReconciler) needsPVCResize(current, desired *appsv1.Stat
return false, err
}

if currentCapacity.Cmp(desiredCapacity) != 0 {
cmp := currentCapacity.Cmp(desiredCapacity)

// desired storage capacity is larger than the current capacity; PVC needs expansion
if cmp == -1 {
return true, nil
}

// desired storage capacity is less than the current capacity; logs and records a warning event
if cmp == 1 {
msg := "shrinking persistent volumes is not supported"
logger.Error(errors.New("unsupported operation"), msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", msg)
return false, errors.New(msg)
}
return false, nil
}

Expand Down
91 changes: 91 additions & 0 deletions controllers/reconcile_persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package controllers_test

import (
"context"
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/status"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
k8sresource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Persistence", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

BeforeEach(func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-shrink",
Namespace: defaultNamespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: pointer.Int32Ptr(5),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)
})

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}, 5).Should(BeTrue())
})

It("does not allow PVC shrink", func() {
By("not updating statefulSet volume claim storage capacity", func() {
tenG := k8sresource.MustParse("10Gi")
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
storage := k8sresource.MustParse("1Gi")
cluster.Spec.Persistence.Storage = &storage
})).To(Succeed())
Consistently(func() k8sresource.Quantity {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests[corev1.ResourceStorage]
}, 10, 1).Should(Equal(tenG))
})

By("setting 'Warning' events", func() {
Expect(aggregateEventMsgs(ctx, cluster, "FailedReconcilePersistence")).To(
ContainSubstring("shrinking persistent volumes is not supported"))
})

By("setting ReconcileSuccess to 'false' with failed reason and message", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 5).Should(Equal("ReconcileSuccess status: False, " +
"with reason: FailedReconcilePVC " +
"and message: shrinking persistent volumes is not supported"))
})
})
})

0 comments on commit 5aa1019

Please sign in to comment.