From 316b6ad5331ca81591b7b1f9e238fa3a38bd5f90 Mon Sep 17 00:00:00 2001 From: Ivan Kolodiazhnyi Date: Mon, 4 Sep 2023 18:20:47 +0300 Subject: [PATCH 1/5] Implement Drain controller --- controllers/drain_controller.go | 185 +++++++++++++++++++++++++++ controllers/drain_controller_test.go | 64 +++++++++ controllers/suite_test.go | 25 ++++ main.go | 20 +++ pkg/consts/constants.go | 7 + pkg/utils/cluster.go | 13 ++ 6 files changed, 314 insertions(+) create mode 100644 controllers/drain_controller.go create mode 100644 controllers/drain_controller_test.go diff --git a/controllers/drain_controller.go b/controllers/drain_controller.go new file mode 100644 index 000000000..95b7d69aa --- /dev/null +++ b/controllers/drain_controller.go @@ -0,0 +1,185 @@ +/* +Copyright 2021. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubectl/pkg/drain" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" +) + +// TODO(e0ne): remove this constant once we'll support parallel multiple nodes configuration in a parallel +const ( + maxParallelNodeConfiguration = 1 +) + +type DrainReconciler struct { + client.Client + Scheme *runtime.Scheme + Drainer *drain.Helper +} + +//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch +//+kubebuilder:rbac:groups=sriovnetwork.openshift.io,resources=sriovoperatorconfigs,verbs=get;list;watch + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile +func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + req.Namespace = namespace + reqLogger := log.FromContext(ctx).WithValues("drain", req.NamespacedName) + reqLogger.Info("Reconciling Drain") + + nodeList := &corev1.NodeList{} + err := dr.List(ctx, nodeList) + if err != nil { + // Failed to get node list + reqLogger.Error(err, "Error occurred on LIST nodes request from API server") + return reconcile.Result{}, err + } + + // sort nodeList to iterate in the same order each reconcile loop + sort.Slice(nodeList.Items, func(i, j int) bool { + return strings.Compare(nodeList.Items[i].Name, nodeList.Items[j].Name) == -1 + }) + + reqLogger.Info("Max node allowed to be draining at the same time", "MaxParallelNodeConfiguration", maxParallelNodeConfiguration) + + drainingNodes := 0 + for _, node := range nodeList.Items { + if utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, constants.Draining) || utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, constants.DrainMcpPaused) { + dr.drainNode(ctx, &node) + drainingNodes++ + } + } + + reqLogger.Info("Count of draining", "drainingNodes", drainingNodes) + if drainingNodes >= maxParallelNodeConfiguration { + reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes") + return reconcile.Result{}, nil + } + + for _, node := range nodeList.Items { + if !utils.NodeHasAnnotation(node, constants.NodeDrainAnnotation, constants.DrainRequired) { + continue + } + if drainingNodes < maxParallelNodeConfiguration { + reqLogger.Info("Start draining node", "node", node.Name) + patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, constants.NodeDrainAnnotation, constants.Draining)) + err = dr.Client.Patch(context.TODO(), &node, client.RawPatch(types.StrategicMergePatchType, patch)) + if err != nil { + reqLogger.Error(err, "Failed to patch node annotations") + return reconcile.Result{}, err + } + drainingNodes++ + } else { + reqLogger.Info("Too many nodes to be draining at the moment. Skipping node %s", node.Name) + return reconcile.Result{}, nil + } + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (dr *DrainReconciler) SetupWithManager(mgr ctrl.Manager) error { + // we always add object with a same(static) key to the queue to reduce + // reconciliation count + qHandler := func(q workqueue.RateLimitingInterface) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: "drain-upgrade-reconcile-name", + }}) + } + + createUpdateEnqueue := handler.Funcs{ + CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + qHandler(q) + }, + UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { + qHandler(q) + }, + } + + // Watch for spec and annotation changes + nodePredicates := builder.WithPredicates(predicate.AnnotationChangedPredicate{}) + + return ctrl.NewControllerManagedBy(mgr). + For(&sriovnetworkv1.SriovOperatorConfig{}). + Watches(&corev1.Node{}, createUpdateEnqueue, nodePredicates). + Complete(dr) +} + +func (dr *DrainReconciler) drainNode(ctx context.Context, node *corev1.Node) error { + reqLogger := log.FromContext(ctx).WithValues("drain node", node.Name) + reqLogger.Info("drainNode(): Node drain requested", "node", node.Name) + var err error + + backoff := wait.Backoff{ + Steps: 5, + Duration: 10 * time.Second, + Factor: 2, + } + var lastErr error + + reqLogger.Info("drainNode(): Start draining") + if err = wait.ExponentialBackoff(backoff, func() (bool, error) { + err := drain.RunCordonOrUncordon(dr.Drainer, node, true) + if err != nil { + lastErr = err + reqLogger.Info("drainNode(): Cordon failed, retrying", "error", err) + return false, nil + } + err = drain.RunNodeDrain(dr.Drainer, node.Name) + if err == nil { + return true, nil + } + lastErr = err + reqLogger.Info("drainNode(): Draining failed, retrying", "error", err) + return false, nil + }); err != nil { + if err == wait.ErrWaitTimeout { + reqLogger.Info("drainNode(): failed to drain node", "steps", backoff.Steps, "error", lastErr) + } + reqLogger.Info("drainNode(): failed to drain node", "error", err) + return err + } + reqLogger.Info("drainNode(): drain complete") + return nil +} diff --git a/controllers/drain_controller_test.go b/controllers/drain_controller_test.go new file mode 100644 index 000000000..9e72d5eee --- /dev/null +++ b/controllers/drain_controller_test.go @@ -0,0 +1,64 @@ +package controllers + +import ( + "time" + + goctx "context" + + v1 "k8s.io/api/core/v1" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" +) + +func createNodeObj(name, anno string) *v1.Node { + node := &v1.Node{} + node.Name = name + node.Annotations = map[string]string{} + node.Annotations[consts.NodeDrainAnnotation] = anno + + return node +} + +func createNode(node *v1.Node) { + Expect(k8sClient.Create(goctx.TODO(), node)).Should(Succeed()) +} + +var _ = Describe("Drain Controller", func() { + + BeforeEach(func() { + node1 := createNodeObj("node1", "Drain_Required") + node2 := createNodeObj("node2", "Drain_Required") + createNode(node1) + createNode(node2) + }) + AfterEach(func() { + node1 := createNodeObj("node1", "Drain_Required") + node2 := createNodeObj("node2", "Drain_Required") + err := k8sClient.Delete(goctx.TODO(), node1) + Expect(err).NotTo(HaveOccurred()) + err = k8sClient.Delete(goctx.TODO(), node2) + Expect(err).NotTo(HaveOccurred()) + }) + + Context("Parallel nodes draining", func() { + + It("Should drain one node", func() { + nodeList := &v1.NodeList{} + listErr := k8sClient.List(ctx, nodeList) + Expect(listErr).NotTo(HaveOccurred()) + time.Sleep(5 * time.Second) + + drainingNodes := 0 + for _, node := range nodeList.Items { + if utils.NodeHasAnnotation(node, "sriovnetwork.openshift.io/state", "Draining") { + drainingNodes++ + } + } + Expect(drainingNodes).To(Equal(1)) + }) + }) +}) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index c3eda1a90..da381bebf 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -18,11 +18,16 @@ package controllers import ( "context" + "fmt" "os" "path/filepath" "testing" "time" + "github.com/golang/glog" + "k8s.io/client-go/kubernetes" + "k8s.io/kubectl/pkg/drain" + netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -143,6 +148,26 @@ var _ = BeforeSuite(func(done Done) { }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) + kubeclient := kubernetes.NewForConfigOrDie(k8sManager.GetConfig()) + err = (&DrainReconciler{ + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + Drainer: &drain.Helper{ + Client: kubeclient, + Force: true, + IgnoreAllDaemonSets: true, + DeleteEmptyDirData: true, + GracePeriodSeconds: -1, + Timeout: 90 * time.Second, + OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) { + verbStr := "Deleted" + glog.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name)) + }, + Ctx: context.Background(), + }, + }).SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + os.Setenv("RESOURCE_PREFIX", "openshift.io") os.Setenv("NAMESPACE", "openshift-sriov-network-operator") os.Setenv("ENABLE_ADMISSION_CONTROLLER", "true") diff --git a/main.go b/main.go index a2e26da01..5c0682f7b 100644 --- a/main.go +++ b/main.go @@ -21,11 +21,13 @@ import ( "flag" "fmt" "os" + "time" netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" openshiftconfigv1 "github.com/openshift/api/config/v1" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/kubectl/pkg/drain" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -134,6 +136,8 @@ func main() { os.Exit(1) } + kubeclient := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie()) + if err = (&controllers.SriovNetworkReconciler{ Client: mgrGlobal.GetClient(), Scheme: mgrGlobal.GetScheme(), @@ -171,6 +175,22 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "SriovNetworkPoolConfig") os.Exit(1) } + if err = (&controllers.DrainReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Drainer: &drain.Helper{ + Client: kubeclient, + Force: true, + IgnoreAllDaemonSets: true, + DeleteEmptyDirData: true, + GracePeriodSeconds: -1, + Timeout: 90 * time.Second, + Ctx: context.Background(), + }, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "DrainReconciler") + os.Exit(1) + } // +kubebuilder:scaffold:builder // Create a default SriovNetworkNodePolicy diff --git a/pkg/consts/constants.go b/pkg/consts/constants.go index 1f37edfa4..fb611159e 100644 --- a/pkg/consts/constants.go +++ b/pkg/consts/constants.go @@ -23,6 +23,13 @@ const ( DPConfigFileName = "config.json" OVSHWOLMachineConfigNameSuffix = "ovs-hw-offload" + NodeDrainAnnotation = "sriovnetwork.openshift.io/state" + DrainIdle = "Idle" + DrainRequired = "Drain_Required" + DrainMcpPaused = "Draining_MCP_Paused" + Draining = "Draining" + DrainComplete = "DrainComplete" + LinkTypeEthernet = "ether" LinkTypeInfiniband = "infiniband" diff --git a/pkg/utils/cluster.go b/pkg/utils/cluster.go index 610bab415..0fd2b9d7a 100644 --- a/pkg/utils/cluster.go +++ b/pkg/utils/cluster.go @@ -9,8 +9,13 @@ import ( configv1 "github.com/openshift/api/config/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" ) const ( @@ -108,3 +113,11 @@ func openshiftControlPlaneTopologyStatus(c client.Client) (configv1.TopologyMode } return infra.Status.ControlPlaneTopology, nil } + +func NodeHasAnnotation(node corev1.Node, annoKey string, value string) bool { + // Check if node already contains annotation + if anno, ok := node.Annotations[annoKey]; ok && (anno == value) { + return true + } + return false +} From b3306834f52174ae86b6f4fa3f3f9d041c34fcbf Mon Sep 17 00:00:00 2001 From: Ivan Kolodiazhnyi Date: Tue, 21 Mar 2023 11:39:31 +0200 Subject: [PATCH 2/5] Use new logic for drain in config daemon --- pkg/daemon/daemon.go | 82 +++++++++++++------------------------------- 1 file changed, 23 insertions(+), 59 deletions(-) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 3ec571924..62ec17090 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -31,8 +31,6 @@ import ( "k8s.io/client-go/kubernetes" listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/util/workqueue" "k8s.io/kubectl/pkg/drain" "sigs.k8s.io/controller-runtime/pkg/log" @@ -100,6 +98,7 @@ type Daemon struct { node *corev1.Node + // TODO(e0ne): remove it drainable bool disableDrain bool @@ -121,8 +120,9 @@ const ( udevScriptsPath = "/bindata/scripts/load-udev.sh" annoKey = "sriovnetwork.openshift.io/state" annoIdle = "Idle" - annoDraining = "Draining" + annoDrainRequired = "Drain_Required" annoMcpPaused = "Draining_MCP_Paused" + annoDraining = "Draining" syncStatusSucceeded = "Succeeded" syncStatusFailed = "Failed" syncStatusInProgress = "InProgress" @@ -626,16 +626,20 @@ func (dn *Daemon) nodeStateSyncHandler() error { return err } } + + if dn.nodeHasAnnotation(annoKey, annoDrainRequired) { + log.Log.Info("nodeStateSyncHandler(): waiting for drain") + return nil + } + if reqDrain { if !dn.isNodeDraining() { - if !dn.disableDrain { - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - log.Log.Info("nodeStateSyncHandler(): get drain lock for sriov daemon") - done := make(chan bool) - go dn.getDrainLock(ctx, done) - <-done + if !dn.disableDrain && !dn.openshiftContext.IsOpenshiftCluster() { + log.Log.Info("nodeStateSyncHandler(): apply 'Drain_Required' label for node") + if err := dn.applyDrainRequired(); err != nil { + return err + } + return nil } } @@ -725,6 +729,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { } func (dn *Daemon) nodeHasAnnotation(annoKey string, value string) bool { + // TODO(e0ne): re-use cluster.NodeHasAnnotation function // Check if node already contains annotation if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == value) { return true @@ -908,55 +913,14 @@ func (dn *Daemon) getNodeMachinePool() error { return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node") } -func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) { - var err error - - lock := &resourcelock.LeaseLock{ - LeaseMeta: metav1.ObjectMeta{ - Name: "config-daemon-draining-lock", - Namespace: namespace, - }, - Client: dn.kubeClient.CoordinationV1(), - LockConfig: resourcelock.ResourceLockConfig{ - Identity: dn.name, - }, +func (dn *Daemon) applyDrainRequired() error { + log.Log.Info("applyDrainRequired(): no other node is draining") + err := dn.annotateNode(dn.name, annoDrainRequired) + if err != nil { + log.Log.Error(err, "applyDrainRequired(): Failed to annotate node") + return err } - - // start the leader election - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - ReleaseOnCancel: true, - LeaseDuration: 5 * time.Second, - RenewDeadline: 3 * time.Second, - RetryPeriod: 1 * time.Second, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - log.Log.V(2).Info("getDrainLock(): started leading") - for { - time.Sleep(3 * time.Second) - if dn.node.Annotations[annoKey] == annoMcpPaused { - // The node in Draining_MCP_Paused state, no other node is draining. Skip drainable checking - done <- true - return - } - if dn.drainable { - log.Log.V(2).Info("getDrainLock(): no other node is draining") - err = dn.annotateNode(dn.name, annoDraining) - if err != nil { - log.Log.Error(err, "getDrainLock(): failed to annotate node") - continue - } - done <- true - return - } - log.Log.V(2).Info("getDrainLock(): other node is draining, wait...") - } - }, - OnStoppedLeading: func() { - log.Log.V(2).Info("getDrainLock(): stopped leading") - }, - }, - }) + return nil } func (dn *Daemon) pauseMCP() error { From f3dc9ed5ef2dc40d9639680c10f144ae7ac30e02 Mon Sep 17 00:00:00 2001 From: Ivan Kolodiazhnyi Date: Mon, 10 Apr 2023 17:31:40 +0300 Subject: [PATCH 3/5] Clean up duplicated and obsolete logic --- pkg/daemon/daemon.go | 72 +++++++++----------------------------------- 1 file changed, 14 insertions(+), 58 deletions(-) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 62ec17090..918b5bd59 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -14,15 +14,14 @@ import ( "sync" "time" + "github.com/golang/glog" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" daemonconsts "github.com/openshift/machine-config-operator/pkg/daemon/constants" mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions" - "go.uber.org/zap" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -38,6 +37,7 @@ import ( sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions" + consts "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/host" snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" plugin "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/plugins" @@ -98,9 +98,6 @@ type Daemon struct { node *corev1.Node - // TODO(e0ne): remove it - drainable bool - disableDrain bool nodeLister listerv1.NodeLister @@ -118,11 +115,6 @@ type Daemon struct { const ( udevScriptsPath = "/bindata/scripts/load-udev.sh" - annoKey = "sriovnetwork.openshift.io/state" - annoIdle = "Idle" - annoDrainRequired = "Drain_Required" - annoMcpPaused = "Draining_MCP_Paused" - annoDraining = "Draining" syncStatusSucceeded = "Succeeded" syncStatusFailed = "Failed" syncStatusInProgress = "InProgress" @@ -391,33 +383,6 @@ func (dn *Daemon) nodeUpdateHandler(old, new interface{}) { return } dn.node = node.DeepCopy() - - nodes, err := dn.nodeLister.List(labels.Everything()) - if err != nil { - log.Log.Error(err, "nodeUpdateHandler(): failed to list nodes") - return - } - - // Checking if other nodes are draining - for _, otherNode := range nodes { - if otherNode.GetName() == dn.name { - continue - } - - drainingAnnotationValue := otherNode.Annotations[annoKey] - if drainingAnnotationValue == annoDraining || drainingAnnotationValue == annoMcpPaused { - log.Log.V(2).Info("nodeUpdateHandler(): node is not drainable, another node is draining", - "other-node", otherNode.Name, "annotation", annoKey+"="+drainingAnnotationValue) - dn.drainable = false - return - } - } - - if !dn.drainable { - log.Log.V(2).Info("nodeUpdateHandler(): node is now drainable") - } - - dn.drainable = true } func (dn *Daemon) operatorConfigAddHandler(obj interface{}) { @@ -627,8 +592,8 @@ func (dn *Daemon) nodeStateSyncHandler() error { } } - if dn.nodeHasAnnotation(annoKey, annoDrainRequired) { - log.Log.Info("nodeStateSyncHandler(): waiting for drain") + if utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.DrainRequired) { + log.Log.Info("nodeStateSyncHandler(): waiting for drain")) return nil } @@ -703,7 +668,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { return err } } else { - if !dn.nodeHasAnnotation(annoKey, annoIdle) { + if !utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.DrainIdle) { if err := dn.annotateNode(dn.name, annoIdle); err != nil { log.Log.Error(err, "nodeStateSyncHandler(): failed to annotate node") return err @@ -728,24 +693,15 @@ func (dn *Daemon) nodeStateSyncHandler() error { return nil } -func (dn *Daemon) nodeHasAnnotation(annoKey string, value string) bool { - // TODO(e0ne): re-use cluster.NodeHasAnnotation function - // Check if node already contains annotation - if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == value) { - return true - } - return false -} - // isNodeDraining: check if the node is draining // both Draining and MCP paused labels will return true func (dn *Daemon) isNodeDraining() bool { - anno, ok := dn.node.Annotations[annoKey] + anno, ok := dn.node.Annotations[consts.NodeDrainAnnotation] if !ok { return false } - return anno == annoDraining || anno == annoMcpPaused + return anno == consts.Draining || anno == consts.DrainMcpPaused } func (dn *Daemon) completeDrain() error { @@ -764,7 +720,7 @@ func (dn *Daemon) completeDrain() error { } } - if err := dn.annotateNode(dn.name, annoIdle); err != nil { + if err := dn.annotateNode(dn.name, consts.DrainIdle); err != nil { log.Log.Error(err, "completeDrain(): failed to annotate node") return err } @@ -868,8 +824,8 @@ func (dn *Daemon) annotateNode(node, value string) error { if newNode.Annotations == nil { newNode.Annotations = map[string]string{} } - if newNode.Annotations[annoKey] != value { - newNode.Annotations[annoKey] = value + if newNode.Annotations[consts.NodeDrainAnnotation] != value { + newNode.Annotations[consts.NodeDrainAnnotation] = value newData, err := json.Marshal(newNode) if err != nil { return err @@ -915,7 +871,7 @@ func (dn *Daemon) getNodeMachinePool() error { func (dn *Daemon) applyDrainRequired() error { log.Log.Info("applyDrainRequired(): no other node is draining") - err := dn.annotateNode(dn.name, annoDrainRequired) + err := dn.annotateNode(dn.name, consts.DrainRequired) if err != nil { log.Log.Error(err, "applyDrainRequired(): Failed to annotate node") return err @@ -934,7 +890,7 @@ func (dn *Daemon) pauseMCP() error { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - paused := dn.node.Annotations[annoKey] == annoMcpPaused + paused := dn.node.Annotations[consts.NodeDrainAnnotation] == consts.DrainMcpPaused mcpEventHandler := func(obj interface{}) { mcp := obj.(*mcfgv1.MachineConfigPool) @@ -967,7 +923,7 @@ func (dn *Daemon) pauseMCP() error { log.Log.V(2).Error(err, "pauseMCP(): failed to pause MCP", "mcp-name", dn.mcpName) return } - err = dn.annotateNode(dn.name, annoMcpPaused) + err = dn.annotateNode(dn.name, consts.DrainMcpPaused) if err != nil { log.Log.V(2).Error(err, "pauseMCP(): Failed to annotate node") return @@ -983,7 +939,7 @@ func (dn *Daemon) pauseMCP() error { log.Log.V(2).Error(err, "pauseMCP(): fail to resume MCP", "mcp-name", dn.mcpName) return } - err = dn.annotateNode(dn.name, annoDraining) + err = dn.annotateNode(dn.name, consts.Draining) if err != nil { log.Log.V(2).Error(err, "pauseMCP(): Failed to annotate node") return From 902596a7411f8633cbd582d847c7774243236f39 Mon Sep 17 00:00:00 2001 From: Ivan Kolodiazhnyi Date: Mon, 3 Jul 2023 18:49:57 +0300 Subject: [PATCH 4/5] Implement DrainAnnotationPredicate to watch only for a needed annotation --- controllers/drain_controller.go | 5 ++-- controllers/helper.go | 45 +++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/controllers/drain_controller.go b/controllers/drain_controller.go index 95b7d69aa..13046dfd3 100644 --- a/controllers/drain_controller.go +++ b/controllers/drain_controller.go @@ -35,7 +35,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" @@ -109,7 +108,7 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } drainingNodes++ } else { - reqLogger.Info("Too many nodes to be draining at the moment. Skipping node %s", node.Name) + reqLogger.Info("Too many nodes to be draining at the moment. Skipping node %s", "node", node.Name) return reconcile.Result{}, nil } } @@ -138,7 +137,7 @@ func (dr *DrainReconciler) SetupWithManager(mgr ctrl.Manager) error { } // Watch for spec and annotation changes - nodePredicates := builder.WithPredicates(predicate.AnnotationChangedPredicate{}) + nodePredicates := builder.WithPredicates(DrainAnnotationPredicate{}) return ctrl.NewControllerManagedBy(mgr). For(&sriovnetworkv1.SriovOperatorConfig{}). diff --git a/controllers/helper.go b/controllers/helper.go index 1da3e626f..970c7b2c8 100644 --- a/controllers/helper.go +++ b/controllers/helper.go @@ -18,10 +18,15 @@ package controllers import ( "bytes" + "context" "encoding/json" "os" "strings" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" ) @@ -40,6 +45,46 @@ const ( var namespace = os.Getenv("NAMESPACE") +type DrainAnnotationPredicate struct { + predicate.Funcs +} + +func (DrainAnnotationPredicate) Create(e event.CreateEvent) bool { + logger := log.FromContext(context.TODO()) + if e.Object == nil { + logger.Info("Create event: node has no drain annotation", "node", e.Object.GetName()) + return false + } + + if _, hasAnno := e.Object.GetAnnotations()[constants.NodeDrainAnnotation]; hasAnno { + logger.Info("Create event: node has no drain annotation", "node", e.Object.GetName()) + return true + } + return false +} + +func (DrainAnnotationPredicate) Update(e event.UpdateEvent) bool { + logger := log.FromContext(context.TODO()) + if e.ObjectOld == nil { + logger.Info("Update event has no old object to update", "node", e.ObjectOld.GetName()) + return false + } + if e.ObjectNew == nil { + logger.Info("Update event has no new object for update", "node", e.ObjectNew.GetName()) + return false + } + + oldAnno, hasOldAnno := e.ObjectOld.GetAnnotations()[constants.NodeDrainAnnotation] + newAnno, hasNewAnno := e.ObjectNew.GetAnnotations()[constants.NodeDrainAnnotation] + + if !hasOldAnno || !hasNewAnno { + logger.Info("Update event: can not compare annotations", "node", e.ObjectNew.GetName()) + return false + } + + return oldAnno != newAnno +} + func GetImagePullSecrets() []string { imagePullSecrets := os.Getenv("IMAGE_PULL_SECRETS") if imagePullSecrets != "" { From 651df2e05b19e4293f8d361c0ef5046a2b38208e Mon Sep 17 00:00:00 2001 From: Ivan Kolodiazhnyi Date: Mon, 4 Sep 2023 18:39:56 +0300 Subject: [PATCH 5/5] Remove node drain call from config daemon --- main.go | 2 +- pkg/daemon/daemon.go | 101 ++++--------------------------------------- pkg/utils/cluster.go | 42 ++++++++++++++++++ 3 files changed, 52 insertions(+), 93 deletions(-) diff --git a/main.go b/main.go index 5c0682f7b..729d57aa4 100644 --- a/main.go +++ b/main.go @@ -185,7 +185,7 @@ func main() { DeleteEmptyDirData: true, GracePeriodSeconds: -1, Timeout: 90 * time.Second, - Ctx: context.Background(), + Ctx: context.Background(), }, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DrainReconciler") diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 918b5bd59..6ff2aaf90 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -3,7 +3,6 @@ package daemon import ( "bytes" "context" - "encoding/json" "fmt" "math/rand" "os" @@ -14,17 +13,16 @@ import ( "sync" "time" - "github.com/golang/glog" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" daemonconsts "github.com/openshift/machine-config-operator/pkg/daemon/constants" mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions" + "go.uber.org/zap" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -593,7 +591,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { } if utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.DrainRequired) { - log.Log.Info("nodeStateSyncHandler(): waiting for drain")) + log.Log.Info("nodeStateSyncHandler(): waiting for drain") return nil } @@ -619,7 +617,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { log.Log.Info("nodeStateSyncHandler(): disable drain is true skipping drain") } else { log.Log.Info("nodeStateSyncHandler(): drain node") - if err := dn.drainNode(); err != nil { + if err := utils.AnnotateNode(dn.name, consts.DrainRequired, dn.kubeClient); err != nil { return err } } @@ -669,7 +667,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { } } else { if !utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.DrainIdle) { - if err := dn.annotateNode(dn.name, annoIdle); err != nil { + if err := utils.AnnotateNode(dn.name, consts.DrainIdle, dn.kubeClient); err != nil { log.Log.Error(err, "nodeStateSyncHandler(): failed to annotate node") return err } @@ -712,7 +710,7 @@ func (dn *Daemon) completeDrain() error { } if dn.openshiftContext.IsOpenshiftCluster() && !dn.openshiftContext.IsHypershift() { - log.Log.Info("completeDrain(): resume MCP", "mcp-name", dn.mcpName) + log.Log.V(2).Info("completeDrain(): resume MCP", "mcp-name", dn.mcpName) pausePatch := []byte("{\"spec\":{\"paused\":false}}") if _, err := dn.openshiftContext.McClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}); err != nil { log.Log.Error(err, "completeDrain(): failed to resume MCP", "mcp-name", dn.mcpName) @@ -720,7 +718,7 @@ func (dn *Daemon) completeDrain() error { } } - if err := dn.annotateNode(dn.name, consts.DrainIdle); err != nil { + if err := utils.AnnotateNode(dn.name, consts.DrainIdle, dn.kubeClient); err != nil { log.Log.Error(err, "completeDrain(): failed to annotate node") return err } @@ -806,47 +804,6 @@ func rebootNode() { } } -func (dn *Daemon) annotateNode(node, value string) error { - log.Log.Info("annotateNode(): Annotate node", "name", node, "value", value) - - oldNode, err := dn.kubeClient.CoreV1().Nodes().Get(context.Background(), dn.name, metav1.GetOptions{}) - if err != nil { - log.Log.Error(err, "annotateNode(): Failed to get node, retrying", "name", node) - return err - } - - oldData, err := json.Marshal(oldNode) - if err != nil { - return err - } - - newNode := oldNode.DeepCopy() - if newNode.Annotations == nil { - newNode.Annotations = map[string]string{} - } - if newNode.Annotations[consts.NodeDrainAnnotation] != value { - newNode.Annotations[consts.NodeDrainAnnotation] = value - newData, err := json.Marshal(newNode) - if err != nil { - return err - } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) - if err != nil { - return err - } - _, err = dn.kubeClient.CoreV1().Nodes().Patch(context.Background(), - dn.name, - types.StrategicMergePatchType, - patchBytes, - metav1.PatchOptions{}) - if err != nil { - log.Log.Error(err, "annotateNode(): Failed to patch node", "name", node) - return err - } - } - return nil -} - func (dn *Daemon) getNodeMachinePool() error { desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] if !ok { @@ -871,7 +828,7 @@ func (dn *Daemon) getNodeMachinePool() error { func (dn *Daemon) applyDrainRequired() error { log.Log.Info("applyDrainRequired(): no other node is draining") - err := dn.annotateNode(dn.name, consts.DrainRequired) + err := utils.AnnotateNode(dn.name, consts.DrainRequired, dn.kubeClient) if err != nil { log.Log.Error(err, "applyDrainRequired(): Failed to annotate node") return err @@ -923,7 +880,7 @@ func (dn *Daemon) pauseMCP() error { log.Log.V(2).Error(err, "pauseMCP(): failed to pause MCP", "mcp-name", dn.mcpName) return } - err = dn.annotateNode(dn.name, consts.DrainMcpPaused) + err = utils.AnnotateNode(dn.name, consts.DrainMcpPaused, dn.kubeClient) if err != nil { log.Log.V(2).Error(err, "pauseMCP(): Failed to annotate node") return @@ -939,7 +896,7 @@ func (dn *Daemon) pauseMCP() error { log.Log.V(2).Error(err, "pauseMCP(): fail to resume MCP", "mcp-name", dn.mcpName) return } - err = dn.annotateNode(dn.name, consts.Draining) + err = utils.AnnotateNode(dn.name, consts.Draining, dn.kubeClient) if err != nil { log.Log.V(2).Error(err, "pauseMCP(): Failed to annotate node") return @@ -968,46 +925,6 @@ func (dn *Daemon) pauseMCP() error { return err } -func (dn *Daemon) drainNode() error { - log.Log.Info("drainNode(): Update prepared") - var err error - - backoff := wait.Backoff{ - Steps: 5, - Duration: 10 * time.Second, - Factor: 2, - } - var lastErr error - - log.Log.Info("drainNode(): Start draining") - dn.eventRecorder.SendEvent("DrainNode", "Drain node has been initiated") - if err = wait.ExponentialBackoff(backoff, func() (bool, error) { - err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true) - if err != nil { - lastErr = err - log.Log.Error(err, "cordon failed, retrying") - return false, nil - } - err = drain.RunNodeDrain(dn.drainer, dn.name) - if err == nil { - return true, nil - } - lastErr = err - log.Log.Error(err, "Draining failed, retrying") - return false, nil - }); err != nil { - if err == wait.ErrWaitTimeout { - log.Log.Error(err, "drainNode(): failed to drain node", "tries", backoff.Steps, "last-error", lastErr) - } - dn.eventRecorder.SendEvent("DrainNode", "Drain node failed") - log.Log.Error(err, "drainNode(): failed to drain node") - return err - } - dn.eventRecorder.SendEvent("DrainNode", "Drain node completed") - log.Log.Info("drainNode(): drain complete") - return nil -} - func (dn *Daemon) tryCreateSwitchdevUdevRule() error { log.Log.V(2).Info("tryCreateSwitchdevUdevRule()") nodeState, nodeStateErr := dn.client.SriovnetworkV1().SriovNetworkNodeStates(namespace).Get( diff --git a/pkg/utils/cluster.go b/pkg/utils/cluster.go index 0fd2b9d7a..a1e5a2119 100644 --- a/pkg/utils/cluster.go +++ b/pkg/utils/cluster.go @@ -2,6 +2,7 @@ package utils import ( "context" + "encoding/json" "fmt" "os" @@ -121,3 +122,44 @@ func NodeHasAnnotation(node corev1.Node, annoKey string, value string) bool { } return false } + +func AnnotateNode(node, value string, kubeClient kubernetes.Interface) error { + log.Log.V(2).Info("annotateNode(): Annotate node", "node", node, "annotation", value) + oldNode, err := kubeClient.CoreV1().Nodes().Get(context.Background(), node, metav1.GetOptions{}) + if err != nil { + log.Log.Error(err, "annotateNode(): Failed to get node, retrying", "node", node) + return err + } + + oldData, err := json.Marshal(oldNode) + if err != nil { + return err + } + + newNode := oldNode.DeepCopy() + if newNode.Annotations == nil { + newNode.Annotations = map[string]string{} + } + + if newNode.Annotations[consts.NodeDrainAnnotation] != value { + newNode.Annotations[consts.NodeDrainAnnotation] = value + newData, err := json.Marshal(newNode) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) + if err != nil { + return err + } + _, err = kubeClient.CoreV1().Nodes().Patch(context.Background(), + node, + types.StrategicMergePatchType, + patchBytes, + metav1.PatchOptions{}) + if err != nil { + log.Log.Error(err, "annotateNode(): Failed to patch node", "node", node) + return err + } + } + return nil +}