Skip to content

Commit

Permalink
Move node drain call to Drain controller
Browse files Browse the repository at this point in the history
  • Loading branch information
e0ne committed Aug 22, 2023
1 parent 1fff5ec commit 631ebb9
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 100 deletions.
44 changes: 44 additions & 0 deletions controllers/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package controllers
import (
"context"
"fmt"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubectl/pkg/drain"
"sort"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -31,6 +35,8 @@ const (
type DrainReconciler struct {
client.Client
Scheme *runtime.Scheme

Drainer *drain.Helper
}

//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
Expand Down Expand Up @@ -93,6 +99,7 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
reqLogger.Error(err, "Failed to patch node annotations")
return reconcile.Result{}, err
}

drainingNodes++
} else {
reqLogger.Info("Too many nodes to be draining at the moment. Skipping node %s", "node", node.Name)
Expand All @@ -103,6 +110,43 @@ func (dr *DrainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return reconcile.Result{}, nil
}

func (dr *DrainReconciler) drainNode(node *corev1.Node) error {
glog.Info("drainNode(): Update prepared")
var err error

backoff := wait.Backoff{
Steps: 5,
Duration: 10 * time.Second,
Factor: 2,
}
var lastErr error

glog.Info("drainNode(): Start draining")
if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
err := drain.RunCordonOrUncordon(dr.Drainer, node, true)
if err != nil {
lastErr = err
glog.Infof("Cordon failed with: %v, retrying", err)
return false, nil
}
err = drain.RunNodeDrain(dr.Drainer, node.Name)
if err == nil {
return true, nil
}
lastErr = err
glog.Infof("Draining failed with: %v, retrying", err)
return false, nil
}); err != nil {
if err == wait.ErrWaitTimeout {
glog.Errorf("drainNode(): failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr)
}
glog.Errorf("drainNode(): failed to drain node: %v", err)
return err
}
glog.Info("drainNode(): drain complete")
return nil
}

// SetupWithManager sets up the controller with the Manager.
func (dr *DrainReconciler) SetupWithManager(mgr ctrl.Manager) error {
// we always add object with a same(static) key to the queue to reduce
Expand Down
12 changes: 5 additions & 7 deletions controllers/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ type DrainAnnotationPredicate struct {
func (DrainAnnotationPredicate) Create(e event.CreateEvent) bool {
logger := log.FromContext(context.TODO())
if e.Object == nil {
logger.Error(nil, "Create event has no object for create", "event", e)
logger.Info("Create event: node has no drain annotation", "node", e.Object.GetName())
return false
}

if _, hasAnno := e.Object.GetAnnotations()[constants.NodeDrainAnnotation]; hasAnno {
logger.Error(nil, "Create event: node has no drain annotation", "event", e)
logger.Info("Create event: node has no drain annotation", "node", e.Object.GetName())
return true
}
return false
Expand All @@ -66,21 +66,19 @@ func (DrainAnnotationPredicate) Create(e event.CreateEvent) bool {
func (DrainAnnotationPredicate) Update(e event.UpdateEvent) bool {
logger := log.FromContext(context.TODO())
if e.ObjectOld == nil {
logger.Error(nil, "Update event has no old object to update", "event", e)
logger.Info("Update event has no old object to update", "event", e)
return false
}
if e.ObjectNew == nil {
logger.Error(nil, "Update event has no new object for update", "event", e)
logger.Info("Update event has no old object to update", "event", e)
return false
}

oldAnno, hasOldAnno := e.ObjectOld.GetAnnotations()[constants.NodeDrainAnnotation]
newAnno, hasNewAnno := e.ObjectNew.GetAnnotations()[constants.NodeDrainAnnotation]

if !hasOldAnno || !hasNewAnno {
logger.Error(nil, "Update event: can not compare annotations", "old", hasOldAnno)
logger.Error(nil, "Update event: can not compare annotations", "new", hasNewAnno)
logger.Error(nil, "Update event: can not compare annotations", "event", e)
logger.Info("Update event: can not compare annotations", "event", e)
return false
}

Expand Down
28 changes: 25 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import (
"context"
"flag"
"fmt"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/kubectl/pkg/drain"
"os"
"time"

netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
openshiftconfigv1 "github.com/openshift/api/config/v1"
Expand Down Expand Up @@ -97,6 +101,7 @@ func main() {
setupLog.Error(err, "couldn't create openshift context")
os.Exit(1)
}
kubeclient := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie())

le := leaderelection.GetLeaderElectionConfig(kubeClient, enableLeaderElection)

Expand Down Expand Up @@ -126,7 +131,7 @@ func main() {
os.Exit(1)
}

if err := initNicIDMap(); err != nil {
if err := initNicIDMap(kubeclient); err != nil {
setupLog.Error(err, "unable to init NicIdMap")
os.Exit(1)
}
Expand Down Expand Up @@ -171,6 +176,24 @@ func main() {
if err = (&controllers.DrainReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Drainer: &drain.Helper{
Client: kubeclient,
Force: true,
IgnoreAllDaemonSets: true,
DeleteEmptyDirData: true,
GracePeriodSeconds: -1,
Timeout: 90 * time.Second,
OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
verbStr := "Deleted"
if usingEviction {
verbStr = "Evicted"
}
glog.Info(fmt.Sprintf("%s pod from Node %s/%s", verbStr, pod.Namespace, pod.Name))
},
//Out: writer{glog.Info},
//ErrOut: writer{glog.Error},
Ctx: context.Background(),
},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DrainReconciler")
os.Exit(1)
Expand Down Expand Up @@ -218,9 +241,8 @@ func main() {
}
}

func initNicIDMap() error {
func initNicIDMap(kubeclient kubernetes.Interface) error {
namespace := os.Getenv("NAMESPACE")
kubeclient := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie())
if err := sriovnetworkv1.InitNicIDMapFromConfigMap(kubeclient, namespace); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 631ebb9

Please sign in to comment.