From 06766d1499e1c3c4b5c1054ec1213a783d8029b1 Mon Sep 17 00:00:00 2001 From: Maksym Fuhol Date: Tue, 27 Feb 2024 12:51:37 +0000 Subject: [PATCH] Refactor StartDeletion usage patterns and enforce periodic scaledown status processor calls. --- .../currently_drained_nodes_test.go | 8 +++-- .../core/scaledown/actuation/actuator.go | 27 ++++++++-------- .../core/scaledown/actuation/actuator_test.go | 32 +++++++++---------- .../core/scaledown/legacy/legacy_test.go | 20 ++++++------ .../core/scaledown/legacy/wrapper.go | 19 +++++------ .../core/scaledown/scaledown.go | 4 ++- cluster-autoscaler/core/static_autoscaler.go | 29 ++++++++--------- 7 files changed, 68 insertions(+), 71 deletions(-) diff --git a/cluster-autoscaler/core/podlistprocessor/currently_drained_nodes_test.go b/cluster-autoscaler/core/podlistprocessor/currently_drained_nodes_test.go index 7b01fffd920b..678afbd05709 100644 --- a/cluster-autoscaler/core/podlistprocessor/currently_drained_nodes_test.go +++ b/cluster-autoscaler/core/podlistprocessor/currently_drained_nodes_test.go @@ -269,8 +269,8 @@ type mockActuator struct { status *mockActuationStatus } -func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) { - return nil, nil +func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) { + return status.ScaleDownError, []*status.ScaleDownNode{}, nil } func (m *mockActuator) CheckStatus() scaledown.ActuationStatus { @@ -281,6 +281,10 @@ func (m *mockActuator) ClearResultsNotNewerThan(time.Time) { } +func (m *mockActuator) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { + return map[string]status.NodeDeleteResult{}, time.Now() +} + type mockActuationStatus struct { drainedNodes []string } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index f5854385b0ef..af2fdbede203 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -96,47 +96,46 @@ func (a *Actuator) ClearResultsNotNewerThan(t time.Time) { a.nodeDeletionTracker.ClearResultsNotNewerThan(t) } +// DeletionResults returns deletion results in a map form, along with the timestamp of last result. +func (a *Actuator) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { + return a.nodeDeletionTracker.DeletionResults() +} + // StartDeletion triggers a new deletion process. -func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) { +func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) { a.nodeDeletionScheduler.ResetAndReportMetrics() deletionStartTime := time.Now() defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }() - results, ts := a.nodeDeletionTracker.DeletionResults() - scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts} - + scaledDownNodes := make([]*status.ScaleDownNode, 0) emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(a.nodeDeletionTracker, empty, drain) if len(emptyToDelete) == 0 && len(drainToDelete) == 0 { - scaleDownStatus.Result = status.ScaleDownNoNodeDeleted - return scaleDownStatus, nil + return status.ScaleDownNoNodeDeleted, scaledDownNodes, nil } if len(emptyToDelete) > 0 { // Taint all empty nodes synchronously if err := a.taintNodesSync(emptyToDelete); err != nil { - scaleDownStatus.Result = status.ScaleDownError - return scaleDownStatus, err + return status.ScaleDownError, scaledDownNodes, err } emptyScaledDown := a.deleteAsyncEmpty(emptyToDelete) - scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, emptyScaledDown...) + scaledDownNodes = append(scaledDownNodes, emptyScaledDown...) } if len(drainToDelete) > 0 { // Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node // could get recreated on another. if err := a.taintNodesSync(drainToDelete); err != nil { - scaleDownStatus.Result = status.ScaleDownError - return scaleDownStatus, err + return status.ScaleDownError, scaledDownNodes, err } // All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously. drainScaledDown := a.deleteAsyncDrain(drainToDelete) - scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, drainScaledDown...) + scaledDownNodes = append(scaledDownNodes, drainScaledDown...) } - scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted - return scaleDownStatus, nil + return status.ScaleDownNodeDeleteStarted, scaledDownNodes, nil } // deleteAsyncEmpty immediately starts deletions asynchronously. diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index 3f468ceb4a24..81a1abd2df1f 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -1174,9 +1174,7 @@ func TestStartDeletion(t *testing.T) { } } - wantScaleDownStatus := &status.ScaleDownStatus{ - Result: tc.wantStatus.result, - } + wantScaleDownNodes := []*status.ScaleDownNode{} for _, scaleDownNodeInfo := range tc.wantStatus.scaledDownNodes { statusScaledDownNode := &status.ScaleDownNode{ Node: generateNode(scaleDownNodeInfo.name), @@ -1184,7 +1182,7 @@ func TestStartDeletion(t *testing.T) { EvictedPods: scaleDownNodeInfo.evictedPods, UtilInfo: scaleDownNodeInfo.utilInfo, } - wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode) + wantScaleDownNodes = append(wantScaleDownNodes, statusScaledDownNode) } scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() @@ -1201,18 +1199,22 @@ func TestStartDeletion(t *testing.T) { budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults), } - gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes) + gotResult, gotScaleDownNodes, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes) if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { t.Errorf("StartDeletion error diff (-want +got):\n%s", diff) } - // Verify ScaleDownStatus looks as expected. + // Verify ScaleDownResult looks as expected. + if diff := cmp.Diff(tc.wantStatus.result, gotResult); diff != "" { + t.Errorf("StartDeletion result diff (-want +got):\n%s", diff) + } + + // Verify ScaleDownNodes looks as expected. ignoreSdNodeOrder := cmpopts.SortSlices(func(a, b *status.ScaleDownNode) bool { return a.Node.Name < b.Node.Name }) - ignoreTimestamps := cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf") cmpNg := cmp.Comparer(func(a, b *testprovider.TestNodeGroup) bool { return a.Id() == b.Id() }) - statusCmpOpts := cmp.Options{ignoreSdNodeOrder, ignoreTimestamps, cmpNg, cmpopts.EquateEmpty()} - if diff := cmp.Diff(wantScaleDownStatus, gotStatus, statusCmpOpts); diff != "" { - t.Errorf("StartDeletion status diff (-want +got):\n%s", diff) + statusCmpOpts := cmp.Options{ignoreSdNodeOrder, cmpNg, cmpopts.EquateEmpty()} + if diff := cmp.Diff(wantScaleDownNodes, gotScaleDownNodes, statusCmpOpts); diff != "" { + t.Errorf("StartDeletion scaled down nodes diff (-want +got):\n%s", diff) } // Verify that all expected nodes were deleted using the cloud provider hook. @@ -1278,13 +1280,9 @@ func TestStartDeletion(t *testing.T) { t.Errorf("Timeout while waiting for node deletion results") } - // Run StartDeletion again to gather node deletion results for deletions started in the previous call, and verify - // that they look as expected. - gotNextStatus, gotNextErr := actuator.StartDeletion(nil, nil) - if gotNextErr != nil { - t.Errorf("StartDeletion unexpected error: %v", gotNextErr) - } - if diff := cmp.Diff(tc.wantNodeDeleteResults, gotNextStatus.NodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" { + // Gather node deletion results for deletions started in the previous call, and verify that they look as expected. + nodeDeleteResults, _ := actuator.DeletionResults() + if diff := cmp.Diff(tc.wantNodeDeleteResults, nodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" { t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff) } }) diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go index 1ba61e250dfd..84eb653c234b 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go @@ -779,10 +779,10 @@ func TestScaleDown(t *testing.T) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) assert.NoError(t, autoscalererr) empty, drain := wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err := wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err := wrapper.StartDeletion(empty, drain) waitForDeleteToFinish(t, wrapper) assert.NoError(t, err) - assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result) + assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult) assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes)) assert.Equal(t, n1.Name, utils.GetStringFromChan(updatedNodes)) } @@ -1036,7 +1036,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) assert.NoError(t, autoscalererr) empty, drain := wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err := wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err := wrapper.StartDeletion(empty, drain) assert.NoError(t, err) var expectedScaleDownResult status.ScaleDownResult @@ -1045,7 +1045,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) { } else { expectedScaleDownResult = status.ScaleDownNoUnneeded } - assert.Equal(t, expectedScaleDownResult, scaleDownStatus.Result) + assert.Equal(t, expectedScaleDownResult, scaleDownResult) expectedScaleDownCount := config.ExpectedScaleDownCount if config.ExpectedScaleDownCount == 0 { @@ -1131,11 +1131,11 @@ func TestNoScaleDownUnready(t *testing.T) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) assert.NoError(t, autoscalererr) empty, drain := wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err := wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err := wrapper.StartDeletion(empty, drain) waitForDeleteToFinish(t, wrapper) assert.NoError(t, err) - assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result) + assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult) deletedNodes := make(chan string, 10) @@ -1155,11 +1155,11 @@ func TestNoScaleDownUnready(t *testing.T) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-2*time.Hour)) assert.NoError(t, autoscalererr) empty, drain = wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err = wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err = wrapper.StartDeletion(empty, drain) waitForDeleteToFinish(t, wrapper) assert.NoError(t, err) - assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result) + assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult) assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes)) } @@ -1245,11 +1245,11 @@ func TestScaleDownNoMove(t *testing.T) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) assert.NoError(t, autoscalererr) empty, drain := wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err := wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err := wrapper.StartDeletion(empty, drain) waitForDeleteToFinish(t, wrapper) assert.NoError(t, err) - assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result) + assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult) } func getCountOfChan(c chan string) int { diff --git a/cluster-autoscaler/core/scaledown/legacy/wrapper.go b/cluster-autoscaler/core/scaledown/legacy/wrapper.go index d9bbfd12dca8..83d75cba55bb 100644 --- a/cluster-autoscaler/core/scaledown/legacy/wrapper.go +++ b/cluster-autoscaler/core/scaledown/legacy/wrapper.go @@ -89,20 +89,12 @@ func (p *ScaleDownWrapper) NodesToDelete(currentTime time.Time) (empty, needDrai } // StartDeletion triggers an actual scale down logic. -func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) { +func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) { // Done to preserve legacy behavior, see comment on NodesToDelete. if p.lastNodesToDeleteErr != nil || p.lastNodesToDeleteResult != status.ScaleDownNodeDeleteStarted { - // When there is no need for scale-down, p.lastNodesToDeleteResult is set to ScaleDownNoUnneeded. We have to still report node delete - // results in this case, otherwise they wouldn't get reported until the next call to actuator.StartDeletion (i.e. until the next scale-down - // attempt). - // Run actuator.StartDeletion with no nodes just to grab the delete results. - origStatus, _ := p.actuator.StartDeletion(nil, nil) - return &status.ScaleDownStatus{ - Result: p.lastNodesToDeleteResult, - NodeDeleteResults: origStatus.NodeDeleteResults, - NodeDeleteResultsAsOf: origStatus.NodeDeleteResultsAsOf, - }, p.lastNodesToDeleteErr + return p.lastNodesToDeleteResult, []*status.ScaleDownNode{}, p.lastNodesToDeleteErr } + return p.actuator.StartDeletion(empty, needDrain) } @@ -116,3 +108,8 @@ func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus { func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) { p.actuator.ClearResultsNotNewerThan(t) } + +// DeletionResults returns deletion results in a map form, along with the timestamp of last result. +func (p *ScaleDownWrapper) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { + return p.actuator.DeletionResults() +} diff --git a/cluster-autoscaler/core/scaledown/scaledown.go b/cluster-autoscaler/core/scaledown/scaledown.go index 434ea9312993..125260a56afb 100644 --- a/cluster-autoscaler/core/scaledown/scaledown.go +++ b/cluster-autoscaler/core/scaledown/scaledown.go @@ -56,12 +56,14 @@ type Actuator interface { // function are not guaranteed to be deleted, it is possible for the // Actuator to ignore some of them e.g. if max configured level of // parallelism is reached. - StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) + StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) // CheckStatus returns an immutable snapshot of ongoing deletions. CheckStatus() ActuationStatus // ClearResultsNotNewerThan removes information about deletions finished // before or exactly at the provided timestamp. ClearResultsNotNewerThan(time.Time) + // DeletionResults returns deletion results in a map form, along with the timestamp of last result. + DeletionResults() (map[string]status.NodeDeleteResult, time.Time) } // ActuationStatus is used for feeding Actuator status back into Planner diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 262723e1db64..d4568809810e 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -388,7 +388,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr scaleUpStatus := &status.ScaleUpStatus{Result: status.ScaleUpNotTried} scaleUpStatusProcessorAlreadyCalled := false scaleDownStatus := &scaledownstatus.ScaleDownStatus{Result: scaledownstatus.ScaleDownNotTried} - scaleDownStatusProcessorAlreadyCalled := false defer func() { // Update status information when the loop is done (regardless of reason) @@ -403,14 +402,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr if !scaleUpStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleUpStatusProcessor != nil { a.processors.ScaleUpStatusProcessor.Process(a.AutoscalingContext, scaleUpStatus) } - if !scaleDownStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleDownStatusProcessor != nil { + if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil { scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider) a.processors.ScaleDownStatusProcessor.Process(a.AutoscalingContext, scaleDownStatus) } - err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime) - if err != nil { - klog.Errorf("AutoscalingStatusProcessor error: %v.", err) + if a.processors != nil && a.processors.AutoscalingStatusProcessor != nil { + err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime) + if err != nil { + klog.Errorf("AutoscalingStatusProcessor error: %v.", err) + } } }() @@ -635,19 +636,21 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr scaleDownStatus.RemovedNodeGroups = removedNodeGroups } + nodeDeletionResults, nodeDeletionResultsAsOf := a.scaleDownActuator.DeletionResults() + scaleDownStatus.NodeDeleteResults = nodeDeletionResults + scaleDownStatus.NodeDeleteResultsAsOf = nodeDeletionResultsAsOf + a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf) if scaleDownInCooldown { scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown - if len(removedNodeGroups) > 0 { - a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus) - } } else { klog.V(4).Infof("Starting scale down") scaleDownStart := time.Now() metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart) empty, needDrain := a.scaleDownPlanner.NodesToDelete(currentTime) - scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain) - a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf) + scaleDownResult, scaledDownNodes, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain) + scaleDownStatus.Result = scaleDownResult + scaleDownStatus.ScaledDownNodes = scaledDownNodes metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart) metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes())) @@ -673,12 +676,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes) } - if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil { - scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider) - a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus) - scaleDownStatusProcessorAlreadyCalled = true - } - if typedErr != nil { klog.Errorf("Failed to scale down: %v", typedErr) a.lastScaleDownFailTime = currentTime