Skip to content

Commit

Permalink
cluster: use STS PersistentVolumeClaimRetentionPolicy
Browse files Browse the repository at this point in the history
if auto-delete-pvcs is set, use PersistentVolumeClaimRetentionPolicy to
auto delete PVCs at scale down and delete.

this allows us to scale down and up; PVCs are worthless after scaling
down because we decomission before scaling down. A decomissioned broker
can not rejoin the cluster.
  • Loading branch information
birdayz committed Aug 30, 2024
1 parent 8b473cc commit 96164cf
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 28 deletions.
3 changes: 3 additions & 0 deletions src/go/k8s/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func main() {
debug bool
ghostbuster bool
unbindPVCsAfter time.Duration
autoDeletePVCs bool
)

flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.")
Expand Down Expand Up @@ -180,6 +181,7 @@ func main() {
flag.BoolVar(&operatorMode, "operator-mode", true, "enables to run as an operator, setting this to false will disable cluster (deprecated), redpanda resources reconciliation.")
flag.BoolVar(&enableHelmControllers, "enable-helm-controllers", true, "if a namespace is defined and operator mode is true, this enables the use of helm controllers to manage fluxcd helm resources.")
flag.DurationVar(&unbindPVCsAfter, "unbind-pvcs-after", 0, "if not zero, runs the PVCUnbinder controller which attempts to 'unbind' the PVCs' of Pods that are Pending for longer than the given duration")
flag.BoolVar(&autoDeletePVCs, "auto-delete-pvcs", false, "Use StatefulSet PersistentVolumeClaimRetentionPolicy to auto delete PVCs on scale down and Cluster resource delete.")

logOptions.BindFlags(flag.CommandLine)
clientOptions.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -265,6 +267,7 @@ func main() {
MetricsTimeout: metricsTimeout,
RestrictToRedpandaVersion: restrictToRedpandaVersion,
GhostDecommissioning: ghostbuster,
AutoDeletePVCs: autoDeletePVCs,
}).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ClusterReconciler struct {
MetricsTimeout time.Duration
RestrictToRedpandaVersion string
GhostDecommissioning bool
AutoDeletePVCs bool
}

//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
)

type attachedResources struct {
ctx context.Context
reconciler *ClusterReconciler
log logr.Logger
cluster *vectorizedv1alpha1.Cluster
items map[string]resources.Resource
order []string
ctx context.Context
reconciler *ClusterReconciler
log logr.Logger
cluster *vectorizedv1alpha1.Cluster
items map[string]resources.Resource
order []string
autoDeletePVCs bool
}

const (
Expand All @@ -43,11 +44,12 @@ const (

func newAttachedResources(ctx context.Context, r *ClusterReconciler, log logr.Logger, cluster *vectorizedv1alpha1.Cluster) *attachedResources {
return &attachedResources{
ctx: ctx,
reconciler: r,
log: log,
cluster: cluster,
items: map[string]resources.Resource{},
ctx: ctx,
reconciler: r,
log: log,
cluster: cluster,
items: map[string]resources.Resource{},
autoDeletePVCs: r.AutoDeletePVCs,
}
}

Expand Down Expand Up @@ -393,7 +395,8 @@ func (a *attachedResources) statefulSet() error {
a.reconciler.AdminAPIClientFactory,
a.reconciler.DecommissionWaitInterval,
a.log,
a.reconciler.MetricsTimeout)
a.reconciler.MetricsTimeout,
a.autoDeletePVCs)
a.order = append(a.order, statefulSet)
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion src/go/k8s/pkg/resources/resource_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func TestEnsure_StatefulSet(t *testing.T) {
adminutils.NewInternalAdminAPI,
time.Second,
ctrl.Log.WithName("test"),
0)
0,
true,
)

err = sts.Ensure(context.Background())
assert.NoError(t, err)
Expand Down
24 changes: 21 additions & 3 deletions src/go/k8s/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
defaultDatadirCapacity = "100Gi"
trueString = "true"

// PodAnnotationNodeIDKey is identical to its label counterpart.
PodAnnotationNodeIDKey = "operator.redpanda.com/node-id"
)

Expand Down Expand Up @@ -115,6 +116,8 @@ type StatefulSetResource struct {
metricsTimeout time.Duration

LastObservedState *appsv1.StatefulSet

autoDeletePVCs bool
}

// NewStatefulSet creates StatefulSetResource
Expand All @@ -134,6 +137,7 @@ func NewStatefulSet(
decommissionWaitInterval time.Duration,
logger logr.Logger,
metricsTimeout time.Duration,
autoDeletePVCs bool,
) *StatefulSetResource {
ssr := &StatefulSetResource{
client,
Expand All @@ -153,6 +157,7 @@ func NewStatefulSet(
logger.WithName("StatefulSetResource"),
defaultAdminAPITimeout,
nil,
autoDeletePVCs,
}
if metricsTimeout != 0 {
ssr.metricsTimeout = metricsTimeout
Expand Down Expand Up @@ -338,6 +343,18 @@ func (r *StatefulSetResource) obj(
fmt.Sprintf("--admin-api-tls-key %q", path.Join(resourcetypes.GetTLSMountPoints().AdminAPI.ClientCAMountDir, "tls.key")))
}

// In any case, configure PersistentVolumeClaimRetentionPolicy
// Default to old behavior: Retain PVC
// If auto-remove-pvcs flag is set, active new behavior: switch to Delete for both WhenScaled and WhenDeleted.
var pvcReclaimRetentionPolicy appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy
if r.autoDeletePVCs {
pvcReclaimRetentionPolicy.WhenScaled = appsv1.DeletePersistentVolumeClaimRetentionPolicyType
pvcReclaimRetentionPolicy.WhenDeleted = appsv1.DeletePersistentVolumeClaimRetentionPolicyType
} else {
pvcReclaimRetentionPolicy.WhenScaled = appsv1.RetainPersistentVolumeClaimRetentionPolicyType
pvcReclaimRetentionPolicy.WhenDeleted = appsv1.RetainPersistentVolumeClaimRetentionPolicyType
}

// We set statefulset replicas via status.currentReplicas in order to control it from the handleScaling function
replicas := r.pandaCluster.GetCurrentReplicas()
ss := &appsv1.StatefulSet{
Expand All @@ -351,9 +368,10 @@ func (r *StatefulSetResource) obj(
APIVersion: "apps/v1",
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
PodManagementPolicy: appsv1.ParallelPodManagement,
Selector: clusterLabels.AsAPISelector(),
PersistentVolumeClaimRetentionPolicy: &pvcReclaimRetentionPolicy,
Replicas: &replicas,
PodManagementPolicy: appsv1.ParallelPodManagement,
Selector: clusterLabels.AsAPISelector(),
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.OnDeleteStatefulSetStrategyType,
},
Expand Down
51 changes: 45 additions & 6 deletions src/go/k8s/pkg/resources/statefulset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/redpanda-data/common-go/rpadmin"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"

vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/src/go/k8s/api/vectorized/v1alpha1"
Expand Down Expand Up @@ -211,9 +213,26 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context, l logr.Log
}

if broker == nil {
log.Info("broker does not exist in the cluster")
r.pandaCluster.SetDecommissionBrokerID(nil)
return r.Status().Update(ctx, r.pandaCluster)
log.Info("Broker has finished decommissioning")

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
cluster := &vectorizedv1alpha1.Cluster{}
err := r.Get(ctx, types.NamespacedName{
Name: r.pandaCluster.Name,
Namespace: r.pandaCluster.Namespace,
}, cluster)
if err != nil {
return err
}
cluster.Status.DecommissioningNode = nil
err = r.Status().Update(ctx, cluster)
if err == nil {
log.Info("Cleared decomm broker ID from status")
// sync original cluster variable to avoid conflicts on subsequent operations
r.pandaCluster.Status = cluster.Status
}
return err
})
}

if broker.MembershipStatus == rpadmin.MembershipStatusDraining {
Expand Down Expand Up @@ -428,10 +447,30 @@ func setCurrentReplicas(
}

log.Info("Scaling StatefulSet", "replicas", replicas)
pandaCluster.Status.CurrentReplicas = replicas
if err := c.Status().Update(ctx, pandaCluster); err != nil {
return fmt.Errorf("could not scale cluster %s to %d replicas: %w", pandaCluster.Name, replicas, err)

err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
cluster := &vectorizedv1alpha1.Cluster{}
err := c.Get(ctx, types.NamespacedName{
Name: pandaCluster.Name,
Namespace: pandaCluster.Namespace,
}, cluster)
if err != nil {
return err
}

cluster.Status.CurrentReplicas = replicas

err = c.Status().Update(ctx, cluster)
if err == nil {
// sync original cluster variable to avoid conflicts on subsequent operations
pandaCluster.Status = cluster.Status
}
return err
})
if err != nil {
return fmt.Errorf("could not scale cluster %s to %d replicas: %w", pandaCluster.Name, pandaCluster.GetCurrentReplicas(), err)
}

log.Info("StatefulSet scaled", "replicas", replicas)
return nil
}
Expand Down
14 changes: 9 additions & 5 deletions src/go/k8s/pkg/resources/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func TestEnsure(t *testing.T) {
},
time.Second,
ctrl.Log.WithName("test"),
0)
0,
true)

ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)

Expand Down Expand Up @@ -502,7 +503,9 @@ func TestCurrentVersion(t *testing.T) {
},
time.Second,
ctrl.Log.WithName("test"),
0)
0,
true,
)
sts.LastObservedState = &v1.StatefulSet{
Spec: v1.StatefulSetSpec{
Replicas: &tests[i].expectedReplicas,
Expand Down Expand Up @@ -754,7 +757,7 @@ func TestStatefulSetResource_IsManagedDecommission(t *testing.T) {
tt.fields.pandaCluster,
nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour,
tt.fields.logger,
time.Hour)
time.Hour, true)
got, err := r.IsManagedDecommission()
if (err != nil) != tt.wantErr {
t.Errorf("StatefulSetResource.IsManagedDecommission() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -848,7 +851,7 @@ func TestStatefulSetPorts_AdditionalListeners(t *testing.T) {
tt.pandaCluster,
nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour,
logger,
time.Hour)
time.Hour, true)
containerPorts := r.GetPortsForListenersInAdditionalConfig()
assert.Equal(t, len(tt.expectedContainerPorts), len(containerPorts))
for _, cp := range containerPorts {
Expand Down Expand Up @@ -922,7 +925,8 @@ func TestStatefulSetEnv_AdditionalListeners(t *testing.T) {
tt.pandaCluster,
nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour,
logger,
time.Hour)
time.Hour,
true)
envs := r.AdditionalListenersEnvVars()

if tt.expectedEnvValue == "" {
Expand Down
2 changes: 1 addition & 1 deletion src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPo
}

if err = utils.DeletePodPVCs(ctx, r.Client, pod, log); err != nil {
return fmt.Errorf(`unable to remove VPCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
return fmt.Errorf(`unable to remove PVCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
}

log.Info("deleting pod")
Expand Down

0 comments on commit 96164cf

Please sign in to comment.