From 3cbcc00203202a2e81e0d73024b49846d52c07cc Mon Sep 17 00:00:00 2001 From: Ivan Kolodiazhnyi Date: Fri, 21 Jul 2023 16:54:21 +0300 Subject: [PATCH] Move node drain call to Drain controller --- controllers/drain_controller.go | 50 ++++++++- main.go | 28 ++++- pkg/consts/constants.go | 1 + pkg/daemon/daemon.go | 179 ++++++++++++++++---------------- pkg/utils/cluster.go | 46 ++++++++ 5 files changed, 210 insertions(+), 94 deletions(-) diff --git a/controllers/drain_controller.go b/controllers/drain_controller.go index f43d8ac237..255c4df40d 100644 --- a/controllers/drain_controller.go +++ b/controllers/drain_controller.go @@ -3,8 +3,12 @@ package controllers import ( "context" "fmt" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubectl/pkg/drain" "sort" "strings" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -24,9 +28,15 @@ import ( "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" ) +const ( + maxParallelNodeConfiguration = 1 +) + type DrainReconciler struct { client.Client Scheme *runtime.Scheme + + Drainer *drain.Helper } //+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch @@ -72,7 +82,7 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } reqLogger.Info("Count of draining", "drainingNodes", drainingNodes) - if config.Spec.MaxParallelNodeConfiguration != 0 && drainingNodes >= config.Spec.MaxParallelNodeConfiguration { + if maxParallelNodeConfiguration != 0 && drainingNodes >= maxParallelNodeConfiguration { reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes") return reconcile.Result{}, nil } @@ -89,6 +99,7 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr reqLogger.Error(err, "Failed to patch node annotations") return reconcile.Result{}, err } + drainingNodes++ } else { reqLogger.Info("Too many nodes to be draining at the moment. Skipping node %s", "node", node.Name) @@ -99,6 +110,43 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return reconcile.Result{}, nil } +func (dr *DrainReconciler) drainNode(node *corev1.Node) error { + glog.Info("drainNode(): Update prepared") + var err error + + backoff := wait.Backoff{ + Steps: 5, + Duration: 10 * time.Second, + Factor: 2, + } + var lastErr error + + glog.Info("drainNode(): Start draining") + if err = wait.ExponentialBackoff(backoff, func() (bool, error) { + err := drain.RunCordonOrUncordon(dr.Drainer, node, true) + if err != nil { + lastErr = err + glog.Infof("Cordon failed with: %v, retrying", err) + return false, nil + } + err = drain.RunNodeDrain(dr.Drainer, node.Name) + if err == nil { + return true, nil + } + lastErr = err + glog.Infof("Draining failed with: %v, retrying", err) + return false, nil + }); err != nil { + if err == wait.ErrWaitTimeout { + glog.Errorf("drainNode(): failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr) + } + glog.Errorf("drainNode(): failed to drain node: %v", err) + return err + } + glog.Info("drainNode(): drain complete") + return nil +} + // SetupWithManager sets up the controller with the Manager. func (dr *DrainReconciler) SetupWithManager(mgr ctrl.Manager) error { // we always add object with a same(static) key to the queue to reduce diff --git a/main.go b/main.go index 43afc60ca0..e2f6272fe8 100644 --- a/main.go +++ b/main.go @@ -20,7 +20,11 @@ import ( "context" "flag" "fmt" + "github.com/golang/glog" + corev1 "k8s.io/api/core/v1" + "k8s.io/kubectl/pkg/drain" "os" + "time" netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" openshiftconfigv1 "github.com/openshift/api/config/v1" @@ -95,6 +99,7 @@ func main() { setupLog.Error(err, "couldn't create openshift context") os.Exit(1) } + kubeclient := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie()) le := leaderelection.GetLeaderElectionConfig(kubeClient, enableLeaderElection) @@ -124,7 +129,7 @@ func main() { os.Exit(1) } - if err := initNicIDMap(); err != nil { + if err := initNicIDMap(kubeclient); err != nil { setupLog.Error(err, "unable to init NicIdMap") os.Exit(1) } @@ -169,6 +174,24 @@ func main() { if err = (&controllers.DrainReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + Drainer: &drain.Helper{ + Client: kubeclient, + Force: true, + IgnoreAllDaemonSets: true, + DeleteEmptyDirData: true, + GracePeriodSeconds: -1, + Timeout: 90 * time.Second, + OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) { + verbStr := "Deleted" + if usingEviction { + verbStr = "Evicted" + } + glog.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name)) + }, + //Out: writer{glog.Info}, + //ErrOut: writer{glog.Error}, + Ctx: context.Background(), + }, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DrainReconciler") os.Exit(1) @@ -216,9 +239,8 @@ func main() { } } -func initNicIDMap() error { +func initNicIDMap(kubeclient kubernetes.Interface) error { namespace := os.Getenv("NAMESPACE") - kubeclient := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie()) if err := sriovnetworkv1.InitNicIDMapFromConfigMap(kubeclient, namespace); err != nil { return err } diff --git a/pkg/consts/constants.go b/pkg/consts/constants.go index 46034205d3..3d1849df4a 100644 --- a/pkg/consts/constants.go +++ b/pkg/consts/constants.go @@ -28,6 +28,7 @@ const ( AnnoDrainRequired = "Drain_Required" AnnoMcpPaused = "Draining_MCP_Paused" AnnoDraining = "Draining" + AnnoDrainComplete = "DrainComplete" LinkTypeEthernet = "ether" LinkTypeInfiniband = "infiniband" diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 941b309c30..023b06ea53 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -3,7 +3,6 @@ package daemon import ( "bytes" "context" - "encoding/json" "flag" "fmt" "io/ioutil" @@ -26,7 +25,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -587,14 +585,14 @@ func (dn *Daemon) nodeStateSyncHandler() error { } } - if dn.disableDrain { - glog.Info("nodeStateSyncHandler(): disable drain is true skipping drain") - } else { - glog.Info("nodeStateSyncHandler(): drain node") - if err := dn.drainNode(); err != nil { - return err - } - } + //if dn.disableDrain { + // glog.Info("nodeStateSyncHandler(): disable drain is true skipping drain") + //} else { + // glog.Info("nodeStateSyncHandler(): drain node") + // if err := dn.drainNode(); err != nil { + // return err + // } + //} } if !reqReboot && !dn.useSystemdService { @@ -638,9 +636,10 @@ func (dn *Daemon) nodeStateSyncHandler() error { glog.Errorf("nodeStateSyncHandler(): failed to complete draining: %v", err) return err } + return nil } else { if !utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.AnnoIdle) { - if err := dn.annotateNode(dn.name, consts.AnnoIdle); err != nil { + if err := utils.AnnotateNode(dn.name, consts.AnnoIdle, dn.kubeClient); err != nil { glog.Errorf("nodeStateSyncHandler(): failed to annotate node: %v", err) return err } @@ -691,7 +690,7 @@ func (dn *Daemon) completeDrain() error { } } - if err := dn.annotateNode(dn.name, consts.AnnoIdle); err != nil { + if err := utils.AnnotateNode(dn.name, consts.AnnoIdle, dn.kubeClient); err != nil { glog.Errorf("completeDrain(): failed to annotate node: %v", err) return err } @@ -777,45 +776,45 @@ func rebootNode() { } } -func (dn *Daemon) annotateNode(node, value string) error { - glog.Infof("annotateNode(): Annotate node %s with: %s", node, value) - - oldNode, err := dn.kubeClient.CoreV1().Nodes().Get(context.Background(), dn.name, metav1.GetOptions{}) - if err != nil { - glog.Infof("annotateNode(): Failed to get node %s %v, retrying", node, err) - return err - } - oldData, err := json.Marshal(oldNode) - if err != nil { - return err - } - - newNode := oldNode.DeepCopy() - if newNode.Annotations == nil { - newNode.Annotations = map[string]string{} - } - if newNode.Annotations[consts.NodeDrainAnnotation] != value { - newNode.Annotations[consts.NodeDrainAnnotation] = value - newData, err := json.Marshal(newNode) - if err != nil { - return err - } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) - if err != nil { - return err - } - _, err = dn.kubeClient.CoreV1().Nodes().Patch(context.Background(), - dn.name, - types.StrategicMergePatchType, - patchBytes, - metav1.PatchOptions{}) - if err != nil { - glog.Infof("annotateNode(): Failed to patch node %s: %v", node, err) - return err - } - } - return nil -} +//func (dn *Daemon) annotateNode(node, value string) error { +// glog.Infof("annotateNode(): Annotate node %s with: %s", node, value) +// +// oldNode, err := dn.kubeClient.CoreV1().Nodes().Get(context.Background(), dn.name, metav1.GetOptions{}) +// if err != nil { +// glog.Infof("annotateNode(): Failed to get node %s %v, retrying", node, err) +// return err +// } +// oldData, err := json.Marshal(oldNode) +// if err != nil { +// return err +// } +// +// newNode := oldNode.DeepCopy() +// if newNode.Annotations == nil { +// newNode.Annotations = map[string]string{} +// } +// if newNode.Annotations[consts.NodeDrainAnnotation] != value { +// newNode.Annotations[consts.NodeDrainAnnotation] = value +// newData, err := json.Marshal(newNode) +// if err != nil { +// return err +// } +// patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) +// if err != nil { +// return err +// } +// _, err = dn.kubeClient.CoreV1().Nodes().Patch(context.Background(), +// dn.name, +// types.StrategicMergePatchType, +// patchBytes, +// metav1.PatchOptions{}) +// if err != nil { +// glog.Infof("annotateNode(): Failed to patch node %s: %v", node, err) +// return err +// } +// } +// return nil +//} func (dn *Daemon) getNodeMachinePool() error { desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] @@ -840,7 +839,7 @@ func (dn *Daemon) getNodeMachinePool() error { func (dn *Daemon) applyDrainRequired() error { glog.V(2).Info("applyDrainRequired(): no other node is draining") - err := dn.annotateNode(dn.name, consts.AnnoDrainRequired) + err := utils.AnnotateNode(dn.name, consts.AnnoDrainRequired, dn.kubeClient) if err != nil { glog.Errorf("applyDrainRequired(): Failed to annotate node: %v", err) return err @@ -892,7 +891,7 @@ func (dn *Daemon) pauseMCP() error { glog.V(2).Infof("pauseMCP(): Failed to pause MCP %s: %v", dn.mcpName, err) return } - err = dn.annotateNode(dn.name, consts.AnnoMcpPaused) + err = utils.AnnotateNode(dn.name, consts.AnnoMcpPaused, dn.kubeClient) if err != nil { glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err) return @@ -908,7 +907,7 @@ func (dn *Daemon) pauseMCP() error { glog.V(2).Infof("pauseMCP(): fail to resume MCP %s: %v", dn.mcpName, err) return } - err = dn.annotateNode(dn.name, consts.AnnoDraining) + err = utils.AnnotateNode(dn.name, consts.AnnoDraining, dn.kubeClient) if err != nil { glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err) return @@ -936,42 +935,42 @@ func (dn *Daemon) pauseMCP() error { return err } -func (dn *Daemon) drainNode() error { - glog.Info("drainNode(): Update prepared") - var err error - - backoff := wait.Backoff{ - Steps: 5, - Duration: 10 * time.Second, - Factor: 2, - } - var lastErr error - - glog.Info("drainNode(): Start draining") - if err = wait.ExponentialBackoff(backoff, func() (bool, error) { - err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true) - if err != nil { - lastErr = err - glog.Infof("Cordon failed with: %v, retrying", err) - return false, nil - } - err = drain.RunNodeDrain(dn.drainer, dn.name) - if err == nil { - return true, nil - } - lastErr = err - glog.Infof("Draining failed with: %v, retrying", err) - return false, nil - }); err != nil { - if err == wait.ErrWaitTimeout { - glog.Errorf("drainNode(): failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr) - } - glog.Errorf("drainNode(): failed to drain node: %v", err) - return err - } - glog.Info("drainNode(): drain complete") - return nil -} +//func (dn *Daemon) drainNode() error { +// glog.Info("drainNode(): Update prepared") +// var err error +// +// backoff := wait.Backoff{ +// Steps: 5, +// Duration: 10 * time.Second, +// Factor: 2, +// } +// var lastErr error +// +// glog.Info("drainNode(): Start draining") +// if err = wait.ExponentialBackoff(backoff, func() (bool, error) { +// err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true) +// if err != nil { +// lastErr = err +// glog.Infof("Cordon failed with: %v, retrying", err) +// return false, nil +// } +// err = drain.RunNodeDrain(dn.drainer, dn.name) +// if err == nil { +// return true, nil +// } +// lastErr = err +// glog.Infof("Draining failed with: %v, retrying", err) +// return false, nil +// }); err != nil { +// if err == wait.ErrWaitTimeout { +// glog.Errorf("drainNode(): failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr) +// } +// glog.Errorf("drainNode(): failed to drain node: %v", err) +// return err +// } +// glog.Info("drainNode(): drain complete") +// return nil +//} func tryCreateSwitchdevUdevRule(nodeState *sriovnetworkv1.SriovNetworkNodeState) error { glog.V(2).Infof("tryCreateSwitchdevUdevRule()") diff --git a/pkg/utils/cluster.go b/pkg/utils/cluster.go index 7e346de06e..b5f45720a5 100644 --- a/pkg/utils/cluster.go +++ b/pkg/utils/cluster.go @@ -2,9 +2,15 @@ package utils import ( "context" + "encoding/json" "fmt" "os" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/kubernetes" + "github.com/golang/glog" configv1 "github.com/openshift/api/config/v1" @@ -119,3 +125,43 @@ func NodeHasAnnotation(node corev1.Node, annoKey string, value string) bool { } return false } + +func AnnotateNode(node, value string, kubeClient kubernetes.Interface) error { + glog.Infof("annotateNode(): Annotate node %s with: %s", node, value) + + oldNode, err := kubeClient.CoreV1().Nodes().Get(context.Background(), node, metav1.GetOptions{}) + if err != nil { + glog.Infof("annotateNode(): Failed to get node %s %v, retrying", node, err) + return err + } + oldData, err := json.Marshal(oldNode) + if err != nil { + return err + } + + newNode := oldNode.DeepCopy() + if newNode.Annotations == nil { + newNode.Annotations = map[string]string{} + } + if newNode.Annotations[consts.NodeDrainAnnotation] != value { + newNode.Annotations[consts.NodeDrainAnnotation] = value + newData, err := json.Marshal(newNode) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) + if err != nil { + return err + } + _, err = kubeClient.CoreV1().Nodes().Patch(context.Background(), + node, + types.StrategicMergePatchType, + patchBytes, + metav1.PatchOptions{}) + if err != nil { + glog.Infof("annotateNode(): Failed to patch node %s: %v", node, err) + return err + } + } + return nil +}