Skip to content

Commit

Permalink
Merge pull request #3824 from yaroslava-serdiuk/DS_eviction_empty_nodes
Browse files Browse the repository at this point in the history
add DaemonSet eviction option for empty nodes
  • Loading branch information
k8s-ci-robot authored Jan 21, 2021
2 parents 0680f66 + 7068bc4 commit 2988218
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 3 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,6 @@ type AutoscalingOptions struct {
ClusterAPICloudConfigAuthoritative bool
// Enable or disable cordon nodes functionality before terminating the node during downscale process
CordonNodeBeforeTerminate bool
// DaemonSetEvictionForEmptyNodes is whether CA will gracefully terminate DaemonSet pods from empty nodes.
DaemonSetEvictionForEmptyNodes bool
}
54 changes: 51 additions & 3 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ const (
// PodEvictionHeadroom is the extra time we wait to catch situations when the pod is ignoring SIGTERM and
// is killed with SIGKILL after MaxGracefulTerminationTime
PodEvictionHeadroom = 30 * time.Second
// DaemonSetEvictionEmptyNodeTimeout is the time to evict all DaemonSet pods on empty node
DaemonSetEvictionEmptyNodeTimeout = 10 * time.Second
// DeamonSetTimeBetweenEvictionRetries is a time between retries to create eviction that uses for DaemonSet eviction for empty nodes
DeamonSetTimeBetweenEvictionRetries = 3 * time.Second
)

// NodeDeletionTracker keeps track of node deletions.
Expand Down Expand Up @@ -1065,7 +1069,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr)
}
deletedNodes = append(deletedNodes, node)
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup) {
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, evictDaemonSetPodsBool bool) {
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id())
var result status.NodeDeleteResult
Expand All @@ -1081,7 +1085,11 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name)
}
}()

if evictDaemonSetPodsBool {
if err := evictDaemonSetPods(sd.context.ClusterSnapshot, nodeToDelete, client, sd.context.MaxGracefulTerminationSec, time.Now(), DaemonSetEvictionEmptyNodeTimeout, DeamonSetTimeBetweenEvictionRetries, recorder); err != nil {
klog.Warningf(err.Error())
}
}
deleteErr = waitForDelayDeletion(nodeToDelete, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout)
if deleteErr != nil {
klog.Errorf("Problem with empty node deletion: %v", deleteErr)
Expand All @@ -1101,11 +1109,51 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(sd.context.CloudProvider.GPULabel(), sd.context.CloudProvider.GetAvailableGPUTypes(), nodeToDelete, nodeGroupForDeletedNode), metrics.Unready)
}
result = status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}(node, nodeGroup)
}(node, nodeGroup, sd.context.DaemonSetEvictionForEmptyNodes)
}
return deletedNodes, nil
}

// Create eviction object for all DaemonSet pods on the node
func evictDaemonSetPods(clusterSnapshot simulator.ClusterSnapshot, nodeToDelete *apiv1.Node, client kube_client.Interface, maxGracefulTerminationSec int, timeNow time.Time, dsEvictionTimeout time.Duration, waitBetweenRetries time.Duration,
recorder kube_record.EventRecorder) error {
nodeInfo, err := clusterSnapshot.NodeInfos().Get(nodeToDelete.Name)
if err != nil {
return fmt.Errorf("failed to get node info for %s", nodeToDelete.Name)
}
_, daemonSetPods, _, err := simulator.FastGetPodsToMove(nodeInfo, true, true, []*policyv1.PodDisruptionBudget{}, time.Now())
if err != nil {
return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err)
}

dsEviction := make(chan status.PodEvictionResult, len(daemonSetPods))

// Perform eviction of DaemonSet pods
for _, daemonSetPod := range daemonSetPods {
go func(podToEvict *apiv1.Pod) {
dsEviction <- evictPod(podToEvict, true, client, recorder, maxGracefulTerminationSec, timeNow.Add(dsEvictionTimeout), waitBetweenRetries)
}(daemonSetPod)
}
// Wait for creating eviction of DaemonSet pods
var failedPodErrors []string
for range daemonSetPods {
select {
case status := <-dsEviction:
if status.Err != nil {
failedPodErrors = append(failedPodErrors, status.Err.Error())
}
// adding waitBetweenRetries in order to have a bigger time interval than evictPod()
case <-time.After(dsEvictionTimeout):
return fmt.Errorf("failed to create DaemonSet eviction for %v seconds on the %s", dsEvictionTimeout, nodeToDelete.Name)
}
}
if len(failedPodErrors) > 0 {

return fmt.Errorf("following DaemonSet pod failed to evict on the %s:\n%s", nodeToDelete.Name, fmt.Errorf(strings.Join(failedPodErrors, "\n")))
}
return nil
}

func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod,
nodeGroup cloudprovider.NodeGroup) status.NodeDeleteResult {
deleteSuccessful := false
Expand Down
115 changes: 115 additions & 0 deletions cluster-autoscaler/core/scale_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,121 @@ var defaultScaleDownOptions = config.AutoscalingOptions{
MaxMemoryTotal: config.DefaultMaxClusterMemory * units.GiB,
}

func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
timeNow := time.Now()
testScenarios := []struct {
name string
dsPods []string
nodeInfoSuccess bool
evictionTimeoutExceed bool
evictionSuccess bool
err error
}{
{
name: "Successful attempt to evict DaemonSet pods",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
evictionTimeoutExceed: false,
evictionSuccess: true,
err: nil,
},
{
name: "Failed to get node info",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: false,
evictionTimeoutExceed: false,
evictionSuccess: true,
err: fmt.Errorf("failed to get node info"),
},
{
name: "Failed to create DaemonSet eviction",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
evictionTimeoutExceed: false,
evictionSuccess: false,
err: fmt.Errorf("following DaemonSet pod failed to evict on the"),
},
{
name: "Eviction timeout exceed",
dsPods: []string{"d1", "d2", "d3"},
nodeInfoSuccess: true,
evictionTimeoutExceed: true,
evictionSuccess: true,
err: fmt.Errorf("failed to create DaemonSet eviction for"),
},
}

for _, scenario := range testScenarios {
t.Run(scenario.name, func(t *testing.T) {
options := config.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: time.Minute,
MaxGracefulTerminationSec: 1,
DaemonSetEvictionForEmptyNodes: true,
}
deletedPods := make(chan string, len(scenario.dsPods)+2)
dsEvictionTimeout := 100 * time.Millisecond
waitBetweenRetries := 10 * time.Millisecond

fakeClient := &fake.Clientset{}
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Time{})
dsPods := make([]*apiv1.Pod, len(scenario.dsPods))
for i, dsName := range scenario.dsPods {
ds := BuildTestPod(dsName, 100, 0)
ds.Spec.NodeName = "n1"
ds.OwnerReferences = GenerateOwnerReferences("", "DaemonSet", "", "")
dsPods[i] = ds
}

fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction)
if createAction == nil {
return false, nil, nil
}
eviction := createAction.GetObject().(*policyv1.Eviction)
if eviction == nil {
return false, nil, nil
}
if scenario.evictionTimeoutExceed {
for {
}
}
if !scenario.evictionSuccess {
return true, nil, fmt.Errorf("fail to evict the pod")
}
deletedPods <- eviction.Name
return true, nil, nil
})
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNode("ng1", n1)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)

context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
assert.NoError(t, err)

if scenario.nodeInfoSuccess {
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{n1}, dsPods)
} else {
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{}, []*apiv1.Pod{})
}

err = evictDaemonSetPods(context.ClusterSnapshot, n1, fakeClient, options.MaxGracefulTerminationSec, timeNow, dsEvictionTimeout, waitBetweenRetries, kube_util.CreateEventRecorder(fakeClient))
if scenario.err != nil {
assert.Contains(t, err.Error(), scenario.err.Error())
return
}
assert.Nil(t, err)
deleted := make([]string, len(scenario.dsPods))
for i := 0; i < len(scenario.dsPods); i++ {
deleted[i] = utils.GetStringFromChan(deletedPods)
}
assert.ElementsMatch(t, deleted, scenario.dsPods)
})
}
}

func TestScaleDownEmptyMultipleNodeGroups(t *testing.T) {
config := &scaleTestConfig{
nodes: []nodeConfig{
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ var (
enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled")
clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only")
cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process")
daemonSetEvictionForEmptyNodes = flag.Bool("daemonset-eviction-for-empty-nodes", false, "DaemonSet pods will be gracefully terminated from empty nodes")
)

func createAutoscalingOptions() config.AutoscalingOptions {
Expand Down Expand Up @@ -247,6 +248,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ConcurrentGceRefreshes: *concurrentGceRefreshes,
ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative,
CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate,
DaemonSetEvictionForEmptyNodes: *daemonSetEvictionForEmptyNodes,
}
}

Expand Down

0 comments on commit 2988218

Please sign in to comment.