From 7068bc48f6791aee20deb22d6d60085ef170d452 Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Mon, 21 Dec 2020 10:10:34 +0000 Subject: [PATCH] add DaemonSet eviction option for empty nodes --- .../config/autoscaling_options.go | 2 + cluster-autoscaler/core/scale_down.go | 54 +++++++- cluster-autoscaler/core/scale_down_test.go | 115 ++++++++++++++++++ cluster-autoscaler/main.go | 2 + 4 files changed, 170 insertions(+), 3 deletions(-) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index de038ad6c7d5..f21a92c75ed2 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -149,4 +149,6 @@ type AutoscalingOptions struct { ClusterAPICloudConfigAuthoritative bool // Enable or disable cordon nodes functionality before terminating the node during downscale process CordonNodeBeforeTerminate bool + // DaemonSetEvictionForEmptyNodes is whether CA will gracefully terminate DaemonSet pods from empty nodes. + DaemonSetEvictionForEmptyNodes bool } diff --git a/cluster-autoscaler/core/scale_down.go b/cluster-autoscaler/core/scale_down.go index 304975831366..48c6e3a65ee2 100644 --- a/cluster-autoscaler/core/scale_down.go +++ b/cluster-autoscaler/core/scale_down.go @@ -71,6 +71,10 @@ const ( // PodEvictionHeadroom is the extra time we wait to catch situations when the pod is ignoring SIGTERM and // is killed with SIGKILL after MaxGracefulTerminationTime PodEvictionHeadroom = 30 * time.Second + // DaemonSetEvictionEmptyNodeTimeout is the time to evict all DaemonSet pods on empty node + DaemonSetEvictionEmptyNodeTimeout = 10 * time.Second + // DeamonSetTimeBetweenEvictionRetries is a time between retries to create eviction that uses for DaemonSet eviction for empty nodes + DeamonSetTimeBetweenEvictionRetries = 3 * time.Second ) // NodeDeletionTracker keeps track of node deletions. @@ -1065,7 +1069,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr) } deletedNodes = append(deletedNodes, node) - go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup) { + go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, evictDaemonSetPodsBool bool) { sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id()) defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id()) var result status.NodeDeleteResult @@ -1081,7 +1085,11 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name) } }() - + if evictDaemonSetPodsBool { + if err := evictDaemonSetPods(sd.context.ClusterSnapshot, nodeToDelete, client, sd.context.MaxGracefulTerminationSec, time.Now(), DaemonSetEvictionEmptyNodeTimeout, DeamonSetTimeBetweenEvictionRetries, recorder); err != nil { + klog.Warningf(err.Error()) + } + } deleteErr = waitForDelayDeletion(nodeToDelete, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout) if deleteErr != nil { klog.Errorf("Problem with empty node deletion: %v", deleteErr) @@ -1101,11 +1109,51 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(sd.context.CloudProvider.GPULabel(), sd.context.CloudProvider.GetAvailableGPUTypes(), nodeToDelete, nodeGroupForDeletedNode), metrics.Unready) } result = status.NodeDeleteResult{ResultType: status.NodeDeleteOk} - }(node, nodeGroup) + }(node, nodeGroup, sd.context.DaemonSetEvictionForEmptyNodes) } return deletedNodes, nil } +// Create eviction object for all DaemonSet pods on the node +func evictDaemonSetPods(clusterSnapshot simulator.ClusterSnapshot, nodeToDelete *apiv1.Node, client kube_client.Interface, maxGracefulTerminationSec int, timeNow time.Time, dsEvictionTimeout time.Duration, waitBetweenRetries time.Duration, + recorder kube_record.EventRecorder) error { + nodeInfo, err := clusterSnapshot.NodeInfos().Get(nodeToDelete.Name) + if err != nil { + return fmt.Errorf("failed to get node info for %s", nodeToDelete.Name) + } + _, daemonSetPods, _, err := simulator.FastGetPodsToMove(nodeInfo, true, true, []*policyv1.PodDisruptionBudget{}, time.Now()) + if err != nil { + return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err) + } + + dsEviction := make(chan status.PodEvictionResult, len(daemonSetPods)) + + // Perform eviction of DaemonSet pods + for _, daemonSetPod := range daemonSetPods { + go func(podToEvict *apiv1.Pod) { + dsEviction <- evictPod(podToEvict, true, client, recorder, maxGracefulTerminationSec, timeNow.Add(dsEvictionTimeout), waitBetweenRetries) + }(daemonSetPod) + } + // Wait for creating eviction of DaemonSet pods + var failedPodErrors []string + for range daemonSetPods { + select { + case status := <-dsEviction: + if status.Err != nil { + failedPodErrors = append(failedPodErrors, status.Err.Error()) + } + // adding waitBetweenRetries in order to have a bigger time interval than evictPod() + case <-time.After(dsEvictionTimeout): + return fmt.Errorf("failed to create DaemonSet eviction for %v seconds on the %s", dsEvictionTimeout, nodeToDelete.Name) + } + } + if len(failedPodErrors) > 0 { + + return fmt.Errorf("following DaemonSet pod failed to evict on the %s:\n%s", nodeToDelete.Name, fmt.Errorf(strings.Join(failedPodErrors, "\n"))) + } + return nil +} + func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, nodeGroup cloudprovider.NodeGroup) status.NodeDeleteResult { deleteSuccessful := false diff --git a/cluster-autoscaler/core/scale_down_test.go b/cluster-autoscaler/core/scale_down_test.go index 38d0c9122372..bf0d32238718 100644 --- a/cluster-autoscaler/core/scale_down_test.go +++ b/cluster-autoscaler/core/scale_down_test.go @@ -1080,6 +1080,121 @@ var defaultScaleDownOptions = config.AutoscalingOptions{ MaxMemoryTotal: config.DefaultMaxClusterMemory * units.GiB, } +func TestDaemonSetEvictionForEmptyNodes(t *testing.T) { + timeNow := time.Now() + testScenarios := []struct { + name string + dsPods []string + nodeInfoSuccess bool + evictionTimeoutExceed bool + evictionSuccess bool + err error + }{ + { + name: "Successful attempt to evict DaemonSet pods", + dsPods: []string{"d1", "d2"}, + nodeInfoSuccess: true, + evictionTimeoutExceed: false, + evictionSuccess: true, + err: nil, + }, + { + name: "Failed to get node info", + dsPods: []string{"d1", "d2"}, + nodeInfoSuccess: false, + evictionTimeoutExceed: false, + evictionSuccess: true, + err: fmt.Errorf("failed to get node info"), + }, + { + name: "Failed to create DaemonSet eviction", + dsPods: []string{"d1", "d2"}, + nodeInfoSuccess: true, + evictionTimeoutExceed: false, + evictionSuccess: false, + err: fmt.Errorf("following DaemonSet pod failed to evict on the"), + }, + { + name: "Eviction timeout exceed", + dsPods: []string{"d1", "d2", "d3"}, + nodeInfoSuccess: true, + evictionTimeoutExceed: true, + evictionSuccess: true, + err: fmt.Errorf("failed to create DaemonSet eviction for"), + }, + } + + for _, scenario := range testScenarios { + t.Run(scenario.name, func(t *testing.T) { + options := config.AutoscalingOptions{ + ScaleDownUtilizationThreshold: 0.5, + ScaleDownUnneededTime: time.Minute, + MaxGracefulTerminationSec: 1, + DaemonSetEvictionForEmptyNodes: true, + } + deletedPods := make(chan string, len(scenario.dsPods)+2) + dsEvictionTimeout := 100 * time.Millisecond + waitBetweenRetries := 10 * time.Millisecond + + fakeClient := &fake.Clientset{} + n1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(n1, true, time.Time{}) + dsPods := make([]*apiv1.Pod, len(scenario.dsPods)) + for i, dsName := range scenario.dsPods { + ds := BuildTestPod(dsName, 100, 0) + ds.Spec.NodeName = "n1" + ds.OwnerReferences = GenerateOwnerReferences("", "DaemonSet", "", "") + dsPods[i] = ds + } + + fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { + createAction := action.(core.CreateAction) + if createAction == nil { + return false, nil, nil + } + eviction := createAction.GetObject().(*policyv1.Eviction) + if eviction == nil { + return false, nil, nil + } + if scenario.evictionTimeoutExceed { + for { + } + } + if !scenario.evictionSuccess { + return true, nil, fmt.Errorf("fail to evict the pod") + } + deletedPods <- eviction.Name + return true, nil, nil + }) + provider := testprovider.NewTestCloudProvider(nil, nil) + provider.AddNodeGroup("ng1", 1, 10, 1) + provider.AddNode("ng1", n1) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + + context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) + assert.NoError(t, err) + + if scenario.nodeInfoSuccess { + simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{n1}, dsPods) + } else { + simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{}, []*apiv1.Pod{}) + } + + err = evictDaemonSetPods(context.ClusterSnapshot, n1, fakeClient, options.MaxGracefulTerminationSec, timeNow, dsEvictionTimeout, waitBetweenRetries, kube_util.CreateEventRecorder(fakeClient)) + if scenario.err != nil { + assert.Contains(t, err.Error(), scenario.err.Error()) + return + } + assert.Nil(t, err) + deleted := make([]string, len(scenario.dsPods)) + for i := 0; i < len(scenario.dsPods); i++ { + deleted[i] = utils.GetStringFromChan(deletedPods) + } + assert.ElementsMatch(t, deleted, scenario.dsPods) + }) + } +} + func TestScaleDownEmptyMultipleNodeGroups(t *testing.T) { config := &scaleTestConfig{ nodes: []nodeConfig{ diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 3e0e37bd5b1d..95eedb6d4600 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -176,6 +176,7 @@ var ( enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled") clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only") cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process") + daemonSetEvictionForEmptyNodes = flag.Bool("daemonset-eviction-for-empty-nodes", false, "DaemonSet pods will be gracefully terminated from empty nodes") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -247,6 +248,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { ConcurrentGceRefreshes: *concurrentGceRefreshes, ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative, CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate, + DaemonSetEvictionForEmptyNodes: *daemonSetEvictionForEmptyNodes, } }