Skip to content

Commit

Permalink
Move handing unremovable nodes to dedicated object
Browse files Browse the repository at this point in the history
  • Loading branch information
x13n committed May 24, 2022
1 parent a13c59c commit b0cd570
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 99 deletions.
134 changes: 41 additions & 93 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
Expand Down Expand Up @@ -268,37 +269,35 @@ func (limits *scaleDownResourcesLimits) tryDecrementLimitsByDelta(delta scaleDow

// ScaleDown is responsible for maintaining the state needed to perform unneeded node removals.
type ScaleDown struct {
context *context.AutoscalingContext
processors *processors.AutoscalingProcessors
clusterStateRegistry *clusterstate.ClusterStateRegistry
unneededNodes map[string]time.Time
unneededNodesList []*apiv1.Node
unremovableNodes map[string]time.Time
podLocationHints map[string]string
nodeUtilizationMap map[string]utilization.Info
usageTracker *simulator.UsageTracker
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
unremovableNodeReasons map[string]*simulator.UnremovableNode
removalSimulator *simulator.RemovalSimulator
context *context.AutoscalingContext
processors *processors.AutoscalingProcessors
clusterStateRegistry *clusterstate.ClusterStateRegistry
unneededNodes map[string]time.Time
unneededNodesList []*apiv1.Node
unremovableNodes *unremovable.Nodes
podLocationHints map[string]string
nodeUtilizationMap map[string]utilization.Info
usageTracker *simulator.UsageTracker
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
removalSimulator *simulator.RemovalSimulator
}

// NewScaleDown builds new ScaleDown object.
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry) *ScaleDown {
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker)
return &ScaleDown{
context: context,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
unneededNodes: make(map[string]time.Time),
unremovableNodes: make(map[string]time.Time),
podLocationHints: make(map[string]string),
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: usageTracker,
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second),
unremovableNodeReasons: make(map[string]*simulator.UnremovableNode),
removalSimulator: removalSimulator,
context: context,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
unneededNodes: make(map[string]time.Time),
unremovableNodes: unremovable.NewNodes(),
podLocationHints: make(map[string]string),
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: usageTracker,
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second),
removalSimulator: removalSimulator,
}
}

Expand All @@ -307,7 +306,6 @@ func (sd *ScaleDown) CleanUp(timestamp time.Time) {
// Use default ScaleDownUnneededTime as in this context the value
// doesn't apply to any specific NodeGroup.
sd.usageTracker.CleanUp(timestamp.Add(-sd.context.NodeGroupDefaults.ScaleDownUnneededTime))
sd.clearUnremovableNodeReasons()
}

// CleanUpUnneededNodes clears the list of unneeded nodes.
Expand All @@ -323,7 +321,7 @@ func (sd *ScaleDown) UnneededNodes() []*apiv1.Node {

func (sd *ScaleDown) checkNodeUtilization(timestamp time.Time, node *apiv1.Node, nodeInfo *schedulerframework.NodeInfo) (simulator.UnremovableReason, *utilization.Info) {
// Skip nodes that were recently checked.
if _, found := sd.unremovableNodes[node.Name]; found {
if sd.unremovableNodes.IsRecent(node.Name) {
return simulator.RecentlyUnremovable, nil
}

Expand Down Expand Up @@ -394,7 +392,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
return errors.ToAutoscalerError(errors.InternalError, err)
}

sd.updateUnremovableNodes(timestamp)
sd.unremovableNodes.Update(sd.context.ClusterSnapshot.NodeInfos(), timestamp)

skipped := 0
utilizationMap := make(map[string]utilization.Info)
Expand All @@ -406,7 +404,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(node.Name)
if err != nil {
klog.Errorf("Can't retrieve scale-down candidate %s from snapshot, err: %v", node.Name, err)
sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

Expand All @@ -420,7 +418,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
skipped++
}

sd.addUnremovableNodeReason(node, reason)
sd.unremovableNodes.AddReason(node, reason)
continue
}

Expand Down Expand Up @@ -520,18 +518,17 @@ func (sd *ScaleDown) UpdateUnneededNodes(
if len(unremovable) > 0 {
unremovableTimeout := timestamp.Add(sd.context.AutoscalingOptions.UnremovableNodeRecheckTimeout)
for _, unremovableNode := range unremovable {
sd.unremovableNodes[unremovableNode.Node.Name] = unremovableTimeout
sd.addUnremovableNode(unremovableNode)
sd.unremovableNodes.AddTimeout(unremovableNode, unremovableTimeout)
}
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", len(unremovable), unremovableTimeout)
}

// This method won't always check all nodes, so let's give a generic reason for all nodes that weren't checked.
for _, node := range scaleDownCandidates {
_, unremovableReasonProvided := sd.unremovableNodeReasons[node.Name]
unremovableReasonProvided := sd.unremovableNodes.HasReason(node.Name)
_, unneeded := result[node.Name]
if !unneeded && !unremovableReasonProvided {
sd.addUnremovableNodeReason(node, simulator.NotUnneededOtherReason)
sd.unremovableNodes.AddReason(node, simulator.NotUnneededOtherReason)
}
}

Expand Down Expand Up @@ -576,59 +573,10 @@ func (sd *ScaleDown) isNodeBelowUtilizationThreshold(node *apiv1.Node, nodeGroup
return true, nil
}

// updateUnremovableNodes updates unremovableNodes map according to current
// state of the cluster. Removes from the map nodes that are no longer in the
// nodes list.
func (sd *ScaleDown) updateUnremovableNodes(timestamp time.Time) {
if len(sd.unremovableNodes) <= 0 {
return
}
newUnremovableNodes := make(map[string]time.Time, len(sd.unremovableNodes))
for oldUnremovable, ttl := range sd.unremovableNodes {
if _, err := sd.context.ClusterSnapshot.NodeInfos().Get(oldUnremovable); err != nil {
// Not logging on error level as most likely cause is that node is no longer in the cluster.
klog.Infof("Can't retrieve node %s from snapshot, removing from unremovable map, err: %v", oldUnremovable, err)
continue
}
if ttl.After(timestamp) {
// Keep nodes that are still in the cluster and haven't expired yet.
newUnremovableNodes[oldUnremovable] = ttl
}
}
sd.unremovableNodes = newUnremovableNodes
}

func (sd *ScaleDown) clearUnremovableNodeReasons() {
sd.unremovableNodeReasons = make(map[string]*simulator.UnremovableNode)
}

func (sd *ScaleDown) addUnremovableNodeReason(node *apiv1.Node, reason simulator.UnremovableReason) {
sd.unremovableNodeReasons[node.Name] = &simulator.UnremovableNode{Node: node, Reason: reason, BlockingPod: nil}
}

func (sd *ScaleDown) addUnremovableNode(unremovableNode *simulator.UnremovableNode) {
sd.unremovableNodeReasons[unremovableNode.Node.Name] = unremovableNode
}

// UnremovableNodesCount returns a map of unremovable node counts per reason.
func (sd *ScaleDown) UnremovableNodesCount() map[simulator.UnremovableReason]int {
reasons := make(map[simulator.UnremovableReason]int)

for _, node := range sd.unremovableNodeReasons {
reasons[node.Reason]++
}

return reasons
}

// UnremovableNodes returns a list of nodes that cannot be removed according to
// the scale down algorithm.
func (sd *ScaleDown) UnremovableNodes() []*simulator.UnremovableNode {
ns := make([]*simulator.UnremovableNode, 0, len(sd.unremovableNodeReasons))
for _, n := range sd.unremovableNodeReasons {
ns = append(ns, n)
}
return ns
return sd.unremovableNodes.AsList()
}

// markSimulationError indicates a simulation error by clearing relevant scale
Expand Down Expand Up @@ -735,7 +683,7 @@ func (sd *ScaleDown) TryToScaleDown(
// Check if node is marked with no scale down annotation.
if hasNoScaleDownAnnotation(node) {
klog.V(4).Infof("Skipping %s - scale down disabled annotation found", node.Name)
sd.addUnremovableNodeReason(node, simulator.ScaleDownDisabledAnnotation)
sd.unremovableNodes.AddReason(node, simulator.ScaleDownDisabledAnnotation)
continue
}

Expand All @@ -745,12 +693,12 @@ func (sd *ScaleDown) TryToScaleDown(
nodeGroup, err := sd.context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Errorf("Error while checking node group for %s: %v", node.Name, err)
sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.V(4).Infof("Skipping %s - no node group config", node.Name)
sd.addUnremovableNodeReason(node, simulator.NotAutoscaled)
sd.unremovableNodes.AddReason(node, simulator.NotAutoscaled)
continue
}

Expand All @@ -762,7 +710,7 @@ func (sd *ScaleDown) TryToScaleDown(
continue
}
if !unneededSince.Add(unneededTime).Before(currentTime) {
sd.addUnremovableNodeReason(node, simulator.NotUnneededLongEnough)
sd.unremovableNodes.AddReason(node, simulator.NotUnneededLongEnough)
continue
}
} else {
Expand All @@ -773,36 +721,36 @@ func (sd *ScaleDown) TryToScaleDown(
continue
}
if !unneededSince.Add(unreadyTime).Before(currentTime) {
sd.addUnremovableNodeReason(node, simulator.NotUnreadyLongEnough)
sd.unremovableNodes.AddReason(node, simulator.NotUnreadyLongEnough)
continue
}
}

size, found := nodeGroupSize[nodeGroup.Id()]
if !found {
klog.Errorf("Error while checking node group size %s: group size not found in cache", nodeGroup.Id())
sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id())
if size-deletionsInProgress <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
sd.addUnremovableNodeReason(node, simulator.NodeGroupMinSizeReached)
sd.unremovableNodes.AddReason(node, simulator.NodeGroupMinSizeReached)
continue
}

scaleDownResourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

checkResult := scaleDownResourcesLeft.checkScaleDownDeltaWithinLimits(scaleDownResourcesDelta)
if checkResult.exceeded {
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.exceededResources)
sd.addUnremovableNodeReason(node, simulator.MinimalResourceLimitExceeded)
sd.unremovableNodes.AddReason(node, simulator.MinimalResourceLimitExceeded)
continue
}

Expand Down Expand Up @@ -851,7 +799,7 @@ func (sd *ScaleDown) TryToScaleDown(
findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)

for _, unremovableNode := range unremovable {
sd.addUnremovableNode(unremovableNode)
sd.unremovableNodes.Add(unremovableNode)
}

if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions cluster-autoscaler/core/scaledown/legacy/legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
Expand Down Expand Up @@ -169,15 +170,13 @@ func TestFindUnneededNodes(t *testing.T) {
assert.False(t, found, n)
}

sd.unremovableNodes = make(map[string]time.Time)
sd.unremovableNodes = unremovable.NewNodes()
sd.unneededNodes["n1"] = time.Now()
allNodes = []*apiv1.Node{n1, n2, n3, n4}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, []*apiv1.Pod{p1, p2, p3, p4})
autoscalererr = sd.UpdateUnneededNodes(allNodes, allNodes, time.Now(), nil)
assert.NoError(t, autoscalererr)

sd.unremovableNodes = make(map[string]time.Time)

assert.Equal(t, 1, len(sd.unneededNodes))
addTime2, found := sd.unneededNodes["n2"]
assert.True(t, found)
Expand All @@ -191,7 +190,7 @@ func TestFindUnneededNodes(t *testing.T) {
assert.False(t, found, n)
}

sd.unremovableNodes = make(map[string]time.Time)
sd.unremovableNodes = unremovable.NewNodes()
scaleDownCandidates := []*apiv1.Node{n1, n3, n4}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, []*apiv1.Pod{p1, p2, p3, p4})
autoscalererr = sd.UpdateUnneededNodes(allNodes, scaleDownCandidates, time.Now(), nil)
Expand All @@ -207,7 +206,7 @@ func TestFindUnneededNodes(t *testing.T) {

assert.Equal(t, 0, len(sd.unneededNodes))
// Verify that no other nodes are in unremovable map.
assert.Equal(t, 1, len(sd.unremovableNodes))
assert.Equal(t, 1, len(sd.unremovableNodes.AsList()))

// But it should be checked after timeout
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, []*apiv1.Pod{})
Expand All @@ -216,7 +215,7 @@ func TestFindUnneededNodes(t *testing.T) {

assert.Equal(t, 1, len(sd.unneededNodes))
// Verify that nodes that are no longer unremovable are removed.
assert.Equal(t, 0, len(sd.unremovableNodes))
assert.Equal(t, 0, len(sd.unremovableNodes.AsList()))
}

func TestFindUnneededGPUNodes(t *testing.T) {
Expand Down
Loading

0 comments on commit b0cd570

Please sign in to comment.