Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for all-or-nothing scale-up strategy #6821

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cluster-autoscaler/core/scaleup/equivalence/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (

// PodGroup contains a group of pods that are equivalent in terms of schedulability.
type PodGroup struct {
Pods []*apiv1.Pod
SchedulingErrors map[string]status.Reasons
Schedulable bool
Pods []*apiv1.Pod
SchedulingErrors map[string]status.Reasons
SchedulableGroups []string
Schedulable bool
}

// BuildPodGroups prepares pod groups with equivalent scheduling properties.
Expand Down
22 changes: 17 additions & 5 deletions cluster-autoscaler/core/scaleup/orchestrator/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,20 @@ func (e *scaleUpExecutor) ExecuteScaleUps(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
allOrNothing bool,
towca marked this conversation as resolved.
Show resolved Hide resolved
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
options := e.autoscalingContext.AutoscalingOptions
if options.ParallelScaleUp {
return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now)
return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, allOrNothing)
}
return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now)
return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, allOrNothing)
}

func (e *scaleUpExecutor) executeScaleUpsSync(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
allOrNothing bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes()
for _, scaleUpInfo := range scaleUpInfos {
Expand All @@ -81,7 +83,7 @@ func (e *scaleUpExecutor) executeScaleUpsSync(
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", scaleUpInfo.Group.Id())
continue
}
if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now); aErr != nil {
if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil {
return aErr, []cloudprovider.NodeGroup{scaleUpInfo.Group}
}
}
Expand All @@ -92,6 +94,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
allOrNothing bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
if err := checkUniqueNodeGroups(scaleUpInfos); err != nil {
return err, extractNodeGroups(scaleUpInfos)
Expand All @@ -113,7 +116,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id())
return
}
if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now); aErr != nil {
if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil {
errResults <- errResult{err: aErr, info: &info}
}
}(scaleUpInfo)
Expand Down Expand Up @@ -141,14 +144,23 @@ func (e *scaleUpExecutor) executeScaleUp(
nodeInfo *schedulerframework.NodeInfo,
availableGPUTypes map[string]struct{},
now time.Time,
allOrNothing bool,
) errors.AutoscalerError {
gpuConfig := e.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node())
gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil)
klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
increase := info.NewSize - info.CurrentSize
if err := info.Group.IncreaseSize(increase); err != nil {
var err error
towca marked this conversation as resolved.
Show resolved Hide resolved
if allOrNothing {
if err = info.Group.AtomicIncreaseSize(increase); err == cloudprovider.ErrNotImplemented {
err = info.Group.IncreaseSize(increase)
}
} else {
err = info.Group.IncreaseSize(increase)
}
if err != nil {
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err)
aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ")
e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now)
Expand Down
63 changes: 60 additions & 3 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
towca marked this conversation as resolved.
Show resolved Hide resolved
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
Expand Down Expand Up @@ -146,11 +147,13 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}

for _, nodeGroup := range validNodeGroups {
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now)
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now, allOrNothing)
o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id())

if len(option.Pods) == 0 || option.NodeCount == 0 {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
} else if allOrNothing && len(option.Pods) < len(unschedulablePods) {
klog.V(4).Infof("Some pods can't fit to %s, giving up due to all-or-nothing scale-up strategy", nodeGroup.Id())
} else {
options = append(options, option)
}
Expand Down Expand Up @@ -211,9 +214,20 @@ func (o *ScaleUpOrchestrator) ScaleUp(
aErr)
}

if newNodes < bestOption.NodeCount {
klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id())
if allOrNothing {
return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups)
}
}

// If necessary, create the node group. This is no longer simulation, an empty node group will be created by cloud provider if supported.
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() {
if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes {
klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes)
return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? Will it not be checked in 155-156 and ComputeExpansionOption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In 155-156 we only exclude cases when there's at least one pod that can't be scheduled on nodes belonging to this group at all. If all pods are compatible with that node group, we'll just get a higher node count (possibly higher than group max size - but we can still balance across similar groups later).

  2. In computeExpansionOption we only cap the node count for special case (groups that scale from 0 to max).

We don't check group max size for all cases in either of those places because of balancing across similar groups, which we only do just before executing scale-up.

So without this check, for a scale-up not affected by the above two cases we could create a node group, then decide not to scale it up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think MaxSize() is checked before balancing node groups below, because the amount over max can be distributed to the similar node groups.

But why check newNodes against the max size of just one group, if we're balancing between groups below (and verifying that the total capacity is big enough) anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why check newNodes against the max size of just one group, if we're balancing between groups below (and verifying that the total capacity is big enough) anyway?

This is checked only for autoprovisioned group just before it's created. I think we currently don't have information how many similar groups there'll be. I suspect estimating this may require changes on cloud provider side (for example, if creating a group in one zone implicitly creates groups in another zone with identical taint/labels/resources).

We agreed with @kisieland offline that it's acceptable to limit it for autoprovisioning to max size of one group initially and leave an open issue for handling this better for cases where an autoprovisioned groups will have unobvious similar groups.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that the final total capacity check would still catch this (because the balancing processor also caps all the requests to MaxSize()), but this lets us avoid an unnecessary node group creation in that case?

In any case, this seems okay. As a nit, maybe leave a TODO for the issue when you have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that the final total capacity check would still catch this (because the balancing processor also caps all the requests to MaxSize()), but this lets us avoid an unnecessary node group creation in that case?

Yes, that's exactly the case.

As a nit, maybe leave a TODO for the issue when you have it.

Will do.

var scaleUpStatus *status.ScaleUpStatus
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets)
if aErr != nil {
Expand Down Expand Up @@ -256,9 +270,21 @@ func (o *ScaleUpOrchestrator) ScaleUp(
aErr)
}

// Last check before scale-up. Node group capacity (both due to max size limits & current size) is only checked when balancing.
totalCapacity := 0
for _, sui := range scaleUpInfos {
totalCapacity += sui.NewSize - sui.CurrentSize
}
if totalCapacity < newNodes {
klog.V(1).Infof("Can only add %d nodes due to node group limits, need %d nodes", totalCapacity, newNodes)
if allOrNothing {
return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups)
}
}

// Execute scale up.
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now, allOrNothing)
if aErr != nil {
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
Expand Down Expand Up @@ -364,7 +390,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
}

klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now, false /* allOrNothing disabled */)
if aErr != nil {
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
Expand Down Expand Up @@ -447,6 +473,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
nodeInfos map[string]*schedulerframework.NodeInfo,
currentNodeCount int,
now time.Time,
allOrNothing bool,
) expander.Option {
option := expander.Option{NodeGroup: nodeGroup}
podGroups := schedulablePodGroups[nodeGroup.Id()]
Expand All @@ -472,6 +499,15 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
}
if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling {
if allOrNothing && option.NodeCount > nodeGroup.MaxSize() {
towca marked this conversation as resolved.
Show resolved Hide resolved
// The following check can quietly cap the number of nodes and so breaks the
// assumption that as long as we're able to provision option.NodeCount of
// nodes, all pods will be accommodated.
// This fix isn't applicable to non-atomic node groups as we'll operate
// on uncapped number of nodes in that case.
option.Pods = nil
option.NodeCount = 0
}
if option.NodeCount > 0 && option.NodeCount != nodeGroup.MaxSize() {
option.NodeCount = nodeGroup.MaxSize()
}
Expand Down Expand Up @@ -564,6 +600,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
})
// Mark pod group as (theoretically) schedulable.
eg.Schedulable = true
eg.SchedulableGroups = append(eg.SchedulableGroups, nodeGroup.Id())
} else {
klog.V(2).Infof("Pod %s/%s can't be scheduled on %s, predicate checking error: %v", samplePod.Namespace, samplePod.Name, nodeGroup.Id(), err.VerboseMessage())
if podCount := len(eg.Pods); podCount > 1 {
Expand Down Expand Up @@ -709,6 +746,26 @@ func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, sim
return true
}

func stopAllOrNothingScaleUp(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) (*status.ScaleUpStatus, errors.AutoscalerError) {
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
for _, eg := range egs {
if eg.Schedulable {
errs := map[string]status.Reasons{}
towca marked this conversation as resolved.
Show resolved Hide resolved
for _, sg := range eg.SchedulableGroups {
errs[sg] = AllOrNothingReason
}
eg.Schedulable = false
}
}
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
return &status.ScaleUpStatus{
towca marked this conversation as resolved.
Show resolved Hide resolved
Result: status.ScaleUpNoOptionsAvailable,
PodsRemainUnschedulable: GetRemainingPods(egs, skipped),
ConsideredNodeGroups: ngs,
}, nil

}

// GetRemainingPods returns information about pods which CA is unable to help
// at this moment.
func GetRemainingPods(egs []*equivalence.PodGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some test cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of my new test cases are in the other PR: #6824

I can add some here as well to have more granularity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test case which fails if allOrNothing is false. More extensive test cases remain in #6824

Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
context.ExpanderStrategy = expander

// scale up
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

// aggregate group size changes
Expand Down Expand Up @@ -1131,7 +1131,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)

assert.NoError(t, err)
// Node group is unhealthy.
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func TestBinpackingLimiter(t *testing.T) {
expander := NewMockRepotingStrategy(t, nil)
context.ExpanderStrategy = expander

scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1231,7 +1231,7 @@ func TestScaleUpNoHelp(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -1453,7 +1453,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)

assert.NoError(t, typedErr)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1515,7 +1515,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down Expand Up @@ -1570,7 +1570,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down
37 changes: 37 additions & 0 deletions cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator

// RejectedReasons contains information why given node group was rejected as a scale-up option.
type RejectedReasons struct {
messages []string
}

// NewRejectedReasons creates new RejectedReason object.
func NewRejectedReasons(m string) *RejectedReasons {
return &RejectedReasons{[]string{m}}
}

// Reasons returns a slice of reasons why the node group was not considered for scale up.
func (sr *RejectedReasons) Reasons() []string {
return sr.messages
}

var (
// AllOrNothingReason means the node group was rejected because not all pods would fit it when using all-or-nothing strategy.
AllOrNothingReason = NewRejectedReasons("not all pods would fit and scale-up is using all-or-nothing strategy")
)
1 change: 1 addition & 0 deletions cluster-autoscaler/core/scaleup/scaleup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Orchestrator interface {
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
) (*status.ScaleUpStatus, errors.AutoscalerError)
// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
// than the configured min size. The source of truth for the current node group
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else {
scaleUpStart := preScaleUp()
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups)
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, false)
if exit, err := postScaleUp(scaleUpStart); exit {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (o *provReqOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
_ bool,
towca marked this conversation as resolved.
Show resolved Hide resolved
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
if !o.initialized {
return &status.ScaleUpStatus{}, ca_errors.ToAutoscalerError(ca_errors.InternalError, fmt.Errorf("provisioningrequest.Orchestrator is not initialized"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestScaleUp(t *testing.T) {
provisioningClasses: []provisioningClass{checkcapacity.New(client)},
}
orchestrator.Initialize(&autoscalingContext, nil, nil, nil, taints.TaintConfig{})
st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{})
st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}, false)
if !tc.err {
assert.NoError(t, err)
assert.Equal(t, tc.scaleUpResult, st.Result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (o *WrapperOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
defer func() { o.scaleUpRegularPods = !o.scaleUpRegularPods }()

Expand All @@ -79,9 +80,9 @@ func (o *WrapperOrchestrator) ScaleUp(
}

if o.scaleUpRegularPods {
return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos, allOrNothing)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for podsOrchestrator allOrNothing=false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is false and set in static_autoscaler.go, I'm just passing the variable here.

I wanted this to be set to whatever is the default for the autoscaler, and the custom orchestrators to override it only as needed (like atomic scale-up for ProvisioningRequests will).

}
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos)
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos, allOrNothing)
}

func splitOut(unschedulablePods []*apiv1.Pod) (provReqPods, regularPods []*apiv1.Pod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func TestWrapperScaleUp(t *testing.T) {
pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey] = "true"
}
unschedulablePods := append(regularPods, provReqPods...)
_, err := o.ScaleUp(unschedulablePods, nil, nil, nil)
_, err := o.ScaleUp(unschedulablePods, nil, nil, nil, false)
assert.Equal(t, err.Error(), provisioningRequestErrorMsg)
_, err = o.ScaleUp(unschedulablePods, nil, nil, nil)
_, err = o.ScaleUp(unschedulablePods, nil, nil, nil, false)
assert.Equal(t, err.Error(), regularPodsErrorMsg)
}

Expand All @@ -71,6 +71,7 @@ func (f *fakeScaleUp) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return nil, errors.NewAutoscalerError(errors.InternalError, f.errorMsg)
}
Expand Down
Loading