Skip to content

Commit

Permalink
Move MaxNodeProvisionTime to NodeGroupAutoscalingOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
morshielt committed Apr 3, 2023
1 parent e608038 commit 38464b7
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 44 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ func (m *gceManagerImpl) GetMigOptions(mig Mig, defaults config.NodeGroupAutosca
if opt, ok := getDurationOption(options, migRef.Name, config.DefaultScaleDownUnreadyTimeKey); ok {
defaults.ScaleDownUnreadyTime = opt
}
if opt, ok := getDurationOption(options, migRef.Name, config.DefaultMaxNodeProvisionTimeKey); ok {
defaults.MaxNodeProvisionTime = opt
}

return &defaults
}
Expand Down
65 changes: 42 additions & 23 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand All @@ -45,6 +46,11 @@ const (
MaxNodeStartupTime = 15 * time.Minute
)

type nodeRegistrationTimeLimitProvider interface {
// GetMaxNodeProvisionTime returns MaxNodeProvisionTime value that should be used for the given NodeGroup.
GetMaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error)
}

// ScaleUpRequest contains information about the requested node group scale up.
type ScaleUpRequest struct {
// NodeGroup is the node group to be scaled up.
Expand Down Expand Up @@ -76,8 +82,6 @@ type ClusterStateRegistryConfig struct {
// Minimum number of nodes that must be unready for MaxTotalUnreadyPercentage to apply.
// This is to ensure that in very small clusters (e.g. 2 nodes) a single node's failure doesn't disable autoscaling.
OkTotalUnreadyCount int
// Maximum time CA waits for node to be provisioned
MaxNodeProvisionTime time.Duration
}

// IncorrectNodeGroupSize contains information about how much the current size of the node group
Expand Down Expand Up @@ -111,6 +115,7 @@ type ScaleUpFailure struct {
// ClusterStateRegistry is a structure to keep track the current state of the cluster.
type ClusterStateRegistry struct {
sync.Mutex
context *context.AutoscalingContext
config ClusterStateRegistryConfig
scaleUpRequests map[string]*ScaleUpRequest // nodeGroupName -> ScaleUpRequest
scaleDownRequests []*ScaleDownRequest
Expand All @@ -132,37 +137,40 @@ type ClusterStateRegistry struct {
previousCloudProviderNodeInstances map[string][]cloudprovider.Instance
cloudProviderNodeInstancesCache *utils.CloudProviderNodeInstancesCache
interrupt chan struct{}
nodeRegistrationTimeLimitProvider nodeRegistrationTimeLimitProvider

// scaleUpFailures contains information about scale-up failures for each node group. It should be
// cleared periodically to avoid unnecessary accumulation.
scaleUpFailures map[string][]ScaleUpFailure
}

// NewClusterStateRegistry creates new ClusterStateRegistry.
func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff) *ClusterStateRegistry {
func NewClusterStateRegistry(context *context.AutoscalingContext, config ClusterStateRegistryConfig, backoff backoff.Backoff, provider nodeRegistrationTimeLimitProvider) *ClusterStateRegistry {
emptyStatus := &api.ClusterAutoscalerStatus{
ClusterwideConditions: make([]api.ClusterAutoscalerCondition, 0),
NodeGroupStatuses: make([]api.NodeGroupStatus, 0),
}

return &ClusterStateRegistry{
scaleUpRequests: make(map[string]*ScaleUpRequest),
scaleDownRequests: make([]*ScaleDownRequest, 0),
nodes: make([]*apiv1.Node, 0),
cloudProvider: cloudProvider,
config: config,
perNodeGroupReadiness: make(map[string]Readiness),
acceptableRanges: make(map[string]AcceptableRange),
incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize),
unregisteredNodes: make(map[string]UnregisteredNode),
deletedNodes: make(map[string]struct{}),
candidatesForScaleDown: make(map[string][]string),
backoff: backoff,
lastStatus: emptyStatus,
logRecorder: logRecorder,
cloudProviderNodeInstancesCache: utils.NewCloudProviderNodeInstancesCache(cloudProvider),
interrupt: make(chan struct{}),
scaleUpFailures: make(map[string][]ScaleUpFailure),
context: context,
scaleUpRequests: make(map[string]*ScaleUpRequest),
scaleDownRequests: make([]*ScaleDownRequest, 0),
nodes: make([]*apiv1.Node, 0),
cloudProvider: context.CloudProvider,
config: config,
perNodeGroupReadiness: make(map[string]Readiness),
acceptableRanges: make(map[string]AcceptableRange),
incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize),
unregisteredNodes: make(map[string]UnregisteredNode),
deletedNodes: make(map[string]struct{}),
candidatesForScaleDown: make(map[string][]string),
backoff: backoff,
lastStatus: emptyStatus,
logRecorder: context.LogRecorder,
cloudProviderNodeInstancesCache: utils.NewCloudProviderNodeInstancesCache(context.CloudProvider),
interrupt: make(chan struct{}),
scaleUpFailures: make(map[string][]ScaleUpFailure),
nodeRegistrationTimeLimitProvider: provider,
}
}

Expand All @@ -189,13 +197,19 @@ func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider
}

func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
maxNodeProvisionTime, err := csr.nodeRegistrationTimeLimitProvider.GetMaxNodeProvisionTime(csr.context, nodeGroup)
if err != nil {
klog.Warningf("Couldn't update scale up request: failed to get maxNodeProvisionTime for node group %s: %w", nodeGroup.Id(), err)
return
}

scaleUpRequest, found := csr.scaleUpRequests[nodeGroup.Id()]
if !found && delta > 0 {
scaleUpRequest = &ScaleUpRequest{
NodeGroup: nodeGroup,
Increase: delta,
Time: currentTime,
ExpectedAddTime: currentTime.Add(csr.config.MaxNodeProvisionTime),
ExpectedAddTime: currentTime.Add(maxNodeProvisionTime),
}
csr.scaleUpRequests[nodeGroup.Id()] = scaleUpRequest
return
Expand All @@ -217,7 +231,7 @@ func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudpr
if delta > 0 {
// if we are actually adding new nodes shift Time and ExpectedAddTime
scaleUpRequest.Time = currentTime
scaleUpRequest.ExpectedAddTime = currentTime.Add(csr.config.MaxNodeProvisionTime)
scaleUpRequest.ExpectedAddTime = currentTime.Add(maxNodeProvisionTime)
}
}

Expand Down Expand Up @@ -589,7 +603,12 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
continue
}
perNgCopy := perNodeGroup[nodeGroup.Id()]
if unregistered.UnregisteredSince.Add(csr.config.MaxNodeProvisionTime).Before(currentTime) {
maxNodeProvisionTime, err := csr.nodeRegistrationTimeLimitProvider.GetMaxNodeProvisionTime(csr.context, nodeGroup)
if err != nil {
klog.Warningf("Failed to get maxNodeProvisionTime for node %s in node group %s: %w", unregistered.Node.Name, nodeGroup.Id(), err)
continue
}
if unregistered.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) {
perNgCopy.LongUnregistered = append(perNgCopy.LongUnregistered, unregistered.Node.Name)
total.LongUnregistered = append(total.LongUnregistered, unregistered.Node.Name)
} else {
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type NodeGroupAutoscalingOptions struct {
ScaleDownUnneededTime time.Duration
// ScaleDownUnreadyTime represents how long an unready node should be unneeded before it is eligible for scale down
ScaleDownUnreadyTime time.Duration
// Maximum time CA waits for node to be provisioned
MaxNodeProvisionTime time.Duration
}

const (
Expand Down Expand Up @@ -110,8 +112,6 @@ type AutoscalingOptions struct {
// MaxGracefulTerminationSec is maximum number of seconds scale down waits for pods to terminate before
// removing the node from cloud provider.
MaxGracefulTerminationSec int
// Maximum time CA waits for node to be provisioned
MaxNodeProvisionTime time.Duration
// MaxTotalUnreadyPercentage is the maximum percentage of unready nodes after which CA halts operations
MaxTotalUnreadyPercentage float64
// OkTotalUnreadyCount is the number of allowed unready nodes, irrespective of max-total-unready-percentage
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ const (
DefaultScaleDownUnneededTimeKey = "scaledownunneededtime"
// DefaultScaleDownUnreadyTimeKey identifies ScaleDownUnreadyTime autoscaling option
DefaultScaleDownUnreadyTimeKey = "scaledownunreadytime"
// DefaultMaxNodeProvisionTimeKey identifies MaxNodeProvisionTime autoscaling option
DefaultMaxNodeProvisionTimeKey = "maxnodeprovisiontime"
)
43 changes: 26 additions & 17 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func NewStaticAutoscaler(
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage,
OkTotalUnreadyCount: opts.OkTotalUnreadyCount,
MaxNodeProvisionTime: opts.MaxNodeProvisionTime,
}

ignoredTaints := make(taints.TaintKeySet)
Expand All @@ -167,7 +166,7 @@ func NewStaticAutoscaler(
ignoredTaints[taintKey] = true
}

clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext.CloudProvider, clusterStateConfig, autoscalingContext.LogRecorder, backoff)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext, clusterStateConfig, backoff, processors.NodeGroupConfigProcessor)
processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry)

deleteOptions := simulator.NodeDeleteOptions{
Expand Down Expand Up @@ -414,7 +413,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
if len(unregisteredNodes) > 0 {
klog.V(1).Infof("%d unregistered nodes present", len(unregisteredNodes))
removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext,
removedAny, err := a.removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext,
a.clusterStateRegistry, currentTime, autoscalingContext.LogRecorder)
// There was a problem with removing unregistered nodes. Retry in the next loop.
if err != nil {
Expand Down Expand Up @@ -445,7 +444,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
// Check if there has been a constant difference between the number of nodes in k8s and
// the number of nodes on the cloud provider side.
// TODO: andrewskim - add protection for ready AWS nodes.
fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
fixedSomething, err := a.fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("Failed to fix node group sizes: %v", err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
Expand Down Expand Up @@ -705,14 +704,18 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
// Sets the target size of node groups to the current number of nodes in them
// if the difference was constant for a prolonged time. Returns true if managed
// to fix something.
func fixNodeGroupSize(context *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, currentTime time.Time) (bool, error) {
func (a *StaticAutoscaler) fixNodeGroupSize(context *context.AutoscalingContext, clusterStateRegistry *clusterstate.ClusterStateRegistry, currentTime time.Time) (bool, error) {
fixed := false
for _, nodeGroup := range context.CloudProvider.NodeGroups() {
incorrectSize := clusterStateRegistry.GetIncorrectNodeGroupSize(nodeGroup.Id())
if incorrectSize == nil {
continue
}
if incorrectSize.FirstObserved.Add(context.MaxNodeProvisionTime).Before(currentTime) {
maxNodeProvisionTime, err := a.processors.NodeGroupConfigProcessor.GetMaxNodeProvisionTime(context, nodeGroup)
if err != nil {
return false, fmt.Errorf("failed to retrieve maxNodeProvisionTime for nodeGroup %s", nodeGroup.Id())
}
if incorrectSize.FirstObserved.Add(maxNodeProvisionTime).Before(currentTime) {
delta := incorrectSize.CurrentSize - incorrectSize.ExpectedSize
if delta < 0 {
klog.V(0).Infof("Decreasing size of %s, expected=%d current=%d delta=%d", nodeGroup.Id(),
Expand All @@ -730,21 +733,27 @@ func fixNodeGroupSize(context *context.AutoscalingContext, clusterStateRegistry
}

// Removes unregistered nodes if needed. Returns true if anything was removed and error if such occurred.
func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNode, context *context.AutoscalingContext,
func (a *StaticAutoscaler) removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNode, context *context.AutoscalingContext,
csr *clusterstate.ClusterStateRegistry, currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) {
removedAny := false
for _, unregisteredNode := range unregisteredNodes {
if unregisteredNode.UnregisteredSince.Add(context.MaxNodeProvisionTime).Before(currentTime) {
nodeGroup, err := context.CloudProvider.NodeGroupForNode(unregisteredNode.Node)
if err != nil {
klog.Warningf("Failed to get node group for %s: %v", unregisteredNode.Node.Name, err)
return removedAny, err
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.Warningf("No node group for node %s, skipping", unregisteredNode.Node.Name)
continue
}

maxNodeProvisionTime, err := a.processors.NodeGroupConfigProcessor.GetMaxNodeProvisionTime(context, nodeGroup)
if err != nil {
return false, fmt.Errorf("failed to retrieve maxNodeProvisionTime for node %s in nodeGroup %s", unregisteredNode.Node.Name, nodeGroup.Id())
}

if unregisteredNode.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) {
klog.V(0).Infof("Removing unregistered node %v", unregisteredNode.Node.Name)
nodeGroup, err := context.CloudProvider.NodeGroupForNode(unregisteredNode.Node)
if err != nil {
klog.Warningf("Failed to get node group for %s: %v", unregisteredNode.Node.Name, err)
return removedAny, err
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.Warningf("No node group for node %s, skipping", unregisteredNode.Node.Name)
continue
}
size, err := nodeGroup.TargetSize()
if err != nil {
klog.Warningf("Failed to get node group size; unregisteredNode=%v; nodeGroup=%v; err=%v", unregisteredNode.Node.Name, nodeGroup.Id(), err)
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ var (
maxTotalUnreadyPercentage = flag.Float64("max-total-unready-percentage", 45, "Maximum percentage of unready nodes in the cluster. After this is exceeded, CA halts operations")
okTotalUnreadyCount = flag.Int("ok-total-unready-count", 3, "Number of allowed unready nodes, irrespective of max-total-unready-percentage")
scaleUpFromZero = flag.Bool("scale-up-from-zero", true, "Should CA scale up when there 0 ready nodes.")
maxNodeProvisionTime = flag.Duration("max-node-provision-time", 15*time.Minute, "Maximum time CA waits for node to be provisioned")
maxNodeProvisionTime = flag.Duration("max-node-provision-time", 15*time.Minute, "The default maximum time CA waits for node to be provisioned")
maxPodEvictionTime = flag.Duration("max-pod-eviction-time", 2*time.Minute, "Maximum time CA tries to evict a pod before giving up")
nodeGroupsFlag = multiStringFlag(
"nodes",
Expand Down Expand Up @@ -253,6 +253,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ScaleDownGpuUtilizationThreshold: *scaleDownGpuUtilizationThreshold,
ScaleDownUnneededTime: *scaleDownUnneededTime,
ScaleDownUnreadyTime: *scaleDownUnreadyTime,
MaxNodeProvisionTime: *maxNodeProvisionTime,
},
CloudConfig: *cloudConfig,
CloudProviderName: *cloudProviderFlag,
Expand All @@ -270,7 +271,6 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxBulkSoftTaintTime: *maxBulkSoftTaintTime,
MaxEmptyBulkDelete: *maxEmptyBulkDeleteFlag,
MaxGracefulTerminationSec: *maxGracefulTerminationFlag,
MaxNodeProvisionTime: *maxNodeProvisionTime,
MaxPodEvictionTime: *maxPodEvictionTime,
MaxNodesTotal: *maxNodesTotal,
MaxCoresTotal: maxCoresTotal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type NodeGroupConfigProcessor interface {
GetScaleDownUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// GetScaleDownGpuUtilizationThreshold returns ScaleDownGpuUtilizationThreshold value that should be used for a given NodeGroup.
GetScaleDownGpuUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error)
// GetMaxNodeProvisionTime return MaxNodeProvisionTime value that should be used for a given NodeGroup.
GetMaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error)
// CleanUp cleans up processor's internal structures.
CleanUp()
}
Expand Down Expand Up @@ -91,6 +93,18 @@ func (p *DelegatingNodeGroupConfigProcessor) GetScaleDownGpuUtilizationThreshold
return ngConfig.ScaleDownGpuUtilizationThreshold, nil
}

// GetMaxNodeProvisionTime returns MaxNodeProvisionTime value that should be used for a given NodeGroup.
func (p *DelegatingNodeGroupConfigProcessor) GetMaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error) {
ngConfig, err := nodeGroup.GetOptions(context.NodeGroupDefaults)
if err != nil && err != cloudprovider.ErrNotImplemented {
return time.Duration(0), err
}
if ngConfig == nil || err == cloudprovider.ErrNotImplemented {
return context.NodeGroupDefaults.MaxNodeProvisionTime, nil
}
return ngConfig.MaxNodeProvisionTime, nil
}

// CleanUp cleans up processor's internal structures.
func (p *DelegatingNodeGroupConfigProcessor) CleanUp() {
}
Expand Down

0 comments on commit 38464b7

Please sign in to comment.