Skip to content

Commit

Permalink
Add support for all-or-nothing scale-up strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksandra-malinowska committed May 13, 2024
1 parent 5ecf37a commit 0fd691d
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 17 deletions.
7 changes: 4 additions & 3 deletions cluster-autoscaler/core/scaleup/equivalence/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (

// PodGroup contains a group of pods that are equivalent in terms of schedulability.
type PodGroup struct {
Pods []*apiv1.Pod
SchedulingErrors map[string]status.Reasons
Schedulable bool
Pods []*apiv1.Pod
SchedulingErrors map[string]status.Reasons
SchedulableGroups []string
Schedulable bool
}

// BuildPodGroups prepares pod groups with equivalent scheduling properties.
Expand Down
59 changes: 58 additions & 1 deletion cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
Expand Down Expand Up @@ -146,11 +147,13 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}

for _, nodeGroup := range validNodeGroups {
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now)
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now, allOrNothing)
o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id())

if len(option.Pods) == 0 || option.NodeCount == 0 {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
} else if allOrNothing && len(option.Pods) < len(unschedulablePods) {
klog.V(4).Infof("Some pods can't fit to %s, giving up due to all-or-nothing scale-up strategy", nodeGroup.Id())
} else {
options = append(options, option)
}
Expand Down Expand Up @@ -211,9 +214,20 @@ func (o *ScaleUpOrchestrator) ScaleUp(
aErr)
}

if newNodes < bestOption.NodeCount {
klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id())
if allOrNothing {
return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups)
}
}

// If necessary, create the node group. This is no longer simulation, an empty node group will be created by cloud provider if supported.
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() {
if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes {
klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes)
return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups)
}
var scaleUpStatus *status.ScaleUpStatus
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets)
if aErr != nil {
Expand Down Expand Up @@ -256,6 +270,18 @@ func (o *ScaleUpOrchestrator) ScaleUp(
aErr)
}

// Last check before scale-up. Node group capacity (both due to max size limits & current size) is only checked when balancing.
totalCapacity := 0
for _, sui := range scaleUpInfos {
totalCapacity += sui.NewSize - sui.CurrentSize
}
if totalCapacity < newNodes {
klog.V(1).Infof("Can only add %d nodes due to node group limits, need %d nodes", totalCapacity, newNodes)
if allOrNothing {
return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups)
}
}

// Execute scale up.
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
Expand Down Expand Up @@ -447,6 +473,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
nodeInfos map[string]*schedulerframework.NodeInfo,
currentNodeCount int,
now time.Time,
allOrNothing bool,
) expander.Option {
option := expander.Option{NodeGroup: nodeGroup}
podGroups := schedulablePodGroups[nodeGroup.Id()]
Expand All @@ -472,6 +499,15 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
}
if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling {
if allOrNothing && option.NodeCount > nodeGroup.MaxSize() {
// The following check can quietly cap the number of nodes and so breaks the
// assumption that as long as we're able to provision option.NodeCount of
// nodes, all pods will be accommodated.
// This fix isn't applicable to non-atomic node groups as we'll operate
// on uncapped number of nodes in that case.
option.Pods = nil
option.NodeCount = 0
}
if option.NodeCount > 0 && option.NodeCount != nodeGroup.MaxSize() {
option.NodeCount = nodeGroup.MaxSize()
}
Expand Down Expand Up @@ -564,6 +600,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
})
// Mark pod group as (theoretically) schedulable.
eg.Schedulable = true
eg.SchedulableGroups = append(eg.SchedulableGroups, nodeGroup.Id())
} else {
klog.V(2).Infof("Pod %s/%s can't be scheduled on %s, predicate checking error: %v", samplePod.Namespace, samplePod.Name, nodeGroup.Id(), err.VerboseMessage())
if podCount := len(eg.Pods); podCount > 1 {
Expand Down Expand Up @@ -709,6 +746,26 @@ func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, sim
return true
}

func stopAllOrNothingScaleUp(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) (*status.ScaleUpStatus, errors.AutoscalerError) {
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
for _, eg := range egs {
if eg.Schedulable {
errs := map[string]status.Reasons{}
for _, sg := range eg.SchedulableGroups {
errs[sg] = AllOrNothingReason
}
eg.Schedulable = false
}
}
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
return &status.ScaleUpStatus{
Result: status.ScaleUpNoOptionsAvailable,
PodsRemainUnschedulable: GetRemainingPods(egs, skipped),
ConsideredNodeGroups: ngs,
}, nil

}

// GetRemainingPods returns information about pods which CA is unable to help
// at this moment.
func GetRemainingPods(egs []*equivalence.PodGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
context.ExpanderStrategy = expander

// scale up
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

// aggregate group size changes
Expand Down Expand Up @@ -1131,7 +1131,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)

assert.NoError(t, err)
// Node group is unhealthy.
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func TestBinpackingLimiter(t *testing.T) {
expander := NewMockRepotingStrategy(t, nil)
context.ExpanderStrategy = expander

scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1231,7 +1231,7 @@ func TestScaleUpNoHelp(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -1453,7 +1453,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)

assert.NoError(t, typedErr)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1515,7 +1515,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down Expand Up @@ -1570,7 +1570,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down
37 changes: 37 additions & 0 deletions cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator

// RejectedReasons contains information why given node group was rejected as a scale-up option.
type RejectedReasons struct {
messages []string
}

// NewRejectedReasons creates new RejectedReason object.
func NewRejectedReasons(m string) *RejectedReasons {
return &RejectedReasons{[]string{m}}
}

// Reasons returns a slice of reasons why the node group was not considered for scale up.
func (sr *RejectedReasons) Reasons() []string {
return sr.messages
}

var (
// AllOrNothingReason means the node group was rejected because not all pods would fit it when using all-or-nothing strategy.
AllOrNothingReason = NewRejectedReasons("not all pods would fit and scale-up is using all-or-nothing strategy")
)
1 change: 1 addition & 0 deletions cluster-autoscaler/core/scaleup/scaleup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Orchestrator interface {
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
) (*status.ScaleUpStatus, errors.AutoscalerError)
// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
// than the configured min size. The source of truth for the current node group
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else {
scaleUpStart := preScaleUp()
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups)
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, false)
if exit, err := postScaleUp(scaleUpStart); exit {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (o *provReqOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
_ bool,
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
if !o.initialized {
return &status.ScaleUpStatus{}, ca_errors.ToAutoscalerError(ca_errors.InternalError, fmt.Errorf("provisioningrequest.Orchestrator is not initialized"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestScaleUp(t *testing.T) {
provisioningClasses: []provisioningClass{checkcapacity.New(client)},
}
orchestrator.Initialize(&autoscalingContext, nil, nil, nil, taints.TaintConfig{})
st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{})
st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}, false)
if !tc.err {
assert.NoError(t, err)
assert.Equal(t, tc.scaleUpResult, st.Result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (o *WrapperOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
defer func() { o.scaleUpRegularPods = !o.scaleUpRegularPods }()

Expand All @@ -79,9 +80,9 @@ func (o *WrapperOrchestrator) ScaleUp(
}

if o.scaleUpRegularPods {
return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos, allOrNothing)
}
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos)
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos, allOrNothing)
}

func splitOut(unschedulablePods []*apiv1.Pod) (provReqPods, regularPods []*apiv1.Pod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func TestWrapperScaleUp(t *testing.T) {
pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey] = "true"
}
unschedulablePods := append(regularPods, provReqPods...)
_, err := o.ScaleUp(unschedulablePods, nil, nil, nil)
_, err := o.ScaleUp(unschedulablePods, nil, nil, nil, false)
assert.Equal(t, err.Error(), provisioningRequestErrorMsg)
_, err = o.ScaleUp(unschedulablePods, nil, nil, nil)
_, err = o.ScaleUp(unschedulablePods, nil, nil, nil, false)
assert.Equal(t, err.Error(), regularPodsErrorMsg)
}

Expand All @@ -71,6 +71,7 @@ func (f *fakeScaleUp) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return nil, errors.NewAutoscalerError(errors.InternalError, f.errorMsg)
}
Expand Down

0 comments on commit 0fd691d

Please sign in to comment.