Skip to content

Commit

Permalink
Move node drain call to Drain controller
Browse files Browse the repository at this point in the history
  • Loading branch information
e0ne committed Aug 3, 2023
1 parent 7179a85 commit 3cbcc00
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 94 deletions.
50 changes: 49 additions & 1 deletion controllers/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package controllers
import (
"context"
"fmt"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubectl/pkg/drain"
"sort"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -24,9 +28,15 @@ import (
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
)

const (
maxParallelNodeConfiguration = 1
)

type DrainReconciler struct {
client.Client
Scheme *runtime.Scheme

Drainer *drain.Helper
}

//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
Expand Down Expand Up @@ -72,7 +82,7 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

reqLogger.Info("Count of draining", "drainingNodes", drainingNodes)
if config.Spec.MaxParallelNodeConfiguration != 0 && drainingNodes >= config.Spec.MaxParallelNodeConfiguration {
if maxParallelNodeConfiguration != 0 && drainingNodes >= maxParallelNodeConfiguration {
reqLogger.Info("MaxParallelNodeConfiguration limit reached for draining nodes")
return reconcile.Result{}, nil
}
Expand All @@ -89,6 +99,7 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
reqLogger.Error(err, "Failed to patch node annotations")
return reconcile.Result{}, err
}

drainingNodes++
} else {
reqLogger.Info("Too many nodes to be draining at the moment. Skipping node %s", "node", node.Name)
Expand All @@ -99,6 +110,43 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return reconcile.Result{}, nil
}

func (dr *DrainReconciler) drainNode(node *corev1.Node) error {
glog.Info("drainNode(): Update prepared")
var err error

backoff := wait.Backoff{
Steps: 5,
Duration: 10 * time.Second,
Factor: 2,
}
var lastErr error

glog.Info("drainNode(): Start draining")
if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
err := drain.RunCordonOrUncordon(dr.Drainer, node, true)
if err != nil {
lastErr = err
glog.Infof("Cordon failed with: %v, retrying", err)
return false, nil
}
err = drain.RunNodeDrain(dr.Drainer, node.Name)
if err == nil {
return true, nil
}
lastErr = err
glog.Infof("Draining failed with: %v, retrying", err)
return false, nil
}); err != nil {
if err == wait.ErrWaitTimeout {
glog.Errorf("drainNode(): failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr)
}
glog.Errorf("drainNode(): failed to drain node: %v", err)
return err
}
glog.Info("drainNode(): drain complete")
return nil
}

// SetupWithManager sets up the controller with the Manager.
func (dr *DrainReconciler) SetupWithManager(mgr ctrl.Manager) error {
// we always add object with a same(static) key to the queue to reduce
Expand Down
28 changes: 25 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import (
"context"
"flag"
"fmt"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/kubectl/pkg/drain"
"os"
"time"

netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
openshiftconfigv1 "github.com/openshift/api/config/v1"
Expand Down Expand Up @@ -95,6 +99,7 @@ func main() {
setupLog.Error(err, "couldn't create openshift context")
os.Exit(1)
}
kubeclient := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie())

le := leaderelection.GetLeaderElectionConfig(kubeClient, enableLeaderElection)

Expand Down Expand Up @@ -124,7 +129,7 @@ func main() {
os.Exit(1)
}

if err := initNicIDMap(); err != nil {
if err := initNicIDMap(kubeclient); err != nil {
setupLog.Error(err, "unable to init NicIdMap")
os.Exit(1)
}
Expand Down Expand Up @@ -169,6 +174,24 @@ func main() {
if err = (&controllers.DrainReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Drainer: &drain.Helper{
Client: kubeclient,
Force: true,
IgnoreAllDaemonSets: true,
DeleteEmptyDirData: true,
GracePeriodSeconds: -1,
Timeout: 90 * time.Second,
OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
verbStr := "Deleted"
if usingEviction {
verbStr = "Evicted"
}
glog.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name))
},
//Out: writer{glog.Info},
//ErrOut: writer{glog.Error},
Ctx: context.Background(),
},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DrainReconciler")
os.Exit(1)
Expand Down Expand Up @@ -216,9 +239,8 @@ func main() {
}
}

func initNicIDMap() error {
func initNicIDMap(kubeclient kubernetes.Interface) error {
namespace := os.Getenv("NAMESPACE")
kubeclient := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie())
if err := sriovnetworkv1.InitNicIDMapFromConfigMap(kubeclient, namespace); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/consts/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
AnnoDrainRequired = "Drain_Required"
AnnoMcpPaused = "Draining_MCP_Paused"
AnnoDraining = "Draining"
AnnoDrainComplete = "DrainComplete"

LinkTypeEthernet = "ether"
LinkTypeInfiniband = "infiniband"
Expand Down
Loading

0 comments on commit 3cbcc00

Please sign in to comment.