Skip to content

Commit

Permalink
Refactor StartDeletion usage patterns and enforce periodic scaledown …
Browse files Browse the repository at this point in the history
…status processor calls.
  • Loading branch information
mtrqq committed Mar 12, 2024
1 parent b8506af commit ec681ef
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ type mockActuator struct {
status *mockActuationStatus
}

func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
return nil, nil
func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) {
return status.ScaleDownError, []*status.ScaleDownNode{}, nil
}

func (m *mockActuator) CheckStatus() scaledown.ActuationStatus {
Expand All @@ -281,6 +281,10 @@ func (m *mockActuator) ClearResultsNotNewerThan(time.Time) {

}

func (m *mockActuator) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
return map[string]status.NodeDeleteResult{}, time.Now()
}

type mockActuationStatus struct {
drainedNodes []string
}
Expand Down
28 changes: 14 additions & 14 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,47 +96,47 @@ func (a *Actuator) ClearResultsNotNewerThan(t time.Time) {
a.nodeDeletionTracker.ClearResultsNotNewerThan(t)
}

// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call
// in a map form, along with the timestamp of last result.
func (a *Actuator) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
return a.nodeDeletionTracker.DeletionResults()
}

// StartDeletion triggers a new deletion process.
func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) {
a.nodeDeletionScheduler.ResetAndReportMetrics()
deletionStartTime := time.Now()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }()

results, ts := a.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts}

scaledDownNodes := make([]*status.ScaleDownNode, 0)
emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(a.nodeDeletionTracker, empty, drain)
if len(emptyToDelete) == 0 && len(drainToDelete) == 0 {
scaleDownStatus.Result = status.ScaleDownNoNodeDeleted
return scaleDownStatus, nil
return status.ScaleDownNoNodeDeleted, nil, nil
}

if len(emptyToDelete) > 0 {
// Taint all empty nodes synchronously
if err := a.taintNodesSync(emptyToDelete); err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
return status.ScaleDownError, scaledDownNodes, err
}

emptyScaledDown := a.deleteAsyncEmpty(emptyToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, emptyScaledDown...)
scaledDownNodes = append(scaledDownNodes, emptyScaledDown...)
}

if len(drainToDelete) > 0 {
// Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node
// could get recreated on another.
if err := a.taintNodesSync(drainToDelete); err != nil {
scaleDownStatus.Result = status.ScaleDownError
return scaleDownStatus, err
return status.ScaleDownError, scaledDownNodes, err
}

// All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously.
drainScaledDown := a.deleteAsyncDrain(drainToDelete)
scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, drainScaledDown...)
scaledDownNodes = append(scaledDownNodes, drainScaledDown...)
}

scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
return scaleDownStatus, nil
return status.ScaleDownNodeDeleteStarted, scaledDownNodes, nil
}

// deleteAsyncEmpty immediately starts deletions asynchronously.
Expand Down
32 changes: 15 additions & 17 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,17 +1174,15 @@ func TestStartDeletion(t *testing.T) {
}
}

wantScaleDownStatus := &status.ScaleDownStatus{
Result: tc.wantStatus.result,
}
wantScaleDownNodes := []*status.ScaleDownNode{}
for _, scaleDownNodeInfo := range tc.wantStatus.scaledDownNodes {
statusScaledDownNode := &status.ScaleDownNode{
Node: generateNode(scaleDownNodeInfo.name),
NodeGroup: tc.nodeGroups[scaleDownNodeInfo.nodeGroup],
EvictedPods: scaleDownNodeInfo.evictedPods,
UtilInfo: scaleDownNodeInfo.utilInfo,
}
wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode)
wantScaleDownNodes = append(wantScaleDownNodes, statusScaledDownNode)
}

scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
Expand All @@ -1201,18 +1199,22 @@ func TestStartDeletion(t *testing.T) {
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
}
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
gotResult, gotScaleDownNodes, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("StartDeletion error diff (-want +got):\n%s", diff)
}

// Verify ScaleDownStatus looks as expected.
// Verify ScaleDownResult looks as expected.
if diff := cmp.Diff(tc.wantStatus.result, gotResult); diff != "" {
t.Errorf("StartDeletion result diff (-want +got):\n%s", diff)
}

// Verify ScaleDownNodes looks as expected.
ignoreSdNodeOrder := cmpopts.SortSlices(func(a, b *status.ScaleDownNode) bool { return a.Node.Name < b.Node.Name })
ignoreTimestamps := cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf")
cmpNg := cmp.Comparer(func(a, b *testprovider.TestNodeGroup) bool { return a.Id() == b.Id() })
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, ignoreTimestamps, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(wantScaleDownStatus, gotStatus, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion status diff (-want +got):\n%s", diff)
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(wantScaleDownNodes, gotScaleDownNodes, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion scaled down nodes diff (-want +got):\n%s", diff)
}

// Verify that all expected nodes were deleted using the cloud provider hook.
Expand Down Expand Up @@ -1278,13 +1280,9 @@ func TestStartDeletion(t *testing.T) {
t.Errorf("Timeout while waiting for node deletion results")
}

// Run StartDeletion again to gather node deletion results for deletions started in the previous call, and verify
// that they look as expected.
gotNextStatus, gotNextErr := actuator.StartDeletion(nil, nil)
if gotNextErr != nil {
t.Errorf("StartDeletion unexpected error: %v", gotNextErr)
}
if diff := cmp.Diff(tc.wantNodeDeleteResults, gotNextStatus.NodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
// Gather node deletion results for deletions started in the previous call, and verify that they look as expected.
nodeDeleteResults, _ := actuator.DeletionResults()
if diff := cmp.Diff(tc.wantNodeDeleteResults, nodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func (n *NodeDeletionTracker) DeletionsCount(nodeGroupId string) int {
return n.deletionsPerNodeGroup[nodeGroupId]
}

// DeletionResults returns deletion results in a map form, along with the timestamp of last result.
// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call
// in a map form, along with the timestamp of last result.
func (n *NodeDeletionTracker) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
n.Lock()
defer n.Unlock()
Expand Down
20 changes: 10 additions & 10 deletions cluster-autoscaler/core/scaledown/legacy/legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,10 +779,10 @@ func TestScaleDown(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)
assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult)
assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes))
assert.Equal(t, n1.Name, utils.GetStringFromChan(updatedNodes))
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)

assert.NoError(t, err)
var expectedScaleDownResult status.ScaleDownResult
Expand All @@ -1045,7 +1045,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) {
} else {
expectedScaleDownResult = status.ScaleDownNoUnneeded
}
assert.Equal(t, expectedScaleDownResult, scaleDownStatus.Result)
assert.Equal(t, expectedScaleDownResult, scaleDownResult)

expectedScaleDownCount := config.ExpectedScaleDownCount
if config.ExpectedScaleDownCount == 0 {
Expand Down Expand Up @@ -1131,11 +1131,11 @@ func TestNoScaleDownUnready(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)

assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult)

deletedNodes := make(chan string, 10)

Expand All @@ -1155,11 +1155,11 @@ func TestNoScaleDownUnready(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-2*time.Hour))
assert.NoError(t, autoscalererr)
empty, drain = wrapper.NodesToDelete(time.Now())
scaleDownStatus, err = wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err = wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)

assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult)
assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes))
}

Expand Down Expand Up @@ -1245,11 +1245,11 @@ func TestScaleDownNoMove(t *testing.T) {
autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute))
assert.NoError(t, autoscalererr)
empty, drain := wrapper.NodesToDelete(time.Now())
scaleDownStatus, err := wrapper.StartDeletion(empty, drain)
scaleDownResult, _, err := wrapper.StartDeletion(empty, drain)
waitForDeleteToFinish(t, wrapper)

assert.NoError(t, err)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result)
assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult)
}

func getCountOfChan(c chan string) int {
Expand Down
20 changes: 9 additions & 11 deletions cluster-autoscaler/core/scaledown/legacy/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,12 @@ func (p *ScaleDownWrapper) NodesToDelete(currentTime time.Time) (empty, needDrai
}

// StartDeletion triggers an actual scale down logic.
func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) {
// Done to preserve legacy behavior, see comment on NodesToDelete.
if p.lastNodesToDeleteErr != nil || p.lastNodesToDeleteResult != status.ScaleDownNodeDeleteStarted {
// When there is no need for scale-down, p.lastNodesToDeleteResult is set to ScaleDownNoUnneeded. We have to still report node delete
// results in this case, otherwise they wouldn't get reported until the next call to actuator.StartDeletion (i.e. until the next scale-down
// attempt).
// Run actuator.StartDeletion with no nodes just to grab the delete results.
origStatus, _ := p.actuator.StartDeletion(nil, nil)
return &status.ScaleDownStatus{
Result: p.lastNodesToDeleteResult,
NodeDeleteResults: origStatus.NodeDeleteResults,
NodeDeleteResultsAsOf: origStatus.NodeDeleteResultsAsOf,
}, p.lastNodesToDeleteErr
return p.lastNodesToDeleteResult, []*status.ScaleDownNode{}, p.lastNodesToDeleteErr
}

return p.actuator.StartDeletion(empty, needDrain)
}

Expand All @@ -116,3 +108,9 @@ func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus {
func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) {
p.actuator.ClearResultsNotNewerThan(t)
}

// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call
// in a map form, along with the timestamp of last result.
func (p *ScaleDownWrapper) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) {
return p.actuator.DeletionResults()
}
5 changes: 4 additions & 1 deletion cluster-autoscaler/core/scaledown/scaledown.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ type Actuator interface {
// function are not guaranteed to be deleted, it is possible for the
// Actuator to ignore some of them e.g. if max configured level of
// parallelism is reached.
StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError)
StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError)
// CheckStatus returns an immutable snapshot of ongoing deletions.
CheckStatus() ActuationStatus
// ClearResultsNotNewerThan removes information about deletions finished
// before or exactly at the provided timestamp.
ClearResultsNotNewerThan(time.Time)
// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call
// in a map form, along with the timestamp of last result.
DeletionResults() (map[string]status.NodeDeleteResult, time.Time)
}

// ActuationStatus is used for feeding Actuator status back into Planner
Expand Down
30 changes: 14 additions & 16 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
scaleUpStatus := &status.ScaleUpStatus{Result: status.ScaleUpNotTried}
scaleUpStatusProcessorAlreadyCalled := false
scaleDownStatus := &scaledownstatus.ScaleDownStatus{Result: scaledownstatus.ScaleDownNotTried}
scaleDownStatusProcessorAlreadyCalled := false

defer func() {
// Update status information when the loop is done (regardless of reason)
Expand All @@ -403,14 +402,21 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
if !scaleUpStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
a.processors.ScaleUpStatusProcessor.Process(a.AutoscalingContext, scaleUpStatus)
}
if !scaleDownStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(a.AutoscalingContext, scaleDownStatus)
}

err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("AutoscalingStatusProcessor error: %v.", err)
nodeDeletionResults, nodeDeletionResultsAsOf := a.scaleDownActuator.DeletionResults()
scaleDownStatus.NodeDeleteResults = nodeDeletionResults
scaleDownStatus.NodeDeleteResultsAsOf = nodeDeletionResultsAsOf
a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf)

if a.processors != nil && a.processors.AutoscalingStatusProcessor != nil {
err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("AutoscalingStatusProcessor error: %v.", err)
}
}
}()

Expand Down Expand Up @@ -637,17 +643,15 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

if scaleDownInCooldown {
scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown
if len(removedNodeGroups) > 0 {
a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
}
} else {
klog.V(4).Infof("Starting scale down")

scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
empty, needDrain := a.scaleDownPlanner.NodesToDelete(currentTime)
scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain)
a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf)
scaleDownResult, scaledDownNodes, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain)
scaleDownStatus.Result = scaleDownResult
scaleDownStatus.ScaledDownNodes = scaledDownNodes
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes()))

Expand All @@ -673,12 +677,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes)
}

if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
scaleDownStatusProcessorAlreadyCalled = true
}

if typedErr != nil {
klog.Errorf("Failed to scale down: %v", typedErr)
a.lastScaleDownFailTime = currentTime
Expand Down

0 comments on commit ec681ef

Please sign in to comment.