From 12374e6c786f27c470b31daf19b0e06b6a961fcd Mon Sep 17 00:00:00 2001 From: Aleksandra Malinowska Date: Wed, 15 May 2024 13:14:51 +0200 Subject: [PATCH] Review fixes --- .../core/scaleup/orchestrator/executor.go | 37 +++++++------- .../core/scaleup/orchestrator/orchestrator.go | 48 ++++++++++++------- .../scaleup/orchestrator/orchestrator_test.go | 33 ++++++++++++- cluster-autoscaler/core/test/common.go | 1 + .../orchestrator/orchestrator.go | 2 +- 5 files changed, 85 insertions(+), 36 deletions(-) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor.go b/cluster-autoscaler/core/scaleup/orchestrator/executor.go index 5a685fa7781f..6d41849f67ee 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/executor.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor.go @@ -61,20 +61,20 @@ func (e *scaleUpExecutor) ExecuteScaleUps( scaleUpInfos []nodegroupset.ScaleUpInfo, nodeInfos map[string]*schedulerframework.NodeInfo, now time.Time, - allOrNothing bool, + atomic bool, ) (errors.AutoscalerError, []cloudprovider.NodeGroup) { options := e.autoscalingContext.AutoscalingOptions if options.ParallelScaleUp { - return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, allOrNothing) + return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, atomic) } - return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, allOrNothing) + return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, atomic) } func (e *scaleUpExecutor) executeScaleUpsSync( scaleUpInfos []nodegroupset.ScaleUpInfo, nodeInfos map[string]*schedulerframework.NodeInfo, now time.Time, - allOrNothing bool, + atomic bool, ) (errors.AutoscalerError, []cloudprovider.NodeGroup) { availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes() for _, scaleUpInfo := range scaleUpInfos { @@ -83,7 +83,7 @@ func (e *scaleUpExecutor) executeScaleUpsSync( klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", scaleUpInfo.Group.Id()) continue } - if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil { + if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, atomic); aErr != nil { return aErr, []cloudprovider.NodeGroup{scaleUpInfo.Group} } } @@ -94,7 +94,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel( scaleUpInfos []nodegroupset.ScaleUpInfo, nodeInfos map[string]*schedulerframework.NodeInfo, now time.Time, - allOrNothing bool, + atomic bool, ) (errors.AutoscalerError, []cloudprovider.NodeGroup) { if err := checkUniqueNodeGroups(scaleUpInfos); err != nil { return err, extractNodeGroups(scaleUpInfos) @@ -116,7 +116,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel( klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id()) return } - if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil { + if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, atomic); aErr != nil { errResults <- errResult{err: aErr, info: &info} } }(scaleUpInfo) @@ -139,12 +139,23 @@ func (e *scaleUpExecutor) executeScaleUpsParallel( return nil, nil } +func (e *scaleUpExecutor) increaseSize(nodeGroup cloudprovider.NodeGroup, increase int, atomic bool) error { + if atomic { + if err := nodeGroup.AtomicIncreaseSize(increase); err != cloudprovider.ErrNotImplemented { + return err + } + // If error is cloudprovider.ErrNotImplemented, fall back to non-atomic + // increase - cloud provider doesn't support it. + } + return nodeGroup.IncreaseSize(increase) +} + func (e *scaleUpExecutor) executeScaleUp( info nodegroupset.ScaleUpInfo, nodeInfo *schedulerframework.NodeInfo, availableGPUTypes map[string]struct{}, now time.Time, - allOrNothing bool, + atomic bool, ) errors.AutoscalerError { gpuConfig := e.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node()) gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil) @@ -152,15 +163,7 @@ func (e *scaleUpExecutor) executeScaleUp( e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", "Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) increase := info.NewSize - info.CurrentSize - var err error - if allOrNothing { - if err = info.Group.AtomicIncreaseSize(increase); err == cloudprovider.ErrNotImplemented { - err = info.Group.IncreaseSize(increase) - } - } else { - err = info.Group.IncreaseSize(increase) - } - if err != nil { + if err := e.increaseSize(info.Group, increase, atomic); err != nil { e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err) aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ") e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index 497c10b15db4..7fb533570288 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -89,7 +89,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, - allOrNothing bool, + allOrNothing bool, // Either request enough capacity for all unschedulablePods, or don't request it at all. ) (*status.ScaleUpStatus, errors.AutoscalerError) { if !o.initialized { return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) @@ -217,7 +217,10 @@ func (o *ScaleUpOrchestrator) ScaleUp( if newNodes < bestOption.NodeCount { klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id()) if allOrNothing { - return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. + klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason) + return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil } } @@ -226,7 +229,10 @@ func (o *ScaleUpOrchestrator) ScaleUp( if !bestOption.NodeGroup.Exist() { if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes { klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes) - return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. + klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason) + return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil } var scaleUpStatus *status.ScaleUpStatus createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets) @@ -278,7 +284,10 @@ func (o *ScaleUpOrchestrator) ScaleUp( if totalCapacity < newNodes { klog.V(1).Infof("Can only add %d nodes due to node group limits, need %d nodes", totalCapacity, newNodes) if allOrNothing { - return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. + klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason) + return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil } } @@ -498,20 +507,22 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( if err != nil && err != cloudprovider.ErrNotImplemented { klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err) } + + // Special handling for groups that only scale from zero to max. if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling { + // For zero-or-max scaling groups, the only valid value of node count is node group's max size. if allOrNothing && option.NodeCount > nodeGroup.MaxSize() { - // The following check can quietly cap the number of nodes and so breaks the - // assumption that as long as we're able to provision option.NodeCount of - // nodes, all pods will be accommodated. - // This fix isn't applicable to non-atomic node groups as we'll operate - // on uncapped number of nodes in that case. + // We would have to cap the node count, which means not all pods will be + // accommodated. This violates the principle of all-or-nothing strategy. option.Pods = nil option.NodeCount = 0 } - if option.NodeCount > 0 && option.NodeCount != nodeGroup.MaxSize() { + if option.NodeCount > 0 { + // Cap or increase the number of nodes to the only valid value - node group's max size. option.NodeCount = nodeGroup.MaxSize() } } + return option } @@ -746,24 +757,27 @@ func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, sim return true } -func stopAllOrNothingScaleUp(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) (*status.ScaleUpStatus, errors.AutoscalerError) { - // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. +func markAllGroupsAsUnschedulable(egs []*equivalence.PodGroup, reason status.Reasons) []*equivalence.PodGroup { for _, eg := range egs { if eg.Schedulable { - errs := map[string]status.Reasons{} + if eg.SchedulingErrors == nil { + eg.SchedulingErrors = map[string]status.Reasons{} + } for _, sg := range eg.SchedulableGroups { - errs[sg] = AllOrNothingReason + eg.SchedulingErrors[sg] = reason } eg.Schedulable = false } } - klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + return egs +} + +func buildNoOptionsAvailableStatus(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) *status.ScaleUpStatus { return &status.ScaleUpStatus{ Result: status.ScaleUpNoOptionsAvailable, PodsRemainUnschedulable: GetRemainingPods(egs, skipped), ConsideredNodeGroups: ngs, - }, nil - + } } // GetRemainingPods returns information about pods which CA is unable to help diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 9b887a108313..40b779f545fe 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -861,6 +861,37 @@ func TestNoCreateNodeGroupMaxCoresLimitHit(t *testing.T) { simpleNoScaleUpTest(t, config, results) } +func TestAllOrNothing(t *testing.T) { + options := defaultOptions + + extraPods := []PodConfig{} + extraPodNames := []string{} + for i := 0; i < 11; i++ { + podName := fmt.Sprintf("pod-%d", i) + extraPods = append(extraPods, PodConfig{Name: podName, Cpu: 1000, Memory: 100}) + extraPodNames = append(extraPodNames, podName) + } + + config := &ScaleUpTestConfig{ + Nodes: []NodeConfig{ + {Name: "n1", Cpu: 1000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng"}, + }, + Pods: []PodConfig{}, + ExtraPods: extraPods, + Options: &options, + AllOrNothing: true, + } + + result := &ScaleTestResults{ + NoScaleUpReason: "all-or-nothing", + ScaleUpStatus: ScaleUpStatusInfo{ + PodsRemainUnschedulable: extraPodNames, + }, + } + + simpleNoScaleUpTest(t, config, result) +} + func simpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResults *ScaleTestResults) { results := runSimpleScaleUpTest(t, config) assert.NotNil(t, results.GroupSizeChanges, "Expected scale up event") @@ -1032,7 +1063,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR context.ExpanderStrategy = expander // scale up - scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) + scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, config.AllOrNothing) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) // aggregate group size changes diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index ff9ef91a545c..0bbb6bdc8815 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -127,6 +127,7 @@ type ScaleUpTestConfig struct { Options *config.AutoscalingOptions NodeTemplateConfigs map[string]*NodeTemplateConfig EnableAutoprovisioning bool + AllOrNothing bool } // ScaleUpTestResult represents a node groups scale up result diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index 70e75057ac78..6738ab5ea006 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -91,7 +91,7 @@ func (o *provReqOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, - _ bool, + _ bool, // Provision() doesn't use this parameter. ) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { if !o.initialized { return &status.ScaleUpStatus{}, ca_errors.ToAutoscalerError(ca_errors.InternalError, fmt.Errorf("provisioningrequest.Orchestrator is not initialized"))