Skip to content

Commit

Permalink
cluster: delete PVCs after decom if flag is set
Browse files Browse the repository at this point in the history
To scale down and up again, we need to delete PVCs after a broker/pod
has been removed.
Otherwise, when scaling up again, the old pvc either:
- Brings up a new pod that has disk contents of a decommissioned node,
  and will fail to join the cluster
- Blocks scheduling of the pod, because the machine does not exist
  anymore.

In any case, the PVC is worthless after decom, and we need to remove it.

Right after decom, before scaling down in the scale_handler, is the
right place to do this. Here, the pod still exists, and we delete the
PVC *AFTER* redpanda confirmed that broker decom was successful.
  • Loading branch information
birdayz committed Aug 30, 2024
1 parent 5c31c49 commit f1114b7
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 30 deletions.
3 changes: 3 additions & 0 deletions src/go/k8s/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func main() {
debug bool
ghostbuster bool
unbindPVCsAfter time.Duration
autoDeletePVCs bool
)

flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.")
Expand Down Expand Up @@ -180,6 +181,7 @@ func main() {
flag.BoolVar(&operatorMode, "operator-mode", true, "enables to run as an operator, setting this to false will disable cluster (deprecated), redpanda resources reconciliation.")
flag.BoolVar(&enableHelmControllers, "enable-helm-controllers", true, "if a namespace is defined and operator mode is true, this enables the use of helm controllers to manage fluxcd helm resources.")
flag.DurationVar(&unbindPVCsAfter, "unbind-pvcs-after", 0, "if not zero, runs the PVCUnbinder controller which attempts to 'unbind' the PVCs' of Pods that are Pending for longer than the given duration")
flag.BoolVar(&autoDeletePVCs, "auto-delete-pvcs", false, "Use StatefulSet PersistentVolumeClaimRetentionPolicy to auto delete PVCs on scale down and Cluster resource delete.")

logOptions.BindFlags(flag.CommandLine)
clientOptions.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -265,6 +267,7 @@ func main() {
MetricsTimeout: metricsTimeout,
RestrictToRedpandaVersion: restrictToRedpandaVersion,
GhostDecommissioning: ghostbuster,
AutoDeletePVCs: autoDeletePVCs,
}).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ClusterReconciler struct {
MetricsTimeout time.Duration
RestrictToRedpandaVersion string
GhostDecommissioning bool
AutoDeletePVCs bool
}

//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
)

type attachedResources struct {
ctx context.Context
reconciler *ClusterReconciler
log logr.Logger
cluster *vectorizedv1alpha1.Cluster
items map[string]resources.Resource
order []string
ctx context.Context
reconciler *ClusterReconciler
log logr.Logger
cluster *vectorizedv1alpha1.Cluster
items map[string]resources.Resource
order []string
autoDeletePVCs bool
}

const (
Expand All @@ -43,11 +44,12 @@ const (

func newAttachedResources(ctx context.Context, r *ClusterReconciler, log logr.Logger, cluster *vectorizedv1alpha1.Cluster) *attachedResources {
return &attachedResources{
ctx: ctx,
reconciler: r,
log: log,
cluster: cluster,
items: map[string]resources.Resource{},
ctx: ctx,
reconciler: r,
log: log,
cluster: cluster,
items: map[string]resources.Resource{},
autoDeletePVCs: r.AutoDeletePVCs,
}
}

Expand Down Expand Up @@ -393,7 +395,8 @@ func (a *attachedResources) statefulSet() error {
a.reconciler.AdminAPIClientFactory,
a.reconciler.DecommissionWaitInterval,
a.log,
a.reconciler.MetricsTimeout)
a.reconciler.MetricsTimeout,
a.autoDeletePVCs)
a.order = append(a.order, statefulSet)
return nil
}
Expand Down
23 changes: 23 additions & 0 deletions src/go/k8s/pkg/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
// The tool being used to manage the operation of an application
ManagedByKey = "app.kubernetes.io/managed-by"

// PodLabelNodeIDKey is used to label the pod with the redpanda node id.
PodLabelNodeIDKey = "operator.redpanda.com/node-id"

nameKeyRedpandaVal = "redpanda"
nameKeyConsoleVal = "redpanda-console"
managedByOperatorVal = "redpanda-operator"
Expand All @@ -48,6 +51,12 @@ func ForCluster(cluster *vectorizedv1alpha1.Cluster) CommonLabels {
return labels
}

func (cl CommonLabels) WithNodeID(nodeID string) CommonLabels {
return merge(cl, map[string]string{
PodLabelNodeIDKey: nodeID,
})
}

// ForConsole return a set of labels that is a union of console labels as well as recommended default labels
// recommended by the kubernetes documentation https://kubernetes.io/docs/concepts/overview/working-with-objects/common-labels/
func ForConsole(console *vectorizedv1alpha1.Console) CommonLabels {
Expand All @@ -63,6 +72,11 @@ func (cl CommonLabels) AsClientSelector() k8slabels.Selector {
return k8slabels.SelectorFromSet(cl.selectorLabels())
}

// AsClientSelectorForNodeID returns a label selector that can find an individual pod with a specific node_id.
func (cl CommonLabels) AsClientSelectorForNodeID() k8slabels.Selector {
return k8slabels.SelectorFromSet(cl.selectorLabelsForNodeID())
}

// AsAPISelector returns label selector made out of subset of common labels: name, instance, component
// return type is metav1.LabelSelector type which is used in resource definition
func (cl CommonLabels) AsAPISelector() *metav1.LabelSelector {
Expand All @@ -83,6 +97,15 @@ func (cl CommonLabels) selectorLabels() k8slabels.Set {
}
}

func (cl CommonLabels) selectorLabelsForNodeID() k8slabels.Set {
return k8slabels.Set{
NameKey: cl[NameKey],
InstanceKey: cl[InstanceKey],
ComponentKey: cl[ComponentKey],
PodLabelNodeIDKey: cl[PodLabelNodeIDKey],
}
}

// merge merges two sets of labels
// if label is set in mainLabels, it won't be overwritten by newLabels
func merge(
Expand Down
4 changes: 3 additions & 1 deletion src/go/k8s/pkg/resources/resource_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func TestEnsure_StatefulSet(t *testing.T) {
adminutils.NewInternalAdminAPI,
time.Second,
ctrl.Log.WithName("test"),
0)
0,
true,
)

err = sts.Ensure(context.Background())
assert.NoError(t, err)
Expand Down
26 changes: 22 additions & 4 deletions src/go/k8s/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ const (
defaultDatadirCapacity = "100Gi"
trueString = "true"

PodAnnotationNodeIDKey = "operator.redpanda.com/node-id"
// PodAnnotationNodeIDKey is identical to its label counterpart.
PodAnnotationNodeIDKey = labels.PodLabelNodeIDKey
)

var (
Expand Down Expand Up @@ -115,6 +116,8 @@ type StatefulSetResource struct {
metricsTimeout time.Duration

LastObservedState *appsv1.StatefulSet

autoRemovePVCs bool
}

// NewStatefulSet creates StatefulSetResource
Expand All @@ -134,6 +137,7 @@ func NewStatefulSet(
decommissionWaitInterval time.Duration,
logger logr.Logger,
metricsTimeout time.Duration,
deletePVCsAfterDecommission bool,
) *StatefulSetResource {
ssr := &StatefulSetResource{
client,
Expand All @@ -153,6 +157,7 @@ func NewStatefulSet(
logger.WithName("StatefulSetResource"),
defaultAdminAPITimeout,
nil,
deletePVCsAfterDecommission,
}
if metricsTimeout != 0 {
ssr.metricsTimeout = metricsTimeout
Expand Down Expand Up @@ -338,6 +343,18 @@ func (r *StatefulSetResource) obj(
fmt.Sprintf("--admin-api-tls-key %q", path.Join(resourcetypes.GetTLSMountPoints().AdminAPI.ClientCAMountDir, "tls.key")))
}

// In any case, configure PersistentVolumeClaimRetentionPolicy
// Default to old behavior: Retain PVC
// If auto-remove-pvcs flag is set, active new behavior: switch to Delete for both WhenScaled and WhenDeleted.
var pvcReclaimRetentionPolicy appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy
if r.autoRemovePVCs {
pvcReclaimRetentionPolicy.WhenScaled = appsv1.DeletePersistentVolumeClaimRetentionPolicyType
pvcReclaimRetentionPolicy.WhenDeleted = appsv1.DeletePersistentVolumeClaimRetentionPolicyType
} else {
pvcReclaimRetentionPolicy.WhenScaled = appsv1.RetainPersistentVolumeClaimRetentionPolicyType
pvcReclaimRetentionPolicy.WhenDeleted = appsv1.RetainPersistentVolumeClaimRetentionPolicyType
}

// We set statefulset replicas via status.currentReplicas in order to control it from the handleScaling function
replicas := r.pandaCluster.GetCurrentReplicas()
ss := &appsv1.StatefulSet{
Expand All @@ -351,9 +368,10 @@ func (r *StatefulSetResource) obj(
APIVersion: "apps/v1",
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
PodManagementPolicy: appsv1.ParallelPodManagement,
Selector: clusterLabels.AsAPISelector(),
PersistentVolumeClaimRetentionPolicy: &pvcReclaimRetentionPolicy,
Replicas: &replicas,
PodManagementPolicy: appsv1.ParallelPodManagement,
Selector: clusterLabels.AsAPISelector(),
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.OnDeleteStatefulSetStrategyType,
},
Expand Down
56 changes: 49 additions & 7 deletions src/go/k8s/pkg/resources/statefulset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/redpanda-data/common-go/rpadmin"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"

vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/src/go/k8s/api/vectorized/v1alpha1"
Expand Down Expand Up @@ -210,10 +212,30 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context, l logr.Log
return err
}

if broker == nil {
log.Info("broker does not exist in the cluster")
r.pandaCluster.SetDecommissionBrokerID(nil)
return r.Status().Update(ctx, r.pandaCluster)
if broker == nil { //nolint:nestif // it's quite a bit nested, but we want to go step-by-step; combining if clauses is not nicer, early returns are not playing very well here either.
log.Info("Broker has finished decommissioning")

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
log.Info("clearing decomm broker ID")
cluster := &vectorizedv1alpha1.Cluster{}
err := r.Get(ctx, types.NamespacedName{
Name: r.pandaCluster.Name,
Namespace: r.pandaCluster.Namespace,
}, cluster)
if err != nil {
return err
}
cluster.Status.DecommissioningNode = nil
err = r.Status().Update(ctx, cluster)
if err == nil {
// sync original cluster variable to avoid conflicts on subsequent operations
log.Info("OK")
r.pandaCluster.Status = cluster.Status
} else {
log.Error(err, "NOK")
}
return err
})
}

if broker.MembershipStatus == rpadmin.MembershipStatusDraining {
Expand Down Expand Up @@ -428,10 +450,30 @@ func setCurrentReplicas(
}

log.Info("Scaling StatefulSet", "replicas", replicas)
pandaCluster.Status.CurrentReplicas = replicas
if err := c.Status().Update(ctx, pandaCluster); err != nil {
return fmt.Errorf("could not scale cluster %s to %d replicas: %w", pandaCluster.Name, replicas, err)

err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
cluster := &vectorizedv1alpha1.Cluster{}
err := c.Get(ctx, types.NamespacedName{
Name: pandaCluster.Name,
Namespace: pandaCluster.Namespace,
}, cluster)
if err != nil {
return err
}

cluster.Status.CurrentReplicas = replicas

err = c.Status().Update(ctx, cluster)
if err == nil {
// sync original cluster variable to avoid conflicts on subsequent operations
pandaCluster.Status = cluster.Status
}
return err
})
if err != nil {
return fmt.Errorf("could not scale cluster %s to %d replicas: %w", pandaCluster.Name, pandaCluster.GetCurrentReplicas(), err)
}

log.Info("StatefulSet scaled", "replicas", replicas)
return nil
}
Expand Down
14 changes: 9 additions & 5 deletions src/go/k8s/pkg/resources/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func TestEnsure(t *testing.T) {
},
time.Second,
ctrl.Log.WithName("test"),
0)
0,
true)

ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)

Expand Down Expand Up @@ -502,7 +503,9 @@ func TestCurrentVersion(t *testing.T) {
},
time.Second,
ctrl.Log.WithName("test"),
0)
0,
true,
)
sts.LastObservedState = &v1.StatefulSet{
Spec: v1.StatefulSetSpec{
Replicas: &tests[i].expectedReplicas,
Expand Down Expand Up @@ -754,7 +757,7 @@ func TestStatefulSetResource_IsManagedDecommission(t *testing.T) {
tt.fields.pandaCluster,
nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour,
tt.fields.logger,
time.Hour)
time.Hour, true)
got, err := r.IsManagedDecommission()
if (err != nil) != tt.wantErr {
t.Errorf("StatefulSetResource.IsManagedDecommission() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -848,7 +851,7 @@ func TestStatefulSetPorts_AdditionalListeners(t *testing.T) {
tt.pandaCluster,
nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour,
logger,
time.Hour)
time.Hour, true)
containerPorts := r.GetPortsForListenersInAdditionalConfig()
assert.Equal(t, len(tt.expectedContainerPorts), len(containerPorts))
for _, cp := range containerPorts {
Expand Down Expand Up @@ -922,7 +925,8 @@ func TestStatefulSetEnv_AdditionalListeners(t *testing.T) {
tt.pandaCluster,
nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour,
logger,
time.Hour)
time.Hour,
true)
envs := r.AdditionalListenersEnvVars()

if tt.expectedEnvValue == "" {
Expand Down
2 changes: 1 addition & 1 deletion src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPo
}

if err = utils.DeletePodPVCs(ctx, r.Client, pod, log); err != nil {
return fmt.Errorf(`unable to remove VPCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
return fmt.Errorf(`unable to remove PVCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
}

log.Info("deleting pod")
Expand Down

0 comments on commit f1114b7

Please sign in to comment.