From 645fe6312986fb3a1a710636d83a390f1c5903d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Wr=C3=B3blewski?= Date: Wed, 10 May 2023 10:23:06 +0000 Subject: [PATCH] Fix scale-up similar node group computation --- .../core/scaleup/orchestrator/orchestrator.go | 224 ++++++++++-------- .../scaleup/orchestrator/orchestrator_test.go | 152 ++++++++++++ cluster-autoscaler/expander/expander.go | 9 +- 3 files changed, 279 insertions(+), 106 deletions(-) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index 759602fdbdeb..83b5f88f0ca6 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -125,48 +125,29 @@ func (o *ScaleUpOrchestrator) ScaleUp( } now := time.Now() - expansionOptions := make(map[string]expander.Option, 0) - skippedNodeGroups := map[string]status.Reasons{} - for _, nodeGroup := range nodeGroups { - if skipReason := o.IsNodeGroupReadyToScaleUp(nodeGroup, now); skipReason != nil { - skippedNodeGroups[nodeGroup.Id()] = skipReason - continue - } + // Filter out invalid node groups + validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, resourcesLeft, now) - currentTargetSize, err := nodeGroup.TargetSize() - if err != nil { - klog.Errorf("Failed to get node group size: %v", err) - skippedNodeGroups[nodeGroup.Id()] = NotReadyReason - continue - } - if currentTargetSize >= nodeGroup.MaxSize() { - klog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id()) - skippedNodeGroups[nodeGroup.Id()] = MaxLimitReachedReason - continue - } + // Calculate expansion options + schedulablePods := map[string][]*apiv1.Pod{} + var options []expander.Option - nodeInfo, found := nodeInfos[nodeGroup.Id()] - if !found { - klog.Errorf("No node info for: %s", nodeGroup.Id()) - skippedNodeGroups[nodeGroup.Id()] = NotReadyReason - continue - } + for _, nodeGroup := range validNodeGroups { + schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfos[nodeGroup.Id()]) + } - if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo); skipReason != nil { - skippedNodeGroups[nodeGroup.Id()] = skipReason + for _, nodeGroup := range validNodeGroups { + option := o.ComputeExpansionOption(nodeGroup, schedulablePods, nodeInfos, upcomingNodes, now) + if len(option.Pods) == 0 || option.NodeCount == 0 { + klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id()) continue } - option := o.ComputeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes) - if len(option.Pods) > 0 && option.NodeCount > 0 { - expansionOptions[nodeGroup.Id()] = option - } else { - klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id()) - } + options = append(options, option) } - if len(expansionOptions) == 0 { + if len(options) == 0 { klog.V(1).Info("No expansion options") return &status.ScaleUpStatus{ Result: status.ScaleUpNoOptionsAvailable, @@ -176,10 +157,6 @@ func (o *ScaleUpOrchestrator) ScaleUp( } // Pick some expansion option. - options := make([]expander.Option, 0, len(expansionOptions)) - for _, o := range expansionOptions { - options = append(options, o) - } bestOption := o.autoscalingContext.ExpanderStrategy.BestOption(options, nodeInfos) if bestOption == nil || bestOption.NodeCount <= 0 { return &status.ScaleUpStatus{ @@ -233,17 +210,16 @@ func (o *ScaleUpOrchestrator) ScaleUp( continue } nodeInfos[nodeGroup.Id()] = nodeInfo - - option := o.ComputeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes) - if len(option.Pods) > 0 && option.NodeCount > 0 { - expansionOptions[nodeGroup.Id()] = option - } + schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfo) } // Update ClusterStateRegistry so similar nodegroups rebalancing works. // TODO(lukaszos) when pursuing scalability update this call with one which takes list of changed node groups so we do not // do extra API calls. (the call at the bottom of ScaleUp() could be also changed then) o.clusterStateRegistry.Recalculate() + + // Recompute similar node groups + bestOption.SimilarNodeGroups = o.ComputeSimilarNodeGroups(bestOption.NodeGroup, nodeInfos, schedulablePods, now) } nodeInfo, found := nodeInfos[bestOption.NodeGroup.Id()] @@ -266,32 +242,16 @@ func (o *ScaleUpOrchestrator) ScaleUp( } targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup} - if o.autoscalingContext.BalanceSimilarNodeGroups { - similarNodeGroups, aErr := o.processors.NodeGroupSetProcessor.FindSimilarNodeGroups(o.autoscalingContext, bestOption.NodeGroup, nodeInfos) - if aErr != nil { - return scaleUpError( - &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, - aErr.AddPrefix("failed to find matching node groups: ")) - } - - similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, expansionOptions) - for _, ng := range similarNodeGroups { - if o.clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) { - targetNodeGroups = append(targetNodeGroups, ng) - } else { - // This should never happen, as we will filter out the node group earlier on because of missing - // entry in podsPassingPredicates, but double checking doesn't really cost us anything. - klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id()) - } - } + for _, ng := range bestOption.SimilarNodeGroups { + targetNodeGroups = append(targetNodeGroups, ng) + } - if len(targetNodeGroups) > 1 { - names := []string{} - for _, ng := range targetNodeGroups { - names = append(names, ng.Id()) - } - klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), strings.Join(names, ", ")) + if len(targetNodeGroups) > 1 { + var names []string + for _, ng := range targetNodeGroups { + names = append(names, ng.Id()) } + klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), strings.Join(names, ", ")) } scaleUpInfos, aErr := o.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(o.autoscalingContext, targetNodeGroups, newNodes) @@ -426,24 +386,69 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize( }, nil } +// filterValidScaleUpNodeGroups filters the node groups that are valid for scale-up +func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups( + nodeGroups []cloudprovider.NodeGroup, + nodeInfos map[string]*schedulerframework.NodeInfo, + resourcesLeft resource.Limits, + now time.Time, +) ([]cloudprovider.NodeGroup, map[string]status.Reasons) { + var validNodeGroups []cloudprovider.NodeGroup + skippedNodeGroups := map[string]status.Reasons{} + + for _, nodeGroup := range nodeGroups { + if skipReason := o.IsNodeGroupReadyToScaleUp(nodeGroup, now); skipReason != nil { + skippedNodeGroups[nodeGroup.Id()] = skipReason + continue + } + + currentTargetSize, err := nodeGroup.TargetSize() + if err != nil { + klog.Errorf("Failed to get node group size: %v", err) + skippedNodeGroups[nodeGroup.Id()] = NotReadyReason + continue + } + if currentTargetSize >= nodeGroup.MaxSize() { + klog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id()) + skippedNodeGroups[nodeGroup.Id()] = MaxLimitReachedReason + continue + } + + nodeInfo, found := nodeInfos[nodeGroup.Id()] + if !found { + klog.Errorf("No node info for: %s", nodeGroup.Id()) + skippedNodeGroups[nodeGroup.Id()] = NotReadyReason + continue + } + if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo); skipReason != nil { + skippedNodeGroups[nodeGroup.Id()] = skipReason + continue + } + + validNodeGroups = append(validNodeGroups, nodeGroup) + } + return validNodeGroups, skippedNodeGroups +} + // ComputeExpansionOption computes expansion option based on pending pods and cluster state. func (o *ScaleUpOrchestrator) ComputeExpansionOption( - podEquivalenceGroups []*equivalence.PodGroup, nodeGroup cloudprovider.NodeGroup, - nodeInfo *schedulerframework.NodeInfo, + schedulablePods map[string][]*apiv1.Pod, + nodeInfos map[string]*schedulerframework.NodeInfo, upcomingNodes []*schedulerframework.NodeInfo, + now time.Time, ) expander.Option { - option := expander.Option{ - NodeGroup: nodeGroup, - Pods: make([]*apiv1.Pod, 0), - } + option := expander.Option{NodeGroup: nodeGroup} + pods := schedulablePods[nodeGroup.Id()] + nodeInfo := nodeInfos[nodeGroup.Id()] - option.Pods = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfo) - if len(option.Pods) > 0 { - estimator := o.autoscalingContext.EstimatorBuilder(o.autoscalingContext.PredicateChecker, o.autoscalingContext.ClusterSnapshot) - option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup) + if len(pods) == 0 { + return option } + estimator := o.autoscalingContext.EstimatorBuilder(o.autoscalingContext.PredicateChecker, o.autoscalingContext.ClusterSnapshot) + option.NodeCount, option.Pods = estimator.Estimate(pods, nodeInfo, nodeGroup) + option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePods, now) return option } @@ -557,37 +562,52 @@ func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCou return newNodeCount, nil } -func filterNodeGroupsByPods( - groups []cloudprovider.NodeGroup, - podsRequiredToFit []*apiv1.Pod, - expansionOptions map[string]expander.Option, +// ComputeSimilarNodeGroups finds similar node groups which can schedule the same +// set of pods as the main node group. +func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups( + nodeGroup cloudprovider.NodeGroup, + nodeInfos map[string]*schedulerframework.NodeInfo, + schedulablePods map[string][]*apiv1.Pod, + now time.Time, ) []cloudprovider.NodeGroup { - result := make([]cloudprovider.NodeGroup, 0) + if !o.autoscalingContext.BalanceSimilarNodeGroups { + return nil + } - for _, group := range groups { - option, found := expansionOptions[group.Id()] - if !found { - klog.V(1).Infof("No info about pods passing predicates found for group %v, skipping it from scale-up consideration", group.Id()) - continue - } - fittingPods := make(map[*apiv1.Pod]bool, len(option.Pods)) - for _, pod := range option.Pods { - fittingPods[pod] = true - } - allFit := true - for _, pod := range podsRequiredToFit { - if _, found := fittingPods[pod]; !found { - klog.V(1).Infof("Group %v, can't fit pod %v/%v, removing from scale-up consideration", group.Id(), pod.Namespace, pod.Name) - allFit = false - break - } - } - if allFit { - result = append(result, group) + groupSchedulablePods, found := schedulablePods[nodeGroup.Id()] + if !found || len(groupSchedulablePods) == 0 { + return nil + } + + similarNodeGroups, err := o.processors.NodeGroupSetProcessor.FindSimilarNodeGroups(o.autoscalingContext, nodeGroup, nodeInfos) + if err != nil { + klog.Errorf("Failed to find similar node groups: %v", err) + return nil + } + + var validSimilarNodeGroups []cloudprovider.NodeGroup + for _, ng := range similarNodeGroups { + if !o.clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) { + klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id()) + } else if similarSchedulablePods, found := schedulablePods[ng.Id()]; found && matchingSchedulablePods(groupSchedulablePods, similarSchedulablePods) { + validSimilarNodeGroups = append(validSimilarNodeGroups, ng) } } - return result + return validSimilarNodeGroups +} + +func matchingSchedulablePods(groupSchedulablePods []*apiv1.Pod, similarSchedulablePods []*apiv1.Pod) bool { + schedulablePods := make(map[*apiv1.Pod]bool) + for _, pod := range similarSchedulablePods { + schedulablePods[pod] = true + } + for _, pod := range groupSchedulablePods { + if _, found := schedulablePods[pod]; !found { + return false + } + } + return true } // GetRemainingPods returns information about pods which CA is unable to help diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index bc6e6d9eea0e..90d7eeff20f7 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -33,11 +33,14 @@ import ( testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/resource" . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -852,6 +855,155 @@ func TestScaleUpNoHelp(t *testing.T) { assert.Regexp(t, regexp.MustCompile("NotTriggerScaleUp"), event) } +type constNodeGroupSetProcessor struct { + similarNodeGroups []cloudprovider.NodeGroup +} + +func (p *constNodeGroupSetProcessor) FindSimilarNodeGroups(_ *context.AutoscalingContext, _ cloudprovider.NodeGroup, _ map[string]*schedulerframework.NodeInfo) ([]cloudprovider.NodeGroup, errors.AutoscalerError) { + return p.similarNodeGroups, nil +} + +func (p *constNodeGroupSetProcessor) BalanceScaleUpBetweenGroups(_ *context.AutoscalingContext, _ []cloudprovider.NodeGroup, _ int) ([]nodegroupset.ScaleUpInfo, errors.AutoscalerError) { + return nil, nil +} + +func (p *constNodeGroupSetProcessor) CleanUp() {} + +func TestComputeSimilarNodeGroups(t *testing.T) { + pod1 := BuildTestPod("p1", 100, 1000) + pod2 := BuildTestPod("p2", 100, 1000) + pod3 := BuildTestPod("p3", 100, 1000) + + testCases := []struct { + name string + nodeGroup string + similarNodeGroups []string + otherNodeGroups []string + balancingEnabled bool + schedulablePods map[string][]*apiv1.Pod + wantSimilarNodeGroups []string + }{ + { + name: "no similar node groups", + nodeGroup: "ng1", + otherNodeGroups: []string{"pg1", "pg2"}, + balancingEnabled: true, + wantSimilarNodeGroups: []string{}, + }, + { + name: "some similar node groups, but no schedulable pods", + nodeGroup: "ng1", + similarNodeGroups: []string{"ng2", "ng3"}, + otherNodeGroups: []string{"pg1", "pg2"}, + balancingEnabled: true, + wantSimilarNodeGroups: []string{}, + }, + { + name: "some similar node groups and same schedulable pods, but balancing disabled", + nodeGroup: "ng1", + similarNodeGroups: []string{"ng2", "ng3"}, + otherNodeGroups: []string{"pg1", "pg2"}, + balancingEnabled: false, + schedulablePods: map[string][]*apiv1.Pod{ + "ng1": {pod1}, + "ng2": {pod1}, + "ng3": {pod1}, + "pg1": {pod1}, + "pg2": {pod1}, + }, + wantSimilarNodeGroups: []string{}, + }, + { + name: "some similar node groups and same schedulable pods", + nodeGroup: "ng1", + similarNodeGroups: []string{"ng2", "ng3"}, + otherNodeGroups: []string{"pg1", "pg2"}, + balancingEnabled: true, + schedulablePods: map[string][]*apiv1.Pod{ + "ng1": {pod1}, + "ng2": {pod1}, + "ng3": {pod1}, + "pg1": {pod1}, + "pg2": {pod1}, + }, + wantSimilarNodeGroups: []string{"ng2", "ng3"}, + }, + { + name: "similar node groups can schedule more pods", + nodeGroup: "ng1", + similarNodeGroups: []string{"ng2", "ng3"}, + otherNodeGroups: []string{"pg1", "pg2"}, + balancingEnabled: true, + schedulablePods: map[string][]*apiv1.Pod{ + "ng1": {pod1}, + "ng2": {pod1, pod2}, + "ng3": {pod1, pod2, pod3}, + "pg1": {pod1, pod2}, + "pg2": {pod1, pod2, pod3}, + }, + wantSimilarNodeGroups: []string{"ng2", "ng3"}, + }, + { + name: "similar node groups can schedule different/no pods", + nodeGroup: "ng1", + similarNodeGroups: []string{"ng2", "ng3"}, + otherNodeGroups: []string{"pg1", "pg2"}, + balancingEnabled: true, + schedulablePods: map[string][]*apiv1.Pod{ + "ng1": {pod1, pod2}, + "ng2": {pod1}, + "pg1": {pod1}, + }, + wantSimilarNodeGroups: []string{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + provider := testprovider.NewTestCloudProvider(func(string, int) error { return nil }, nil) + nodeGroupSetProcessor := &constNodeGroupSetProcessor{} + now := time.Now() + + allNodeGroups := []string{tc.nodeGroup} + allNodeGroups = append(allNodeGroups, tc.similarNodeGroups...) + allNodeGroups = append(allNodeGroups, tc.otherNodeGroups...) + + var nodes []*apiv1.Node + for _, ng := range allNodeGroups { + nodeName := fmt.Sprintf("%s-node", ng) + node := BuildTestNode(nodeName, 100, 1000) + SetNodeReadyState(node, true, now.Add(-2*time.Minute)) + nodes = append(nodes, node) + + provider.AddNodeGroup(ng, 0, 10, 1) + provider.AddNode(ng, node) + } + + for _, ng := range tc.similarNodeGroups { + nodeGroupSetProcessor.similarNodeGroups = append(nodeGroupSetProcessor.similarNodeGroups, provider.GetNodeGroup(ng)) + } + + listers := kube_util.NewListerRegistry(nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil, nil) + ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{BalanceSimilarNodeGroups: tc.balancingEnabled}, &fake.Clientset{}, listers, provider, nil, nil) + assert.NoError(t, err) + + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), clusterstate.NewStaticMaxNodeProvisionTimeProvider(15*time.Minute)) + assert.NoError(t, clusterState.UpdateNodes(nodes, nodeInfos, time.Now())) + + suOrchestrator := &ScaleUpOrchestrator{} + suOrchestrator.Initialize(&ctx, &processors.AutoscalingProcessors{NodeGroupSetProcessor: nodeGroupSetProcessor}, clusterState, taints.TaintConfig{}) + similarNodeGroups := suOrchestrator.ComputeSimilarNodeGroups(provider.GetNodeGroup(tc.nodeGroup), nodeInfos, tc.schedulablePods, now) + + var gotSimilarNodeGroups []string + for _, ng := range similarNodeGroups { + gotSimilarNodeGroups = append(gotSimilarNodeGroups, ng.Id()) + } + assert.ElementsMatch(t, gotSimilarNodeGroups, tc.wantSimilarNodeGroups) + }) + } +} + func TestScaleUpBalanceGroups(t *testing.T) { provider := testprovider.NewTestCloudProvider(func(string, int) error { return nil diff --git a/cluster-autoscaler/expander/expander.go b/cluster-autoscaler/expander/expander.go index 57a91cfa78e7..9aa2eccb16b1 100644 --- a/cluster-autoscaler/expander/expander.go +++ b/cluster-autoscaler/expander/expander.go @@ -42,10 +42,11 @@ var ( // Option describes an option to expand the cluster. type Option struct { - NodeGroup cloudprovider.NodeGroup - NodeCount int - Debug string - Pods []*apiv1.Pod + NodeGroup cloudprovider.NodeGroup + SimilarNodeGroups []cloudprovider.NodeGroup + NodeCount int + Debug string + Pods []*apiv1.Pod } // Strategy describes an interface for selecting the best option when scaling up