Skip to content

Commit

Permalink
Add flag to force remove long unregistered nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
BigDarkClown committed Nov 13, 2024
1 parent f6b5ab5 commit c5e0acb
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 128 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ type AutoscalingOptions struct {
CheckCapacityProvisioningRequestMaxBatchSize int
// CheckCapacityProvisioningRequestBatchTimebox is the maximum time to spend processing a batch of provisioning requests
CheckCapacityProvisioningRequestBatchTimebox time.Duration
// ForceDeleteLongUnregisteredNodes is used to enable/disable ignoring min size constraints during removal of long unregistered nodes
ForceDeleteLongUnregisteredNodes bool
}

// KubeClientOptions specify options for kube client
Expand Down
38 changes: 22 additions & 16 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,29 +785,35 @@ func (a *StaticAutoscaler) removeOldUnregisteredNodes(allUnregisteredNodes []clu
nodeGroup := nodeGroups[nodeGroupId]

klog.V(0).Infof("Removing %v unregistered nodes for node group %v", len(unregisteredNodesToDelete), nodeGroupId)
size, err := nodeGroup.TargetSize()
if err != nil {
klog.Warningf("Failed to get node group size; nodeGroup=%v; err=%v", nodeGroup.Id(), err)
continue
}
possibleToDelete := size - nodeGroup.MinSize()
if possibleToDelete <= 0 {
klog.Warningf("Node group %s min size reached, skipping removal of %v unregistered nodes", nodeGroupId, len(unregisteredNodesToDelete))
continue
}
if len(unregisteredNodesToDelete) > possibleToDelete {
klog.Warningf("Capping node group %s unregistered node removal to %d nodes, removing all %d would exceed min size constaint", nodeGroupId, possibleToDelete, len(unregisteredNodesToDelete))
unregisteredNodesToDelete = unregisteredNodesToDelete[:possibleToDelete]
if !a.ForceDeleteLongUnregisteredNodes {
size, err := nodeGroup.TargetSize()
if err != nil {
klog.Warningf("Failed to get node group size; nodeGroup=%v; err=%v", nodeGroup.Id(), err)
continue
}
possibleToDelete := size - nodeGroup.MinSize()
if possibleToDelete <= 0 {
klog.Warningf("Node group %s min size reached, skipping removal of %v unregistered nodes", nodeGroupId, len(unregisteredNodesToDelete))
continue
}
if len(unregisteredNodesToDelete) > possibleToDelete {
klog.Warningf("Capping node group %s unregistered node removal to %d nodes, removing all %d would exceed min size constaint", nodeGroupId, possibleToDelete, len(unregisteredNodesToDelete))
unregisteredNodesToDelete = unregisteredNodesToDelete[:possibleToDelete]
}
}
nodesToDelete := toNodes(unregisteredNodesToDelete)

nodesToDelete, err = overrideNodesToDeleteForZeroOrMax(a.NodeGroupDefaults, nodeGroup, nodesToDelete)
nodesToDelete := toNodes(unregisteredNodesToDelete)
nodesToDelete, err := overrideNodesToDeleteForZeroOrMax(a.NodeGroupDefaults, nodeGroup, nodesToDelete)
if err != nil {
klog.Warningf("Failed to remove unregistered nodes from node group %s: %v", nodeGroupId, err)
continue
}

err = nodeGroup.DeleteNodes(nodesToDelete)
if a.ForceDeleteLongUnregisteredNodes {
err = nodeGroup.ForceDeleteNodes(nodesToDelete)
} else {
err = nodeGroup.DeleteNodes(nodesToDelete)
}
csr.InvalidateNodeInstancesCacheEntry(nodeGroup)
if err != nil {
klog.Warningf("Failed to remove %v unregistered nodes from node group %s: %v", len(nodesToDelete), nodeGroupId, err)
Expand Down
234 changes: 122 additions & 112 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,134 +876,144 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) {
}

func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
daemonSetListerMock := &daemonSetListerMock{}
onScaleUpMock := &onScaleUpMock{}
onScaleDownMock := &onScaleDownMock{}
deleteFinished := make(chan bool, 1)

now := time.Now()
later := now.Add(1 * time.Minute)
for _, forceDeleteLongUnregisteredNodes := range []bool{false, true} {
t.Run(fmt.Sprintf("forceDeleteLongUnregisteredNodes=%v", forceDeleteLongUnregisteredNodes), func(t *testing.T) {
readyNodeLister := kubernetes.NewTestNodeLister(nil)
allNodeLister := kubernetes.NewTestNodeLister(nil)
allPodListerMock := &podListerMock{}
podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{}
daemonSetListerMock := &daemonSetListerMock{}
onScaleUpMock := &onScaleUpMock{}
onScaleDownMock := &onScaleDownMock{}
deleteFinished := make(chan bool, 1)

now := time.Now()
later := now.Add(1 * time.Minute)

n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now())

p1 := BuildTestPod("p1", 600, 100)
p1.Spec.NodeName = "n1"
p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable())

provider := testprovider.NewTestCloudProvider(
func(id string, delta int) error {
return onScaleUpMock.ScaleUp(id, delta)
}, func(id string, name string) error {
ret := onScaleDownMock.ScaleDown(id, name)
deleteFinished <- true
return ret
})
provider.AddNodeGroup("ng1", 2, 10, 2)
provider.AddNode("ng1", n1)

n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now())
// broken node, that will be just hanging out there during
// the test (it can't be removed since that would validate group min size)
brokenNode := BuildTestNode("broken", 1000, 1000)
provider.AddNode("ng1", brokenNode)

p1 := BuildTestPod("p1", 600, 100)
p1.Spec.NodeName = "n1"
p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable())
ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup)
assert.NotNil(t, ng1)
assert.NotNil(t, provider)

provider := testprovider.NewTestCloudProvider(
func(id string, delta int) error {
return onScaleUpMock.ScaleUp(id, delta)
}, func(id string, name string) error {
ret := onScaleDownMock.ScaleDown(id, name)
deleteFinished <- true
return ret
})
provider.AddNodeGroup("ng1", 2, 10, 2)
provider.AddNode("ng1", n1)
// Create context with mocked lister registry.
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: time.Minute,
ScaleDownUnreadyTime: time.Minute,
ScaleDownUtilizationThreshold: 0.5,
MaxNodeProvisionTime: 10 * time.Second,
},
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
MaxNodesTotal: 10,
MaxCoresTotal: 10,
MaxMemoryTotal: 100000,
ForceDeleteLongUnregisteredNodes: forceDeleteLongUnregisteredNodes,
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()

// broken node, that will be just hanging out there during
// the test (it can't be removed since that would validate group min size)
brokenNode := BuildTestNode("broken", 1000, 1000)
provider.AddNode("ng1", brokenNode)
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)

ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup)
assert.NotNil(t, ng1)
assert.NotNil(t, provider)
setUpScaleDownActuator(&context, options)

// Create context with mocked lister registry.
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: time.Minute,
ScaleDownUnreadyTime: time.Minute,
ScaleDownUtilizationThreshold: 0.5,
MaxNodeProvisionTime: 10 * time.Second,
},
EstimatorName: estimator.BinpackingEstimatorName,
ScaleDownEnabled: true,
MaxNodesTotal: 10,
MaxCoresTotal: 10,
MaxMemoryTotal: 100000,
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry

context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil)
assert.NoError(t, err)
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
}
processors := processorstest.NewTestProcessors(&context)

setUpScaleDownActuator(&context, options)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
// broken node detected as unregistered

listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock,
nil, nil, nil, nil)
context.ListerRegistry = listerRegistry
nodes := []*apiv1.Node{n1}
// nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState.UpdateNodes(nodes, nil, now)

clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
}
processors := processorstest.NewTestProcessors(&context)
// broken node failed to register in time
clusterState.UpdateNodes(nodes, nil, later)

clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)
// broken node detected as unregistered
sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})

nodes := []*apiv1.Node{n1}
// nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState.UpdateNodes(nodes, nil, now)
autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
}

// broken node failed to register in time
clusterState.UpdateNodes(nodes, nil, later)
// If deletion of unregistered nodes is not forced, we need to simulate
// additional scale-up to respect min size constraints.
if !forceDeleteUnregisteredNodes {
// Scale up.
readyNodeLister.SetNodes(nodes)
allNodeLister.SetNodes(nodes)
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once()

err = autoscaler.RunOnce(later.Add(time.Hour))
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)

nodes = append(nodes, n2)
provider.AddNode("ng1", n2)
ng1.SetTargetSize(3)
}

sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil)
suOrchestrator := orchestrator.New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
// Remove broken node
readyNodeLister.SetNodes(nodes)
allNodeLister.SetNodes(nodes)
allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Twice()
onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()

autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
lastScaleUpTime: time.Now(),
lastScaleDownFailTime: time.Now(),
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
scaleUpOrchestrator: suOrchestrator,
processors: processors,
loopStartNotifier: loopstart.NewObserversList(nil),
processorCallbacks: processorCallbacks,
err = autoscaler.RunOnce(later.Add(2 * time.Hour))
waitForDeleteToFinish(t, deleteFinished)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
})
}

// Scale up.
readyNodeLister.SetNodes([]*apiv1.Node{n1})
allNodeLister.SetNodes([]*apiv1.Node{n1})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()
onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once()

err = autoscaler.RunOnce(later.Add(time.Hour))
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)

// Remove broken node after going over min size
provider.AddNode("ng1", n2)
ng1.SetTargetSize(3)

readyNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allNodeLister.SetNodes([]*apiv1.Node{n1, n2})
allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice()
onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once()

err = autoscaler.RunOnce(later.Add(2 * time.Hour))
waitForDeleteToFinish(t, deleteFinished)
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
}

func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ var (
checkCapacityBatchProcessing = flag.Bool("check-capacity-batch-processing", false, "Whether to enable batch processing for check capacity requests.")
checkCapacityProvisioningRequestMaxBatchSize = flag.Int("check-capacity-provisioning-request-max-batch-size", 10, "Maximum number of provisioning requests to process in a single batch.")
checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.")
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Wheter to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -457,6 +458,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
CheckCapacityBatchProcessing: *checkCapacityBatchProcessing,
CheckCapacityProvisioningRequestMaxBatchSize: *checkCapacityProvisioningRequestMaxBatchSize,
CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox,
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
}
}

Expand Down

0 comments on commit c5e0acb

Please sign in to comment.