Skip to content

Commit

Permalink
cluster: delete PVCs after decom if flag is set
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Aug 29, 2024
1 parent 5c31c49 commit 898cce6
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 39 deletions.
19 changes: 11 additions & 8 deletions src/go/k8s/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func main() {
debug bool
ghostbuster bool
unbindPVCsAfter time.Duration
deletePVCsAfterDecommission bool
)

flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.")
Expand Down Expand Up @@ -180,6 +181,7 @@ func main() {
flag.BoolVar(&operatorMode, "operator-mode", true, "enables to run as an operator, setting this to false will disable cluster (deprecated), redpanda resources reconciliation.")
flag.BoolVar(&enableHelmControllers, "enable-helm-controllers", true, "if a namespace is defined and operator mode is true, this enables the use of helm controllers to manage fluxcd helm resources.")
flag.DurationVar(&unbindPVCsAfter, "unbind-pvcs-after", 0, "if not zero, runs the PVCUnbinder controller which attempts to 'unbind' the PVCs' of Pods that are Pending for longer than the given duration")
flag.BoolVar(&deletePVCsAfterDecommission, "delete-pvcs-after-decommission", false, "if true, deletes PVCs when a broker has finished decommissioning.")

logOptions.BindFlags(flag.CommandLine)
clientOptions.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -257,14 +259,15 @@ func main() {
ctrl.Log.Info("running in v1", "mode", OperatorV1Mode)

if err = (&redpandacontrollers.ClusterReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"),
Scheme: mgr.GetScheme(),
AdminAPIClientFactory: adminutils.NewInternalAdminAPI,
DecommissionWaitInterval: decommissionWaitInterval,
MetricsTimeout: metricsTimeout,
RestrictToRedpandaVersion: restrictToRedpandaVersion,
GhostDecommissioning: ghostbuster,
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"),
Scheme: mgr.GetScheme(),
AdminAPIClientFactory: adminutils.NewInternalAdminAPI,
DecommissionWaitInterval: decommissionWaitInterval,
MetricsTimeout: metricsTimeout,
RestrictToRedpandaVersion: restrictToRedpandaVersion,
GhostDecommissioning: ghostbuster,
DeletePVCsAfterDecommission: deletePVCsAfterDecommission,
}).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down
19 changes: 10 additions & 9 deletions src/go/k8s/internal/controller/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,16 @@ var (
// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
client.Client
Log logr.Logger
configuratorSettings resources.ConfiguratorSettings
clusterDomain string
Scheme *runtime.Scheme
AdminAPIClientFactory adminutils.AdminAPIClientFactory
DecommissionWaitInterval time.Duration
MetricsTimeout time.Duration
RestrictToRedpandaVersion string
GhostDecommissioning bool
Log logr.Logger
configuratorSettings resources.ConfiguratorSettings
clusterDomain string
Scheme *runtime.Scheme
AdminAPIClientFactory adminutils.AdminAPIClientFactory
DecommissionWaitInterval time.Duration
MetricsTimeout time.Duration
RestrictToRedpandaVersion string
GhostDecommissioning bool
DeletePVCsAfterDecommission bool
}

//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
)

type attachedResources struct {
ctx context.Context
reconciler *ClusterReconciler
log logr.Logger
cluster *vectorizedv1alpha1.Cluster
items map[string]resources.Resource
order []string
ctx context.Context
reconciler *ClusterReconciler
log logr.Logger
cluster *vectorizedv1alpha1.Cluster
items map[string]resources.Resource
order []string
deletePVCsAfterDecommission bool
}

const (
Expand All @@ -43,11 +44,12 @@ const (

func newAttachedResources(ctx context.Context, r *ClusterReconciler, log logr.Logger, cluster *vectorizedv1alpha1.Cluster) *attachedResources {
return &attachedResources{
ctx: ctx,
reconciler: r,
log: log,
cluster: cluster,
items: map[string]resources.Resource{},
ctx: ctx,
reconciler: r,
log: log,
cluster: cluster,
items: map[string]resources.Resource{},
deletePVCsAfterDecommission: r.DeletePVCsAfterDecommission,
}
}

Expand Down Expand Up @@ -393,7 +395,8 @@ func (a *attachedResources) statefulSet() error {
a.reconciler.AdminAPIClientFactory,
a.reconciler.DecommissionWaitInterval,
a.log,
a.reconciler.MetricsTimeout)
a.reconciler.MetricsTimeout,
a.deletePVCsAfterDecommission)
a.order = append(a.order, statefulSet)
return nil
}
Expand Down
23 changes: 23 additions & 0 deletions src/go/k8s/pkg/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
// The tool being used to manage the operation of an application
ManagedByKey = "app.kubernetes.io/managed-by"

// PodLabelNodeIDKey is used to label the pod with the redpanda node id.
PodLabelNodeIDKey = "operator.redpanda.com/node-id"

nameKeyRedpandaVal = "redpanda"
nameKeyConsoleVal = "redpanda-console"
managedByOperatorVal = "redpanda-operator"
Expand All @@ -48,6 +51,12 @@ func ForCluster(cluster *vectorizedv1alpha1.Cluster) CommonLabels {
return labels
}

func (cl CommonLabels) WithNodeID(nodeID string) CommonLabels {
return merge(cl, map[string]string{
PodLabelNodeIDKey: nodeID,
})
}

// ForConsole return a set of labels that is a union of console labels as well as recommended default labels
// recommended by the kubernetes documentation https://kubernetes.io/docs/concepts/overview/working-with-objects/common-labels/
func ForConsole(console *vectorizedv1alpha1.Console) CommonLabels {
Expand All @@ -63,6 +72,11 @@ func (cl CommonLabels) AsClientSelector() k8slabels.Selector {
return k8slabels.SelectorFromSet(cl.selectorLabels())
}

// AsClientSelectorForNodeID returns a label selector that can find an individual pod with a specific node_id.
func (cl CommonLabels) AsClientSelectorForNodeID() k8slabels.Selector {
return k8slabels.SelectorFromSet(cl.selectorLabelsForNodeId())
}

// AsAPISelector returns label selector made out of subset of common labels: name, instance, component
// return type is metav1.LabelSelector type which is used in resource definition
func (cl CommonLabels) AsAPISelector() *metav1.LabelSelector {
Expand All @@ -83,6 +97,15 @@ func (cl CommonLabels) selectorLabels() k8slabels.Set {
}
}

func (cl CommonLabels) selectorLabelsForNodeId() k8slabels.Set {
return k8slabels.Set{
NameKey: cl[NameKey],
InstanceKey: cl[InstanceKey],
ComponentKey: cl[ComponentKey],
PodLabelNodeIDKey: cl[PodLabelNodeIDKey],
}
}

// merge merges two sets of labels
// if label is set in mainLabels, it won't be overwritten by newLabels
func merge(
Expand Down
4 changes: 3 additions & 1 deletion src/go/k8s/pkg/resources/resource_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func TestEnsure_StatefulSet(t *testing.T) {
adminutils.NewInternalAdminAPI,
time.Second,
ctrl.Log.WithName("test"),
0)
0,
true,
)

err = sts.Ensure(context.Background())
assert.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions src/go/k8s/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type StatefulSetResource struct {
metricsTimeout time.Duration

LastObservedState *appsv1.StatefulSet

deletePVCsAfterDecommission bool
}

// NewStatefulSet creates StatefulSetResource
Expand All @@ -134,6 +136,7 @@ func NewStatefulSet(
decommissionWaitInterval time.Duration,
logger logr.Logger,
metricsTimeout time.Duration,
deletePVCsAfterDecommission bool,
) *StatefulSetResource {
ssr := &StatefulSetResource{
client,
Expand All @@ -153,6 +156,7 @@ func NewStatefulSet(
logger.WithName("StatefulSetResource"),
defaultAdminAPITimeout,
nil,
deletePVCsAfterDecommission,
}
if metricsTimeout != 0 {
ssr.metricsTimeout = metricsTimeout
Expand Down
58 changes: 55 additions & 3 deletions src/go/k8s/pkg/resources/statefulset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ import (
"github.com/redpanda-data/common-go/rpadmin"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"

vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/src/go/k8s/api/vectorized/v1alpha1"
adminutils "github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/admin"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/labels"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/resources/featuregates"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/utils"
)

const (
Expand Down Expand Up @@ -193,6 +197,7 @@ func (r *StatefulSetResource) handleDecommissionInProgress(ctx context.Context,
// Before completing the process, it double-checks if the node is still not registered, for handling cases where the node was
// about to start when the decommissioning process started. If the broker is found, the process is restarted.
func (r *StatefulSetResource) handleDecommission(ctx context.Context, l logr.Logger) error {
// FIXME TODO make sure this pod belongs to this STS.
brokerID := r.pandaCluster.GetDecommissionBrokerID()
if brokerID == nil {
return nil
Expand All @@ -211,9 +216,56 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context, l logr.Log
}

if broker == nil {
log.Info("broker does not exist in the cluster")
r.pandaCluster.SetDecommissionBrokerID(nil)
return r.Status().Update(ctx, r.pandaCluster)
log.Info("Broker has finished decomissioning")

// If deletePVCsAfterDecommission is set, we want to delete the PVC if it's there.
// Since it was decomissioned, it is safe to delete that pod's PVC.
if r.deletePVCsAfterDecommission {
brokerIDStr := fmt.Sprint(*brokerID)
podList := &corev1.PodList{}
if err := r.Client.List(ctx, podList, &client.ListOptions{
Namespace: r.pandaCluster.Namespace,
LabelSelector: labels.ForCluster(r.pandaCluster).WithNodeID(brokerIDStr).AsClientSelectorForNodeID(),
}); err != nil {
return fmt.Errorf("failed to get decom pod: %w", err)
}

if len(podList.Items) == 1 {
pod := podList.Items[0]
if v := pod.Labels[labels.PodLabelNodeIDKey]; v == brokerIDStr {
if err = utils.DeletePodPVCs(ctx, r.Client, &pod, log); err != nil {
return fmt.Errorf(`unable to remove PVCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
}
log.Info("Deleted Pod PVCs", "pod", pod.Namespace+"/"+pod.Name)
} else {
return fmt.Errorf("bug: pod has unexpected node_id label with value %s", v)
}
} else {
log.Info("Skipping delete Pod PVCs, because no pod with broker_id annotation was found")
}
} else {
log.Info("falg turned off")
}

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
log.Info("clearing decomm broker ID")
cluster := &vectorizedv1alpha1.Cluster{}
err := r.Get(ctx, types.NamespacedName{
Name: r.pandaCluster.Name,
Namespace: r.pandaCluster.Namespace,
}, cluster)
if err != nil {
return err
}
cluster.Status.DecommissioningNode = nil
err = r.Status().Update(ctx, cluster)
if err == nil {
// sync original cluster variable to avoid conflicts on subsequent operations
r.pandaCluster = cluster
}
return err
})

}

if broker.MembershipStatus == rpadmin.MembershipStatusDraining {
Expand Down
14 changes: 9 additions & 5 deletions src/go/k8s/pkg/resources/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func TestEnsure(t *testing.T) {
},
time.Second,
ctrl.Log.WithName("test"),
0)
0,
true)

ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)

Expand Down Expand Up @@ -502,7 +503,9 @@ func TestCurrentVersion(t *testing.T) {
},
time.Second,
ctrl.Log.WithName("test"),
0)
0,
true,
)
sts.LastObservedState = &v1.StatefulSet{
Spec: v1.StatefulSetSpec{
Replicas: &tests[i].expectedReplicas,
Expand Down Expand Up @@ -754,7 +757,7 @@ func TestStatefulSetResource_IsManagedDecommission(t *testing.T) {
tt.fields.pandaCluster,
nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour,
tt.fields.logger,
time.Hour)
time.Hour, true)
got, err := r.IsManagedDecommission()
if (err != nil) != tt.wantErr {
t.Errorf("StatefulSetResource.IsManagedDecommission() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -848,7 +851,7 @@ func TestStatefulSetPorts_AdditionalListeners(t *testing.T) {
tt.pandaCluster,
nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour,
logger,
time.Hour)
time.Hour, true)
containerPorts := r.GetPortsForListenersInAdditionalConfig()
assert.Equal(t, len(tt.expectedContainerPorts), len(containerPorts))
for _, cp := range containerPorts {
Expand Down Expand Up @@ -922,7 +925,8 @@ func TestStatefulSetEnv_AdditionalListeners(t *testing.T) {
tt.pandaCluster,
nil, "", "", types.NamespacedName{}, nil, nil, "", resources.ConfiguratorSettings{}, nil, nil, time.Hour,
logger,
time.Hour)
time.Hour,
true)
envs := r.AdditionalListenersEnvVars()

if tt.expectedEnvValue == "" {
Expand Down
2 changes: 1 addition & 1 deletion src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPo
}

if err = utils.DeletePodPVCs(ctx, r.Client, pod, log); err != nil {
return fmt.Errorf(`unable to remove VPCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
return fmt.Errorf(`unable to remove PVCs for pod "%s/%s: %w"`, pod.GetNamespace(), pod.GetName(), err)
}

log.Info("deleting pod")
Expand Down

0 comments on commit 898cce6

Please sign in to comment.