Skip to content

Commit

Permalink
Merge pull request #1634 from aleksandra-malinowska/cherrypick-delete…
Browse files Browse the repository at this point in the history
…taint-retry-on-conflicts-1.3

Cherrypick of #1293: deletetaint retry on conflicts 1.3
  • Loading branch information
k8s-ci-robot authored Jan 31, 2019
2 parents 0b7a146 + bb47132 commit 1dcf846
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 67 deletions.
91 changes: 56 additions & 35 deletions cluster-autoscaler/utils/deletetaint/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_client "k8s.io/client-go/kubernetes"

Expand All @@ -31,27 +32,38 @@ import (
const (
// ToBeDeletedTaint is a taint used to make the node unschedulable.
ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"

maxRetryDeadline = 5 * time.Second
conflictRetryInterval = 750 * time.Millisecond
)

// MarkToBeDeleted sets a taint that makes the node unschedulable.
func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error {
// Get the newest version of the node.
freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil || freshNode == nil {
return fmt.Errorf("failed to get node %v: %v", node.Name, err)
}
retryDeadline := time.Now().Add(maxRetryDeadline)
for {
// Get the newest version of the node.
freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil || freshNode == nil {
return fmt.Errorf("failed to get node %v: %v", node.Name, err)
}

added, err := addToBeDeletedTaint(freshNode)
if added == false {
return err
}
_, err = client.CoreV1().Nodes().Update(freshNode)
if err != nil {
glog.Warningf("Error while adding taints on node %v: %v", node.Name, err)
return err
added, err := addToBeDeletedTaint(freshNode)
if added == false {
return err
}
_, err = client.CoreV1().Nodes().Update(freshNode)
if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) {
time.Sleep(conflictRetryInterval)
continue
}

if err != nil {
glog.Warningf("Error while adding taints on node %v: %v", node.Name, err)
return err
}
glog.V(1).Infof("Successfully added toBeDeletedTaint on node %v", node.Name)
return nil
}
glog.V(1).Infof("Successfully added toBeDeletedTaint on node %v", node.Name)
return nil
}

func addToBeDeletedTaint(node *apiv1.Node) (bool, error) {
Expand Down Expand Up @@ -96,28 +108,37 @@ func GetToBeDeletedTime(node *apiv1.Node) (*time.Time, error) {

// CleanToBeDeleted cleans ToBeDeleted taint.
func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) {
freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil || freshNode == nil {
return false, fmt.Errorf("failed to get node %v: %v", node.Name, err)
}
newTaints := make([]apiv1.Taint, 0)
for _, taint := range freshNode.Spec.Taints {
if taint.Key == ToBeDeletedTaint {
glog.V(1).Infof("Releasing taint %+v on node %v", taint, node.Name)
} else {
newTaints = append(newTaints, taint)
retryDeadline := time.Now().Add(maxRetryDeadline)
for {
freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil || freshNode == nil {
return false, fmt.Errorf("failed to get node %v: %v", node.Name, err)
}
newTaints := make([]apiv1.Taint, 0)
for _, taint := range freshNode.Spec.Taints {
if taint.Key == ToBeDeletedTaint {
glog.V(1).Infof("Releasing taint %+v on node %v", taint, node.Name)
} else {
newTaints = append(newTaints, taint)
}
}
}

if len(newTaints) != len(freshNode.Spec.Taints) {
freshNode.Spec.Taints = newTaints
_, err := client.CoreV1().Nodes().Update(freshNode)
if err != nil {
glog.Warningf("Error while releasing taints on node %v: %v", node.Name, err)
return false, err
if len(newTaints) != len(freshNode.Spec.Taints) {
freshNode.Spec.Taints = newTaints
_, err := client.CoreV1().Nodes().Update(freshNode)

if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) {
time.Sleep(conflictRetryInterval)
continue
}

if err != nil {
glog.Warningf("Error while releasing taints on node %v: %v", node.Name, err)
return false, err
}
glog.V(1).Infof("Successfully released toBeDeletedTaint on node %v", node.Name)
return true, nil
}
glog.V(1).Infof("Successfully released toBeDeletedTaint on node %v", node.Name)
return true, nil
return false, nil
}
return false, nil
}
71 changes: 39 additions & 32 deletions cluster-autoscaler/utils/deletetaint/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ limitations under the License.
package deletetaint

import (
"fmt"
"sync/atomic"
"testing"
"time"

. "k8s.io/autoscaler/cluster-autoscaler/utils/test"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
Expand All @@ -33,62 +36,66 @@ import (

func TestMarkNodes(t *testing.T) {
node := BuildTestNode("node", 1000, 1000)
fakeClient, updatedNodes := buildFakeClientAndUpdateChannel(node)
fakeClient := buildFakeClient(t, node)
err := MarkToBeDeleted(node, fakeClient)
assert.NoError(t, err)
assert.Equal(t, node.Name, getStringFromChan(updatedNodes))
assert.True(t, HasToBeDeletedTaint(node))

updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{})
assert.NoError(t, err)
assert.True(t, HasToBeDeletedTaint(updatedNode))
}

func TestCheckNodes(t *testing.T) {
node := BuildTestNode("node", 1000, 1000)
fakeClient, updatedNodes := buildFakeClientAndUpdateChannel(node)
fakeClient := buildFakeClient(t, node)
err := MarkToBeDeleted(node, fakeClient)
assert.NoError(t, err)
assert.Equal(t, node.Name, getStringFromChan(updatedNodes))
assert.True(t, HasToBeDeletedTaint(node))

val, err := GetToBeDeletedTime(node)
updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{})
assert.NoError(t, err)
assert.True(t, HasToBeDeletedTaint(updatedNode))

val, err := GetToBeDeletedTime(updatedNode)
assert.NoError(t, err)
assert.True(t, time.Now().Sub(*val) < 10*time.Second)
}

func TestCleanNodes(t *testing.T) {
node := BuildTestNode("node", 1000, 1000)
addToBeDeletedTaint(node)
fakeClient, updatedNodes := buildFakeClientAndUpdateChannel(node)
fakeClient := buildFakeClient(t, node)

cleaned, err := CleanToBeDeleted(node, fakeClient)
assert.True(t, cleaned)
assert.NoError(t, err)
assert.Equal(t, node.Name, getStringFromChan(updatedNodes))
assert.False(t, HasToBeDeletedTaint(node))

updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{})
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
}

func buildFakeClientAndUpdateChannel(node *apiv1.Node) (*fake.Clientset, chan string) {
fakeClient := &fake.Clientset{}
updatedNodes := make(chan string, 10)
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
get := action.(core.GetAction)
if get.GetName() == node.Name {
return true, node, nil
}
return true, nil, errors.NewNotFound(apiv1.Resource("node"), get.GetName())
})
fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
func buildFakeClient(t *testing.T, node *apiv1.Node) *fake.Clientset {
fakeClient := fake.NewSimpleClientset()

_, err := fakeClient.CoreV1().Nodes().Create(node)
assert.NoError(t, err)

// return a 'Conflict' error on the first upadte, then pass it through, then return a Conflict again
var returnedConflict int32
fakeClient.Fake.PrependReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
obj := update.GetObject().(*apiv1.Node)
updatedNodes <- obj.Name
return true, obj, nil

if atomic.LoadInt32(&returnedConflict) == 0 {
// allow the next update
atomic.StoreInt32(&returnedConflict, 1)
return true, nil, errors.NewConflict(apiv1.Resource("node"), obj.GetName(), fmt.Errorf("concurrent update on %s", obj.GetName()))
}

// return a conflict on next update
atomic.StoreInt32(&returnedConflict, 0)
return false, nil, nil
})
return fakeClient, updatedNodes
}

func getStringFromChan(c chan string) string {
select {
case val := <-c:
return val
case <-time.After(time.Second * 10):
return "Nothing returned"
}
return fakeClient
}

0 comments on commit 1dcf846

Please sign in to comment.