Skip to content

Commit

Permalink
Merge pull request #5750 from BigDarkClown/similar
Browse files Browse the repository at this point in the history
Fix scale-up similar node group computation
  • Loading branch information
k8s-ci-robot authored May 11, 2023
2 parents c04fceb + 645fe63 commit ee59c74
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 106 deletions.
224 changes: 122 additions & 102 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,48 +125,29 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}

now := time.Now()
expansionOptions := make(map[string]expander.Option, 0)
skippedNodeGroups := map[string]status.Reasons{}

for _, nodeGroup := range nodeGroups {
if skipReason := o.IsNodeGroupReadyToScaleUp(nodeGroup, now); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}
// Filter out invalid node groups
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, resourcesLeft, now)

currentTargetSize, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Failed to get node group size: %v", err)
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
if currentTargetSize >= nodeGroup.MaxSize() {
klog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = MaxLimitReachedReason
continue
}
// Calculate expansion options
schedulablePods := map[string][]*apiv1.Pod{}
var options []expander.Option

nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
klog.Errorf("No node info for: %s", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
for _, nodeGroup := range validNodeGroups {
schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfos[nodeGroup.Id()])
}

if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
for _, nodeGroup := range validNodeGroups {
option := o.ComputeExpansionOption(nodeGroup, schedulablePods, nodeInfos, upcomingNodes, now)
if len(option.Pods) == 0 || option.NodeCount == 0 {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
continue
}

option := o.ComputeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
if len(option.Pods) > 0 && option.NodeCount > 0 {
expansionOptions[nodeGroup.Id()] = option
} else {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
}
options = append(options, option)
}

if len(expansionOptions) == 0 {
if len(options) == 0 {
klog.V(1).Info("No expansion options")
return &status.ScaleUpStatus{
Result: status.ScaleUpNoOptionsAvailable,
Expand All @@ -176,10 +157,6 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}

// Pick some expansion option.
options := make([]expander.Option, 0, len(expansionOptions))
for _, o := range expansionOptions {
options = append(options, o)
}
bestOption := o.autoscalingContext.ExpanderStrategy.BestOption(options, nodeInfos)
if bestOption == nil || bestOption.NodeCount <= 0 {
return &status.ScaleUpStatus{
Expand Down Expand Up @@ -233,17 +210,16 @@ func (o *ScaleUpOrchestrator) ScaleUp(
continue
}
nodeInfos[nodeGroup.Id()] = nodeInfo

option := o.ComputeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
if len(option.Pods) > 0 && option.NodeCount > 0 {
expansionOptions[nodeGroup.Id()] = option
}
schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfo)
}

// Update ClusterStateRegistry so similar nodegroups rebalancing works.
// TODO(lukaszos) when pursuing scalability update this call with one which takes list of changed node groups so we do not
// do extra API calls. (the call at the bottom of ScaleUp() could be also changed then)
o.clusterStateRegistry.Recalculate()

// Recompute similar node groups
bestOption.SimilarNodeGroups = o.ComputeSimilarNodeGroups(bestOption.NodeGroup, nodeInfos, schedulablePods, now)
}

nodeInfo, found := nodeInfos[bestOption.NodeGroup.Id()]
Expand All @@ -266,32 +242,16 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}

targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup}
if o.autoscalingContext.BalanceSimilarNodeGroups {
similarNodeGroups, aErr := o.processors.NodeGroupSetProcessor.FindSimilarNodeGroups(o.autoscalingContext, bestOption.NodeGroup, nodeInfos)
if aErr != nil {
return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
aErr.AddPrefix("failed to find matching node groups: "))
}

similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, expansionOptions)
for _, ng := range similarNodeGroups {
if o.clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) {
targetNodeGroups = append(targetNodeGroups, ng)
} else {
// This should never happen, as we will filter out the node group earlier on because of missing
// entry in podsPassingPredicates, but double checking doesn't really cost us anything.
klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id())
}
}
for _, ng := range bestOption.SimilarNodeGroups {
targetNodeGroups = append(targetNodeGroups, ng)
}

if len(targetNodeGroups) > 1 {
names := []string{}
for _, ng := range targetNodeGroups {
names = append(names, ng.Id())
}
klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), strings.Join(names, ", "))
if len(targetNodeGroups) > 1 {
var names []string
for _, ng := range targetNodeGroups {
names = append(names, ng.Id())
}
klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), strings.Join(names, ", "))
}

scaleUpInfos, aErr := o.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(o.autoscalingContext, targetNodeGroups, newNodes)
Expand Down Expand Up @@ -426,24 +386,69 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
}, nil
}

// filterValidScaleUpNodeGroups filters the node groups that are valid for scale-up
func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
resourcesLeft resource.Limits,
now time.Time,
) ([]cloudprovider.NodeGroup, map[string]status.Reasons) {
var validNodeGroups []cloudprovider.NodeGroup
skippedNodeGroups := map[string]status.Reasons{}

for _, nodeGroup := range nodeGroups {
if skipReason := o.IsNodeGroupReadyToScaleUp(nodeGroup, now); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}

currentTargetSize, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Failed to get node group size: %v", err)
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
if currentTargetSize >= nodeGroup.MaxSize() {
klog.V(4).Infof("Skipping node group %s - max size reached", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = MaxLimitReachedReason
continue
}

nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
klog.Errorf("No node info for: %s", nodeGroup.Id())
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}

validNodeGroups = append(validNodeGroups, nodeGroup)
}
return validNodeGroups, skippedNodeGroups
}

// ComputeExpansionOption computes expansion option based on pending pods and cluster state.
func (o *ScaleUpOrchestrator) ComputeExpansionOption(
podEquivalenceGroups []*equivalence.PodGroup,
nodeGroup cloudprovider.NodeGroup,
nodeInfo *schedulerframework.NodeInfo,
schedulablePods map[string][]*apiv1.Pod,
nodeInfos map[string]*schedulerframework.NodeInfo,
upcomingNodes []*schedulerframework.NodeInfo,
now time.Time,
) expander.Option {
option := expander.Option{
NodeGroup: nodeGroup,
Pods: make([]*apiv1.Pod, 0),
}
option := expander.Option{NodeGroup: nodeGroup}
pods := schedulablePods[nodeGroup.Id()]
nodeInfo := nodeInfos[nodeGroup.Id()]

option.Pods = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfo)
if len(option.Pods) > 0 {
estimator := o.autoscalingContext.EstimatorBuilder(o.autoscalingContext.PredicateChecker, o.autoscalingContext.ClusterSnapshot)
option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup)
if len(pods) == 0 {
return option
}

estimator := o.autoscalingContext.EstimatorBuilder(o.autoscalingContext.PredicateChecker, o.autoscalingContext.ClusterSnapshot)
option.NodeCount, option.Pods = estimator.Estimate(pods, nodeInfo, nodeGroup)
option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePods, now)
return option
}

Expand Down Expand Up @@ -557,37 +562,52 @@ func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCou
return newNodeCount, nil
}

func filterNodeGroupsByPods(
groups []cloudprovider.NodeGroup,
podsRequiredToFit []*apiv1.Pod,
expansionOptions map[string]expander.Option,
// ComputeSimilarNodeGroups finds similar node groups which can schedule the same
// set of pods as the main node group.
func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
nodeGroup cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
schedulablePods map[string][]*apiv1.Pod,
now time.Time,
) []cloudprovider.NodeGroup {
result := make([]cloudprovider.NodeGroup, 0)
if !o.autoscalingContext.BalanceSimilarNodeGroups {
return nil
}

for _, group := range groups {
option, found := expansionOptions[group.Id()]
if !found {
klog.V(1).Infof("No info about pods passing predicates found for group %v, skipping it from scale-up consideration", group.Id())
continue
}
fittingPods := make(map[*apiv1.Pod]bool, len(option.Pods))
for _, pod := range option.Pods {
fittingPods[pod] = true
}
allFit := true
for _, pod := range podsRequiredToFit {
if _, found := fittingPods[pod]; !found {
klog.V(1).Infof("Group %v, can't fit pod %v/%v, removing from scale-up consideration", group.Id(), pod.Namespace, pod.Name)
allFit = false
break
}
}
if allFit {
result = append(result, group)
groupSchedulablePods, found := schedulablePods[nodeGroup.Id()]
if !found || len(groupSchedulablePods) == 0 {
return nil
}

similarNodeGroups, err := o.processors.NodeGroupSetProcessor.FindSimilarNodeGroups(o.autoscalingContext, nodeGroup, nodeInfos)
if err != nil {
klog.Errorf("Failed to find similar node groups: %v", err)
return nil
}

var validSimilarNodeGroups []cloudprovider.NodeGroup
for _, ng := range similarNodeGroups {
if !o.clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) {
klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id())
} else if similarSchedulablePods, found := schedulablePods[ng.Id()]; found && matchingSchedulablePods(groupSchedulablePods, similarSchedulablePods) {
validSimilarNodeGroups = append(validSimilarNodeGroups, ng)
}
}

return result
return validSimilarNodeGroups
}

func matchingSchedulablePods(groupSchedulablePods []*apiv1.Pod, similarSchedulablePods []*apiv1.Pod) bool {
schedulablePods := make(map[*apiv1.Pod]bool)
for _, pod := range similarSchedulablePods {
schedulablePods[pod] = true
}
for _, pod := range groupSchedulablePods {
if _, found := schedulablePods[pod]; !found {
return false
}
}
return true
}

// GetRemainingPods returns information about pods which CA is unable to help
Expand Down
Loading

0 comments on commit ee59c74

Please sign in to comment.