From 22f6efa253000ae7f2a6dd1e359f14e38f71988d Mon Sep 17 00:00:00 2001 From: Sebastian Sch Date: Mon, 11 Nov 2024 12:16:23 +0200 Subject: [PATCH] re-organize drain controller package Signed-off-by: Sebastian Sch --- controllers/drain_controller.go | 189 ++------------ controllers/drain_controller_helper.go | 288 +++++++++++++++++++++ controllers/helper.go | 12 +- controllers/helper_test.go | 330 ------------------------- pkg/drain/drainer.go | 2 +- 5 files changed, 307 insertions(+), 514 deletions(-) create mode 100644 controllers/drain_controller_helper.go delete mode 100644 controllers/helper_test.go diff --git a/controllers/drain_controller.go b/controllers/drain_controller.go index b96458fa7..b03825942 100644 --- a/controllers/drain_controller.go +++ b/controllers/drain_controller.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "sync" - "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -107,19 +106,23 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } // create the drain state annotation if it doesn't exist in the sriovNetworkNodeState object - nodeStateDrainAnnotationCurrent, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent) + nodeStateDrainAnnotationCurrent, nodeStateExist, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent) if err != nil { reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation") return ctrl.Result{}, err } // create the drain state annotation if it doesn't exist in the node object - nodeDrainAnnotation, err := dr.ensureAnnotationExists(ctx, node, constants.NodeDrainAnnotation) + nodeDrainAnnotation, nodeExist, err := dr.ensureAnnotationExists(ctx, node, constants.NodeDrainAnnotation) if err != nil { reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation") return ctrl.Result{}, err } + // requeue the request if we needed to add any of the annotations + if !nodeExist || !nodeStateExist { + return ctrl.Result{Requeue: true}, nil + } reqLogger.V(2).Info("Drain annotations", "nodeAnnotation", nodeDrainAnnotation, "nodeStateAnnotation", nodeStateDrainAnnotationCurrent) // Check the node request @@ -141,98 +144,14 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // doesn't need to drain anymore, so we can stop the drain if nodeStateDrainAnnotationCurrent == constants.DrainComplete || nodeStateDrainAnnotationCurrent == constants.Draining { - completed, err := dr.drainer.CompleteDrainNode(ctx, node) - if err != nil { - reqLogger.Error(err, "failed to complete drain on node") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "failed to drain node") - return ctrl.Result{}, err - } - - // if we didn't manage to complete the un drain of the node we retry - if !completed { - reqLogger.Info("complete drain was not completed re queueing the request") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "node complete drain was not completed") - // TODO: make this time configurable - return reconcile.Result{RequeueAfter: 5 * time.Second}, nil - } - - // move the node state back to idle - err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client) - if err != nil { - reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle) - return ctrl.Result{}, err - } - - reqLogger.Info("completed the un drain for node") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "node un drain completed") - return ctrl.Result{}, nil - } - } else if nodeDrainAnnotation == constants.DrainRequired || nodeDrainAnnotation == constants.RebootRequired { - // this cover the case a node request to drain or reboot - - // nothing to do here we need to wait for the node to move back to idle - if nodeStateDrainAnnotationCurrent == constants.DrainComplete { - reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo") - return ctrl.Result{}, nil - } - - // we need to start the drain, but first we need to check that we can drain the node - if nodeStateDrainAnnotationCurrent == constants.DrainIdle { - result, err := dr.tryDrainNode(ctx, node) - if err != nil { - reqLogger.Error(err, "failed to check if we can drain the node") - return ctrl.Result{}, err - } - - // in case we need to wait because we just to the max number of draining nodes - if result != nil { - return *result, nil - } - } - - // class the drain function that will also call drain to other platform providers like openshift - drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired) - if err != nil { - reqLogger.Error(err, "error trying to drain the node") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "failed to drain node") - return reconcile.Result{}, err - } - - // if we didn't manage to complete the drain of the node we retry - if !drained { - reqLogger.Info("the nodes was not drained re queueing the request") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "node drain operation was not completed") - return reconcile.Result{RequeueAfter: 5 * time.Second}, nil - } - - // if we manage to drain we label the node state with drain completed and finish - err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client) - if err != nil { - reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete) - return ctrl.Result{}, err + return dr.handleNodeIdleNodeStateDrainingOrCompleted(ctx, &reqLogger, node, nodeNetworkState) } + } - reqLogger.Info("node drained successfully") - dr.recorder.Event(nodeNetworkState, - corev1.EventTypeWarning, - "DrainController", - "node drain completed") - return ctrl.Result{}, nil + // this cover the case a node request to drain or reboot + if nodeDrainAnnotation == constants.DrainRequired || + nodeDrainAnnotation == constants.RebootRequired { + return dr.handleNodeDrainOrReboot(ctx, &reqLogger, node, nodeNetworkState, nodeDrainAnnotation, nodeStateDrainAnnotationCurrent) } reqLogger.Error(nil, "unexpected node drain annotation") @@ -250,93 +169,17 @@ func (dr *DrainReconcile) getObject(ctx context.Context, req ctrl.Request, objec return true, nil } -func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object client.Object, key string) (string, error) { +func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object client.Object, key string) (string, bool, error) { value, exist := object.GetAnnotations()[key] if !exist { err := utils.AnnotateObject(ctx, object, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client) if err != nil { - return "", err - } - return constants.DrainIdle, nil - } - - return value, nil -} - -func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) { - // configure logs - reqLogger := log.FromContext(ctx) - reqLogger.Info("checkForNodeDrain():") - - //critical section we need to check if we can start the draining - dr.drainCheckMutex.Lock() - defer dr.drainCheckMutex.Unlock() - - // find the relevant node pool - nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node) - if err != nil { - reqLogger.Error(err, "failed to find the pool for the requested node") - return nil, err - } - - // check how many nodes we can drain in parallel for the specific pool - maxUnv, err := nodePool.MaxUnavailable(len(nodeList)) - if err != nil { - reqLogger.Error(err, "failed to calculate max unavailable") - return nil, err - } - - current := 0 - snns := &sriovnetworkv1.SriovNetworkNodeState{} - - var currentSnns *sriovnetworkv1.SriovNetworkNodeState - for _, nodeObj := range nodeList { - err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns) - if err != nil { - if errors.IsNotFound(err) { - reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy") - continue - } - return nil, err - } - - if snns.GetName() == node.GetName() { - currentSnns = snns.DeepCopy() - } - - if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) || - utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) { - current++ + return "", false, err } + return constants.DrainIdle, false, nil } - reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv) - reqLogger.Info("Count of draining", "drainingNodes", current) - - // if maxUnv is zero this means we drain all the nodes in parallel without a limit - if maxUnv == -1 { - reqLogger.Info("draining all the nodes in parallel") - } else if current >= maxUnv { - // the node requested to be drained, but we are at the limit so we re-enqueue the request - reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request") - // TODO: make this time configurable - return &reconcile.Result{RequeueAfter: 5 * time.Second}, nil - } - - if currentSnns == nil { - return nil, fmt.Errorf("failed to find sriov network node state for requested node") - } - - err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client) - if err != nil { - reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining) - return nil, err - } - - return nil, nil -} -func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) { - return findNodePoolConfig(ctx, node, dr.Client) + return value, true, nil } // SetupWithManager sets up the controller with the Manager. diff --git a/controllers/drain_controller_helper.go b/controllers/drain_controller_helper.go new file mode 100644 index 000000000..c9e6bf550 --- /dev/null +++ b/controllers/drain_controller_helper.go @@ -0,0 +1,288 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" +) + +func (dr *DrainReconcile) handleNodeIdleNodeStateDrainingOrCompleted(ctx context.Context, + reqLogger *logr.Logger, + node *corev1.Node, + nodeNetworkState *sriovnetworkv1.SriovNetworkNodeState) (ctrl.Result, error) { + completed, err := dr.drainer.CompleteDrainNode(ctx, node) + if err != nil { + reqLogger.Error(err, "failed to complete drain on node") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "failed to drain node") + return ctrl.Result{}, err + } + + // if we didn't manage to complete the un drain of the node we retry + if !completed { + reqLogger.Info("complete drain was not completed re queueing the request") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node complete drain was not completed") + // TODO: make this time configurable + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + + // move the node state back to idle + err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle) + return ctrl.Result{}, err + } + + reqLogger.Info("completed the un drain for node") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node un drain completed") + return ctrl.Result{}, nil +} + +func (dr *DrainReconcile) handleNodeDrainOrReboot(ctx context.Context, + reqLogger *logr.Logger, + node *corev1.Node, + nodeNetworkState *sriovnetworkv1.SriovNetworkNodeState, + nodeDrainAnnotation, + nodeStateDrainAnnotationCurrent string) (ctrl.Result, error) { + // nothing to do here we need to wait for the node to move back to idle + if nodeStateDrainAnnotationCurrent == constants.DrainComplete { + reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo") + return ctrl.Result{}, nil + } + + // we need to start the drain, but first we need to check that we can drain the node + if nodeStateDrainAnnotationCurrent == constants.DrainIdle { + result, err := dr.tryDrainNode(ctx, node) + if err != nil { + reqLogger.Error(err, "failed to check if we can drain the node") + return ctrl.Result{}, err + } + + // in case we need to wait because we just to the max number of draining nodes + if result != nil { + return *result, nil + } + } + + // call the drain function that will also call drain to other platform providers like openshift + drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired) + if err != nil { + reqLogger.Error(err, "error trying to drain the node") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "failed to drain node") + return reconcile.Result{}, err + } + + // if we didn't manage to complete the drain of the node we retry + if !drained { + reqLogger.Info("the nodes was not drained re queueing the request") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node drain operation was not completed") + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + + // if we manage to drain we label the node state with drain completed and finish + err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete) + return ctrl.Result{}, err + } + + reqLogger.Info("node drained successfully") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node drain completed") + return ctrl.Result{}, nil +} + +func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) { + // configure logs + reqLogger := log.FromContext(ctx) + reqLogger.Info("checkForNodeDrain():") + + //critical section we need to check if we can start the draining + dr.drainCheckMutex.Lock() + defer dr.drainCheckMutex.Unlock() + + // find the relevant node pool + nodePool, nodeList, err := dr.findNodePoolConfig(ctx, node) + if err != nil { + reqLogger.Error(err, "failed to find the pool for the requested node") + return nil, err + } + + // check how many nodes we can drain in parallel for the specific pool + maxUnv, err := nodePool.MaxUnavailable(len(nodeList)) + if err != nil { + reqLogger.Error(err, "failed to calculate max unavailable") + return nil, err + } + + current := 0 + snns := &sriovnetworkv1.SriovNetworkNodeState{} + + var currentSnns *sriovnetworkv1.SriovNetworkNodeState + for _, nodeObj := range nodeList { + err = dr.Get(ctx, client.ObjectKey{Name: nodeObj.GetName(), Namespace: vars.Namespace}, snns) + if err != nil { + if errors.IsNotFound(err) { + reqLogger.V(2).Info("node doesn't have a sriovNetworkNodePolicy") + continue + } + return nil, err + } + + if snns.GetName() == node.GetName() { + currentSnns = snns.DeepCopy() + } + + if utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.Draining) || + utils.ObjectHasAnnotation(snns, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete) { + current++ + } + } + reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxUnv) + reqLogger.Info("Count of draining", "drainingNodes", current) + + // if maxUnv is zero this means we drain all the nodes in parallel without a limit + if maxUnv == -1 { + reqLogger.Info("draining all the nodes in parallel") + } else if current >= maxUnv { + // the node requested to be drained, but we are at the limit so we re-enqueue the request + reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes re-enqueue the request") + // TODO: make this time configurable + return &reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + + if currentSnns == nil { + return nil, fmt.Errorf("failed to find sriov network node state for requested node") + } + + err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining) + return nil, err + } + + return nil, nil +} + +func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.Node) (*sriovnetworkv1.SriovNetworkPoolConfig, []corev1.Node, error) { + logger := log.FromContext(ctx) + logger.Info("findNodePoolConfig():") + // get all the sriov network pool configs + npcl := &sriovnetworkv1.SriovNetworkPoolConfigList{} + err := dr.List(ctx, npcl) + if err != nil { + logger.Error(err, "failed to list sriovNetworkPoolConfig") + return nil, nil, err + } + + selectedNpcl := []*sriovnetworkv1.SriovNetworkPoolConfig{} + nodesInPools := map[string]interface{}{} + + for _, npc := range npcl.Items { + // we skip hw offload objects + if npc.Spec.OvsHardwareOffloadConfig.Name != "" { + continue + } + + if npc.Spec.NodeSelector == nil { + npc.Spec.NodeSelector = &metav1.LabelSelector{} + } + + selector, err := metav1.LabelSelectorAsSelector(npc.Spec.NodeSelector) + if err != nil { + logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", npc.Spec.NodeSelector) + return nil, nil, err + } + + if selector.Matches(labels.Set(node.Labels)) { + selectedNpcl = append(selectedNpcl, npc.DeepCopy()) + } + + nodeList := &corev1.NodeList{} + err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector}) + if err != nil { + logger.Error(err, "failed to list all the nodes matching the pool with label selector from nodeSelector", + "machineConfigPoolName", npc, + "nodeSelector", npc.Spec.NodeSelector) + return nil, nil, err + } + + for _, nodeName := range nodeList.Items { + nodesInPools[nodeName.Name] = nil + } + } + + if len(selectedNpcl) > 1 { + // don't allow the node to be part of multiple pools + err = fmt.Errorf("node is part of more then one pool") + logger.Error(err, "multiple pools founded for a specific node", "numberOfPools", len(selectedNpcl), "pools", selectedNpcl) + return nil, nil, err + } else if len(selectedNpcl) == 1 { + // found one pool for our node + logger.V(2).Info("found sriovNetworkPool", "pool", *selectedNpcl[0]) + selector, err := metav1.LabelSelectorAsSelector(selectedNpcl[0].Spec.NodeSelector) + if err != nil { + logger.Error(err, "failed to create label selector from nodeSelector", "nodeSelector", selectedNpcl[0].Spec.NodeSelector) + return nil, nil, err + } + + // list all the nodes that are also part of this pool and return them + nodeList := &corev1.NodeList{} + err = dr.List(ctx, nodeList, &client.ListOptions{LabelSelector: selector}) + if err != nil { + logger.Error(err, "failed to list nodes using with label selector", "labelSelector", selector) + return nil, nil, err + } + + return selectedNpcl[0], nodeList.Items, nil + } else { + // in this case we get all the nodes and remove the ones that already part of any pool + logger.V(1).Info("node doesn't belong to any pool, using default drain configuration with MaxUnavailable of one", "pool", *defaultPoolConfig) + nodeList := &corev1.NodeList{} + err = dr.List(ctx, nodeList) + if err != nil { + logger.Error(err, "failed to list all the nodes") + return nil, nil, err + } + + defaultNodeLists := []corev1.Node{} + for _, nodeObj := range nodeList.Items { + if _, exist := nodesInPools[nodeObj.Name]; !exist { + defaultNodeLists = append(defaultNodeLists, nodeObj) + } + } + return defaultPoolConfig, defaultNodeLists, nil + } +} diff --git a/controllers/helper.go b/controllers/helper.go index b90ad44f8..c301e92a5 100644 --- a/controllers/helper.go +++ b/controllers/helper.go @@ -51,7 +51,7 @@ import ( ) var ( - webhooks = map[string](string){ + webhooks = map[string]string{ constants.InjectorWebHookName: constants.InjectorWebHookPath, constants.OperatorWebHookName: constants.OperatorWebHookPath, } @@ -101,11 +101,7 @@ func (DrainAnnotationPredicate) Update(e event.UpdateEvent) bool { return true } - if oldAnno != newAnno { - return true - } - - return false + return oldAnno != newAnno } type DrainStateAnnotationPredicate struct { @@ -138,10 +134,6 @@ func (DrainStateAnnotationPredicate) Update(e event.UpdateEvent) bool { return true } - if oldAnno != newAnno { - return true - } - return oldAnno != newAnno } diff --git a/controllers/helper_test.go b/controllers/helper_test.go deleted file mode 100644 index d998cf0da..000000000 --- a/controllers/helper_test.go +++ /dev/null @@ -1,330 +0,0 @@ -/* - - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "context" - "sync" - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/google/go-cmp/cmp" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - controllerruntime "sigs.k8s.io/controller-runtime" - - sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" -) - -func TestNodeSelectorMerge(t *testing.T) { - table := []struct { - tname string - policies []sriovnetworkv1.SriovNetworkNodePolicy - expected []corev1.NodeSelectorTerm - }{ - { - tname: "testoneselector", - policies: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "foo": "bar", - }, - }, - }, - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "bb": "cc", - }, - }, - }, - }, - expected: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "foo", - Values: []string{"bar"}, - }, - }, - }, - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb", - Values: []string{"cc"}, - }, - }, - }, - }, - }, - { - tname: "testtwoselectors", - policies: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "foo": "bar", - "foo1": "bar1", - }, - }, - }, - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "bb": "cc", - "bb1": "cc1", - "bb2": "cc2", - }, - }, - }, - }, - expected: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "foo", - Values: []string{"bar"}, - }, - { - Operator: corev1.NodeSelectorOpIn, - Key: "foo1", - Values: []string{"bar1"}, - }, - }, - }, - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb", - Values: []string{"cc"}, - }, - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb1", - Values: []string{"cc1"}, - }, - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb2", - Values: []string{"cc2"}, - }, - }, - }, - }, - }, - { - tname: "testemptyselector", - policies: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{}, - }, - }, - }, - expected: []corev1.NodeSelectorTerm{}, - }, - } - - for _, tc := range table { - t.Run(tc.tname, func(t *testing.T) { - selectors := nodeSelectorTermsForPolicyList(tc.policies) - if !cmp.Equal(selectors, tc.expected) { - t.Error(tc.tname, "Selectors not as expected", cmp.Diff(selectors, tc.expected)) - } - }) - } -} - -var _ = Describe("Helper Validation", Ordered, func() { - - var cancel context.CancelFunc - var ctx context.Context - var dc *sriovnetworkv1.SriovOperatorConfig - var in *appsv1.DaemonSet - - BeforeAll(func() { - By("Setup controller manager") - k8sManager, err := setupK8sManagerForTest() - Expect(err).ToNot(HaveOccurred()) - - ctx, cancel = context.WithCancel(context.Background()) - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - defer GinkgoRecover() - By("Start controller manager") - err := k8sManager.Start(ctx) - Expect(err).ToNot(HaveOccurred()) - }() - - DeferCleanup(func() { - By("Shutdown controller manager") - cancel() - wg.Wait() - }) - }) - - BeforeEach(func() { - dc = &sriovnetworkv1.SriovOperatorConfig{ - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "default", - Namespace: vars.Namespace, - UID: "12312312"}} - in = &appsv1.DaemonSet{ - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "sriov-device-plugin", - Namespace: vars.Namespace}, - Spec: appsv1.DaemonSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "sriov-device-plugin"}}, - Template: corev1.PodTemplateSpec{ - ObjectMeta: controllerruntime.ObjectMeta{ - Labels: map[string]string{"app": "sriov-device-plugin"}}, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Image: "test:latest", - Name: "test", - }, - }, - }, - }}} - - err := k8sClient.Delete(ctx, in) - if err != nil { - Expect(errors.IsNotFound(err)).To(BeTrue()) - } - }) - - Context("syncDaemonSet", func() { - It("should create a new daemon", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - {ObjectMeta: controllerruntime.ObjectMeta{Name: "test", Namespace: vars.Namespace}}, - }} - err := syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).To(BeNil()) - }) - It("should update affinity", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test": "test"}, - }, - }, - }} - - err := k8sClient.Create(ctx, in) - Expect(err).ToNot(HaveOccurred()) - - err = syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ToNot(BeNil()) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).To(Equal(1)) - }) - It("should update affinity with multiple", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test": "test"}, - }, - }, - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test1", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test1": "test"}, - }, - }, - }} - - err := k8sClient.Create(ctx, in) - Expect(err).ToNot(HaveOccurred()) - - err = syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ToNot(BeNil()) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).To(Equal(2)) - }) - It("should switch affinity", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test1", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test1": "test"}, - }, - }, - }} - - in.Spec.Template.Spec.Affinity = &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{{ - MatchExpressions: []corev1.NodeSelectorRequirement{{ - Operator: corev1.NodeSelectorOpIn, - Key: "test", - Values: []string{"test"}, - }}, - }}, - }, - }, - } - - err := k8sClient.Create(ctx, in) - Expect(err).ToNot(HaveOccurred()) - - err = syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ToNot(BeNil()) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).To(Equal(1)) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions)).To(Equal(1)) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key).To(Equal("test1")) - }) - }) -}) diff --git a/pkg/drain/drainer.go b/pkg/drain/drainer.go index a3500dc47..22dbed3df 100644 --- a/pkg/drain/drainer.go +++ b/pkg/drain/drainer.go @@ -98,7 +98,7 @@ func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrai reqLogger.Info("drainNode(): failed to drain node", "error", err) return false, err } - reqLogger.Info("drainNode(): drain complete") + reqLogger.Info("drainNode(): Drain completed") return true, nil }