From 95e9d8411432ded11b62fc997d1ae3048914d253 Mon Sep 17 00:00:00 2001 From: Sebastian Sch Date: Thu, 25 Jul 2024 11:20:08 +0300 Subject: [PATCH] This commit introduces a new redesign on how the operator resets the device plugin * use a general nodeSelector to avoid updating the daemonset yaml * remove the config-daemon removing pod (better security) * make the operator in charge of resetting the device plugin via annotations * mark the node as cordon BEFORE we remove the device plugin (without drain) to avoid scheduling new pods until the device plugin is backed up Signed-off-by: Sebastian Sch --- controllers/drain_controller.go | 272 +--------- controllers/drain_controller_helper.go | 467 ++++++++++++++++++ controllers/drain_controller_test.go | 105 ++++ controllers/helper.go | 119 +---- controllers/helper_test.go | 330 ------------- .../sriovnetworknodepolicy_controller.go | 4 - controllers/sriovoperatorconfig_controller.go | 2 +- .../sriovoperatorconfig_controller_test.go | 87 ++-- controllers/suite_test.go | 7 + deploy/clusterrole.yaml | 6 - .../templates/clusterrole.yaml | 6 - pkg/consts/constants.go | 5 + pkg/daemon/daemon.go | 87 +--- pkg/daemon/daemon_test.go | 36 +- pkg/drain/drainer.go | 19 +- pkg/platforms/openshift/openshift.go | 12 + pkg/utils/cluster.go | 46 +- test/conformance/tests/test_sriov_operator.go | 2 + 18 files changed, 759 insertions(+), 853 deletions(-) create mode 100644 controllers/drain_controller_helper.go delete mode 100644 controllers/helper_test.go diff --git a/controllers/drain_controller.go b/controllers/drain_controller.go index 86da909d80..cc4a528265 100644 --- a/controllers/drain_controller.go +++ b/controllers/drain_controller.go @@ -20,12 +20,10 @@ import ( "context" "fmt" "sync" - "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" @@ -136,11 +134,11 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl if nodeDrainAnnotation == constants.DrainIdle { // this cover the case the node is on idle - // node request to be on idle and the currect state is idle - // we don't do anything + // node request to be on idle and the current state is idle + // in this case we check if vf configuration exist in the nodeState + // if not we remove the device plugin node selector label from the node if nodeStateDrainAnnotationCurrent == constants.DrainIdle { - reqLogger.Info("node and nodeState are on idle nothing todo") - return reconcile.Result{}, nil + return dr.handleNodeIdleNodeStateIdle(ctx, &reqLogger, node, nodeNetworkState) } // we have two options here: @@ -151,98 +149,19 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // doesn't need to drain anymore, so we can stop the drain if nodeStateDrainAnnotationCurrent == constants.DrainComplete || nodeStateDrainAnnotationCurrent == constants.Draining { - completed, err := dr.drainer.CompleteDrainNode(ctx, node) - if err != nil { - reqLogger.Error(err, "failed to complete drain on node") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "failed to drain node") - return ctrl.Result{}, err - } - - // if we didn't manage to complete the un drain of the node we retry - if !completed { - reqLogger.Info("complete drain was not completed re queueing the request") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "node complete drain was not completed") - // TODO: make this time configurable - return reconcile.Result{RequeueAfter: 5 * time.Second}, nil - } - - // move the node state back to idle - err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client) - if err != nil { - reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle) - return ctrl.Result{}, err - } - - reqLogger.Info("completed the un drain for node") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "node un drain completed") - return ctrl.Result{}, nil - } - } else if nodeDrainAnnotation == constants.DrainRequired || nodeDrainAnnotation == constants.RebootRequired { - // this cover the case a node request to drain or reboot - - // nothing to do here we need to wait for the node to move back to idle - if nodeStateDrainAnnotationCurrent == constants.DrainComplete { - reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo") - return ctrl.Result{}, nil - } - - // we need to start the drain, but first we need to check that we can drain the node - if nodeStateDrainAnnotationCurrent == constants.DrainIdle { - result, err := dr.tryDrainNode(ctx, node) - if err != nil { - reqLogger.Error(err, "failed to check if we can drain the node") - return ctrl.Result{}, err - } - - // in case we need to wait because we just to the max number of draining nodes - if result != nil { - return *result, nil - } - } - - // class the drain function that will also call drain to other platform providers like openshift - drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired) - if err != nil { - reqLogger.Error(err, "error trying to drain the node") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "failed to drain node") - return reconcile.Result{}, err - } - - // if we didn't manage to complete the drain of the node we retry - if !drained { - reqLogger.Info("the nodes was not drained re queueing the request") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "node drain operation was not completed") - return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + return dr.handleNodeIdleNodeStateDrainingOrCompleted(ctx, &reqLogger, node, nodeNetworkState) } + } - // if we manage to drain we label the node state with drain completed and finish - err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client) - if err != nil { - reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete) - return ctrl.Result{}, err - } + // this cover the case a node request to drain or reboot + if nodeDrainAnnotation == constants.DrainRequired || + nodeDrainAnnotation == constants.RebootRequired { + return dr.handleNodeDrainOrReboot(ctx, &reqLogger, node, nodeNetworkState, nodeDrainAnnotation, nodeStateDrainAnnotationCurrent) + } - reqLogger.Info("node drained successfully") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "node drain completed") - return ctrl.Result{}, nil + // this cover the case a node request to only reset the device plugin + if nodeDrainAnnotation == constants.DevicePluginResetRequired { + return dr.handleNodeDPReset(ctx, &reqLogger, node, nodeNetworkState, nodeStateDrainAnnotationCurrent) } reqLogger.Error(nil, "unexpected node drain annotation") @@ -273,169 +192,6 @@ func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object cli return value, nil } -func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) { - // configure logs - reqLogger := log.FromContext(ctx) - reqLogger.Info("checkForNodeDrain():") - - //critical section we need to check if we can start the draining - dr.drainCheckMutex.Lock() - defer dr.drainCheckMutex.Unlock() - - // find the relevant node pool - nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node) - if err != nil { - reqLogger.Error(err, "failed to find the pool for the requested node") - return nil, err - } - - // check how many nodes we can drain in parallel for the specific pool - maxUnv, err := nodePool.MaxUnavailable(len(nodeList)) - if err != nil { - reqLogger.Error(err, "failed to calculate max unavailable") - return nil, err - } - - current := 0 - snns := &sriovnetworkv1.SriovNetworkNodeState{} - - var currentSnns *sriovnetworkv1.SriovNetworkNodeState - for _, nodeObj := range nodeList { - err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns) - if err != nil { - if errors.IsNotFound(err) { - reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy") - continue - } - return nil, err - } - - if snns.GetName() == node.GetName() { - currentSnns = snns.DeepCopy() - } - - if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) || - utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) { - current++ - } - } - reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv) - reqLogger.Info("Count of draining", "drainingNodes", current) - - // if maxUnv is zero this means we drain all the nodes in parallel without a limit - if maxUnv == -1 { - reqLogger.Info("draining all the nodes in parallel") - } else if current >= maxUnv { - // the node requested to be drained, but we are at the limit so we re-enqueue the request - reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request") - // TODO: make this time configurable - return &reconcile.Result{RequeueAfter: 5 * time.Second}, nil - } - - if currentSnns == nil { - return nil, fmt.Errorf("failed to find sriov network node state for requested node") - } - - err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client) - if err != nil { - reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining) - return nil, err - } - - return nil, nil -} - -func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) { - logger := log.FromContext(ctx) - logger.Info("findNodePoolConfig():") - // get all the sriov network pool configs - npcl := &sriovnetworkv1.SriovNetworkPoolConfigList{} - err := dr.List(ctx, npcl) - if err != nil { - logger.Error(err, "failed to list sriovNetworkPoolConfig") - return nil, nil, err - } - - selectedNpcl := []*sriovnetworkv1.SriovNetworkPoolConfig{} - nodesInPools := map[string]interface{}{} - - for _, npc := range npcl.Items { - // we skip hw offload objects - if npc.Spec.OvsHardwareOffloadConfig.Name != "" { - continue - } - - if npc.Spec.NodeSelector == nil { - npc.Spec.NodeSelector = &metav1.LabelSelector{} - } - - selector, err := metav1.LabelSelectorAsSelector(npc.Spec.NodeSelector) - if err != nil { - logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", npc.Spec.NodeSelector) - return nil, nil, err - } - - if selector.Matches(labels.Set(node.Labels)) { - selectedNpcl = append(selectedNpcl, npc.DeepCopy()) - } - - nodeList := &corev1.NodeList{} - err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector}) - if err != nil { - logger.Error(err, "failed to list all the nodes matching the pool with label selector from nodeSelector", - "machineConfigPoolName", npc, - "nodeSelector", npc.Spec.NodeSelector) - return nil, nil, err - } - - for _, nodeName := range nodeList.Items { - nodesInPools[nodeName.Name] = nil - } - } - - if len(selectedNpcl) > 1 { - // don't allow the node to be part of multiple pools - err = fmt.Errorf("node is part of more then one pool") - logger.Error(err, "multiple pools founded for a specific node", "numberOfPools", len(selectedNpcl), "pools", selectedNpcl) - return nil, nil, err - } else if len(selectedNpcl) == 1 { - // found one pool for our node - logger.V(2).Info("found sriovNetworkPool", "pool", *selectedNpcl[0]) - selector, err := metav1.LabelSelectorAsSelector(selectedNpcl[0].Spec.NodeSelector) - if err != nil { - logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", selectedNpcl[0].Spec.NodeSelector) - return nil, nil, err - } - - // list all the nodes that are also part of this pool and return them - nodeList := &corev1.NodeList{} - err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector}) - if err != nil { - logger.Error(err, "failed to list nodes using with label selector", "labelSelector", selector) - return nil, nil, err - } - - return selectedNpcl[0], nodeList.Items, nil - } else { - // in this case we get all the nodes and remove the ones that already part of any pool - logger.V(1).Info("node doesn't belong to any pool, using default drain configuration with MaxUnavailable of one", "pool", *defaultNpcl) - nodeList := &corev1.NodeList{} - err = dr.List(ctx, nodeList) - if err != nil { - logger.Error(err, "failed to list all the nodes") - return nil, nil, err - } - - defaultNodeLists := []corev1.Node{} - for _, nodeObj := range nodeList.Items { - if _, exist := nodesInPools[nodeObj.Name]; !exist { - defaultNodeLists = append(defaultNodeLists, nodeObj) - } - } - return defaultNpcl, defaultNodeLists, nil - } -} - // SetupWithManager sets up the controller with the Manager. func (dr *DrainReconcile) SetupWithManager(mgr ctrl.Manager) error { createUpdateEnqueue := handler.Funcs{ diff --git a/controllers/drain_controller_helper.go b/controllers/drain_controller_helper.go new file mode 100644 index 0000000000..3ee72b21e1 --- /dev/null +++ b/controllers/drain_controller_helper.go @@ -0,0 +1,467 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" +) + +func (dr *DrainReconcile) handleNodeIdleNodeStateIdle(ctx context.Context, + reqLogger *logr.Logger, + node *corev1.Node, + nodeNetworkState *sriovnetworkv1.SriovNetworkNodeState) (ctrl.Result, error) { + // in case we have policy there is nothing else to do + if len(nodeNetworkState.Spec.Interfaces) > 0 { + reqLogger.Info("node and nodeState are on idle nothing todo") + return reconcile.Result{}, nil + } + + // if we don't have any policy + // let's be sure the device plugin label doesn't exist on the node + reqLogger.Info("remove Device plugin from node nodeState spec is empty") + err := utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelDisabled, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to label node for device plugin label", + "labelKey", + constants.SriovDevicePluginLabel, + "labelValue", + constants.SriovDevicePluginLabelDisabled) + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +func (dr *DrainReconcile) handleNodeIdleNodeStateDrainingOrCompleted(ctx context.Context, + reqLogger *logr.Logger, + node *corev1.Node, + nodeNetworkState *sriovnetworkv1.SriovNetworkNodeState) (ctrl.Result, error) { + completed, err := dr.drainer.CompleteDrainNode(ctx, node) + if err != nil { + reqLogger.Error(err, "failed to complete drain on node") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "failed to drain node") + return ctrl.Result{}, err + } + + // if we didn't manage to complete the un drain of the node we retry + if !completed { + reqLogger.Info("complete drain was not completed re queueing the request") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node complete drain was not completed") + // TODO: make this time configurable + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + + // check the device plugin exited and enable it again + // only of we have something in the node state spec + if len(nodeNetworkState.Spec.Interfaces) > 0 { + completed, err = dr.enableSriovDevicePlugin(ctx, node) + if err != nil { + reqLogger.Error(err, "failed to enable SriovDevicePlugin") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "failed to enable SriovDevicePlugin") + return ctrl.Result{}, err + } + + if !completed { + reqLogger.Info("sriov device plugin enable was not completed") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "sriov device plugin enable was not completed") + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + } + + // move the node state back to idle + err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle) + return ctrl.Result{}, err + } + + reqLogger.Info("completed the un drain for node") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node un drain completed") + return ctrl.Result{Requeue: true}, nil +} + +func (dr *DrainReconcile) handleNodeDrainOrReboot(ctx context.Context, + reqLogger *logr.Logger, + node *corev1.Node, + nodeNetworkState *sriovnetworkv1.SriovNetworkNodeState, + nodeDrainAnnotation, + nodeStateDrainAnnotationCurrent string) (ctrl.Result, error) { + // nothing to do here we need to wait for the node to move back to idle + if nodeStateDrainAnnotationCurrent == constants.DrainComplete { + reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo") + return ctrl.Result{}, nil + } + + // we need to start the drain, but first we need to check that we can drain the node + if nodeStateDrainAnnotationCurrent == constants.DrainIdle { + result, err := dr.tryDrainNode(ctx, node) + if err != nil { + reqLogger.Error(err, "failed to check if we can drain the node") + return ctrl.Result{}, err + } + + // in case we need to wait because we just to the max number of draining nodes + if result != nil { + return *result, nil + } + } + + // call the drain function that will also call drain to other platform providers like openshift + drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired) + if err != nil { + reqLogger.Error(err, "error trying to drain the node") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "failed to drain node") + return reconcile.Result{}, err + } + + // if we didn't manage to complete the drain of the node we retry + if !drained { + reqLogger.Info("the nodes was not drained re queueing the request") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node drain operation was not completed") + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + + reqLogger.Info("remove Device plugin from node") + err = utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelDisabled, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to label node for device plugin label", + "labelKey", + constants.SriovDevicePluginLabel, + "labelValue", + constants.SriovDevicePluginLabelDisabled) + return reconcile.Result{}, err + } + + // if we manage to drain we label the node state with drain completed and finish + err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete) + return ctrl.Result{}, err + } + + reqLogger.Info("node drained successfully") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node drain completed") + return ctrl.Result{}, nil +} + +func (dr *DrainReconcile) handleNodeDPReset(ctx context.Context, + reqLogger *logr.Logger, + node *corev1.Node, + nodeNetworkState *sriovnetworkv1.SriovNetworkNodeState, + nodeStateDrainAnnotationCurrent string) (ctrl.Result, error) { + // nothing to do here we need to wait for the node to move back to idle + if nodeStateDrainAnnotationCurrent == constants.DrainComplete { + reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo") + return ctrl.Result{}, nil + } + + // if we are on idle state we move it to drain + if nodeStateDrainAnnotationCurrent == constants.DrainIdle { + err := utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining) + return ctrl.Result{}, err + } + return ctrl.Result{Requeue: true}, nil + } + + // This cover a case where we only need to reset the device plugin + // for that we are going to cordon the node, so we don't get new pods allocated + // to the node in the time we remove the device plugin + err := dr.drainer.RunCordonOrUncordon(ctx, node, true) + if err != nil { + reqLogger.Error(err, "failed to cordon on node") + return reconcile.Result{}, err + } + + // we switch the sriov label to disable and mark the drain as completed + // no need to wait for the device plugin to exist here as we cordon the node, + // and we want to config-daemon to start the configuration in parallel of the kube-controller to remove the pod + // we check the device plugin was removed when the config-daemon moves is desire state to idle + reqLogger.Info("disable Device plugin from node") + err = utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelDisabled, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to label node for device plugin label", + "labelKey", + constants.SriovDevicePluginLabel, + "labelValue", + constants.SriovDevicePluginLabelDisabled) + return reconcile.Result{}, err + } + + // if we manage to cordon we label the node state with drain completed and finish + err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete) + return ctrl.Result{}, err + } + + reqLogger.Info("node cordoned successfully and device plugin removed") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node cordoned and device plugin removed completed") + return ctrl.Result{}, nil +} + +func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) { + // configure logs + reqLogger := log.FromContext(ctx) + reqLogger.Info("checkForNodeDrain():") + + //critical section we need to check if we can start the draining + dr.drainCheckMutex.Lock() + defer dr.drainCheckMutex.Unlock() + + // find the relevant node pool + nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node) + if err != nil { + reqLogger.Error(err, "failed to find the pool for the requested node") + return nil, err + } + + // check how many nodes we can drain in parallel for the specific pool + maxUnv, err := nodePool.MaxUnavailable(len(nodeList)) + if err != nil { + reqLogger.Error(err, "failed to calculate max unavailable") + return nil, err + } + + current := 0 + snns := &sriovnetworkv1.SriovNetworkNodeState{} + + var currentSnns *sriovnetworkv1.SriovNetworkNodeState + for _, nodeObj := range nodeList { + err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns) + if err != nil { + if errors.IsNotFound(err) { + reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy") + continue + } + return nil, err + } + + if snns.GetName() == node.GetName() { + currentSnns = snns.DeepCopy() + } + + if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) || + utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) { + current++ + } + } + reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv) + reqLogger.Info("Count of draining", "drainingNodes", current) + + // if maxUnv is zero this means we drain all the nodes in parallel without a limit + if maxUnv == -1 { + reqLogger.Info("draining all the nodes in parallel") + } else if current >= maxUnv { + // the node requested to be drained, but we are at the limit so we re-enqueue the request + reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request") + // TODO: make this time configurable + return &reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + + if currentSnns == nil { + return nil, fmt.Errorf("failed to find sriov network node state for requested node") + } + + err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining) + return nil, err + } + + return nil, nil +} + +func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) { + logger := log.FromContext(ctx) + logger.Info("findNodePoolConfig():") + // get all the sriov network pool configs + npcl := &sriovnetworkv1.SriovNetworkPoolConfigList{} + err := dr.List(ctx, npcl) + if err != nil { + logger.Error(err, "failed to list sriovNetworkPoolConfig") + return nil, nil, err + } + + selectedNpcl := []*sriovnetworkv1.SriovNetworkPoolConfig{} + nodesInPools := map[string]interface{}{} + + for _, npc := range npcl.Items { + // we skip hw offload objects + if npc.Spec.OvsHardwareOffloadConfig.Name != "" { + continue + } + + if npc.Spec.NodeSelector == nil { + npc.Spec.NodeSelector = &metav1.LabelSelector{} + } + + selector, err := metav1.LabelSelectorAsSelector(npc.Spec.NodeSelector) + if err != nil { + logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", npc.Spec.NodeSelector) + return nil, nil, err + } + + if selector.Matches(labels.Set(node.Labels)) { + selectedNpcl = append(selectedNpcl, npc.DeepCopy()) + } + + nodeList := &corev1.NodeList{} + err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector}) + if err != nil { + logger.Error(err, "failed to list all the nodes matching the pool with label selector from nodeSelector", + "machineConfigPoolName", npc, + "nodeSelector", npc.Spec.NodeSelector) + return nil, nil, err + } + + for _, nodeName := range nodeList.Items { + nodesInPools[nodeName.Name] = nil + } + } + + if len(selectedNpcl) > 1 { + // don't allow the node to be part of multiple pools + err = fmt.Errorf("node is part of more then one pool") + logger.Error(err, "multiple pools founded for a specific node", "numberOfPools", len(selectedNpcl), "pools", selectedNpcl) + return nil, nil, err + } else if len(selectedNpcl) == 1 { + // found one pool for our node + logger.V(2).Info("found sriovNetworkPool", "pool", *selectedNpcl[0]) + selector, err := metav1.LabelSelectorAsSelector(selectedNpcl[0].Spec.NodeSelector) + if err != nil { + logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", selectedNpcl[0].Spec.NodeSelector) + return nil, nil, err + } + + // list all the nodes that are also part of this pool and return them + nodeList := &corev1.NodeList{} + err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector}) + if err != nil { + logger.Error(err, "failed to list nodes using with label selector", "labelSelector", selector) + return nil, nil, err + } + + return selectedNpcl[0], nodeList.Items, nil + } else { + // in this case we get all the nodes and remove the ones that already part of any pool + logger.V(1).Info("node doesn't belong to any pool, using default drain configuration with MaxUnavailable of one", "pool", *defaultNpcl) + nodeList := &corev1.NodeList{} + err = dr.List(ctx, nodeList) + if err != nil { + logger.Error(err, "failed to list all the nodes") + return nil, nil, err + } + + defaultNodeLists := []corev1.Node{} + for _, nodeObj := range nodeList.Items { + if _, exist := nodesInPools[nodeObj.Name]; !exist { + defaultNodeLists = append(defaultNodeLists, nodeObj) + } + } + return defaultNpcl, defaultNodeLists, nil + } +} + +// enableSriovDevicePlugin change the device plugin label on the requested node to enable +// if there is a pod still running we will return false +func (dr *DrainReconcile) enableSriovDevicePlugin(ctx context.Context, node *corev1.Node) (bool, error) { + logger := log.FromContext(ctx) + logger.Info("enableSriovDevicePlugin():") + + // check if the device plugin is terminating only if the node annotation for device plugin is disabled + if node.Annotations[constants.SriovDevicePluginLabel] == constants.SriovDevicePluginLabelDisabled { + pods, err := dr.getDevicePluginPodsOnNode(node.Name) + if err != nil { + logger.Error(err, "failed to list device plugin pods running on node") + return false, err + } + + if len(pods.Items) != 0 { + log.Log.V(2).Info("device plugin pod still terminating on node") + return false, nil + } + } + + logger.Info("enable Device plugin from node") + err := utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelEnabled, dr.Client) + if err != nil { + log.Log.Error(err, "failed to label node for device plugin label", + "labelKey", + constants.SriovDevicePluginLabel, + "labelValue", + constants.SriovDevicePluginLabelEnabled) + return false, err + } + + // check if the device plugin pod is running on the node + pods, err := dr.getDevicePluginPodsOnNode(node.Name) + if err != nil { + logger.Error(err, "failed to list device plugin pods running on node") + return false, err + } + + if len(pods.Items) == 1 && pods.Items[0].Status.Phase == corev1.PodRunning { + logger.Info("Device plugin pod running on node") + return true, nil + } + + logger.V(2).Info("Device plugin pod still not running on node") + return false, nil +} + +func (dr *DrainReconcile) getDevicePluginPodsOnNode(nodeName string) (*corev1.PodList, error) { + pods := &corev1.PodList{} + err := dr.List(context.Background(), pods, &client.ListOptions{ + Raw: &metav1.ListOptions{ + LabelSelector: "app=sriov-device-plugin", + FieldSelector: fmt.Sprintf("spec.nodeName=%s,metadata.namespace=%s", nodeName, vars.Namespace), + ResourceVersion: "0"}, + }) + + return pods, err +} diff --git a/controllers/drain_controller_test.go b/controllers/drain_controller_test.go index de3fe08843..566c752756 100644 --- a/controllers/drain_controller_test.go +++ b/controllers/drain_controller_test.go @@ -316,6 +316,93 @@ var _ = Describe("Drain Controller", Ordered, func() { expectNodeStateAnnotation(nodeState1, constants.Draining) }) }) + + Context("With requesting device plugin restart", func() { + It("should remove and the device plugin label on the node with nodeState spec is empty", func() { + node, nodeState := createNode(ctx, "node5") + simulateDaemonSetAnnotation(node, constants.DevicePluginResetRequired) + expectNodeStateAnnotation(nodeState, constants.DrainComplete) + expectNodeLabel(node, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelDisabled) + simulateDaemonSetAnnotation(node, constants.DrainIdle) + expectNodeStateAnnotation(nodeState, constants.DrainIdle) + expectNodeLabel(node, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelDisabled) + }) + + It("should remove and add the device plugin label on the node", func() { + node, nodeState := createNode(ctx, "node6") + originalNodeState := nodeState.DeepCopy() + nodeState.Spec = sriovnetworkv1.SriovNetworkNodeStateSpec{Interfaces: sriovnetworkv1.Interfaces{ + { + Name: "test", + NumVfs: 10, + VfGroups: []sriovnetworkv1.VfGroup{ + { + PolicyName: "test", + ResourceName: "test", + VfRange: "0-9", + }, + }, + }, + }} + Expect(k8sClient.Patch(ctx, nodeState, client.MergeFrom(originalNodeState))).ToNot(HaveOccurred()) + + simulateDaemonSetAnnotation(node, constants.DevicePluginResetRequired) + expectNodeStateAnnotation(nodeState, constants.DrainComplete) + expectNodeLabel(node, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelDisabled) + simulateDaemonSetAnnotation(node, constants.DrainIdle) + createDevicePluginPodOnNode(ctx, node.Name) + expectNodeStateAnnotation(nodeState, constants.DrainIdle) + expectNodeLabel(node, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelEnabled) + }) + + It("should restart the device plugin on multiple nodes if requested", func() { + simpleNodeStateSpec := sriovnetworkv1.SriovNetworkNodeStateSpec{Interfaces: sriovnetworkv1.Interfaces{ + { + Name: "test", + NumVfs: 10, + VfGroups: []sriovnetworkv1.VfGroup{ + { + PolicyName: "test", + ResourceName: "test", + VfRange: "0-9", + }, + }, + }, + }} + + node1, nodeState1 := createNode(ctx, "node7") + nodeState1.Spec = simpleNodeStateSpec + Eventually(func(g Gomega) { + err := k8sClient.Update(context.TODO(), nodeState1) + g.Expect(err).ToNot(HaveOccurred()) + }, "10s", "1s").Should(Succeed()) + + node2, nodeState2 := createNode(ctx, "node8") + nodeState2.Spec = simpleNodeStateSpec + Eventually(func(g Gomega) { + err := k8sClient.Update(context.TODO(), nodeState2) + g.Expect(err).ToNot(HaveOccurred()) + }, "10s", "1s").Should(Succeed()) + + simulateDaemonSetAnnotation(node1, constants.DevicePluginResetRequired) + simulateDaemonSetAnnotation(node2, constants.DevicePluginResetRequired) + + expectNodeStateAnnotation(nodeState1, constants.DrainComplete) + expectNodeStateAnnotation(nodeState2, constants.DrainComplete) + expectNodeLabel(node1, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelDisabled) + expectNodeLabel(node2, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelDisabled) + + simulateDaemonSetAnnotation(node1, constants.DrainIdle) + simulateDaemonSetAnnotation(node2, constants.DrainIdle) + createDevicePluginPodOnNode(ctx, node1.Name) + createDevicePluginPodOnNode(ctx, node2.Name) + + expectNodeStateAnnotation(nodeState1, constants.DrainIdle) + expectNodeStateAnnotation(nodeState2, constants.DrainIdle) + expectNodeLabel(node1, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelEnabled) + expectNodeLabel(node2, constants.SriovDevicePluginLabel, constants.SriovDevicePluginLabelEnabled) + }) + }) }) func expectNodeStateAnnotation(nodeState *sriovnetworkv1.SriovNetworkNodeState, expectedAnnotationValue string) { @@ -374,6 +461,15 @@ func expectNodeIsSchedulable(node *corev1.Node) { }, "20s", "1s").Should(Succeed()) } +func expectNodeLabel(node *corev1.Node, key, value string) { + EventuallyWithOffset(1, func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: node.Name}, node)). + ToNot(HaveOccurred()) + + g.Expect(node.Labels[key]).To(Equal(value)) + }, "20s", "1s").Should(Succeed()) +} + func simulateDaemonSetAnnotation(node *corev1.Node, drainAnnotationValue string) { ExpectWithOffset(1, utils.AnnotateObject(context.Background(), node, constants.NodeDrainAnnotation, drainAnnotationValue, k8sClient)). @@ -446,3 +542,12 @@ func createPodOnNode(ctx context.Context, podName, nodeName string) { NodeName: nodeName, TerminationGracePeriodSeconds: pointer.Int64(60)}} Expect(k8sClient.Create(ctx, &pod)).ToNot(HaveOccurred()) } + +func createDevicePluginPodOnNode(ctx context.Context, nodeName string) { + pod := corev1.Pod{ObjectMeta: metav1.ObjectMeta{GenerateName: "device-plugin-", Namespace: vars.Namespace, Labels: map[string]string{"app": "sriov-device-plugin"}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "test", Image: "test", Command: []string{"test"}}}, + NodeName: nodeName, TerminationGracePeriodSeconds: pointer.Int64(60)}} + Expect(k8sClient.Create(ctx, &pod)).ToNot(HaveOccurred()) + pod.Status.Phase = corev1.PodRunning + Expect(k8sClient.Status().Update(ctx, &pod)).ToNot(HaveOccurred()) +} diff --git a/controllers/helper.go b/controllers/helper.go index 9ff735473f..d327f3d209 100644 --- a/controllers/helper.go +++ b/controllers/helper.go @@ -22,12 +22,10 @@ import ( "encoding/json" "fmt" "os" - "sort" "strings" errs "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" uns "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -91,11 +89,7 @@ func (DrainAnnotationPredicate) Update(e event.UpdateEvent) bool { return true } - if oldAnno != newAnno { - return true - } - - return false + return oldAnno != newAnno } type DrainStateAnnotationPredicate struct { @@ -128,10 +122,6 @@ func (DrainStateAnnotationPredicate) Update(e event.UpdateEvent) bool { return true } - if oldAnno != newAnno { - return true - } - return oldAnno != newAnno } @@ -152,29 +142,25 @@ func formatJSON(str string) (string, error) { return prettyJSON.String(), nil } +// GetDefaultNodeSelector return a nodeSelector with worker and linux os func GetDefaultNodeSelector() map[string]string { return map[string]string{"node-role.kubernetes.io/worker": "", "kubernetes.io/os": "linux"} } -// hasNoValidPolicy returns true if no SriovNetworkNodePolicy -// or only the (deprecated) "default" policy is present -func hasNoValidPolicy(pl []sriovnetworkv1.SriovNetworkNodePolicy) bool { - switch len(pl) { - case 0: - return true - case 1: - return pl[0].Name == constants.DefaultPolicyName - default: - return false - } +// GetDefaultNodeSelectorForDevicePlugin return a nodeSelector with worker linux os +// and the enabled sriov device plugin +func GetDefaultNodeSelectorForDevicePlugin() map[string]string { + return map[string]string{ + "node-role.kubernetes.io/worker": "", + "kubernetes.io/os": "linux", + constants.SriovDevicePluginLabel: constants.SriovDevicePluginLabelEnabled} } func syncPluginDaemonObjs(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, - dc *sriovnetworkv1.SriovOperatorConfig, - pl *sriovnetworkv1.SriovNetworkNodePolicyList) error { + dc *sriovnetworkv1.SriovOperatorConfig) error { logger := log.Log.WithName("syncPluginDaemonObjs") logger.V(1).Info("Start to sync sriov daemons objects") @@ -185,7 +171,7 @@ func syncPluginDaemonObjs(ctx context.Context, data.Data["ReleaseVersion"] = os.Getenv("RELEASEVERSION") data.Data["ResourcePrefix"] = vars.ResourcePrefix data.Data["ImagePullSecrets"] = GetImagePullSecrets() - data.Data["NodeSelectorField"] = GetDefaultNodeSelector() + data.Data["NodeSelectorField"] = GetDefaultNodeSelectorForDevicePlugin() data.Data["UseCDI"] = dc.Spec.UseCDI objs, err := renderDsForCR(constants.PluginPath, &data) if err != nil { @@ -193,16 +179,6 @@ func syncPluginDaemonObjs(ctx context.Context, return err } - if hasNoValidPolicy(pl.Items) { - for _, obj := range objs { - err := deleteK8sResource(ctx, client, obj) - if err != nil { - return err - } - } - return nil - } - // Sync DaemonSets for _, obj := range objs { if obj.GetKind() == constants.DaemonSet && len(dc.Spec.ConfigDaemonNodeSelector) > 0 { @@ -214,13 +190,16 @@ func syncPluginDaemonObjs(ctx context.Context, return err } ds.Spec.Template.Spec.NodeSelector = dc.Spec.ConfigDaemonNodeSelector - err = scheme.Convert(ds, obj, nil) + // add the special node selector for the device plugin + dsCopy := ds.DeepCopy() + dsCopy.Spec.Template.Spec.NodeSelector[constants.SriovDevicePluginLabel] = constants.SriovDevicePluginLabelEnabled + err = scheme.Convert(dsCopy, obj, nil) if err != nil { logger.Error(err, "Fail to convert to Unstructured") return err } } - err = syncDsObject(ctx, client, scheme, dc, pl, obj) + err = syncDsObject(ctx, client, scheme, dc, obj) if err != nil { logger.Error(err, "Couldn't sync SR-IoV daemons objects") return err @@ -230,14 +209,7 @@ func syncPluginDaemonObjs(ctx context.Context, return nil } -func deleteK8sResource(ctx context.Context, client k8sclient.Client, in *uns.Unstructured) error { - if err := apply.DeleteObject(ctx, client, in); err != nil { - return fmt.Errorf("failed to delete object %v with err: %v", in, err) - } - return nil -} - -func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, pl *sriovnetworkv1.SriovNetworkNodePolicyList, obj *uns.Unstructured) error { +func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, obj *uns.Unstructured) error { logger := log.Log.WithName("syncDsObject") kind := obj.GetKind() logger.V(1).Info("Start to sync Objects", "Kind", kind) @@ -257,7 +229,7 @@ func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime. logger.Error(err, "Fail to convert to DaemonSet") return err } - err = syncDaemonSet(ctx, client, scheme, dc, pl, ds) + err = syncDaemonSet(ctx, client, scheme, dc, ds) if err != nil { logger.Error(err, "Fail to sync DaemonSet", "Namespace", ds.Namespace, "Name", ds.Name) return err @@ -266,54 +238,6 @@ func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime. return nil } -func setDsNodeAffinity(pl *sriovnetworkv1.SriovNetworkNodePolicyList, ds *appsv1.DaemonSet) error { - terms := nodeSelectorTermsForPolicyList(pl.Items) - if len(terms) > 0 { - ds.Spec.Template.Spec.Affinity = &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: terms, - }, - }, - } - } - return nil -} - -func nodeSelectorTermsForPolicyList(policies []sriovnetworkv1.SriovNetworkNodePolicy) []corev1.NodeSelectorTerm { - terms := []corev1.NodeSelectorTerm{} - for _, p := range policies { - // Note(adrianc): default policy is deprecated and ignored. - if p.Name == constants.DefaultPolicyName { - continue - } - - if len(p.Spec.NodeSelector) == 0 { - continue - } - expressions := []corev1.NodeSelectorRequirement{} - for k, v := range p.Spec.NodeSelector { - exp := corev1.NodeSelectorRequirement{ - Operator: corev1.NodeSelectorOpIn, - Key: k, - Values: []string{v}, - } - expressions = append(expressions, exp) - } - // sorting is needed to keep the daemon spec stable. - // the items are popped in a random order from the map - sort.Slice(expressions, func(i, j int) bool { - return expressions[i].Key < expressions[j].Key - }) - nodeSelector := corev1.NodeSelectorTerm{ - MatchExpressions: expressions, - } - terms = append(terms, nodeSelector) - } - - return terms -} - // renderDsForCR returns a busybox pod with the same name/namespace as the cr func renderDsForCR(path string, data *render.RenderData) ([]*uns.Unstructured, error) { logger := log.Log.WithName("renderDsForCR") @@ -326,16 +250,11 @@ func renderDsForCR(path string, data *render.RenderData) ([]*uns.Unstructured, e return objs, nil } -func syncDaemonSet(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, pl *sriovnetworkv1.SriovNetworkNodePolicyList, in *appsv1.DaemonSet) error { +func syncDaemonSet(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, in *appsv1.DaemonSet) error { logger := log.Log.WithName("syncDaemonSet") logger.V(1).Info("Start to sync DaemonSet", "Namespace", in.Namespace, "Name", in.Name) var err error - if pl != nil { - if err = setDsNodeAffinity(pl, in); err != nil { - return err - } - } if err = controllerutil.SetControllerReference(dc, in, scheme); err != nil { return err } diff --git a/controllers/helper_test.go b/controllers/helper_test.go deleted file mode 100644 index d998cf0da3..0000000000 --- a/controllers/helper_test.go +++ /dev/null @@ -1,330 +0,0 @@ -/* - - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "context" - "sync" - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/google/go-cmp/cmp" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - controllerruntime "sigs.k8s.io/controller-runtime" - - sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" -) - -func TestNodeSelectorMerge(t *testing.T) { - table := []struct { - tname string - policies []sriovnetworkv1.SriovNetworkNodePolicy - expected []corev1.NodeSelectorTerm - }{ - { - tname: "testoneselector", - policies: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "foo": "bar", - }, - }, - }, - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "bb": "cc", - }, - }, - }, - }, - expected: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "foo", - Values: []string{"bar"}, - }, - }, - }, - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb", - Values: []string{"cc"}, - }, - }, - }, - }, - }, - { - tname: "testtwoselectors", - policies: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "foo": "bar", - "foo1": "bar1", - }, - }, - }, - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "bb": "cc", - "bb1": "cc1", - "bb2": "cc2", - }, - }, - }, - }, - expected: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "foo", - Values: []string{"bar"}, - }, - { - Operator: corev1.NodeSelectorOpIn, - Key: "foo1", - Values: []string{"bar1"}, - }, - }, - }, - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb", - Values: []string{"cc"}, - }, - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb1", - Values: []string{"cc1"}, - }, - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb2", - Values: []string{"cc2"}, - }, - }, - }, - }, - }, - { - tname: "testemptyselector", - policies: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{}, - }, - }, - }, - expected: []corev1.NodeSelectorTerm{}, - }, - } - - for _, tc := range table { - t.Run(tc.tname, func(t *testing.T) { - selectors := nodeSelectorTermsForPolicyList(tc.policies) - if !cmp.Equal(selectors, tc.expected) { - t.Error(tc.tname, "Selectors not as expected", cmp.Diff(selectors, tc.expected)) - } - }) - } -} - -var _ = Describe("Helper Validation", Ordered, func() { - - var cancel context.CancelFunc - var ctx context.Context - var dc *sriovnetworkv1.SriovOperatorConfig - var in *appsv1.DaemonSet - - BeforeAll(func() { - By("Setup controller manager") - k8sManager, err := setupK8sManagerForTest() - Expect(err).ToNot(HaveOccurred()) - - ctx, cancel = context.WithCancel(context.Background()) - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - defer GinkgoRecover() - By("Start controller manager") - err := k8sManager.Start(ctx) - Expect(err).ToNot(HaveOccurred()) - }() - - DeferCleanup(func() { - By("Shutdown controller manager") - cancel() - wg.Wait() - }) - }) - - BeforeEach(func() { - dc = &sriovnetworkv1.SriovOperatorConfig{ - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "default", - Namespace: vars.Namespace, - UID: "12312312"}} - in = &appsv1.DaemonSet{ - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "sriov-device-plugin", - Namespace: vars.Namespace}, - Spec: appsv1.DaemonSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "sriov-device-plugin"}}, - Template: corev1.PodTemplateSpec{ - ObjectMeta: controllerruntime.ObjectMeta{ - Labels: map[string]string{"app": "sriov-device-plugin"}}, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Image: "test:latest", - Name: "test", - }, - }, - }, - }}} - - err := k8sClient.Delete(ctx, in) - if err != nil { - Expect(errors.IsNotFound(err)).To(BeTrue()) - } - }) - - Context("syncDaemonSet", func() { - It("should create a new daemon", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - {ObjectMeta: controllerruntime.ObjectMeta{Name: "test", Namespace: vars.Namespace}}, - }} - err := syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).To(BeNil()) - }) - It("should update affinity", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test": "test"}, - }, - }, - }} - - err := k8sClient.Create(ctx, in) - Expect(err).ToNot(HaveOccurred()) - - err = syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ToNot(BeNil()) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).To(Equal(1)) - }) - It("should update affinity with multiple", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test": "test"}, - }, - }, - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test1", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test1": "test"}, - }, - }, - }} - - err := k8sClient.Create(ctx, in) - Expect(err).ToNot(HaveOccurred()) - - err = syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ToNot(BeNil()) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).To(Equal(2)) - }) - It("should switch affinity", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test1", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test1": "test"}, - }, - }, - }} - - in.Spec.Template.Spec.Affinity = &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{{ - MatchExpressions: []corev1.NodeSelectorRequirement{{ - Operator: corev1.NodeSelectorOpIn, - Key: "test", - Values: []string{"test"}, - }}, - }}, - }, - }, - } - - err := k8sClient.Create(ctx, in) - Expect(err).ToNot(HaveOccurred()) - - err = syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ToNot(BeNil()) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).To(Equal(1)) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions)).To(Equal(1)) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key).To(Equal("test1")) - }) - }) -}) diff --git a/controllers/sriovnetworknodepolicy_controller.go b/controllers/sriovnetworknodepolicy_controller.go index be46880b7c..161cfb86a9 100644 --- a/controllers/sriovnetworknodepolicy_controller.go +++ b/controllers/sriovnetworknodepolicy_controller.go @@ -133,10 +133,6 @@ func (r *SriovNetworkNodePolicyReconciler) Reconcile(ctx context.Context, req ct if err = r.syncDevicePluginConfigMap(ctx, defaultOpConf, policyList, nodeList); err != nil { return reconcile.Result{}, err } - // Render and sync Daemon objects - if err = syncPluginDaemonObjs(ctx, r.Client, r.Scheme, defaultOpConf, policyList); err != nil { - return reconcile.Result{}, err - } // All was successful. Request that this be re-triggered after ResyncPeriod, // so we can reconcile state again. diff --git a/controllers/sriovoperatorconfig_controller.go b/controllers/sriovoperatorconfig_controller.go index 377ebd2dee..601b7feeb6 100644 --- a/controllers/sriovoperatorconfig_controller.go +++ b/controllers/sriovoperatorconfig_controller.go @@ -128,7 +128,7 @@ func (r *SriovOperatorConfigReconciler) Reconcile(ctx context.Context, req ctrl. return reconcile.Result{}, err } - if err = syncPluginDaemonObjs(ctx, r.Client, r.Scheme, defaultConfig, policyList); err != nil { + if err = syncPluginDaemonObjs(ctx, r.Client, r.Scheme, defaultConfig); err != nil { return reconcile.Result{}, err } diff --git a/controllers/sriovoperatorconfig_controller_test.go b/controllers/sriovoperatorconfig_controller_test.go index 7f6db3522e..1fa01ff4cf 100644 --- a/controllers/sriovoperatorconfig_controller_test.go +++ b/controllers/sriovoperatorconfig_controller_test.go @@ -2,7 +2,6 @@ package controllers import ( "context" - "fmt" "os" "strings" "sync" @@ -30,7 +29,7 @@ import ( mock_platforms "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms/mock" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms/openshift" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" - util "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util" + "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util" ) var _ = Describe("SriovOperatorConfig controller", Ordered, func() { @@ -347,6 +346,43 @@ var _ = Describe("SriovOperatorConfig controller", Ordered, func() { Expect(err).ToNot(HaveOccurred()) }) + It("should add the device-plugin node selector only for the device plugin", func() { + err := util.WaitForNamespacedObject(&appsv1.DaemonSet{}, k8sClient, testNamespace, "sriov-device-plugin", util.RetryInterval, util.APITimeout) + Expect(err).NotTo(HaveOccurred()) + + ds := &appsv1.DaemonSet{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: testNamespace, Name: "sriov-device-plugin"}, ds)).NotTo(HaveOccurred()) + value, exist := ds.Spec.Template.Spec.NodeSelector[consts.SriovDevicePluginLabel] + Expect(exist).To(BeTrue()) + Expect(value).To(Equal(consts.SriovDevicePluginLabelEnabled)) + + ds = &appsv1.DaemonSet{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: testNamespace, Name: "sriov-network-config-daemon"}, ds)).NotTo(HaveOccurred()) + _, exist = ds.Spec.Template.Spec.NodeSelector[consts.SriovDevicePluginLabel] + Expect(exist).To(BeFalse()) + + nodeSelector := map[string]string{ + "node-role.kubernetes.io/worker": "", + "bool-key": "true", + } + + restore := updateConfigDaemonNodeSelector(nodeSelector) + DeferCleanup(restore) + + Eventually(func(g Gomega) { + err := util.WaitForNamespacedObject(ds, k8sClient, testNamespace, "sriov-network-config-daemon", util.RetryInterval, util.APITimeout) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(ds.Spec.Template.Spec.NodeSelector).To((Equal(nodeSelector))) + }).Should(Succeed()) + + nodeSelector[consts.SriovDevicePluginLabel] = consts.SriovDevicePluginLabelEnabled + Eventually(func(g Gomega) { + err := util.WaitForNamespacedObject(ds, k8sClient, testNamespace, "sriov-device-plugin", util.RetryInterval, util.APITimeout) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(ds.Spec.Template.Spec.NodeSelector).To((Equal(nodeSelector))) + }).Should(Succeed()) + }) + Context("metricsExporter feature gate", func() { When("is disabled", func() { It("should not deploy the daemonset", func() { @@ -483,53 +519,6 @@ var _ = Describe("SriovOperatorConfig controller", Ordered, func() { g.Expect(injectorCfg.Webhooks[0].ClientConfig.CABundle).To(Equal([]byte("ca-bundle-2\n"))) }, "1s").Should(Succeed()) }) - - It("should reconcile to a converging state when multiple node policies are set", func() { - By("Creating a consistent number of node policies") - for i := 0; i < 30; i++ { - p := &sriovnetworkv1.SriovNetworkNodePolicy{ - ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: fmt.Sprintf("p%d", i)}, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - Priority: 99, - NodeSelector: map[string]string{"foo": fmt.Sprintf("v%d", i)}, - }, - } - err := k8sClient.Create(context.Background(), p) - Expect(err).NotTo(HaveOccurred()) - } - - By("Triggering a the reconcile loop") - config := &sriovnetworkv1.SriovOperatorConfig{} - err := k8sClient.Get(context.Background(), types.NamespacedName{Name: "default", Namespace: testNamespace}, config) - Expect(err).NotTo(HaveOccurred()) - if config.ObjectMeta.Labels == nil { - config.ObjectMeta.Labels = make(map[string]string) - } - config.ObjectMeta.Labels["trigger-test"] = "test-reconcile-daemonset" - err = k8sClient.Update(context.Background(), config) - Expect(err).NotTo(HaveOccurred()) - - By("Wait until device-plugin Daemonset's affinity has been calculated") - var expectedAffinity *corev1.Affinity - - Eventually(func(g Gomega) { - daemonSet := &appsv1.DaemonSet{} - err = k8sClient.Get(context.Background(), types.NamespacedName{Name: "sriov-device-plugin", Namespace: testNamespace}, daemonSet) - g.Expect(err).NotTo(HaveOccurred()) - // Wait until the last policy (with NodeSelector foo=v29) has been considered at least one time - g.Expect(daemonSet.Spec.Template.Spec.Affinity.String()).To(ContainSubstring("v29")) - expectedAffinity = daemonSet.Spec.Template.Spec.Affinity - }, "3s", "1s").Should(Succeed()) - - By("Verify device-plugin Daemonset's affinity doesn't change over time") - Consistently(func(g Gomega) { - daemonSet := &appsv1.DaemonSet{} - err = k8sClient.Get(context.Background(), types.NamespacedName{Name: "sriov-device-plugin", Namespace: testNamespace}, daemonSet) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(daemonSet.Spec.Template.Spec.Affinity). - To(Equal(expectedAffinity)) - }, "3s", "1s").Should(Succeed()) - }) }) }) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index bc2f13b8e2..9d5492e212 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -188,6 +188,13 @@ var _ = BeforeSuite(func() { } Expect(k8sClient.Create(context.Background(), ns)).Should(Succeed()) + sa := &corev1.ServiceAccount{TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + Namespace: testNamespace, + }} + Expect(k8sClient.Create(context.Background(), sa)).Should(Succeed()) + // Create openshift Infrastructure infra := &openshiftconfigv1.Infrastructure{ ObjectMeta: metav1.ObjectMeta{ diff --git a/deploy/clusterrole.yaml b/deploy/clusterrole.yaml index e7a5960616..e7a84394e1 100644 --- a/deploy/clusterrole.yaml +++ b/deploy/clusterrole.yaml @@ -45,12 +45,6 @@ rules: - apiGroups: [""] resources: ["nodes"] verbs: ["get", "list", "watch", "patch", "update"] -- apiGroups: [""] - resources: ["pods"] - verbs: ["*"] -- apiGroups: ["apps"] - resources: ["daemonsets"] - verbs: ["get"] - apiGroups: [ "config.openshift.io" ] resources: [ "infrastructures" ] verbs: [ "get", "list", "watch" ] diff --git a/deployment/sriov-network-operator-chart/templates/clusterrole.yaml b/deployment/sriov-network-operator-chart/templates/clusterrole.yaml index 7cd8fd014e..519d2c05ca 100644 --- a/deployment/sriov-network-operator-chart/templates/clusterrole.yaml +++ b/deployment/sriov-network-operator-chart/templates/clusterrole.yaml @@ -49,12 +49,6 @@ rules: - apiGroups: [""] resources: ["nodes"] verbs: ["get", "list", "watch", "patch", "update"] - - apiGroups: [""] - resources: ["pods"] - verbs: ["*"] - - apiGroups: ["apps"] - resources: ["daemonsets"] - verbs: ["get"] - apiGroups: [ "config.openshift.io" ] resources: [ "infrastructures" ] verbs: [ "get", "list", "watch" ] diff --git a/pkg/consts/constants.go b/pkg/consts/constants.go index f3c0761112..baaddd4bfc 100644 --- a/pkg/consts/constants.go +++ b/pkg/consts/constants.go @@ -67,12 +67,17 @@ const ( MachineConfigPoolPausedAnnotationIdle = "Idle" MachineConfigPoolPausedAnnotationPaused = "Paused" + SriovDevicePluginLabel = "sriovnetwork.openshift.io/device-plugin" + SriovDevicePluginLabelEnabled = "Enabled" + SriovDevicePluginLabelDisabled = "Disabled" + NodeDrainAnnotation = "sriovnetwork.openshift.io/state" NodeStateDrainAnnotation = "sriovnetwork.openshift.io/desired-state" NodeStateDrainAnnotationCurrent = "sriovnetwork.openshift.io/current-state" DrainIdle = "Idle" DrainRequired = "Drain_Required" RebootRequired = "Reboot_Required" + DevicePluginResetRequired = "Device_Plugin_Reset_Required" Draining = "Draining" DrainComplete = "DrainComplete" diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 5ed31ff85e..9b42d788de 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -10,7 +10,6 @@ import ( "time" "golang.org/x/time/rate" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -478,6 +477,27 @@ func (dn *Daemon) nodeStateSyncHandler() error { } } + // if we don't need to drain, and we are on idle we need to request the device plugin reset + if !reqDrain && utils.ObjectHasAnnotation(dn.desiredNodeState, + consts.NodeStateDrainAnnotationCurrent, + consts.DrainIdle) { + log.Log.Info("nodeStateSyncHandler(): apply 'Device_Plugin_Reset_Required' annotation for node") + err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DevicePluginResetRequired, dn.client) + if err != nil { + log.Log.Error(err, "handleDrain(): Failed to annotate node") + return err + } + + log.Log.Info("handleDrain(): apply 'Device_Plugin_Reset_Required' annotation for nodeState") + if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState, + consts.NodeStateDrainAnnotation, + consts.DevicePluginResetRequired, dn.client); err != nil { + return err + } + + return nil + } + // apply the vendor plugins after we are done with drain if needed for k, p := range dn.loadedPlugins { // Skip both the general and virtual plugin apply them last @@ -523,13 +543,6 @@ func (dn *Daemon) nodeStateSyncHandler() error { return nil } - // restart device plugin pod - log.Log.Info("nodeStateSyncHandler(): restart device plugin pod") - if err := dn.restartDevicePluginPod(); err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): fail to restart device plugin pod") - return err - } - log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for node") err = utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainIdle, dn.client) if err != nil { @@ -678,64 +691,6 @@ func (dn *Daemon) handleDrain(reqReboot bool) (bool, error) { return true, nil } -func (dn *Daemon) restartDevicePluginPod() error { - dn.mu.Lock() - defer dn.mu.Unlock() - log.Log.V(2).Info("restartDevicePluginPod(): try to restart device plugin pod") - - var podToDelete string - pods, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: "app=sriov-device-plugin", - FieldSelector: "spec.nodeName=" + vars.NodeName, - ResourceVersion: "0", - }) - if err != nil { - if errors.IsNotFound(err) { - log.Log.Info("restartDevicePluginPod(): device plugin pod exited") - return nil - } - log.Log.Error(err, "restartDevicePluginPod(): Failed to list device plugin pod, retrying") - return err - } - - if len(pods.Items) == 0 { - log.Log.Info("restartDevicePluginPod(): device plugin pod exited") - return nil - } - podToDelete = pods.Items[0].Name - - log.Log.V(2).Info("restartDevicePluginPod(): Found device plugin pod, deleting it", "pod-name", podToDelete) - err = dn.kubeClient.CoreV1().Pods(vars.Namespace).Delete(context.Background(), podToDelete, metav1.DeleteOptions{}) - if errors.IsNotFound(err) { - log.Log.Info("restartDevicePluginPod(): pod to delete not found") - return nil - } - if err != nil { - log.Log.Error(err, "restartDevicePluginPod(): Failed to delete device plugin pod, retrying") - return err - } - - if err := wait.PollImmediateUntil(3*time.Second, func() (bool, error) { - _, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).Get(context.Background(), podToDelete, metav1.GetOptions{}) - if errors.IsNotFound(err) { - log.Log.Info("restartDevicePluginPod(): device plugin pod exited") - return true, nil - } - - if err != nil { - log.Log.Error(err, "restartDevicePluginPod(): Failed to check for device plugin exit, retrying") - } else { - log.Log.Info("restartDevicePluginPod(): waiting for device plugin pod to exit", "pod-name", podToDelete) - } - return false, nil - }, dn.stopCh); err != nil { - log.Log.Error(err, "restartDevicePluginPod(): failed to wait for checking pod deletion") - return err - } - - return nil -} - func (dn *Daemon) rebootNode() { log.Log.Info("rebootNode(): trigger node reboot") exit, err := dn.HostHelpers.Chroot(consts.Host) diff --git a/pkg/daemon/daemon_test.go b/pkg/daemon/daemon_test.go index f1111810a9..f1b364ed85 100644 --- a/pkg/daemon/daemon_test.go +++ b/pkg/daemon/daemon_test.go @@ -107,19 +107,6 @@ var _ = Describe("Config Daemon", func() { }, } - SriovDevicePluginPod := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "sriov-device-plugin-xxxx", - Namespace: vars.Namespace, - Labels: map[string]string{ - "app": "sriov-device-plugin", - }, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - } - err = sriovnetworkv1.AddToScheme(scheme.Scheme) Expect(err).ToNot(HaveOccurred()) kClient := kclient.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(&corev1.Node{ @@ -130,7 +117,7 @@ var _ = Describe("Config Daemon", func() { Namespace: vars.Namespace, }}).Build() - kubeClient := fakek8s.NewSimpleClientset(&FakeSupportedNicIDs, &SriovDevicePluginPod) + kubeClient := fakek8s.NewSimpleClientset(&FakeSupportedNicIDs) snclient := snclientset.NewSimpleClientset() err = sriovnetworkv1.InitNicIDMapFromConfigMap(kubeClient, vars.Namespace) Expect(err).ToNot(HaveOccurred()) @@ -228,22 +215,15 @@ var _ = Describe("Config Daemon", func() { Eventually(refreshCh, "30s").Should(Receive(&msg)) Expect(msg.syncStatus).To(Equal("InProgress")) + nodeState.Annotations[consts.NodeStateDrainAnnotationCurrent] = consts.DrainComplete + err = updateSriovNetworkNodeState(sut.sriovClient, nodeState) + Expect(err).ToNot(HaveOccurred()) + + Eventually(refreshCh, "30s").Should(Receive(&msg)) + Expect(msg.syncStatus).To(Equal("InProgress")) Eventually(refreshCh, "30s").Should(Receive(&msg)) Expect(msg.syncStatus).To(Equal("Succeeded")) - Eventually(func() (int, error) { - podList, err := sut.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: "app=sriov-device-plugin", - FieldSelector: "spec.nodeName=test-node", - }) - - if err != nil { - return 0, err - } - - return len(podList.Items), nil - }, "10s").Should(BeZero()) - }) It("ignore non latest SriovNetworkNodeState generations", func() { @@ -270,7 +250,7 @@ var _ = Describe("Config Daemon", func() { ObjectMeta: metav1.ObjectMeta{ Name: "test-node", Generation: 777, - Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainIdle}, + Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainComplete}, }, } Expect( diff --git a/pkg/drain/drainer.go b/pkg/drain/drainer.go index a3500dc47f..4f35df5e41 100644 --- a/pkg/drain/drainer.go +++ b/pkg/drain/drainer.go @@ -31,6 +31,7 @@ func (w writer) Write(p []byte) (n int, err error) { type DrainInterface interface { DrainNode(context.Context, *corev1.Node, bool) (bool, error) CompleteDrainNode(context.Context, *corev1.Node) (bool, error) + RunCordonOrUncordon(ctx context.Context, node *corev1.Node, desired bool) error } type Drainer struct { @@ -98,7 +99,7 @@ func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrai reqLogger.Info("drainNode(): failed to drain node", "error", err) return false, err } - reqLogger.Info("drainNode(): drain complete") + reqLogger.Info("drainNode(): Drain completed") return true, nil } @@ -131,6 +132,22 @@ func (d *Drainer) CompleteDrainNode(ctx context.Context, node *corev1.Node) (boo return completed, nil } +// RunCordonOrUncordon runs cordon or uncordon on a specific +func (d *Drainer) RunCordonOrUncordon(ctx context.Context, node *corev1.Node, desired bool) error { + logger := log.FromContext(ctx) + logger.Info("RunCordonOrUncordon:()") + + // create drain helper we don't care about the drain function so we sent false to the fullDrain parameter + drainHelper := createDrainHelper(d.kubeClient, ctx, false) + + // perform the api call + err := drain.RunCordonOrUncordon(drainHelper, node, desired) + if err != nil { + logger.Error(err, "failed to cordon/uncordon node", "node", node.Name, "desired", desired) + } + return err +} + // createDrainHelper function to create a drain helper // if fullDrain is false we only remove pods that have the resourcePrefix // if not we remove all the pods in the node diff --git a/pkg/platforms/openshift/openshift.go b/pkg/platforms/openshift/openshift.go index 3f7d3421c6..b55b9c70da 100644 --- a/pkg/platforms/openshift/openshift.go +++ b/pkg/platforms/openshift/openshift.go @@ -228,6 +228,18 @@ func (c *openshiftContext) OpenshiftAfterCompleteDrainNode(ctx context.Context, return false, err } + value, exist := mcp.Annotations[consts.MachineConfigPoolPausedAnnotation] + // if the label doesn't exist we just return true here + // this can be a case where the node was moved to another MCP in the time we start the drain + if !exist { + return true, nil + } + // check if the sriov annotation on mcp is idle + // if the value is idle we just return here + if value == consts.MachineConfigPoolPausedAnnotationIdle { + return true, nil + } + // get all the nodes that belong to this machine config pool to validate this is the last node // request to complete the drain nodesInPool := &corev1.NodeList{} diff --git a/pkg/utils/cluster.go b/pkg/utils/cluster.go index 6f8d72e079..f53322f4ed 100644 --- a/pkg/utils/cluster.go +++ b/pkg/utils/cluster.go @@ -128,16 +128,17 @@ func ObjectHasAnnotation(obj metav1.Object, annoKey string, value string) bool { // AnnotateObject adds annotation to a kubernetes object func AnnotateObject(ctx context.Context, obj client.Object, key, value string, c client.Client) error { - log.Log.V(2).Info("AnnotateObject(): Annotate object", - "objectName", obj.GetName(), - "objectKind", obj.GetObjectKind(), - "annotation", value) newObj := obj.DeepCopyObject().(client.Object) if newObj.GetAnnotations() == nil { newObj.SetAnnotations(map[string]string{}) } if newObj.GetAnnotations()[key] != value { + log.Log.V(2).Info("AnnotateObject(): Annotate object", + "objectName", obj.GetName(), + "objectKind", obj.GetObjectKind(), + "annotationKey", key, + "annotationValue", value) newObj.GetAnnotations()[key] = value patch := client.MergeFrom(obj) err := c.Patch(ctx, @@ -161,3 +162,40 @@ func AnnotateNode(ctx context.Context, nodeName string, key, value string, c cli return AnnotateObject(ctx, node, key, value, c) } + +// LabelObject adds label to a kubernetes object +func LabelObject(ctx context.Context, obj client.Object, key, value string, c client.Client) error { + newObj := obj.DeepCopyObject().(client.Object) + if newObj.GetLabels() == nil { + newObj.SetLabels(map[string]string{}) + } + + if newObj.GetLabels()[key] != value { + log.Log.V(2).Info("LabelObject(): Annotate object", + "objectName", obj.GetName(), + "objectKind", obj.GetObjectKind(), + "labelKey", key, + "labelValue", value) + newObj.GetLabels()[key] = value + patch := client.MergeFrom(obj) + err := c.Patch(ctx, + newObj, patch) + if err != nil { + log.Log.Error(err, "annotateObject(): Failed to patch object") + return err + } + } + + return nil +} + +// LabelNode add label to a node +func LabelNode(ctx context.Context, nodeName string, key, value string, c client.Client) error { + node := &corev1.Node{} + err := c.Get(context.TODO(), client.ObjectKey{Name: nodeName}, node) + if err != nil { + return err + } + + return LabelObject(ctx, node, key, value, c) +} diff --git a/test/conformance/tests/test_sriov_operator.go b/test/conformance/tests/test_sriov_operator.go index cac6b356c8..84841b0bce 100644 --- a/test/conformance/tests/test_sriov_operator.go +++ b/test/conformance/tests/test_sriov_operator.go @@ -1615,6 +1615,8 @@ var _ = Describe("[sriov] operator", func() { Expect(err).ToNot(HaveOccurred()) waitForNetAttachDef("test-mtuvolnetwork", namespaces.Test) + By("starting a pod to make the drain slower") + createTestPod(node, []string{sriovNetwork.Name}) // update the interface intf = getInterfaceFromNodeStateByPciAddress(node, intf.PciAddress)