Skip to content

Commit

Permalink
re-organize drain controller package
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Sch <[email protected]>
  • Loading branch information
SchSeba committed Nov 12, 2024
1 parent d10db66 commit 22f6efa
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 514 deletions.
189 changes: 16 additions & 173 deletions controllers/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -107,19 +106,23 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

// create the drain state annotation if it doesn't exist in the sriovNetworkNodeState object
nodeStateDrainAnnotationCurrent, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent)
nodeStateDrainAnnotationCurrent, nodeStateExist, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent)
if err != nil {
reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
return ctrl.Result{}, err
}

// create the drain state annotation if it doesn't exist in the node object
nodeDrainAnnotation, err := dr.ensureAnnotationExists(ctx, node, constants.NodeDrainAnnotation)
nodeDrainAnnotation, nodeExist, err := dr.ensureAnnotationExists(ctx, node, constants.NodeDrainAnnotation)
if err != nil {
reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
return ctrl.Result{}, err
}

// requeue the request if we needed to add any of the annotations
if !nodeExist || !nodeStateExist {
return ctrl.Result{Requeue: true}, nil
}
reqLogger.V(2).Info("Drain annotations", "nodeAnnotation", nodeDrainAnnotation, "nodeStateAnnotation", nodeStateDrainAnnotationCurrent)

// Check the node request
Expand All @@ -141,98 +144,14 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// doesn't need to drain anymore, so we can stop the drain
if nodeStateDrainAnnotationCurrent == constants.DrainComplete ||
nodeStateDrainAnnotationCurrent == constants.Draining {
completed, err := dr.drainer.CompleteDrainNode(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to complete drain on node")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"failed to drain node")
return ctrl.Result{}, err
}

// if we didn't manage to complete the un drain of the node we retry
if !completed {
reqLogger.Info("complete drain was not completed re queueing the request")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node complete drain was not completed")
// TODO: make this time configurable
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

// move the node state back to idle
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle)
return ctrl.Result{}, err
}

reqLogger.Info("completed the un drain for node")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node un drain completed")
return ctrl.Result{}, nil
}
} else if nodeDrainAnnotation == constants.DrainRequired || nodeDrainAnnotation == constants.RebootRequired {
// this cover the case a node request to drain or reboot

// nothing to do here we need to wait for the node to move back to idle
if nodeStateDrainAnnotationCurrent == constants.DrainComplete {
reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo")
return ctrl.Result{}, nil
}

// we need to start the drain, but first we need to check that we can drain the node
if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
result, err := dr.tryDrainNode(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to check if we can drain the node")
return ctrl.Result{}, err
}

// in case we need to wait because we just to the max number of draining nodes
if result != nil {
return *result, nil
}
}

// class the drain function that will also call drain to other platform providers like openshift
drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired)
if err != nil {
reqLogger.Error(err, "error trying to drain the node")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"failed to drain node")
return reconcile.Result{}, err
}

// if we didn't manage to complete the drain of the node we retry
if !drained {
reqLogger.Info("the nodes was not drained re queueing the request")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node drain operation was not completed")
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

// if we manage to drain we label the node state with drain completed and finish
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete)
return ctrl.Result{}, err
return dr.handleNodeIdleNodeStateDrainingOrCompleted(ctx, &reqLogger, node, nodeNetworkState)
}
}

reqLogger.Info("node drained successfully")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node drain completed")
return ctrl.Result{}, nil
// this cover the case a node request to drain or reboot
if nodeDrainAnnotation == constants.DrainRequired ||
nodeDrainAnnotation == constants.RebootRequired {
return dr.handleNodeDrainOrReboot(ctx, &reqLogger, node, nodeNetworkState, nodeDrainAnnotation, nodeStateDrainAnnotationCurrent)
}

reqLogger.Error(nil, "unexpected node drain annotation")
Expand All @@ -250,93 +169,17 @@ func (dr *DrainReconcile) getObject(ctx context.Context, req ctrl.Request, objec
return true, nil
}

func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object client.Object, key string) (string, error) {
func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object client.Object, key string) (string, bool, error) {
value, exist := object.GetAnnotations()[key]
if !exist {
err := utils.AnnotateObject(ctx, object, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
return "", err
}
return constants.DrainIdle, nil
}

return value, nil
}

func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) {
// configure logs
reqLogger := log.FromContext(ctx)
reqLogger.Info("checkForNodeDrain():")

//critical section we need to check if we can start the draining
dr.drainCheckMutex.Lock()
defer dr.drainCheckMutex.Unlock()

// find the relevant node pool
nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to find the pool for the requested node")
return nil, err
}

// check how many nodes we can drain in parallel for the specific pool
maxUnv, err := nodePool.MaxUnavailable(len(nodeList))
if err != nil {
reqLogger.Error(err, "failed to calculate max unavailable")
return nil, err
}

current := 0
snns := &sriovnetworkv1.SriovNetworkNodeState{}

var currentSnns *sriovnetworkv1.SriovNetworkNodeState
for _, nodeObj := range nodeList {
err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns)
if err != nil {
if errors.IsNotFound(err) {
reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy")
continue
}
return nil, err
}

if snns.GetName() == node.GetName() {
currentSnns = snns.DeepCopy()
}

if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) ||
utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) {
current++
return "", false, err
}
return constants.DrainIdle, false, nil
}
reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv)
reqLogger.Info("Count of draining", "drainingNodes", current)

// if maxUnv is zero this means we drain all the nodes in parallel without a limit
if maxUnv == -1 {
reqLogger.Info("draining all the nodes in parallel")
} else if current >= maxUnv {
// the node requested to be drained, but we are at the limit so we re-enqueue the request
reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request")
// TODO: make this time configurable
return &reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

if currentSnns == nil {
return nil, fmt.Errorf("failed to find sriov network node state for requested node")
}

err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining)
return nil, err
}

return nil, nil
}

func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) {
return findNodePoolConfig(ctx, node, dr.Client)
return value, true, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading

0 comments on commit 22f6efa

Please sign in to comment.