Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MCP pause case and re-organize drain controller package #803

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 17 additions & 174 deletions controllers/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -107,19 +106,23 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

// create the drain state annotation if it doesn't exist in the sriovNetworkNodeState object
nodeStateDrainAnnotationCurrent, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent)
nodeStateDrainAnnotationCurrent, nodeStateExist, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent)
if err != nil {
reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
return ctrl.Result{}, err
}

// create the drain state annotation if it doesn't exist in the node object
nodeDrainAnnotation, err := dr.ensureAnnotationExists(ctx, node, constants.NodeDrainAnnotation)
nodeDrainAnnotation, nodeExist, err := dr.ensureAnnotationExists(ctx, node, constants.NodeDrainAnnotation)
if err != nil {
reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
return ctrl.Result{}, err
}

// requeue the request if we needed to add any of the annotations
if !nodeExist || !nodeStateExist {
return ctrl.Result{Requeue: true}, nil
}
reqLogger.V(2).Info("Drain annotations", "nodeAnnotation", nodeDrainAnnotation, "nodeStateAnnotation", nodeStateDrainAnnotationCurrent)

// Check the node request
Expand All @@ -141,98 +144,14 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// doesn't need to drain anymore, so we can stop the drain
if nodeStateDrainAnnotationCurrent == constants.DrainComplete ||
nodeStateDrainAnnotationCurrent == constants.Draining {
completed, err := dr.drainer.CompleteDrainNode(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to complete drain on node")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"failed to drain node")
return ctrl.Result{}, err
}

// if we didn't manage to complete the un drain of the node we retry
if !completed {
reqLogger.Info("complete drain was not completed re queueing the request")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node complete drain was not completed")
// TODO: make this time configurable
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

// move the node state back to idle
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle)
return ctrl.Result{}, err
}

reqLogger.Info("completed the un drain for node")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node un drain completed")
return ctrl.Result{}, nil
}
} else if nodeDrainAnnotation == constants.DrainRequired || nodeDrainAnnotation == constants.RebootRequired {
// this cover the case a node request to drain or reboot

// nothing to do here we need to wait for the node to move back to idle
if nodeStateDrainAnnotationCurrent == constants.DrainComplete {
reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo")
return ctrl.Result{}, nil
}

// we need to start the drain, but first we need to check that we can drain the node
if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
result, err := dr.tryDrainNode(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to check if we can drain the node")
return ctrl.Result{}, err
}

// in case we need to wait because we just to the max number of draining nodes
if result != nil {
return *result, nil
}
}

// class the drain function that will also call drain to other platform providers like openshift
drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired)
if err != nil {
reqLogger.Error(err, "error trying to drain the node")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"failed to drain node")
return reconcile.Result{}, err
}

// if we didn't manage to complete the drain of the node we retry
if !drained {
reqLogger.Info("the nodes was not drained re queueing the request")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node drain operation was not completed")
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

// if we manage to drain we label the node state with drain completed and finish
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete)
return ctrl.Result{}, err
return dr.handleNodeIdleNodeStateDrainingOrCompleted(ctx, &reqLogger, node, nodeNetworkState)
}
}

reqLogger.Info("node drained successfully")
dr.recorder.Event(nodeNetworkState,
corev1.EventTypeWarning,
"DrainController",
"node drain completed")
return ctrl.Result{}, nil
// this cover the case a node request to drain or reboot
if nodeDrainAnnotation == constants.DrainRequired ||
nodeDrainAnnotation == constants.RebootRequired {
return dr.handleNodeDrainOrReboot(ctx, &reqLogger, node, nodeNetworkState, nodeDrainAnnotation, nodeStateDrainAnnotationCurrent)
}

reqLogger.Error(nil, "unexpected node drain annotation")
Expand All @@ -250,93 +169,17 @@ func (dr *DrainReconcile) getObject(ctx context.Context, req ctrl.Request, objec
return true, nil
}

func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object client.Object, key string) (string, error) {
func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object client.Object, key string) (string, bool, error) {
value, exist := object.GetAnnotations()[key]
if !exist {
err := utils.AnnotateObject(ctx, object, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
return "", err
}
return constants.DrainIdle, nil
}

return value, nil
}

func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) {
// configure logs
reqLogger := log.FromContext(ctx)
reqLogger.Info("checkForNodeDrain():")

//critical section we need to check if we can start the draining
dr.drainCheckMutex.Lock()
defer dr.drainCheckMutex.Unlock()

// find the relevant node pool
nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to find the pool for the requested node")
return nil, err
}

// check how many nodes we can drain in parallel for the specific pool
maxUnv, err := nodePool.MaxUnavailable(len(nodeList))
if err != nil {
reqLogger.Error(err, "failed to calculate max unavailable")
return nil, err
}

current := 0
snns := &sriovnetworkv1.SriovNetworkNodeState{}

var currentSnns *sriovnetworkv1.SriovNetworkNodeState
for _, nodeObj := range nodeList {
err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns)
err := utils.AnnotateObject(ctx, object, key, constants.DrainIdle, dr.Client)
if err != nil {
if errors.IsNotFound(err) {
reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy")
continue
}
return nil, err
}

if snns.GetName() == node.GetName() {
currentSnns = snns.DeepCopy()
}

if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) ||
utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) {
current++
return "", false, err
}
return constants.DrainIdle, false, nil
}
reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv)
reqLogger.Info("Count of draining", "drainingNodes", current)

// if maxUnv is zero this means we drain all the nodes in parallel without a limit
if maxUnv == -1 {
reqLogger.Info("draining all the nodes in parallel")
} else if current >= maxUnv {
// the node requested to be drained, but we are at the limit so we re-enqueue the request
reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request")
// TODO: make this time configurable
return &reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

if currentSnns == nil {
return nil, fmt.Errorf("failed to find sriov network node state for requested node")
}

err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining)
return nil, err
}

return nil, nil
}

func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) {
return findNodePoolConfig(ctx, node, dr.Client)
return value, true, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading
Loading