Skip to content

Commit

Permalink
Split removeOldUnregisteredNodes method
Browse files Browse the repository at this point in the history
  • Loading branch information
BigDarkClown committed Nov 18, 2024
1 parent a0bf108 commit 1599485
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 36 deletions.
75 changes: 43 additions & 32 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
if len(unregisteredNodes) > 0 {
klog.V(1).Infof("%d unregistered nodes present", len(unregisteredNodes))
removedAny, err := a.removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext,
removedAny, err := a.removeOldUnregisteredNodes(unregisteredNodes,
a.clusterStateRegistry, currentTime, autoscalingContext.LogRecorder)
// There was a problem with removing unregistered nodes. Retry in the next loop.
if err != nil {
Expand Down Expand Up @@ -752,39 +752,22 @@ func fixNodeGroupSize(context *context.AutoscalingContext, clusterStateRegistry
return fixed, nil
}

// Removes unregistered nodes if needed. Returns true if anything was removed and error if such occurred.
func (a *StaticAutoscaler) removeOldUnregisteredNodes(allUnregisteredNodes []clusterstate.UnregisteredNode, context *context.AutoscalingContext,
// removeOldUnregisteredNodes removes unregistered nodes if needed. Returns true
// if anything was removed and error if such occurred.
func (a *StaticAutoscaler) removeOldUnregisteredNodes(allUnregisteredNodes []clusterstate.UnregisteredNode,
csr *clusterstate.ClusterStateRegistry, currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) {

nodeGroups := a.nodeGroupsById()
nodesToDeleteByNodeGroupId := make(map[string][]clusterstate.UnregisteredNode)
for _, unregisteredNode := range allUnregisteredNodes {
nodeGroup, err := a.CloudProvider.NodeGroupForNode(unregisteredNode.Node)
if err != nil {
klog.Warningf("Failed to get node group for %s: %v", unregisteredNode.Node.Name, err)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.Warningf("No node group for node %s, skipping", unregisteredNode.Node.Name)
continue
}

maxNodeProvisionTime, err := csr.MaxNodeProvisionTime(nodeGroup)
if err != nil {
return false, fmt.Errorf("failed to retrieve maxNodeProvisionTime for node %s in nodeGroup %s", unregisteredNode.Node.Name, nodeGroup.Id())
}

if unregisteredNode.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) {
klog.V(0).Infof("Marking unregistered node %v for removal", unregisteredNode.Node.Name)
nodesToDeleteByNodeGroupId[nodeGroup.Id()] = append(nodesToDeleteByNodeGroupId[nodeGroup.Id()], unregisteredNode)
}
unregisteredNodesToRemove, err := a.oldUnregisteredNodesToRemove(allUnregisteredNodes, csr, currentTime)
if err != nil {
return false, err
}

nodeGroups := a.nodeGroupsById()
removedAny := false
for nodeGroupId, unregisteredNodesToDelete := range nodesToDeleteByNodeGroupId {
for nodeGroupId, nodesToRemove := range unregisteredNodesToRemove {
nodeGroup := nodeGroups[nodeGroupId]

klog.V(0).Infof("Removing %v unregistered nodes for node group %v", len(unregisteredNodesToDelete), nodeGroupId)
klog.V(0).Infof("Removing %v unregistered nodes for node group %v", len(nodesToRemove), nodeGroupId)
if !a.ForceDeleteLongUnregisteredNodes {
size, err := nodeGroup.TargetSize()
if err != nil {
Expand All @@ -793,16 +776,16 @@ func (a *StaticAutoscaler) removeOldUnregisteredNodes(allUnregisteredNodes []clu
}
possibleToDelete := size - nodeGroup.MinSize()
if possibleToDelete <= 0 {
klog.Warningf("Node group %s min size reached, skipping removal of %v unregistered nodes", nodeGroupId, len(unregisteredNodesToDelete))
klog.Warningf("Node group %s min size reached, skipping removal of %v unregistered nodes", nodeGroupId, len(nodesToRemove))
continue
}
if len(unregisteredNodesToDelete) > possibleToDelete {
klog.Warningf("Capping node group %s unregistered node removal to %d nodes, removing all %d would exceed min size constaint", nodeGroupId, possibleToDelete, len(unregisteredNodesToDelete))
unregisteredNodesToDelete = unregisteredNodesToDelete[:possibleToDelete]
if len(nodesToRemove) > possibleToDelete {
klog.Warningf("Capping node group %s unregistered node removal to %d nodes, removing all %d would exceed min size constaint", nodeGroupId, possibleToDelete, len(nodesToRemove))
nodesToRemove = nodesToRemove[:possibleToDelete]
}
}

nodesToDelete := toNodes(unregisteredNodesToDelete)
nodesToDelete := toNodes(nodesToRemove)
nodesToDelete, err := overrideNodesToDeleteForZeroOrMax(a.NodeGroupDefaults, nodeGroup, nodesToDelete)
if err != nil {
klog.Warningf("Failed to remove unregistered nodes from node group %s: %v", nodeGroupId, err)
Expand Down Expand Up @@ -836,6 +819,34 @@ func (a *StaticAutoscaler) removeOldUnregisteredNodes(allUnregisteredNodes []clu
return removedAny, nil
}

// oldUnregisteredNodes returns old unregistered nodes grouped by their node group id.
func (a *StaticAutoscaler) oldUnregisteredNodes(allUnregisteredNodes []clusterstate.UnregisteredNode, csr *clusterstate.ClusterStateRegistry, currentTime time.Time) (map[string][]clusterstate.UnregisteredNode, error) {
nodesByNodeGroupId := make(map[string][]clusterstate.UnregisteredNode)
for _, unregisteredNode := range allUnregisteredNodes {
nodeGroup, err := a.CloudProvider.NodeGroupForNode(unregisteredNode.Node)
if err != nil {
klog.Warningf("Failed to get node group for %s: %v", unregisteredNode.Node.Name, err)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.Warningf("No node group for node %s, skipping", unregisteredNode.Node.Name)
continue
}

maxNodeProvisionTime, err := csr.MaxNodeProvisionTime(nodeGroup)
if err != nil {
return nil, fmt.Errorf("failed to retrieve maxNodeProvisionTime for node %s in nodeGroup %s", unregisteredNode.Node.Name, nodeGroup.Id())
}

if unregisteredNode.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) {
klog.V(0).Infof("Marking unregistered node %v for removal", unregisteredNode.Node.Name)
nodesByNodeGroupId[nodeGroup.Id()] = append(nodesByNodeGroupId[nodeGroup.Id()], unregisteredNode)
}
}

return nodesByNodeGroupId, nil
}

func toNodes(unregisteredNodes []clusterstate.UnregisteredNode) []*apiv1.Node {
nodes := []*apiv1.Node{}
for _, n := range unregisteredNodes {
Expand Down
8 changes: 4 additions & 4 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2257,12 +2257,12 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) {
}

// Nothing should be removed. The unregistered node is not old enough.
removed, err := autoscaler.removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now.Add(-50*time.Minute), fakeLogRecorder)
removed, err := autoscaler.removeOldUnregisteredNodes(unregisteredNodes, clusterState, now.Add(-50*time.Minute), fakeLogRecorder)
assert.NoError(t, err)
assert.False(t, removed)

// ng1_2 should be removed.
removed, err = autoscaler.removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now, fakeLogRecorder)
removed, err = autoscaler.removeOldUnregisteredNodes(unregisteredNodes, clusterState, now, fakeLogRecorder)
assert.NoError(t, err)
assert.True(t, removed)
deletedNode := core_utils.GetStringFromChan(deletedNodes)
Expand Down Expand Up @@ -2317,12 +2317,12 @@ func TestRemoveOldUnregisteredNodesAtomic(t *testing.T) {
}

// Nothing should be removed. The unregistered node is not old enough.
removed, err := autoscaler.removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now.Add(-50*time.Minute), fakeLogRecorder)
removed, err := autoscaler.removeOldUnregisteredNodes(unregisteredNodes, clusterState, now.Add(-50*time.Minute), fakeLogRecorder)
assert.NoError(t, err)
assert.False(t, removed)

// unregNode is long unregistered, so all of the nodes should be removed due to ZeroOrMaxNodeScaling option
removed, err = autoscaler.removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now, fakeLogRecorder)
removed, err = autoscaler.removeOldUnregisteredNodes(unregisteredNodes, clusterState, now, fakeLogRecorder)
assert.NoError(t, err)
assert.True(t, removed)
wantNames, deletedNames := []string{}, []string{}
Expand Down

0 comments on commit 1599485

Please sign in to comment.