Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put ScaleUp logic behind an interface #5597

Merged
merged 7 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
Expand Down Expand Up @@ -52,6 +53,7 @@ type AutoscalerOptions struct {
Backoff backoff.Backoff
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
RemainingPdbTracker pdb.RemainingPdbTracker
ScaleUpManagerFactory scaleup.ManagerFactory
}

// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
Expand Down Expand Up @@ -82,7 +84,8 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError)
opts.EstimatorBuilder,
opts.Backoff,
opts.DebuggingSnapshotter,
opts.RemainingPdbTracker), nil
opts.RemainingPdbTracker,
opts.ScaleUpManagerFactory), nil
}

// Initialize default options if not provided.
Expand Down
633 changes: 0 additions & 633 deletions cluster-autoscaler/core/scale_up.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,35 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package core
package equivalence

import (
"k8s.io/autoscaler/cluster-autoscaler/utils"
"reflect"

"k8s.io/autoscaler/cluster-autoscaler/utils"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
pod_utils "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
)

type podEquivalenceGroup struct {
pods []*apiv1.Pod
schedulingErrors map[string]status.Reasons
schedulable bool
// PodGroup contains a group of pods that are equivalent in terms of schedulability.
type PodGroup struct {
Pods []*apiv1.Pod
SchedulingErrors map[string]status.Reasons
Schedulable bool
}

// buildPodEquivalenceGroups prepares pod groups with equivalent scheduling properties.
func buildPodEquivalenceGroups(pods []*apiv1.Pod) []*podEquivalenceGroup {
podEquivalenceGroups := []*podEquivalenceGroup{}
// BuildPodGroups prepares pod groups with equivalent scheduling properties.
func BuildPodGroups(pods []*apiv1.Pod) []*PodGroup {
podEquivalenceGroups := []*PodGroup{}
for _, pods := range groupPodsBySchedulingProperties(pods) {
podEquivalenceGroups = append(podEquivalenceGroups, &podEquivalenceGroup{
pods: pods,
schedulingErrors: map[string]status.Reasons{},
schedulable: false,
podEquivalenceGroups = append(podEquivalenceGroups, &PodGroup{
Pods: pods,
SchedulingErrors: map[string]status.Reasons{},
Schedulable: false,
})
}
return podEquivalenceGroups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package core
package equivalence

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package scaleup
package resource

import (
"fmt"
Expand All @@ -34,8 +34,8 @@ import (
// LimitUnknown is used as a value in ResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider.
const LimitUnknown = math.MaxInt64

// ResourceManager provides resource checks before scaling up the cluster.
type ResourceManager struct {
// Manager provides resource checks before scaling up the cluster.
type Manager struct {
crp customresources.CustomResourcesProcessor
}

Expand All @@ -45,22 +45,22 @@ type LimitsCheckResult struct {
ExceededResources []string
}

// ResourcesLimits is a map: the key is resource type and the value is resource limit.
type ResourcesLimits map[string]int64
// Limits is a map: the key is resource type and the value is resource limit.
type Limits map[string]int64

// ResourcesDelta is a map: the key is resource type and the value is resource delta.
type ResourcesDelta map[string]int64
// Delta is a map: the key is resource type and the value is resource delta.
type Delta map[string]int64

// NewResourceManager creates an instance of scale up resource manager with provided parameters.
func NewResourceManager(crp customresources.CustomResourcesProcessor) *ResourceManager {
return &ResourceManager{
// NewManager creates an instance of scale up resource manager with provided parameters.
func NewManager(crp customresources.CustomResourcesProcessor) *Manager {
return &Manager{
crp: crp,
}
}

// DeltaForNode calculates the amount of resources that will be used from the cluster when creating a node.
func (m *ResourceManager) DeltaForNode(ctx *context.AutoscalingContext, nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup) (ResourcesDelta, errors.AutoscalerError) {
resultScaleUpDelta := make(ResourcesDelta)
func (m *Manager) DeltaForNode(ctx *context.AutoscalingContext, nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup) (Delta, errors.AutoscalerError) {
resultScaleUpDelta := make(Delta)
nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(nodeInfo.Node())
resultScaleUpDelta[cloudprovider.ResourceNameCores] = nodeCPU
resultScaleUpDelta[cloudprovider.ResourceNameMemory] = nodeMemory
Expand All @@ -73,7 +73,7 @@ func (m *ResourceManager) DeltaForNode(ctx *context.AutoscalingContext, nodeInfo
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
resourceTargets, err := m.crp.GetNodeResourceTargets(ctx, nodeInfo.Node(), nodeGroup)
if err != nil {
return ResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to get target custom resources for node group %v: ", nodeGroup.Id())
return Delta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to get target custom resources for node group %v: ", nodeGroup.Id())
}

for _, resourceTarget := range resourceTargets {
Expand All @@ -85,7 +85,7 @@ func (m *ResourceManager) DeltaForNode(ctx *context.AutoscalingContext, nodeInfo
}

// ResourcesLeft calculates the amount of resources left in the cluster.
func (m *ResourceManager) ResourcesLeft(ctx *context.AutoscalingContext, nodeInfos map[string]*schedulerframework.NodeInfo, nodes []*corev1.Node) (ResourcesLimits, errors.AutoscalerError) {
func (m *Manager) ResourcesLeft(ctx *context.AutoscalingContext, nodeInfos map[string]*schedulerframework.NodeInfo, nodes []*corev1.Node) (Limits, errors.AutoscalerError) {
nodesFromNotAutoscaledGroups, err := utils.FilterOutNodesFromNotAutoscaledGroups(nodes, ctx.CloudProvider)
if err != nil {
return nil, err.AddPrefix("failed to filter out nodes which are from not autoscaled groups: ")
Expand All @@ -104,14 +104,14 @@ func (m *ResourceManager) ResourcesLeft(ctx *context.AutoscalingContext, nodeInf
totalResources, totalResourcesErr = m.customResourcesTotal(ctx, nodeInfos, nodesFromNotAutoscaledGroups)
}

resultScaleUpLimits := make(ResourcesLimits)
resultScaleUpLimits := make(Limits)
for _, resource := range resourceLimiter.GetResources() {
max := resourceLimiter.GetMax(resource)
// we put only actual limits into final map. No entry means no limit.
if max > 0 {
if (resource == cloudprovider.ResourceNameCores || resource == cloudprovider.ResourceNameMemory) && errCoresMem != nil {
// core resource info missing - no reason to proceed with scale up
return ResourcesLimits{}, errCoresMem
return Limits{}, errCoresMem
}

switch {
Expand Down Expand Up @@ -142,8 +142,8 @@ func (m *ResourceManager) ResourcesLeft(ctx *context.AutoscalingContext, nodeInf
return resultScaleUpLimits, nil
}

// ApplyResourcesLimits calculates the new node count by applying the left resource limits of the cluster.
func (m *ResourceManager) ApplyResourcesLimits(ctx *context.AutoscalingContext, newCount int, resourceLeft ResourcesLimits, nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup) (int, errors.AutoscalerError) {
// ApplyLimits calculates the new node count by applying the left resource limits of the cluster.
func (m *Manager) ApplyLimits(ctx *context.AutoscalingContext, newCount int, resourceLeft Limits, nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup) (int, errors.AutoscalerError) {
delta, err := m.DeltaForNode(ctx, nodeInfo, nodeGroup)
if err != nil {
return 0, err
Expand Down Expand Up @@ -181,7 +181,7 @@ func (m *ResourceManager) ApplyResourcesLimits(ctx *context.AutoscalingContext,
}

// CheckDeltaWithinLimits compares the resource limit and resource delta, and returns the limit check result.
func CheckDeltaWithinLimits(left ResourcesLimits, delta ResourcesDelta) LimitsCheckResult {
func CheckDeltaWithinLimits(left Limits, delta Delta) LimitsCheckResult {
exceededResources := sets.NewString()
for resource, resourceDelta := range delta {
resourceLeft, found := left[resource]
Expand All @@ -203,7 +203,7 @@ func LimitsNotExceeded() LimitsCheckResult {
return LimitsCheckResult{false, []string{}}
}

func (m *ResourceManager) coresMemoryTotal(ctx *context.AutoscalingContext, nodeInfos map[string]*schedulerframework.NodeInfo, nodesFromNotAutoscaledGroups []*corev1.Node) (int64, int64, errors.AutoscalerError) {
func (m *Manager) coresMemoryTotal(ctx *context.AutoscalingContext, nodeInfos map[string]*schedulerframework.NodeInfo, nodesFromNotAutoscaledGroups []*corev1.Node) (int64, int64, errors.AutoscalerError) {
var coresTotal int64
var memoryTotal int64
for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
Expand Down Expand Up @@ -233,7 +233,7 @@ func (m *ResourceManager) coresMemoryTotal(ctx *context.AutoscalingContext, node
return coresTotal, memoryTotal, nil
}

func (m *ResourceManager) customResourcesTotal(ctx *context.AutoscalingContext, nodeInfos map[string]*schedulerframework.NodeInfo, nodesFromNotAutoscaledGroups []*corev1.Node) (map[string]int64, errors.AutoscalerError) {
func (m *Manager) customResourcesTotal(ctx *context.AutoscalingContext, nodeInfos map[string]*schedulerframework.NodeInfo, nodesFromNotAutoscaledGroups []*corev1.Node) (map[string]int64, errors.AutoscalerError) {
result := make(map[string]int64)
for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
currentSize, err := nodeGroup.TargetSize()
Expand Down
Loading