Skip to content

Commit

Permalink
Merge pull request #165 from pliurh/drain-node
Browse files Browse the repository at this point in the history
Optimize multi-nodes draining logic
  • Loading branch information
openshift-merge-robot authored Apr 10, 2020
2 parents 27676cf + 6b1caaf commit ddef424
Show file tree
Hide file tree
Showing 271 changed files with 19,827 additions and 11 deletions.
3 changes: 0 additions & 3 deletions cmd/sriov-network-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"flag"
"os"
"time"

"github.com/golang/glog"
sriovnetworkv1 "github.com/openshift/sriov-network-operator/pkg/apis/sriovnetwork/v1"
Expand Down Expand Up @@ -79,8 +78,6 @@ func runStartCmd(cmd *cobra.Command, args []string) {
// creates the in-cluster config
config, err = rest.InClusterConfig()
}
// set client timeout to prevent REST call wait forever when not able to reach the API server.
config.Timeout = 15 * time.Second

if err != nil {
panic(err.Error())
Expand Down
1 change: 1 addition & 0 deletions hack/build-plugins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ CGO_ENABLED=1

echo "Building ${REPO}/pkg/plugins (${VERSION_OVERRIDE})"
CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -buildmode=plugin -ldflags "${GLDFLAGS} -s -w" ${GOFLAGS} -o ${BIN_PATH}/plugins/$1_plugin.so ${REPO}/pkg/plugins/$1
#CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -buildmode=plugin -trimpath -gcflags='all=-N -l' -ldflags "${GLDFLAGS} -s -w" ${GOFLAGS} -o ${BIN_PATH}/plugins/$1_plugin.so ${REPO}/pkg/plugins/$1
109 changes: 101 additions & 8 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package daemon

import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"os"
"os/exec"
"path/filepath"
Expand All @@ -16,13 +18,17 @@ import (

"github.com/golang/glog"
drain "github.com/openshift/kubernetes-drain"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
// "k8s.io/client-go/informers"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
// "k8s.io/client-go/kubernetes/scheme"
sriovnetworkv1 "github.com/openshift/sriov-network-operator/pkg/apis/sriovnetwork/v1"
Expand Down Expand Up @@ -62,7 +68,12 @@ type Daemon struct {
mu *sync.Mutex
}

const scriptsPath = "/bindata/scripts/enable-rdma.sh"
const (
scriptsPath = "/bindata/scripts/enable-rdma.sh"
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
)

var namespace = os.Getenv("NAMESPACE")
var pluginsPath = os.Getenv("PLUGINSPATH")
Expand Down Expand Up @@ -93,13 +104,14 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
if err := tryCreateUdevRule(); err != nil {
return err
}

var timeout int64 = 5
dn.mu = &sync.Mutex{}
informerFactory := sninformer.NewFilteredSharedInformerFactory(dn.client,
time.Second*30,
time.Second*15,
namespace,
func(lo *v1.ListOptions) {
lo.FieldSelector = "metadata.name=" + dn.name
lo.TimeoutSeconds = &timeout
},
)

Expand Down Expand Up @@ -436,11 +448,87 @@ func (a GlogLogger) Logf(format string, v ...interface{}) {
glog.Infof(format, v...)
}

func (dn *Daemon) annotateNode(node, value string, lister listerv1.NodeLister) error {
glog.Infof("annotateNode(): Annotate node %s with: %s", node, value)
oldNode, err := lister.Get(dn.name)
if err != nil {
return err
}
oldData, err := json.Marshal(oldNode)
if err != nil {
return err
}

newNode := oldNode.DeepCopy()
if newNode.Annotations == nil {
newNode.Annotations = map[string]string{}
}
if newNode.Annotations[annoKey] != value {
newNode.Annotations[annoKey] = value
newData, err := json.Marshal(newNode)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
if err != nil {
return err
}
_, err = dn.kubeClient.CoreV1().Nodes().Patch(dn.name, types.StrategicMergePatchType, patchBytes)
if err != nil {
return err
}
}
return nil
}

func (dn *Daemon) drainNode(name string) {
glog.Info("drainNode(): Update prepared; beginning drain")
glog.Info("drainNode(): Update prepared")
node, err := dn.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
glog.Errorf("nodeStateChangeHandler(): failed to get node: %v", err)
glog.Errorf("drainNode(): failed to get node: %v", err)
}
rand.Seed(time.Now().UnixNano())
informerFactory := informers.NewSharedInformerFactory(dn.kubeClient,
time.Second*15,
)
stop := make(chan struct{})
lister := informerFactory.Core().V1().Nodes().Lister()
informer := informerFactory.Core().V1().Nodes().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
nn := newObj.(*corev1.Node)
if nn.Annotations[annoKey] == annoDraining || nn.GetName() == dn.name {
return
}
ok := true
// wait a random time to avoid all the nodes checking the nodes anno at the same time
time.Sleep(time.Duration(rand.Intn(6000)) * time.Millisecond)
nodes, err := lister.List(labels.Everything())
if err != nil {
return
}
glog.Infof("drainNode(): Check if any other node is draining")
for _, node := range nodes {
if node.GetName() != dn.name && node.Annotations[annoKey] == annoDraining {
glog.Infof("drainNode(): node %s is draining", node.Name)
ok = false
}
}
if ok {
glog.Info("drainNode(): No other node is draining, stop watching")
select {
case <-stop:
default:
close(stop)
}
}
},
})
informer.Run(stop)

err = dn.annotateNode(dn.name, annoDraining, lister)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
}

backoff := wait.Backoff{
Expand All @@ -452,11 +540,12 @@ func (dn *Daemon) drainNode(name string) {

logger := GlogLogger{}

glog.Info("drainNode(): Start draining")
if err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := drain.Drain(dn.kubeClient, []*corev1.Node{node}, &drain.DrainOptions{
DeleteLocalData: true,
Force: true,
GracePeriodSeconds: 600,
GracePeriodSeconds: -1,
IgnoreDaemonsets: true,
Logger: logger,
})
Expand All @@ -473,6 +562,10 @@ func (dn *Daemon) drainNode(name string) {
glog.Errorf("drainNode(): failed to drain node: %v", err)
}
glog.Info("drainNode(): drain complete")
err = dn.annotateNode(dn.name, annoIdle, lister)
if err != nil {
glog.Errorf("drainNode(): failed to annotate node: %v", err)
}
}

func needRestartDevicePlugin(oldState, newState *sriovnetworkv1.SriovNetworkNodeState) bool {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ddef424

Please sign in to comment.