Skip to content

Commit

Permalink
Optimize multi-nodes draining logic
Browse files Browse the repository at this point in the history
Check if other node is draining by the operator before starting draining.
Make draining running in serial when more than one node needs to be drained.
  • Loading branch information
pliurh committed Mar 27, 2020
1 parent d49784c commit 6b1caaf
Show file tree
Hide file tree
Showing 271 changed files with 19,827 additions and 11 deletions.
3 changes: 0 additions & 3 deletions cmd/sriov-network-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"flag"
"os"
"time"

"github.com/golang/glog"
sriovnetworkv1 "github.com/openshift/sriov-network-operator/pkg/apis/sriovnetwork/v1"
Expand Down Expand Up @@ -79,8 +78,6 @@ func runStartCmd(cmd *cobra.Command, args []string) {
// creates the in-cluster config
config, err = rest.InClusterConfig()
}
// set client timeout to prevent REST call wait forever when not able to reach the API server.
config.Timeout = 15 * time.Second

if err != nil {
panic(err.Error())
Expand Down
1 change: 1 addition & 0 deletions hack/build-plugins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ CGO_ENABLED=1

echo "Building ${REPO}/pkg/plugins (${VERSION_OVERRIDE})"
CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -buildmode=plugin -ldflags "${GLDFLAGS} -s -w" ${GOFLAGS} -o ${BIN_PATH}/plugins/$1_plugin.so ${REPO}/pkg/plugins/$1
#CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -buildmode=plugin -trimpath -gcflags='all=-N -l' -ldflags "${GLDFLAGS} -s -w" ${GOFLAGS} -o ${BIN_PATH}/plugins/$1_plugin.so ${REPO}/pkg/plugins/$1
109 changes: 101 additions & 8 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package daemon

import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"os"
"os/exec"
"path/filepath"
Expand All @@ -16,13 +18,17 @@ import (

"github.com/golang/glog"
drain "github.com/openshift/kubernetes-drain"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
// "k8s.io/client-go/informers"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
// "k8s.io/client-go/kubernetes/scheme"
sriovnetworkv1 "github.com/openshift/sriov-network-operator/pkg/apis/sriovnetwork/v1"
Expand Down Expand Up @@ -62,7 +68,12 @@ type Daemon struct {
mu *sync.Mutex
}

const scriptsPath = "/bindata/scripts/enable-rdma.sh"
const (
scriptsPath = "/bindata/scripts/enable-rdma.sh"
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
)

var namespace = os.Getenv("NAMESPACE")
var pluginsPath = os.Getenv("PLUGINSPATH")
Expand Down Expand Up @@ -93,13 +104,14 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
if err := tryCreateUdevRule(); err != nil {
return err
}

var timeout int64 = 5
dn.mu = &sync.Mutex{}
informerFactory := sninformer.NewFilteredSharedInformerFactory(dn.client,
time.Second*30,
time.Second*15,
namespace,
func(lo *v1.ListOptions) {
lo.FieldSelector = "metadata.name=" + dn.name
lo.TimeoutSeconds = &timeout
},
)

Expand Down Expand Up @@ -436,11 +448,87 @@ func (a GlogLogger) Logf(format string, v ...interface{}) {
glog.Infof(format, v...)
}

func (dn *Daemon) annotateNode(node, value string, lister listerv1.NodeLister) error {
glog.Infof("annotateNode(): Annotate node %s with: %s", node, value)
oldNode, err := lister.Get(dn.name)
if err != nil {
return err
}
oldData, err := json.Marshal(oldNode)
if err != nil {
return err
}

newNode := oldNode.DeepCopy()
if newNode.Annotations == nil {
newNode.Annotations = map[string]string{}
}
if newNode.Annotations[annoKey] != value {
newNode.Annotations[annoKey] = value
newData, err := json.Marshal(newNode)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
if err != nil {
return err
}
_, err = dn.kubeClient.CoreV1().Nodes().Patch(dn.name, types.StrategicMergePatchType, patchBytes)
if err != nil {
return err
}
}
return nil
}

func (dn *Daemon) drainNode(name string) {
glog.Info("drainNode(): Update prepared; beginning drain")
glog.Info("drainNode(): Update prepared")
node, err := dn.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
glog.Errorf("nodeStateChangeHandler(): failed to get node: %v", err)
glog.Errorf("drainNode(): failed to get node: %v", err)
}
rand.Seed(time.Now().UnixNano())
informerFactory := informers.NewSharedInformerFactory(dn.kubeClient,
time.Second*15,
)
stop := make(chan struct{})
lister := informerFactory.Core().V1().Nodes().Lister()
informer := informerFactory.Core().V1().Nodes().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
nn := newObj.(*corev1.Node)
if nn.Annotations[annoKey] == annoDraining || nn.GetName() == dn.name {
return
}
ok := true
// wait a random time to avoid all the nodes checking the nodes anno at the same time
time.Sleep(time.Duration(rand.Intn(6000)) * time.Millisecond)
nodes, err := lister.List(labels.Everything())
if err != nil {
return
}
glog.Infof("drainNode(): Check if any other node is draining")
for _, node := range nodes {
if node.GetName() != dn.name && node.Annotations[annoKey] == annoDraining {
glog.Infof("drainNode(): node %s is draining", node.Name)
ok = false
}
}
if ok {
glog.Info("drainNode(): No other node is draining, stop watching")
select {
case <-stop:
default:
close(stop)
}
}
},
})
informer.Run(stop)

err = dn.annotateNode(dn.name, annoDraining, lister)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
}

backoff := wait.Backoff{
Expand All @@ -452,11 +540,12 @@ func (dn *Daemon) drainNode(name string) {

logger := GlogLogger{}

glog.Info("drainNode(): Start draining")
if err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := drain.Drain(dn.kubeClient, []*corev1.Node{node}, &drain.DrainOptions{
DeleteLocalData: true,
Force: true,
GracePeriodSeconds: 600,
GracePeriodSeconds: -1,
IgnoreDaemonsets: true,
Logger: logger,
})
Expand All @@ -473,6 +562,10 @@ func (dn *Daemon) drainNode(name string) {
glog.Errorf("drainNode(): failed to drain node: %v", err)
}
glog.Info("drainNode(): drain complete")
err = dn.annotateNode(dn.name, annoIdle, lister)
if err != nil {
glog.Errorf("drainNode(): failed to annotate node: %v", err)
}
}

func needRestartDevicePlugin(oldState, newState *sriovnetworkv1.SriovNetworkNodeState) bool {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6b1caaf

Please sign in to comment.