diff --git a/cluster-autoscaler/utils/deletetaint/delete.go b/cluster-autoscaler/utils/deletetaint/delete.go index f17ccda0a7f6..c2b93541421c 100644 --- a/cluster-autoscaler/utils/deletetaint/delete.go +++ b/cluster-autoscaler/utils/deletetaint/delete.go @@ -22,6 +22,7 @@ import ( "time" apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kube_client "k8s.io/client-go/kubernetes" @@ -31,27 +32,38 @@ import ( const ( // ToBeDeletedTaint is a taint used to make the node unschedulable. ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" + + maxRetryDeadline = 5 * time.Second + conflictRetryInterval = 750 * time.Millisecond ) // MarkToBeDeleted sets a taint that makes the node unschedulable. func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error { - // Get the newest version of the node. - freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) - if err != nil || freshNode == nil { - return fmt.Errorf("failed to get node %v: %v", node.Name, err) - } + retryDeadline := time.Now().Add(maxRetryDeadline) + for { + // Get the newest version of the node. + freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil || freshNode == nil { + return fmt.Errorf("failed to get node %v: %v", node.Name, err) + } - added, err := addToBeDeletedTaint(freshNode) - if added == false { - return err - } - _, err = client.CoreV1().Nodes().Update(freshNode) - if err != nil { - glog.Warningf("Error while adding taints on node %v: %v", node.Name, err) - return err + added, err := addToBeDeletedTaint(freshNode) + if added == false { + return err + } + _, err = client.CoreV1().Nodes().Update(freshNode) + if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { + time.Sleep(conflictRetryInterval) + continue + } + + if err != nil { + glog.Warningf("Error while adding taints on node %v: %v", node.Name, err) + return err + } + glog.V(1).Infof("Successfully added toBeDeletedTaint on node %v", node.Name) + return nil } - glog.V(1).Infof("Successfully added toBeDeletedTaint on node %v", node.Name) - return nil } func addToBeDeletedTaint(node *apiv1.Node) (bool, error) { @@ -96,28 +108,37 @@ func GetToBeDeletedTime(node *apiv1.Node) (*time.Time, error) { // CleanToBeDeleted cleans ToBeDeleted taint. func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) { - freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) - if err != nil || freshNode == nil { - return false, fmt.Errorf("failed to get node %v: %v", node.Name, err) - } - newTaints := make([]apiv1.Taint, 0) - for _, taint := range freshNode.Spec.Taints { - if taint.Key == ToBeDeletedTaint { - glog.V(1).Infof("Releasing taint %+v on node %v", taint, node.Name) - } else { - newTaints = append(newTaints, taint) + retryDeadline := time.Now().Add(maxRetryDeadline) + for { + freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil || freshNode == nil { + return false, fmt.Errorf("failed to get node %v: %v", node.Name, err) + } + newTaints := make([]apiv1.Taint, 0) + for _, taint := range freshNode.Spec.Taints { + if taint.Key == ToBeDeletedTaint { + glog.V(1).Infof("Releasing taint %+v on node %v", taint, node.Name) + } else { + newTaints = append(newTaints, taint) + } } - } - if len(newTaints) != len(freshNode.Spec.Taints) { - freshNode.Spec.Taints = newTaints - _, err := client.CoreV1().Nodes().Update(freshNode) - if err != nil { - glog.Warningf("Error while releasing taints on node %v: %v", node.Name, err) - return false, err + if len(newTaints) != len(freshNode.Spec.Taints) { + freshNode.Spec.Taints = newTaints + _, err := client.CoreV1().Nodes().Update(freshNode) + + if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { + time.Sleep(conflictRetryInterval) + continue + } + + if err != nil { + glog.Warningf("Error while releasing taints on node %v: %v", node.Name, err) + return false, err + } + glog.V(1).Infof("Successfully released toBeDeletedTaint on node %v", node.Name) + return true, nil } - glog.V(1).Infof("Successfully released toBeDeletedTaint on node %v", node.Name) - return true, nil + return false, nil } - return false, nil } diff --git a/cluster-autoscaler/utils/deletetaint/delete_test.go b/cluster-autoscaler/utils/deletetaint/delete_test.go index 1ad25bb8f3ba..8763d71de9bc 100644 --- a/cluster-autoscaler/utils/deletetaint/delete_test.go +++ b/cluster-autoscaler/utils/deletetaint/delete_test.go @@ -17,6 +17,8 @@ limitations under the License. package deletetaint import ( + "fmt" + "sync/atomic" "testing" "time" @@ -24,6 +26,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" @@ -33,22 +36,26 @@ import ( func TestMarkNodes(t *testing.T) { node := BuildTestNode("node", 1000, 1000) - fakeClient, updatedNodes := buildFakeClientAndUpdateChannel(node) + fakeClient := buildFakeClient(t, node) err := MarkToBeDeleted(node, fakeClient) assert.NoError(t, err) - assert.Equal(t, node.Name, getStringFromChan(updatedNodes)) - assert.True(t, HasToBeDeletedTaint(node)) + + updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.True(t, HasToBeDeletedTaint(updatedNode)) } func TestCheckNodes(t *testing.T) { node := BuildTestNode("node", 1000, 1000) - fakeClient, updatedNodes := buildFakeClientAndUpdateChannel(node) + fakeClient := buildFakeClient(t, node) err := MarkToBeDeleted(node, fakeClient) assert.NoError(t, err) - assert.Equal(t, node.Name, getStringFromChan(updatedNodes)) - assert.True(t, HasToBeDeletedTaint(node)) - val, err := GetToBeDeletedTime(node) + updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.True(t, HasToBeDeletedTaint(updatedNode)) + + val, err := GetToBeDeletedTime(updatedNode) assert.NoError(t, err) assert.True(t, time.Now().Sub(*val) < 10*time.Second) } @@ -56,39 +63,39 @@ func TestCheckNodes(t *testing.T) { func TestCleanNodes(t *testing.T) { node := BuildTestNode("node", 1000, 1000) addToBeDeletedTaint(node) - fakeClient, updatedNodes := buildFakeClientAndUpdateChannel(node) + fakeClient := buildFakeClient(t, node) cleaned, err := CleanToBeDeleted(node, fakeClient) assert.True(t, cleaned) assert.NoError(t, err) - assert.Equal(t, node.Name, getStringFromChan(updatedNodes)) - assert.False(t, HasToBeDeletedTaint(node)) + + updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.False(t, HasToBeDeletedTaint(updatedNode)) } -func buildFakeClientAndUpdateChannel(node *apiv1.Node) (*fake.Clientset, chan string) { - fakeClient := &fake.Clientset{} - updatedNodes := make(chan string, 10) - fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { - get := action.(core.GetAction) - if get.GetName() == node.Name { - return true, node, nil - } - return true, nil, errors.NewNotFound(apiv1.Resource("node"), get.GetName()) - }) - fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { +func buildFakeClient(t *testing.T, node *apiv1.Node) *fake.Clientset { + fakeClient := fake.NewSimpleClientset() + + _, err := fakeClient.CoreV1().Nodes().Create(node) + assert.NoError(t, err) + + // return a 'Conflict' error on the first upadte, then pass it through, then return a Conflict again + var returnedConflict int32 + fakeClient.Fake.PrependReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { update := action.(core.UpdateAction) obj := update.GetObject().(*apiv1.Node) - updatedNodes <- obj.Name - return true, obj, nil + + if atomic.LoadInt32(&returnedConflict) == 0 { + // allow the next update + atomic.StoreInt32(&returnedConflict, 1) + return true, nil, errors.NewConflict(apiv1.Resource("node"), obj.GetName(), fmt.Errorf("concurrent update on %s", obj.GetName())) + } + + // return a conflict on next update + atomic.StoreInt32(&returnedConflict, 0) + return false, nil, nil }) - return fakeClient, updatedNodes -} -func getStringFromChan(c chan string) string { - select { - case val := <-c: - return val - case <-time.After(time.Second * 10): - return "Nothing returned" - } + return fakeClient }