Skip to content

Commit

Permalink
Refactor StartDeletion usage patterns and enforce periodic scaledown …
Browse files Browse the repository at this point in the history
…status processor calls.
  • Loading branch information
mtrqq committed Feb 27, 2024
1 parent 5286b3f commit 67bd5e2
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ type mockActuator struct {
status *mockActuationStatus
}

func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
return nil, nil
func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) {
return status.ScaleDownError, []*status.ScaleDownNode{}, nil
}

func (m *mockActuator) CheckStatus() scaledown.ActuationStatus {
Expand All @@ -281,6 +281,10 @@ func (m *mockActuator) ClearResultsNotNewerThan(time.Time) {

}

func (m *mockActuator) ScaleDownStatus() *status.ScaleDownStatus {
return &status.ScaleDownStatus{}
}

type mockActuationStatus struct {
drainedNodes []string
}
Expand Down
37 changes: 21 additions & 16 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,47 +96,52 @@ func (a *Actuator) ClearResultsNotNewerThan(t time.Time) {
a.nodeDeletionTracker.ClearResultsNotNewerThan(t)
}

// ScaleDownStatus returns scale down status.
func (a *Actuator) ScaleDownStatus() *status.ScaleDownStatus {
results, ts := a.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts}

return scaleDownStatus
}

// StartDeletion triggers a new deletion process.
func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (result status.ScaleDownResult, nodes []*status.ScaleDownNode, err errors.AutoscalerError) {
a.nodeDeletionScheduler.ReportMetrics()
deletionStartTime := time.Now()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }()

results, ts := a.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts}

emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(a.nodeDeletionTracker, empty, drain)
if len(emptyToDelete) == 0 && len(drainToDelete) == 0 {
scaleDownStatus.Result = status.ScaleDownNoNodeDeleted
return scaleDownStatus, nil
result = status.ScaleDownNoNodeDeleted
return
}

if len(emptyToDelete) > 0 {
// Taint all empty nodes synchronously
if err := a.taintNodesSync(emptyToDelete); err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
if err = a.taintNodesSync(emptyToDelete); err != nil {
result = status.ScaleDownError
return
}

emptyScaledDown := a.deleteAsyncEmpty(emptyToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, emptyScaledDown...)
nodes = append(nodes, emptyScaledDown...)
}

if len(drainToDelete) > 0 {
// Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node
// could get recreated on another.
if err := a.taintNodesSync(drainToDelete); err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
if err = a.taintNodesSync(drainToDelete); err != nil {
result = status.ScaleDownError
return
}

// All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously.
drainScaledDown := a.deleteAsyncDrain(drainToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, drainScaledDown...)
nodes = append(nodes, drainScaledDown...)
}

scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
return scaleDownStatus, nil
result = status.ScaleDownNodeDeleteStarted
return
}

// deleteAsyncEmpty immediately starts deletions asynchronously.
Expand Down
30 changes: 14 additions & 16 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,17 +1174,15 @@ func TestStartDeletion(t *testing.T) {
}
}

wantScaleDownStatus := &status.ScaleDownStatus{
Result: tc.wantStatus.result,
}
wantScaleDownNodes := []*status.ScaleDownNode{}
for _, scaleDownNodeInfo := range tc.wantStatus.scaledDownNodes {
statusScaledDownNode := &status.ScaleDownNode{
Node: generateNode(scaleDownNodeInfo.name),
NodeGroup: tc.nodeGroups[scaleDownNodeInfo.nodeGroup],
EvictedPods: scaleDownNodeInfo.evictedPods,
UtilInfo: scaleDownNodeInfo.utilInfo,
}
wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode)
wantScaleDownNodes = append(wantScaleDownNodes, statusScaledDownNode)
}

scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
Expand All @@ -1201,18 +1199,22 @@ func TestStartDeletion(t *testing.T) {
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
}
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
gotResult, gotScaleDownNodes, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("StartDeletion error diff (-want +got):\n%s", diff)
}

// Verify ScaleDownStatus looks as expected.
// Verify ScaleDownResult looks as expected.
if diff := cmp.Diff(tc.wantStatus.result, gotResult); diff != "" {
t.Errorf("StartDeletion result diff (-want +got):\n%s", diff)
}

// Verify ScaleDownNodes looks as expected.
ignoreSdNodeOrder := cmpopts.SortSlices(func(a, b *status.ScaleDownNode) bool { return a.Node.Name < b.Node.Name })
ignoreTimestamps := cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf")
cmpNg := cmp.Comparer(func(a, b *testprovider.TestNodeGroup) bool { return a.Id() == b.Id() })
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, ignoreTimestamps, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(wantScaleDownStatus, gotStatus, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion status diff (-want +got):\n%s", diff)
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(wantScaleDownNodes, gotScaleDownNodes, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion scaled down nodes diff (-want +got):\n%s", diff)
}

// Verify that all expected nodes were deleted using the cloud provider hook.
Expand Down Expand Up @@ -1278,12 +1280,8 @@ func TestStartDeletion(t *testing.T) {
t.Errorf("Timeout while waiting for node deletion results")
}

// Run StartDeletion again to gather node deletion results for deletions started in the previous call, and verify
// that they look as expected.
gotNextStatus, gotNextErr := actuator.StartDeletion(nil, nil)
if gotNextErr != nil {
t.Errorf("StartDeletion unexpected error: %v", gotNextErr)
}
// Gather node deletion results for deletions started in the previous call, and verify that they look as expected.
gotNextStatus := actuator.ScaleDownStatus()
if diff := cmp.Diff(tc.wantNodeDeleteResults, gotNextStatus.NodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff)
}
Expand Down
20 changes: 10 additions & 10 deletions cluster-autoscaler/core/scaledown/legacy/legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,10 +779,10 @@ func TestScaleDown(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult)
assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes))
assert.Equal(t, n1.Name, utils.GetStringFromChan(updatedNodes))
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)

assert.NoError(t, err)
var expectedScaleDownResult status.ScaleDownResult
Expand All @@ -1045,7 +1045,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
} else {
expectedScaleDownResult = status.ScaleDownNoUnneeded
}
assert.Equal(t, expectedScaleDownResult, scaleDownStatus.Result)
assert.Equal(t, expectedScaleDownResult, scaleDownResult)

expectedScaleDownCount := config.ExpectedScaleDownCount
if config.ExpectedScaleDownCount == 0 {
Expand Down Expand Up @@ -1131,11 +1131,11 @@ func TestNoScaleDownUnready(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)

assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult)

deletedNodes := make(chan string, 10)

Expand All @@ -1155,11 +1155,11 @@ func TestNoScaleDownUnready(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-2*time.Hour))
assert.NoError(t, autoscalererr)
empty, drain = wrapper.NodesToDelete(time.Now())
scaleDownStatus, err = wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err = wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)

assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult)
assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes))
}

Expand Down Expand Up @@ -1245,11 +1245,11 @@ func TestScaleDownNoMove(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)

assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult)
}

func getCountOfChan(c chan string) int {
Expand Down
18 changes: 7 additions & 11 deletions cluster-autoscaler/core/scaledown/legacy/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,12 @@ func (p *ScaleDownWrapper) NodesToDelete(currentTime time.Time) (empty, needDrai
}

// StartDeletion triggers an actual scale down logic.
func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) {
// Done to preserve legacy behavior, see comment on NodesToDelete.
if p.lastNodesToDeleteErr != nil || p.lastNodesToDeleteResult != status.ScaleDownNodeDeleteStarted {
// When there is no need for scale-down, p.lastNodesToDeleteResult is set to ScaleDownNoUnneeded. We have to still report node delete
// results in this case, otherwise they wouldn't get reported until the next call to actuator.StartDeletion (i.e. until the next scale-down
// attempt).
// Run actuator.StartDeletion with no nodes just to grab the delete results.
origStatus, _ := p.actuator.StartDeletion(nil, nil)
return &status.ScaleDownStatus{
Result: p.lastNodesToDeleteResult,
NodeDeleteResults: origStatus.NodeDeleteResults,
NodeDeleteResultsAsOf: origStatus.NodeDeleteResultsAsOf,
}, p.lastNodesToDeleteErr
return p.lastNodesToDeleteResult, []*status.ScaleDownNode{}, p.lastNodesToDeleteErr
}

return p.actuator.StartDeletion(empty, needDrain)
}

Expand All @@ -116,3 +108,7 @@ func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus {
func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) {
p.actuator.ClearResultsNotNewerThan(t)
}

func (p *ScaleDownWrapper) ScaleDownStatus() *status.ScaleDownStatus {
return p.actuator.ScaleDownStatus()
}
4 changes: 3 additions & 1 deletion cluster-autoscaler/core/scaledown/scaledown.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ type Actuator interface {
// function are not guaranteed to be deleted, it is possible for the
// Actuator to ignore some of them e.g. if max configured level of
// parallelism is reached.
StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError)
StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError)
// CheckStatus returns an immutable snapshot of ongoing deletions.
CheckStatus() ActuationStatus
// ClearResultsNotNewerThan removes information about deletions finished
// before or exactly at the provided timestamp.
ClearResultsNotNewerThan(time.Time)
// ScaleDownStatus returns scale down status.
ScaleDownStatus() *status.ScaleDownStatus
}

// ActuationStatus is used for feeding Actuator status back into Planner
Expand Down
25 changes: 10 additions & 15 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
scaleUpStatus := &status.ScaleUpStatus{Result: status.ScaleUpNotTried}
scaleUpStatusProcessorAlreadyCalled := false
scaleDownStatus := &scaledownstatus.ScaleDownStatus{Result: scaledownstatus.ScaleDownNotTried}
scaleDownStatusProcessorAlreadyCalled := false

defer func() {
// Update status information when the loop is done (regardless of reason)
Expand All @@ -403,14 +402,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
if !scaleUpStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
a.processors.ScaleUpStatusProcessor.Process(a.AutoscalingContext, scaleUpStatus)
}
if !scaleDownStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(a.AutoscalingContext, scaleDownStatus)
}

err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("AutoscalingStatusProcessor error: %v.", err)
if a.processors != nil {
err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("AutoscalingStatusProcessor error: %v.", err)
}
}
}()

Expand Down Expand Up @@ -635,18 +636,18 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
scaleDownStatus.RemovedNodeGroups = removedNodeGroups
}

scaleDownStatus = a.scaleDownActuator.ScaleDownStatus()
if scaleDownInCooldown {
scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown
if len(removedNodeGroups) > 0 {
a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
}
} else {
klog.V(4).Infof("Starting scale down")

scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
empty, needDrain := a.scaleDownPlanner.NodesToDelete(currentTime)
scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain)
scaleDownResult, scaledDownNodes, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain)
scaleDownStatus.Result = scaleDownResult
scaleDownStatus.ScaledDownNodes = scaledDownNodes
a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf)
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes()))
Expand All @@ -673,12 +674,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes)
}

if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
scaleDownStatusProcessorAlreadyCalled = true
}

if typedErr != nil {
klog.Errorf("Failed to scale down: %v", typedErr)
a.lastScaleDownFailTime = currentTime
Expand Down

0 comments on commit 67bd5e2

Please sign in to comment.