Skip to content

Commit

Permalink
Remove node drain call from config daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
e0ne committed Nov 6, 2023
1 parent 902596a commit 651df2e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 93 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func main() {
DeleteEmptyDirData: true,
GracePeriodSeconds: -1,
Timeout: 90 * time.Second,
Ctx: context.Background(),
Ctx: context.Background(),
},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DrainReconciler")
Expand Down
101 changes: 9 additions & 92 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package daemon
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
Expand All @@ -14,17 +13,16 @@ import (
"sync"
"time"

"github.com/golang/glog"
mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
daemonconsts "github.com/openshift/machine-config-operator/pkg/daemon/constants"
mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions"
"go.uber.org/zap"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -593,7 +591,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
}

if utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.DrainRequired) {
log.Log.Info("nodeStateSyncHandler(): waiting for drain"))
log.Log.Info("nodeStateSyncHandler(): waiting for drain")
return nil
}

Expand All @@ -619,7 +617,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
log.Log.Info("nodeStateSyncHandler(): disable drain is true skipping drain")
} else {
log.Log.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(); err != nil {
if err := utils.AnnotateNode(dn.name, consts.DrainRequired, dn.kubeClient); err != nil {
return err
}
}
Expand Down Expand Up @@ -669,7 +667,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
}
} else {
if !utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.DrainIdle) {
if err := dn.annotateNode(dn.name, annoIdle); err != nil {
if err := utils.AnnotateNode(dn.name, consts.DrainIdle, dn.kubeClient); err != nil {
log.Log.Error(err, "nodeStateSyncHandler(): failed to annotate node")
return err
}
Expand Down Expand Up @@ -712,15 +710,15 @@ func (dn *Daemon) completeDrain() error {
}

if dn.openshiftContext.IsOpenshiftCluster() && !dn.openshiftContext.IsHypershift() {
log.Log.Info("completeDrain(): resume MCP", "mcp-name", dn.mcpName)
log.Log.V(2).Info("completeDrain(): resume MCP", "mcp-name", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
if _, err := dn.openshiftContext.McClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}); err != nil {
log.Log.Error(err, "completeDrain(): failed to resume MCP", "mcp-name", dn.mcpName)
return err
}
}

if err := dn.annotateNode(dn.name, consts.DrainIdle); err != nil {
if err := utils.AnnotateNode(dn.name, consts.DrainIdle, dn.kubeClient); err != nil {
log.Log.Error(err, "completeDrain(): failed to annotate node")
return err
}
Expand Down Expand Up @@ -806,47 +804,6 @@ func rebootNode() {
}
}

func (dn *Daemon) annotateNode(node, value string) error {
log.Log.Info("annotateNode(): Annotate node", "name", node, "value", value)

oldNode, err := dn.kubeClient.CoreV1().Nodes().Get(context.Background(), dn.name, metav1.GetOptions{})
if err != nil {
log.Log.Error(err, "annotateNode(): Failed to get node, retrying", "name", node)
return err
}

oldData, err := json.Marshal(oldNode)
if err != nil {
return err
}

newNode := oldNode.DeepCopy()
if newNode.Annotations == nil {
newNode.Annotations = map[string]string{}
}
if newNode.Annotations[consts.NodeDrainAnnotation] != value {
newNode.Annotations[consts.NodeDrainAnnotation] = value
newData, err := json.Marshal(newNode)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
if err != nil {
return err
}
_, err = dn.kubeClient.CoreV1().Nodes().Patch(context.Background(),
dn.name,
types.StrategicMergePatchType,
patchBytes,
metav1.PatchOptions{})
if err != nil {
log.Log.Error(err, "annotateNode(): Failed to patch node", "name", node)
return err
}
}
return nil
}

func (dn *Daemon) getNodeMachinePool() error {
desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey]
if !ok {
Expand All @@ -871,7 +828,7 @@ func (dn *Daemon) getNodeMachinePool() error {

func (dn *Daemon) applyDrainRequired() error {
log.Log.Info("applyDrainRequired(): no other node is draining")
err := dn.annotateNode(dn.name, consts.DrainRequired)
err := utils.AnnotateNode(dn.name, consts.DrainRequired, dn.kubeClient)
if err != nil {
log.Log.Error(err, "applyDrainRequired(): Failed to annotate node")
return err
Expand Down Expand Up @@ -923,7 +880,7 @@ func (dn *Daemon) pauseMCP() error {
log.Log.V(2).Error(err, "pauseMCP(): failed to pause MCP", "mcp-name", dn.mcpName)
return
}
err = dn.annotateNode(dn.name, consts.DrainMcpPaused)
err = utils.AnnotateNode(dn.name, consts.DrainMcpPaused, dn.kubeClient)
if err != nil {
log.Log.V(2).Error(err, "pauseMCP(): Failed to annotate node")
return
Expand All @@ -939,7 +896,7 @@ func (dn *Daemon) pauseMCP() error {
log.Log.V(2).Error(err, "pauseMCP(): fail to resume MCP", "mcp-name", dn.mcpName)
return
}
err = dn.annotateNode(dn.name, consts.Draining)
err = utils.AnnotateNode(dn.name, consts.Draining, dn.kubeClient)
if err != nil {
log.Log.V(2).Error(err, "pauseMCP(): Failed to annotate node")
return
Expand Down Expand Up @@ -968,46 +925,6 @@ func (dn *Daemon) pauseMCP() error {
return err
}

func (dn *Daemon) drainNode() error {
log.Log.Info("drainNode(): Update prepared")
var err error

backoff := wait.Backoff{
Steps: 5,
Duration: 10 * time.Second,
Factor: 2,
}
var lastErr error

log.Log.Info("drainNode(): Start draining")
dn.eventRecorder.SendEvent("DrainNode", "Drain node has been initiated")
if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true)
if err != nil {
lastErr = err
log.Log.Error(err, "cordon failed, retrying")
return false, nil
}
err = drain.RunNodeDrain(dn.drainer, dn.name)
if err == nil {
return true, nil
}
lastErr = err
log.Log.Error(err, "Draining failed, retrying")
return false, nil
}); err != nil {
if err == wait.ErrWaitTimeout {
log.Log.Error(err, "drainNode(): failed to drain node", "tries", backoff.Steps, "last-error", lastErr)
}
dn.eventRecorder.SendEvent("DrainNode", "Drain node failed")
log.Log.Error(err, "drainNode(): failed to drain node")
return err
}
dn.eventRecorder.SendEvent("DrainNode", "Drain node completed")
log.Log.Info("drainNode(): drain complete")
return nil
}

func (dn *Daemon) tryCreateSwitchdevUdevRule() error {
log.Log.V(2).Info("tryCreateSwitchdevUdevRule()")
nodeState, nodeStateErr := dn.client.SriovnetworkV1().SriovNetworkNodeStates(namespace).Get(
Expand Down
42 changes: 42 additions & 0 deletions pkg/utils/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"context"
"encoding/json"
"fmt"
"os"

Expand Down Expand Up @@ -121,3 +122,44 @@ func NodeHasAnnotation(node corev1.Node, annoKey string, value string) bool {
}
return false
}

func AnnotateNode(node, value string, kubeClient kubernetes.Interface) error {
log.Log.V(2).Info("annotateNode(): Annotate node", "node", node, "annotation", value)
oldNode, err := kubeClient.CoreV1().Nodes().Get(context.Background(), node, metav1.GetOptions{})
if err != nil {
log.Log.Error(err, "annotateNode(): Failed to get node, retrying", "node", node)
return err
}

oldData, err := json.Marshal(oldNode)
if err != nil {
return err
}

newNode := oldNode.DeepCopy()
if newNode.Annotations == nil {
newNode.Annotations = map[string]string{}
}

if newNode.Annotations[consts.NodeDrainAnnotation] != value {
newNode.Annotations[consts.NodeDrainAnnotation] = value
newData, err := json.Marshal(newNode)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
if err != nil {
return err
}
_, err = kubeClient.CoreV1().Nodes().Patch(context.Background(),
node,
types.StrategicMergePatchType,
patchBytes,
metav1.PatchOptions{})
if err != nil {
log.Log.Error(err, "annotateNode(): Failed to patch node", "node", node)
return err
}
}
return nil
}

0 comments on commit 651df2e

Please sign in to comment.