Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksandra-malinowska authored and k8s-infra-cherrypick-robot committed Jul 9, 2024
1 parent c74f2df commit 12374e6
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 36 deletions.
37 changes: 20 additions & 17 deletions cluster-autoscaler/core/scaleup/orchestrator/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,20 @@ func (e *scaleUpExecutor) ExecuteScaleUps(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
allOrNothing bool,
atomic bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
options := e.autoscalingContext.AutoscalingOptions
if options.ParallelScaleUp {
return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, allOrNothing)
return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, atomic)
}
return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, allOrNothing)
return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, atomic)
}

func (e *scaleUpExecutor) executeScaleUpsSync(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
allOrNothing bool,
atomic bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes()
for _, scaleUpInfo := range scaleUpInfos {
Expand All @@ -83,7 +83,7 @@ func (e *scaleUpExecutor) executeScaleUpsSync(
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", scaleUpInfo.Group.Id())
continue
}
if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil {
if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, atomic); aErr != nil {
return aErr, []cloudprovider.NodeGroup{scaleUpInfo.Group}
}
}
Expand All @@ -94,7 +94,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
allOrNothing bool,
atomic bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
if err := checkUniqueNodeGroups(scaleUpInfos); err != nil {
return err, extractNodeGroups(scaleUpInfos)
Expand All @@ -116,7 +116,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id())
return
}
if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil {
if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, atomic); aErr != nil {
errResults <- errResult{err: aErr, info: &info}
}
}(scaleUpInfo)
Expand All @@ -139,28 +139,31 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
return nil, nil
}

func (e *scaleUpExecutor) increaseSize(nodeGroup cloudprovider.NodeGroup, increase int, atomic bool) error {
if atomic {
if err := nodeGroup.AtomicIncreaseSize(increase); err != cloudprovider.ErrNotImplemented {
return err
}
// If error is cloudprovider.ErrNotImplemented, fall back to non-atomic
// increase - cloud provider doesn't support it.
}
return nodeGroup.IncreaseSize(increase)
}

func (e *scaleUpExecutor) executeScaleUp(
info nodegroupset.ScaleUpInfo,
nodeInfo *schedulerframework.NodeInfo,
availableGPUTypes map[string]struct{},
now time.Time,
allOrNothing bool,
atomic bool,
) errors.AutoscalerError {
gpuConfig := e.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node())
gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil)
klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
increase := info.NewSize - info.CurrentSize
var err error
if allOrNothing {
if err = info.Group.AtomicIncreaseSize(increase); err == cloudprovider.ErrNotImplemented {
err = info.Group.IncreaseSize(increase)
}
} else {
err = info.Group.IncreaseSize(increase)
}
if err != nil {
if err := e.increaseSize(info.Group, increase, atomic); err != nil {
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err)
aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ")
e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now)
Expand Down
48 changes: 31 additions & 17 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
allOrNothing bool, // Either request enough capacity for all unschedulablePods, or don't request it at all.
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
Expand Down Expand Up @@ -217,7 +217,10 @@ func (o *ScaleUpOrchestrator) ScaleUp(
if newNodes < bestOption.NodeCount {
klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id())
if allOrNothing {
return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups)
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason)
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
}

Expand All @@ -226,7 +229,10 @@ func (o *ScaleUpOrchestrator) ScaleUp(
if !bestOption.NodeGroup.Exist() {
if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes {
klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes)
return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups)
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason)
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
var scaleUpStatus *status.ScaleUpStatus
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets)
Expand Down Expand Up @@ -278,7 +284,10 @@ func (o *ScaleUpOrchestrator) ScaleUp(
if totalCapacity < newNodes {
klog.V(1).Infof("Can only add %d nodes due to node group limits, need %d nodes", totalCapacity, newNodes)
if allOrNothing {
return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups)
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason)
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
}

Expand Down Expand Up @@ -498,20 +507,22 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
if err != nil && err != cloudprovider.ErrNotImplemented {
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
}

// Special handling for groups that only scale from zero to max.
if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling {
// For zero-or-max scaling groups, the only valid value of node count is node group's max size.
if allOrNothing && option.NodeCount > nodeGroup.MaxSize() {
// The following check can quietly cap the number of nodes and so breaks the
// assumption that as long as we're able to provision option.NodeCount of
// nodes, all pods will be accommodated.
// This fix isn't applicable to non-atomic node groups as we'll operate
// on uncapped number of nodes in that case.
// We would have to cap the node count, which means not all pods will be
// accommodated. This violates the principle of all-or-nothing strategy.
option.Pods = nil
option.NodeCount = 0
}
if option.NodeCount > 0 && option.NodeCount != nodeGroup.MaxSize() {
if option.NodeCount > 0 {
// Cap or increase the number of nodes to the only valid value - node group's max size.
option.NodeCount = nodeGroup.MaxSize()
}
}

return option
}

Expand Down Expand Up @@ -746,24 +757,27 @@ func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, sim
return true
}

func stopAllOrNothingScaleUp(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) (*status.ScaleUpStatus, errors.AutoscalerError) {
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
func markAllGroupsAsUnschedulable(egs []*equivalence.PodGroup, reason status.Reasons) []*equivalence.PodGroup {
for _, eg := range egs {
if eg.Schedulable {
errs := map[string]status.Reasons{}
if eg.SchedulingErrors == nil {
eg.SchedulingErrors = map[string]status.Reasons{}
}
for _, sg := range eg.SchedulableGroups {
errs[sg] = AllOrNothingReason
eg.SchedulingErrors[sg] = reason
}
eg.Schedulable = false
}
}
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
return egs
}

func buildNoOptionsAvailableStatus(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) *status.ScaleUpStatus {
return &status.ScaleUpStatus{
Result: status.ScaleUpNoOptionsAvailable,
PodsRemainUnschedulable: GetRemainingPods(egs, skipped),
ConsideredNodeGroups: ngs,
}, nil

}
}

// GetRemainingPods returns information about pods which CA is unable to help
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,37 @@ func TestNoCreateNodeGroupMaxCoresLimitHit(t *testing.T) {
simpleNoScaleUpTest(t, config, results)
}

func TestAllOrNothing(t *testing.T) {
options := defaultOptions

extraPods := []PodConfig{}
extraPodNames := []string{}
for i := 0; i < 11; i++ {
podName := fmt.Sprintf("pod-%d", i)
extraPods = append(extraPods, PodConfig{Name: podName, Cpu: 1000, Memory: 100})
extraPodNames = append(extraPodNames, podName)
}

config := &ScaleUpTestConfig{
Nodes: []NodeConfig{
{Name: "n1", Cpu: 1000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng"},
},
Pods: []PodConfig{},
ExtraPods: extraPods,
Options: &options,
AllOrNothing: true,
}

result := &ScaleTestResults{
NoScaleUpReason: "all-or-nothing",
ScaleUpStatus: ScaleUpStatusInfo{
PodsRemainUnschedulable: extraPodNames,
},
}

simpleNoScaleUpTest(t, config, result)
}

func simpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResults *ScaleTestResults) {
results := runSimpleScaleUpTest(t, config)
assert.NotNil(t, results.GroupSizeChanges, "Expected scale up event")
Expand Down Expand Up @@ -1032,7 +1063,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
context.ExpanderStrategy = expander

// scale up
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, config.AllOrNothing)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

// aggregate group size changes
Expand Down
1 change: 1 addition & 0 deletions cluster-autoscaler/core/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type ScaleUpTestConfig struct {
Options *config.AutoscalingOptions
NodeTemplateConfigs map[string]*NodeTemplateConfig
EnableAutoprovisioning bool
AllOrNothing bool
}

// ScaleUpTestResult represents a node groups scale up result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (o *provReqOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
_ bool,
_ bool, // Provision() doesn't use this parameter.
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
if !o.initialized {
return &status.ScaleUpStatus{}, ca_errors.ToAutoscalerError(ca_errors.InternalError, fmt.Errorf("provisioningrequest.Orchestrator is not initialized"))
Expand Down

0 comments on commit 12374e6

Please sign in to comment.