Skip to content

Commit

Permalink
Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
triceras committed Jan 17, 2025
1 parent c8db5ef commit 5321939
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 154 deletions.
185 changes: 114 additions & 71 deletions controllers/humiocluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2419,94 +2419,67 @@ func (r *HumioClusterReconciler) cleanupOrphanedNodePoolPDBs(ctx context.Context
return nil
}

// reconcilePodDisruptionBudgets handles the creation and updates of PDBs for all node pools
// reconcilePodDisruptionBudgets handles PDB reconciliation for all node pools
func (r *HumioClusterReconciler) reconcilePodDisruptionBudgets(ctx context.Context, humioNodePools HumioNodePoolList) (ctrl.Result, error) {
// Handle all node pools
for _, hnp := range humioNodePools.Items {
if shouldCreatePDBForNodePool(hnp) {
if err := r.reconcileSinglePDB(ctx, hnp); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to reconcile PDB for node pool %s: %w", hnp.GetNodePoolName(), err)
}
r.Log.Info("reconciling pod disruption budgets")

// Handle PDBs for all node pools (including the default pool from NodeCount)
for _, pool := range humioNodePools.Items {
if err := r.reconcileSinglePDB(ctx, pool); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to reconcile PDB for node pool %s: %w", pool.GetNodePoolName(), err)
}
}

// Clean up orphaned PDBs
if len(humioNodePools.Items) > 0 {
clusterName := humioNodePools.Items[0].GetClusterName()
namespace := humioNodePools.Items[0].GetNamespace()
hc := &humiov1alpha1.HumioCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
},
}
if err := r.cleanupOrphanedNodePoolPDBs(ctx, hc); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to cleanup orphaned PDBs: %w", err)
}
// Clean up any orphaned PDBs
if err := r.cleanupOrphanedPDBs(ctx, humioNodePools); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to cleanup orphaned PDBs: %w", err)
}

return ctrl.Result{}, nil
}

// reconcileSinglePDB handles creation/update of a PDB for a single node pool
func (r *HumioClusterReconciler) reconcileSinglePDB(ctx context.Context, hnp *HumioNodePool) error {
func (r *HumioClusterReconciler) reconcileSinglePDB(ctx context.Context, pool *HumioNodePool) error {
if !shouldCreatePDBForNodePool(pool) {
r.Log.Info("skipping PDB creation - node pool does not need PDB",
"nodePool", pool.GetNodePoolName(),
"nodeCount", pool.GetNodeCount())
return nil
}

// Check if pods exist for this node pool
pods, err := kubernetes.ListPods(ctx, r, hnp.GetNamespace(), hnp.GetNodePoolLabels())
pods, err := kubernetes.ListPods(ctx, r, pool.GetNamespace(), pool.GetNodePoolLabels())
if err != nil {
return fmt.Errorf("failed to list pods for node pool %s: %w", hnp.GetNodePoolName(), err)
return fmt.Errorf("failed to list pods for node pool %s: %w", pool.GetNodePoolName(), err)
}

if len(pods) == 0 {
r.Log.Info("No pods found for node pool, skipping PDB creation", "nodePool", hnp.GetNodePoolName())
r.Log.Info("no pods found for node pool, skipping PDB creation",
"nodePool", pool.GetNodePoolName())
return nil
}

// Build the desired PDB
desiredPDB, err := r.buildNodePoolPDB(ctx, hnp)
desiredPDB, err := r.constructPDB(pool)
if err != nil {
return fmt.Errorf("failed to build PDB for node pool %s: %w", hnp.GetNodePoolName(), err)
return fmt.Errorf("failed to construct PDB for node pool %s: %w", pool.GetNodePoolName(), err)
}

// Get existing PDB if it exists
existingPDB := &policyv1.PodDisruptionBudget{}
err = r.Get(ctx, types.NamespacedName{Name: desiredPDB.Name, Namespace: hnp.GetNamespace()}, existingPDB)
if err != nil {
if k8serrors.IsNotFound(err) {
// Create new PDB
r.Log.Info("Creating PDB", "pdbName", desiredPDB.Name, "nodePool", hnp.GetNodePoolName())
if err := r.Create(ctx, desiredPDB); err != nil {
return fmt.Errorf("failed to create PDB %s: %w", desiredPDB.Name, err)
}
return nil
}
return fmt.Errorf("failed to get PDB %s: %w", desiredPDB.Name, err)
}

// Update existing PDB if needed
if !arePDBsEqual(existingPDB, desiredPDB) {
r.Log.Info("Updating PDB", "pdbName", desiredPDB.Name, "nodePool", hnp.GetNodePoolName())
existingPDB.Spec = desiredPDB.Spec
if err := r.Update(ctx, existingPDB); err != nil {
return fmt.Errorf("failed to update PDB %s: %w", desiredPDB.Name, err)
}
}

return nil
return r.createOrUpdatePDB(ctx, pool, desiredPDB)
}

// buildNodePoolPDB constructs a PodDisruptionBudget for the specified node pool
func (r *HumioClusterReconciler) buildNodePoolPDB(ctx context.Context, hnp *HumioNodePool) (*policyv1.PodDisruptionBudget, error) {
pdbName := fmt.Sprintf("%s%s", hnp.GetNodePoolName(), pdbNameSuffix)
// constructPDB creates a PDB object for the given node pool
func (r *HumioClusterReconciler) constructPDB(pool *HumioNodePool) (*policyv1.PodDisruptionBudget, error) {
pdbName := fmt.Sprintf("%s%s", pool.GetNodePoolName(), pdbNameSuffix)
if len(pdbName) > maxPDBNameLength {
return nil, fmt.Errorf("PDB name %s exceeds maximum length of %d characters", pdbName, maxPDBNameLength)
}

labels := hnp.GetNodePoolLabels()

labels := pool.GetNodePoolLabels()
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: pdbName,
Namespace: hnp.GetNamespace(),
Namespace: pool.GetNamespace(),
Labels: labels,
},
Spec: policyv1.PodDisruptionBudgetSpec{
Expand All @@ -2516,33 +2489,33 @@ func (r *HumioClusterReconciler) buildNodePoolPDB(ctx context.Context, hnp *Humi
},
}

// Get PDB configuration from the node pool
pdbConfig := hnp.GetPodDisruptionBudget()
// Apply PDB configuration from the node pool
pdbConfig := pool.GetPodDisruptionBudget()
if pdbConfig != nil {
if pdbConfig.MinAvailable != nil {
pdb.Spec.MinAvailable = pdbConfig.MinAvailable
} else if pdbConfig.MaxUnavailable != nil {
pdb.Spec.MaxUnavailable = pdbConfig.MaxUnavailable
} else {
// Set default MinAvailable if neither is specified
defaultMinAvailable := intstr.FromString(defaultMinAvailable)
pdb.Spec.MinAvailable = &defaultMinAvailable
defaultMin := intstr.FromString(defaultMinAvailable)
pdb.Spec.MinAvailable = &defaultMin
}

if pdbConfig.UnhealthyPodEvictionPolicy != nil {
pdb.Spec.UnhealthyPodEvictionPolicy = (*policyv1.UnhealthyPodEvictionPolicyType)(pdbConfig.UnhealthyPodEvictionPolicy)
}
} else {
// Set default MinAvailable if no PDB config provided
defaultMinAvailable := intstr.FromString(defaultMinAvailable)
pdb.Spec.MinAvailable = &defaultMinAvailable
defaultMin := intstr.FromString(defaultMinAvailable)
pdb.Spec.MinAvailable = &defaultMin
}

// Set controller reference
clusterRef := &humiov1alpha1.HumioCluster{
ObjectMeta: metav1.ObjectMeta{
Name: hnp.GetClusterName(),
Namespace: hnp.GetNamespace(),
Name: pool.GetClusterName(),
Namespace: pool.GetNamespace(),
},
}
if err := controllerutil.SetControllerReference(clusterRef, pdb, r.Scheme()); err != nil {
Expand All @@ -2552,21 +2525,91 @@ func (r *HumioClusterReconciler) buildNodePoolPDB(ctx context.Context, hnp *Humi
return pdb, nil
}

// arePDBsEqual checks if two PDBs have the same spec
func arePDBsEqual(existing, desired *policyv1.PodDisruptionBudget) bool {
// createOrUpdatePDB handles the creation or update of a PDB
func (r *HumioClusterReconciler) createOrUpdatePDB(ctx context.Context, pool *HumioNodePool, desiredPDB *policyv1.PodDisruptionBudget) error {
existingPDB := &policyv1.PodDisruptionBudget{}
err := r.Get(ctx, types.NamespacedName{
Name: desiredPDB.Name,
Namespace: desiredPDB.Namespace,
}, existingPDB)

if err != nil {
if k8serrors.IsNotFound(err) {
r.Log.Info("creating new PDB",
"pdbName", desiredPDB.Name,
"nodePool", pool.GetNodePoolName())
return r.Create(ctx, desiredPDB)
}
return fmt.Errorf("failed to get PDB %s: %w", desiredPDB.Name, err)
}

if !pdbSpecsEqual(existingPDB, desiredPDB) {
r.Log.Info("updating existing PDB",
"pdbName", desiredPDB.Name,
"nodePool", pool.GetNodePoolName())
existingPDB.Spec = desiredPDB.Spec
return r.Update(ctx, existingPDB)
}

return nil
}

// pdbSpecsEqual compares two PDB specs for equality
func pdbSpecsEqual(existing, desired *policyv1.PodDisruptionBudget) bool {
return reflect.DeepEqual(existing.Spec.MinAvailable, desired.Spec.MinAvailable) &&
reflect.DeepEqual(existing.Spec.MaxUnavailable, desired.Spec.MaxUnavailable) &&
reflect.DeepEqual(existing.Spec.Selector, desired.Spec.Selector) &&
reflect.DeepEqual(existing.Spec.UnhealthyPodEvictionPolicy, desired.Spec.UnhealthyPodEvictionPolicy)
}

// ensurePodDisruptionBudgets wraps reconcilePodDisruptionBudgets to match the ctxHumioClusterFunc signature
// cleanupOrphanedPDBs removes PDBs that don't correspond to any active node pools
func (r *HumioClusterReconciler) cleanupOrphanedPDBs(ctx context.Context, humioNodePools HumioNodePoolList) error {
if len(humioNodePools.Items) == 0 {
return nil
}

// Get cluster details from the first node pool
clusterName := humioNodePools.Items[0].GetClusterName()
namespace := humioNodePools.Items[0].GetNamespace()

// List all existing PDBs for this cluster
existingPDBs := &policyv1.PodDisruptionBudgetList{}
if err := r.List(ctx, existingPDBs,
client.InNamespace(namespace),
client.MatchingLabels(kubernetes.LabelsForHumio(clusterName))); err != nil {
return fmt.Errorf("failed to list PDBs: %w", err)
}

// Create a map of valid PDB names based on active node pools
validPDBs := make(map[string]bool)
for _, pool := range humioNodePools.Items {
if shouldCreatePDBForNodePool(pool) {
validPDBs[fmt.Sprintf("%s%s", pool.GetNodePoolName(), pdbNameSuffix)] = true
}
}

// Delete orphaned PDBs
for _, pdb := range existingPDBs.Items {
if !validPDBs[pdb.Name] {
r.Log.Info("deleting orphaned PDB", "pdbName", pdb.Name)
if err := r.Delete(ctx, &pdb); err != nil && !k8serrors.IsNotFound(err) {
return fmt.Errorf("failed to delete orphaned PDB %s: %w", pdb.Name, err)
}
}
}

return nil
}

// ensurePodDisruptionBudgets is the main entry point for PDB reconciliation
func (r *HumioClusterReconciler) ensurePodDisruptionBudgets(ctx context.Context, hc *humiov1alpha1.HumioCluster) error {
humioNodePools := getHumioNodePoolManagers(hc)
if result, err := r.reconcilePodDisruptionBudgets(ctx, humioNodePools); err != nil {
result, err := r.reconcilePodDisruptionBudgets(ctx, humioNodePools)
if err != nil {
return fmt.Errorf("failed to reconcile PDBs: %w", err)
} else if result.Requeue || result.RequeueAfter > 0 {
return fmt.Errorf("requeue requested by PDB reconciliation")
}
if result.Requeue || result.RequeueAfter > 0 {
return fmt.Errorf("requeue requested during PDB reconciliation")
}
return nil
}
Loading

0 comments on commit 5321939

Please sign in to comment.