Skip to content

Commit

Permalink
ScaleUpManager refactoring
Browse files Browse the repository at this point in the history
* Make the structures public, as well as some helper functions
* manager.go to scaleup.go file rename
* Minor code simplifications
* Minor comment fixes/style consolidations
  • Loading branch information
kisieland committed Mar 15, 2023
1 parent 3d796de commit 4d6ac8a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 74 deletions.
File renamed without changes.
156 changes: 83 additions & 73 deletions cluster-autoscaler/core/scaleup/wrapper/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,24 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type scaleUpManagerFactory struct {
// ScaleUpManagerFactory implements scaleup.ManagerFactory interface.
type ScaleUpManagerFactory struct {
}

// NewManagerFactory returns new instance of scale up manager factory.
func NewManagerFactory() *scaleUpManagerFactory {
return &scaleUpManagerFactory{}
func NewManagerFactory() *ScaleUpManagerFactory {
return &ScaleUpManagerFactory{}
}

// NewScaleUpWrapper returns new instance of scale up wrapper.
func (f *scaleUpManagerFactory) NewManager(
func (f *ScaleUpManagerFactory) NewManager(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
ignoredTaints taints.TaintKeySet,
) scaleup.Manager {
resourceManager := resource.NewManager(processors.CustomResourcesProcessor)
return &scaleUpManager{
return &ScaleUpManager{
autoscalingContext: autoscalingContext,
processors: processors,
resourceManager: resourceManager,
Expand All @@ -52,7 +53,8 @@ func (f *scaleUpManagerFactory) NewManager(
}
}

type scaleUpManager struct {
// ScaleUpManager implements scaleup.Manager interface.
type ScaleUpManager struct {
autoscalingContext *context.AutoscalingContext
processors *ca_processors.AutoscalingProcessors
resourceManager *resource.Manager
Expand All @@ -63,7 +65,7 @@ type scaleUpManager struct {
// ScaleUp tries to scale the cluster up. Returns appropriate status or error if
// an unexpected error occurred. Assumes that all nodes in the cluster are ready
// and in sync with instance groups.
func (w *scaleUpManager) ScaleUp(
func (w *ScaleUpManager) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
Expand Down Expand Up @@ -118,7 +120,7 @@ func (w *scaleUpManager) ScaleUp(
skippedNodeGroups := map[string]status.Reasons{}

for _, nodeGroup := range nodeGroups {
if skipReason := w.isNodeGroupReadyToScaleUp(nodeGroup, now); skipReason != nil {
if skipReason := w.IsNodeGroupReadyToScaleUp(nodeGroup, now); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}
Expand All @@ -142,12 +144,12 @@ func (w *scaleUpManager) ScaleUp(
continue
}

if skipReason := w.isNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo); skipReason != nil {
if skipReason := w.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}

option, err := w.computeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
option, err := w.ComputeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err))
}
Expand Down Expand Up @@ -187,12 +189,10 @@ func (w *scaleUpManager) ScaleUp(
}
klog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id())

newNodes := bestOption.NodeCount
newNodeCount, aErr := w.getCappedNewNodeCount(newNodes, len(nodes)+len(upcomingNodes))
newNodes, aErr := w.GetCappedNewNodeCount(bestOption.NodeCount, len(nodes)+len(upcomingNodes))
if aErr != nil {
return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
}
newNodes = newNodeCount

createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() {
Expand Down Expand Up @@ -229,7 +229,7 @@ func (w *scaleUpManager) ScaleUp(
}
nodeInfos[nodeGroup.Id()] = nodeInfo

option, err := w.computeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
option, err := w.ComputeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, errors.ToAutoscalerError(errors.InternalError, err))
}
Expand All @@ -247,8 +247,7 @@ func (w *scaleUpManager) ScaleUp(

nodeInfo, found := nodeInfos[bestOption.NodeGroup.Id()]
if !found {
// This should never happen, as we already should have retrieved
// nodeInfo for any considered nodegroup.
// This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup.
klog.Errorf("No node info for: %s", bestOption.NodeGroup.Id())
return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
Expand All @@ -257,7 +256,7 @@ func (w *scaleUpManager) ScaleUp(
"No node info for best expansion option!"))
}

// apply upper limits for CPU and memory
// Apply upper limits for CPU and memory.
newNodes, aErr = w.resourceManager.ApplyLimits(w.autoscalingContext, newNodes, resourcesLeft, nodeInfo, bestOption.NodeGroup)
if aErr != nil {
return scaleUpError(
Expand All @@ -279,9 +278,8 @@ func (w *scaleUpManager) ScaleUp(
if w.clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) {
targetNodeGroups = append(targetNodeGroups, ng)
} else {
// This should never happen, as we will filter out the node group earlier on
// because of missing entry in podsPassingPredicates, but double checking doesn't
// really cost us anything
// This should never happen, as we will filter out the node group earlier on because of missing
// entry in podsPassingPredicates, but double checking doesn't really cost us anything.
klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id())
}
}
Expand All @@ -295,16 +293,15 @@ func (w *scaleUpManager) ScaleUp(
}
}

scaleUpInfos, aErr := w.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(
w.autoscalingContext, targetNodeGroups, newNodes)
scaleUpInfos, aErr := w.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(w.autoscalingContext, targetNodeGroups, newNodes)
if aErr != nil {
return scaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
aErr)
}

klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
if aErr, failedInfo := w.ExecuteScaleUp(now, scaleUpInfos, nodeInfos); aErr != nil {
if aErr, failedInfo := w.ExecuteScaleUps(scaleUpInfos, nodeInfos, now); aErr != nil {
return scaleUpError(
&status.ScaleUpStatus{
CreateNodeGroupResults: createNodeGroupResults,
Expand All @@ -331,7 +328,7 @@ func (w *scaleUpManager) ScaleUp(
// than the configured min size. The source of truth for the current node group
// size is the TargetSize queried directly from cloud providers. Returns
// appropriate status or error if an unexpected error occurred.
func (w *scaleUpManager) ScaleUpToNodeGroupMinSize(
func (w *ScaleUpManager) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
Expand Down Expand Up @@ -361,7 +358,7 @@ func (w *scaleUpManager) ScaleUpToNodeGroupMinSize(
continue
}

if skipReason := w.isNodeGroupReadyToScaleUp(ng, now); skipReason != nil {
if skipReason := w.IsNodeGroupReadyToScaleUp(ng, now); skipReason != nil {
klog.Warningf("ScaleUpToNodeGroupMinSize: node group is ready to scale up: %v", skipReason)
continue
}
Expand All @@ -372,7 +369,7 @@ func (w *scaleUpManager) ScaleUpToNodeGroupMinSize(
continue
}

if skipReason := w.isNodeGroupResourceExceeded(resourcesLeft, ng, nodeInfo); skipReason != nil {
if skipReason := w.IsNodeGroupResourceExceeded(resourcesLeft, ng, nodeInfo); skipReason != nil {
klog.Warning("ScaleUpToNodeGroupMinSize: node group resource excceded: %v", skipReason)
continue
}
Expand All @@ -384,7 +381,7 @@ func (w *scaleUpManager) ScaleUpToNodeGroupMinSize(
continue
}

newNodeCount, err = w.getCappedNewNodeCount(newNodeCount, targetSize)
newNodeCount, err = w.GetCappedNewNodeCount(newNodeCount, targetSize)
if err != nil {
klog.Warning("ScaleUpToNodeGroupMinSize: failed to get capped node count: %v", err)
continue
Expand All @@ -405,7 +402,7 @@ func (w *scaleUpManager) ScaleUpToNodeGroupMinSize(
}

klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos)
if aErr, failedInfo := w.ExecuteScaleUp(now, scaleUpInfos, nodeInfos); aErr != nil {
if aErr, failedInfo := w.ExecuteScaleUps(scaleUpInfos, nodeInfos, now); aErr != nil {
return scaleUpError(
&status.ScaleUpStatus{
FailedResizeNodeGroups: []cloudprovider.NodeGroup{failedInfo.Group},
Expand All @@ -422,32 +419,37 @@ func (w *scaleUpManager) ScaleUpToNodeGroupMinSize(
}, nil
}

func (w *scaleUpManager) computeExpansionOption(podEquivalenceGroups []*equivalence.PodGroup, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo, upcomingNodes []*schedulerframework.NodeInfo) (expander.Option, error) {
// ComputeExpansionOption computes expansion option based on pending pods and cluster state.
func (w *ScaleUpManager) ComputeExpansionOption(
podEquivalenceGroups []*equivalence.PodGroup,
nodeGroup cloudprovider.NodeGroup,
nodeInfo *schedulerframework.NodeInfo,
upcomingNodes []*schedulerframework.NodeInfo,
) (expander.Option, error) {
option := expander.Option{
NodeGroup: nodeGroup,
Pods: make([]*apiv1.Pod, 0),
}

w.autoscalingContext.ClusterSnapshot.Fork()

// add test node to snapshot
// Add test node to snapshot.
var pods []*apiv1.Pod
for _, podInfo := range nodeInfo.Pods {
pods = append(pods, podInfo.Pod)
}
if err := w.autoscalingContext.ClusterSnapshot.AddNodeWithPods(nodeInfo.Node(), pods); err != nil {
klog.Errorf("Error while adding test Node; %v", err)
klog.Errorf("Error while adding test Node: %v", err)
w.autoscalingContext.ClusterSnapshot.Revert()
// TODO: Or should I just skip the node group?
return expander.Option{}, nil
}

for _, eg := range podEquivalenceGroups {
samplePod := eg.Pods[0]
if err := w.autoscalingContext.PredicateChecker.CheckPredicates(w.autoscalingContext.ClusterSnapshot, samplePod, nodeInfo.Node().Name); err == nil {
// add pods to option
// Add pods to option.
option.Pods = append(option.Pods, eg.Pods...)
// mark pod group as (theoretically) schedulable
// Mark pod group as (theoretically) schedulable.
eg.Schedulable = true
} else {
klog.V(2).Infof("Pod %s can't be scheduled on %s, predicate checking error: %v", samplePod.Name, nodeGroup.Id(), err.VerboseMessage())
Expand All @@ -468,7 +470,8 @@ func (w *scaleUpManager) computeExpansionOption(podEquivalenceGroups []*equivale
return option, nil
}

func (w *scaleUpManager) isNodeGroupReadyToScaleUp(nodeGroup cloudprovider.NodeGroup, now time.Time) *SkippedReasons {
// IsNodeGroupReadyToScaleUp returns nil if node group is ready to be scaled up, otherwise a reason is provided.
func (w *ScaleUpManager) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider.NodeGroup, now time.Time) *SkippedReasons {
// Autoprovisioned node groups without nodes are created later so skip check for them.
if nodeGroup.Exist() && !w.clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) {
// Hack that depends on internals of IsNodeGroupSafeToScaleUp.
Expand All @@ -482,7 +485,8 @@ func (w *scaleUpManager) isNodeGroupReadyToScaleUp(nodeGroup cloudprovider.NodeG
return nil
}

func (w *scaleUpManager) isNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo) *SkippedReasons {
// IsNodeGroupResourceExceeded returns nil if node group resource limits are not exceeded, otherwise a reason is provided.
func (w *ScaleUpManager) IsNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo) *SkippedReasons {
resourcesDelta, err := w.resourceManager.DeltaForNode(w.autoscalingContext, nodeInfo, nodeGroup)
if err != nil {
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
Expand All @@ -507,7 +511,8 @@ func (w *scaleUpManager) isNodeGroupResourceExceeded(resourcesLeft resource.Limi
return nil
}

func (w *scaleUpManager) getCappedNewNodeCount(newNodeCount, currentNodeCount int) (int, errors.AutoscalerError) {
// GetCappedNewNodeCount caps resize according to cluster wide node count limit.
func (w *ScaleUpManager) GetCappedNewNodeCount(newNodeCount, currentNodeCount int) (int, errors.AutoscalerError) {
if w.autoscalingContext.MaxNodesTotal > 0 && newNodeCount+currentNodeCount > w.autoscalingContext.MaxNodesTotal {
klog.V(1).Infof("Capping size to max cluster total size (%d)", w.autoscalingContext.MaxNodesTotal)
newNodeCount = w.autoscalingContext.MaxNodesTotal - currentNodeCount
Expand All @@ -519,12 +524,12 @@ func (w *scaleUpManager) getCappedNewNodeCount(newNodeCount, currentNodeCount in
return newNodeCount, nil
}

// ExecuteScaleUp executes the scale up, based on the provided scale up infos.
// ExecuteScaleUps executes the scale ups, based on the provided scale up infos.
// In case of issues returns an error and a scale up info which failed to execute.
func (w *scaleUpManager) ExecuteScaleUp(
now time.Time,
func (w *ScaleUpManager) ExecuteScaleUps(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
) (errors.AutoscalerError, *nodegroupset.ScaleUpInfo) {
availableGPUTypes := w.autoscalingContext.CloudProvider.GetAvailableGPUTypes()
for _, info := range scaleUpInfos {
Expand All @@ -542,7 +547,11 @@ func (w *scaleUpManager) ExecuteScaleUp(
return nil, nil
}

func (w *scaleUpManager) executeScaleUp(info nodegroupset.ScaleUpInfo, gpuResourceName, gpuType string, now time.Time) errors.AutoscalerError {
func (w *ScaleUpManager) executeScaleUp(
info nodegroupset.ScaleUpInfo,
gpuResourceName, gpuType string,
now time.Time,
) errors.AutoscalerError {
klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
w.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
Expand All @@ -563,41 +572,11 @@ func (w *scaleUpManager) executeScaleUp(info nodegroupset.ScaleUpInfo, gpuResour
return nil
}

func getRemainingPods(egs []*equivalence.PodGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
remaining := []status.NoScaleUpInfo{}
for _, eg := range egs {
if eg.Schedulable {
continue
}
for _, pod := range eg.Pods {
noScaleUpInfo := status.NoScaleUpInfo{
Pod: pod,
RejectedNodeGroups: eg.SchedulingErrors,
SkippedNodeGroups: skipped,
}
remaining = append(remaining, noScaleUpInfo)
}
}
return remaining
}

func getPodsAwaitingEvaluation(egs []*equivalence.PodGroup, bestOption string) []*apiv1.Pod {
awaitsEvaluation := []*apiv1.Pod{}
for _, eg := range egs {
if eg.Schedulable {
if _, found := eg.SchedulingErrors[bestOption]; found {
// Schedulable, but not yet.
awaitsEvaluation = append(awaitsEvaluation, eg.Pods...)
}
}
}
return awaitsEvaluation
}

func filterNodeGroupsByPods(
groups []cloudprovider.NodeGroup,
podsRequiredToFit []*apiv1.Pod,
expansionOptions map[string]expander.Option) []cloudprovider.NodeGroup {
expansionOptions map[string]expander.Option,
) []cloudprovider.NodeGroup {

result := make([]cloudprovider.NodeGroup, 0)

Expand Down Expand Up @@ -627,6 +606,37 @@ func filterNodeGroupsByPods(
return result
}

func getRemainingPods(egs []*equivalence.PodGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
remaining := []status.NoScaleUpInfo{}
for _, eg := range egs {
if eg.Schedulable {
continue
}
for _, pod := range eg.Pods {
noScaleUpInfo := status.NoScaleUpInfo{
Pod: pod,
RejectedNodeGroups: eg.SchedulingErrors,
SkippedNodeGroups: skipped,
}
remaining = append(remaining, noScaleUpInfo)
}
}
return remaining
}

func getPodsAwaitingEvaluation(egs []*equivalence.PodGroup, bestOption string) []*apiv1.Pod {
awaitsEvaluation := []*apiv1.Pod{}
for _, eg := range egs {
if eg.Schedulable {
if _, found := eg.SchedulingErrors[bestOption]; found {
// Schedulable, but not yet.
awaitsEvaluation = append(awaitsEvaluation, eg.Pods...)
}
}
}
return awaitsEvaluation
}

func scaleUpError(s *status.ScaleUpStatus, err errors.AutoscalerError) (*status.ScaleUpStatus, errors.AutoscalerError) {
s.ScaleUpError = &err
s.Result = status.ScaleUpError
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scaleup/wrapper/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ func TestAuthError(t *testing.T) {
clusterStateRegistry := clusterstate.NewClusterStateRegistry(nil, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
suManagerFactory := NewManagerFactory()
suManager := suManagerFactory.NewManager(&context, processors, clusterStateRegistry, nil)
scaleUpWrapper := suManager.(*scaleUpManager)
scaleUpWrapper := suManager.(*ScaleUpManager)
aerr := scaleUpWrapper.executeScaleUp(info, "", "", time.Now())
assert.Error(t, aerr)

Expand Down

0 comments on commit 4d6ac8a

Please sign in to comment.