Skip to content

Commit

Permalink
remove DecommissionPod
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Sep 4, 2024
1 parent 171da55 commit c9402d4
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 31 deletions.
11 changes: 0 additions & 11 deletions src/go/k8s/api/vectorized/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,6 @@ type ClusterStatus struct {
// Indicates that a node is currently being decommissioned from the cluster and provides its ordinal number
// +optional
DecommissioningNode *int32 `json:"decommissioningNode,omitempty"`
// Indicates the pod that hosts the node we are currently decommissioning
// +optional
DecommissioningPod string `json:"decommissioningPod,omitempty"`
// Current version of the cluster.
// +optional
Version string `json:"version"`
Expand Down Expand Up @@ -1444,14 +1441,6 @@ func (r *Cluster) SetDecommissionBrokerID(id *int32) {
r.Status.DecommissioningNode = id
}

func (r *Cluster) GetDecomissionningPod() string {
return r.Status.DecommissioningPod
}

func (r *Cluster) SetDecommissioningPod(pod string) {
r.Status.DecommissioningPod = pod
}

func (r *Cluster) GetNodePools() []*NodePoolSpec {
out := make([]*NodePoolSpec, 0)
if r.Spec.Replicas != nil && *r.Spec.Replicas > 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1528,10 +1528,6 @@ spec:
from the cluster and provides its ordinal number
format: int32
type: integer
decommissioningPod:
description: Indicates the pod that hosts the node we are currently
decommissioning
type: string
nodePools:
additionalProperties:
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ func getQuiescentCondition(redpandaCluster *vectorizedv1alpha1.Cluster) vectoriz
if redpandaCluster.Status.DecommissioningNode != nil {
condition.Status = corev1.ConditionFalse
condition.Reason = "DecommissioningInProgress"
condition.Message = fmt.Sprintf("Decommissioning of node_id=%d in progress", redpandaCluster.Status.DecommissioningNode)
condition.Message = fmt.Sprintf("Decommissioning of node_id=%d in progress", *redpandaCluster.Status.DecommissioningNode)
return condition
}

Expand Down
3 changes: 3 additions & 0 deletions src/go/k8s/pkg/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
// NodePoolKey is used to document the node pool associated with the StatefulSet.
NodePoolKey = "cluster.redpanda.com/nodepool"

// PodLabelNodeIDKey
PodLabelNodeIDKey = "operator.redpanda.com/node-id"

nameKeyRedpandaVal = "redpanda"
nameKeyConsoleVal = "redpanda-console"
managedByOperatorVal = "redpanda-operator"
Expand Down
63 changes: 50 additions & 13 deletions src/go/k8s/pkg/resources/statefulset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,22 @@ import (
"context"
"errors"
"fmt"
"slices"
"strconv"
"strings"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
"github.com/redpanda-data/common-go/rpadmin"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/src/go/k8s/api/vectorized/v1alpha1"
adminutils "github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/admin"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/labels"

"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/patch"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/resources/featuregates"
Expand Down Expand Up @@ -169,12 +172,10 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error {
return err
}
cluster.SetDecommissionBrokerID(targetBroker)
cluster.SetDecommissioningPod(podName)
err = r.Status().Update(ctx, cluster)
if err == nil {
// sync original cluster variable to avoid conflicts on subsequent operations
r.pandaCluster.SetDecommissionBrokerID(targetBroker)
r.pandaCluster.SetDecommissioningPod(podName)
}
return err
})
Expand All @@ -185,27 +186,54 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error {
return nil
}

// getDecomPod finds the pod that belongs to *THIS* StatefulSet. If none is found - nil,nil is returned.
func (r *StatefulSetResource) getDecomPod(ctx context.Context, brokerID *int32) (*corev1.Pod, error) {
var brokerPod *corev1.Pod
// Get all pods of *THIS* nodePool
podList, err := r.getPodList(ctx)
if err != nil {
return nil, err
}
// Filter out those where NodeID != DecomNodeID.
podList.Items = slices.DeleteFunc(podList.Items, func(pod corev1.Pod) bool {
return pod.Annotations[labels.PodLabelNodeIDKey] == strconv.Itoa(int(*brokerID))
})
if len(podList.Items) > 0 {
brokerPod = &podList.Items[0]
}

return brokerPod, nil
}

func (r *StatefulSetResource) handleDecommissionInProgress(ctx context.Context, l logr.Logger) error {
log := l.WithName("handleDecommissionInProgress").WithValues("nodepool", r.nodePool.Name)
brokerID := r.pandaCluster.GetDecommissionBrokerID()
if brokerID == nil {
return nil
}

brokerPod := r.pandaCluster.GetDecomissionningPod()
if brokerPod == "" {
return fmt.Errorf("bug: status.decommissioningPod is empty, but decommissioningBroker ID is not")
brokerPod, err := r.getDecomPod(ctx, brokerID)
if err != nil {
return fmt.Errorf("failed to get decom pod: %w", err)
}

// FIXME use nodepool label on pod.
_, err := strconv.Atoi(strings.TrimPrefix(brokerPod, r.LastObservedState.Name+"-")) // jb FIXME: use some better mechanism to get the pods for this pool.
if !strings.HasPrefix(brokerPod, r.LastObservedState.Name) || err != nil { // Need to be really strict. If default nodepool is reconciled, the pod of a different nodepool would also have the prefix. Make sure, it's prefix + - + ordinal.
log.Info("broker is not part of this nodepool. Ignoring", "brokerID", brokerID, "pod name", brokerPod)

// Decom Pod is from another NodePool. Ignore.
if brokerPod == nil {
// log.Info("Decommission in progress, but could not find a NodePool pod hosting that Broker ID. Skipping for this NodePool.", "brokerID", brokerID, "pod name", brokerPod)
// We must return an error, or logic will continue to run, and we decom another broker, even if one is in progress already
return fmt.Errorf("decom is in progress, but it's part of this nodepool. erroring so this is retried. only one decom cluster-wide is allowed")
// return fmt.Errorf("decom is in progress, but it's part of this nodepool. erroring so this is retried. only one decom cluster-wide is allowed")
return nil
}

// // FIXME use nodepool label on pod.
// _, err := strconv.Atoi(strings.TrimPrefix(brokerPod, r.LastObservedState.Name+"-")) // jb FIXME: use some better mechanism to get the pods for this pool.
// if !strings.HasPrefix(brokerPod, r.LastObservedState.Name) || err != nil { // Need to be really strict. If default nodepool is reconciled, the pod of a different nodepool would also have the prefix. Make sure, it's prefix + - + ordinal.
// log.Info("broker is not part of this nodepool. Ignoring", "brokerID", brokerID, "pod name", brokerPod)
//
// // We must return an error, or logic will continue to run, and we decom another broker, even if one is in progress already
// return fmt.Errorf("decom is in progress, but it's part of this nodepool. erroring so this is retried. only one decom cluster-wide is allowed")
// }

if !r.nodePool.Removed && *r.nodePool.Replicas >= r.pandaCluster.Status.NodePools[r.LastObservedState.Name].CurrentReplicas { // NodePool name is NOT equal to STS name, or not supposed to be.
// Decommissioning can also be canceled and we need to recommission
err := r.handleRecommission(ctx)
Expand Down Expand Up @@ -259,9 +287,20 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context, l logr.Log
if brokerID == nil {
return nil
}

log := l.WithName("handleDecommission").WithValues("node_id", *brokerID)
log.Info("handling broker decommissioning")

decomPod, err := r.getDecomPod(ctx, brokerID)
if err != nil {
return fmt.Errorf("failed to get decom pod: %w", err)
}

// Ignore - decom pod is from another NodePool
if decomPod == nil {
return fmt.Errorf("handleDecommission invoked but decomPod not found")
}

log.Info("Getting admin api client")
adminAPI, err := r.getAdminAPIClient(ctx)
if err != nil {
Expand All @@ -286,7 +325,6 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context, l logr.Log
return err
}
cluster.Status.DecommissioningNode = nil
cluster.Status.DecommissioningPod = ""
err = r.Status().Update(ctx, cluster)
if err == nil {
log.Info("Cleared decomm broker ID from status")
Expand Down Expand Up @@ -364,7 +402,6 @@ func (r *StatefulSetResource) handleRecommission(ctx context.Context) error {
if broker.MembershipStatus == rpadmin.MembershipStatusActive {
log.Info("Recommissioning process successfully completed")
r.pandaCluster.SetDecommissionBrokerID(nil)
r.pandaCluster.SetDecommissioningPod("")
return r.Status().Update(ctx, r.pandaCluster)
}

Expand Down
2 changes: 0 additions & 2 deletions src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ func (r *StatefulSetResource) isClusterHealthy(ctx context.Context) error {
return nil
}

// FIXME: needs to take into account the node pool
func (r *StatefulSetResource) getPodList(ctx context.Context) (*corev1.PodList, error) {
var podList corev1.PodList
err := r.List(ctx, &podList, &k8sclient.ListOptions{
Expand Down Expand Up @@ -518,7 +517,6 @@ func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPo
return fmt.Errorf("cannot get broker id for pod: %w", err)
}
r.pandaCluster.SetDecommissionBrokerID(id)
r.pandaCluster.SetDecommissioningPod(pod.Name)

if err = r.handleDecommission(ctx, log); err != nil {
return err
Expand Down

0 comments on commit c9402d4

Please sign in to comment.