Skip to content

Commit

Permalink
Move MaxNodeProvisionTime to NodeGroupAutoscalingOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
morshielt committed Apr 14, 2023
1 parent d8336cc commit e4358c2
Show file tree
Hide file tree
Showing 15 changed files with 247 additions and 203 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ func (m *gceManagerImpl) GetMigOptions(mig Mig, defaults config.NodeGroupAutosca
if opt, ok := getDurationOption(options, migRef.Name, config.DefaultScaleDownUnreadyTimeKey); ok {
defaults.ScaleDownUnreadyTime = opt
}
if opt, ok := getDurationOption(options, migRef.Name, config.DefaultMaxNodeProvisionTimeKey); ok {
defaults.MaxNodeProvisionTime = opt
}

return &defaults
}
Expand Down
4 changes: 4 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/gce_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,7 @@ func TestGetMigOptions(t *testing.T) {
ScaleDownGpuUtilizationThreshold: 0.2,
ScaleDownUnneededTime: time.Second,
ScaleDownUnreadyTime: time.Minute,
MaxNodeProvisionTime: 15 * time.Minute,
}

cases := []struct {
Expand All @@ -1539,12 +1540,14 @@ func TestGetMigOptions(t *testing.T) {
config.DefaultScaleDownUtilizationThresholdKey: "0.7",
config.DefaultScaleDownUnneededTimeKey: "1h",
config.DefaultScaleDownUnreadyTimeKey: "30m",
config.DefaultMaxNodeProvisionTimeKey: "60m",
},
expected: &config.NodeGroupAutoscalingOptions{
ScaleDownGpuUtilizationThreshold: 0.6,
ScaleDownUtilizationThreshold: 0.7,
ScaleDownUnneededTime: time.Hour,
ScaleDownUnreadyTime: 30 * time.Minute,
MaxNodeProvisionTime: 60 * time.Minute,
},
},
{
Expand All @@ -1558,6 +1561,7 @@ func TestGetMigOptions(t *testing.T) {
ScaleDownUtilizationThreshold: defaultOptions.ScaleDownUtilizationThreshold,
ScaleDownUnneededTime: time.Minute,
ScaleDownUnreadyTime: defaultOptions.ScaleDownUnreadyTime,
MaxNodeProvisionTime: 15 * time.Minute,
},
},
{
Expand Down
70 changes: 47 additions & 23 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand All @@ -45,6 +46,11 @@ const (
MaxNodeStartupTime = 15 * time.Minute
)

type nodeRegistrationTimeLimitProvider interface {
// GetMaxNodeProvisionTime returns MaxNodeProvisionTime value that should be used for the given NodeGroup.
GetMaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error)
}

// ScaleUpRequest contains information about the requested node group scale up.
type ScaleUpRequest struct {
// NodeGroup is the node group to be scaled up.
Expand Down Expand Up @@ -76,8 +82,6 @@ type ClusterStateRegistryConfig struct {
// Minimum number of nodes that must be unready for MaxTotalUnreadyPercentage to apply.
// This is to ensure that in very small clusters (e.g. 2 nodes) a single node's failure doesn't disable autoscaling.
OkTotalUnreadyCount int
// Maximum time CA waits for node to be provisioned
MaxNodeProvisionTime time.Duration
}

// IncorrectNodeGroupSize contains information about how much the current size of the node group
Expand Down Expand Up @@ -111,6 +115,7 @@ type ScaleUpFailure struct {
// ClusterStateRegistry is a structure to keep track the current state of the cluster.
type ClusterStateRegistry struct {
sync.Mutex
context *context.AutoscalingContext
config ClusterStateRegistryConfig
scaleUpRequests map[string]*ScaleUpRequest // nodeGroupName -> ScaleUpRequest
scaleDownRequests []*ScaleDownRequest
Expand All @@ -132,37 +137,40 @@ type ClusterStateRegistry struct {
previousCloudProviderNodeInstances map[string][]cloudprovider.Instance
cloudProviderNodeInstancesCache *utils.CloudProviderNodeInstancesCache
interrupt chan struct{}
nodeRegistrationTimeLimitProvider nodeRegistrationTimeLimitProvider

// scaleUpFailures contains information about scale-up failures for each node group. It should be
// cleared periodically to avoid unnecessary accumulation.
scaleUpFailures map[string][]ScaleUpFailure
}

// NewClusterStateRegistry creates new ClusterStateRegistry.
func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff) *ClusterStateRegistry {
func NewClusterStateRegistry(context *context.AutoscalingContext, config ClusterStateRegistryConfig, backoff backoff.Backoff, provider nodeRegistrationTimeLimitProvider) *ClusterStateRegistry {
emptyStatus := &api.ClusterAutoscalerStatus{
ClusterwideConditions: make([]api.ClusterAutoscalerCondition, 0),
NodeGroupStatuses: make([]api.NodeGroupStatus, 0),
}

return &ClusterStateRegistry{
scaleUpRequests: make(map[string]*ScaleUpRequest),
scaleDownRequests: make([]*ScaleDownRequest, 0),
nodes: make([]*apiv1.Node, 0),
cloudProvider: cloudProvider,
config: config,
perNodeGroupReadiness: make(map[string]Readiness),
acceptableRanges: make(map[string]AcceptableRange),
incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize),
unregisteredNodes: make(map[string]UnregisteredNode),
deletedNodes: make(map[string]struct{}),
candidatesForScaleDown: make(map[string][]string),
backoff: backoff,
lastStatus: emptyStatus,
logRecorder: logRecorder,
cloudProviderNodeInstancesCache: utils.NewCloudProviderNodeInstancesCache(cloudProvider),
interrupt: make(chan struct{}),
scaleUpFailures: make(map[string][]ScaleUpFailure),
context: context,
scaleUpRequests: make(map[string]*ScaleUpRequest),
scaleDownRequests: make([]*ScaleDownRequest, 0),
nodes: make([]*apiv1.Node, 0),
cloudProvider: context.CloudProvider,
config: config,
perNodeGroupReadiness: make(map[string]Readiness),
acceptableRanges: make(map[string]AcceptableRange),
incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize),
unregisteredNodes: make(map[string]UnregisteredNode),
deletedNodes: make(map[string]struct{}),
candidatesForScaleDown: make(map[string][]string),
backoff: backoff,
lastStatus: emptyStatus,
logRecorder: context.LogRecorder,
cloudProviderNodeInstancesCache: utils.NewCloudProviderNodeInstancesCache(context.CloudProvider),
interrupt: make(chan struct{}),
scaleUpFailures: make(map[string][]ScaleUpFailure),
nodeRegistrationTimeLimitProvider: provider,
}
}

Expand All @@ -188,14 +196,25 @@ func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider
csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime)
}

// MaxNodeProvisionTime returns MaxNodeProvisionTime value that should be used for the given NodeGroup.
func (csr *ClusterStateRegistry) MaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error) {
return csr.nodeRegistrationTimeLimitProvider.GetMaxNodeProvisionTime(context, nodeGroup)
}

func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
maxNodeProvisionTime, err := csr.nodeRegistrationTimeLimitProvider.GetMaxNodeProvisionTime(csr.context, nodeGroup)
if err != nil {
klog.Warningf("Couldn't update scale up request: failed to get maxNodeProvisionTime for node group %s: %w", nodeGroup.Id(), err)
return
}

scaleUpRequest, found := csr.scaleUpRequests[nodeGroup.Id()]
if !found && delta > 0 {
scaleUpRequest = &ScaleUpRequest{
NodeGroup: nodeGroup,
Increase: delta,
Time: currentTime,
ExpectedAddTime: currentTime.Add(csr.config.MaxNodeProvisionTime),
ExpectedAddTime: currentTime.Add(maxNodeProvisionTime),
}
csr.scaleUpRequests[nodeGroup.Id()] = scaleUpRequest
return
Expand All @@ -217,7 +236,7 @@ func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudpr
if delta > 0 {
// if we are actually adding new nodes shift Time and ExpectedAddTime
scaleUpRequest.Time = currentTime
scaleUpRequest.ExpectedAddTime = currentTime.Add(csr.config.MaxNodeProvisionTime)
scaleUpRequest.ExpectedAddTime = currentTime.Add(maxNodeProvisionTime)
}
}

Expand Down Expand Up @@ -589,7 +608,12 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
continue
}
perNgCopy := perNodeGroup[nodeGroup.Id()]
if unregistered.UnregisteredSince.Add(csr.config.MaxNodeProvisionTime).Before(currentTime) {
maxNodeProvisionTime, err := csr.nodeRegistrationTimeLimitProvider.GetMaxNodeProvisionTime(csr.context, nodeGroup)
if err != nil {
klog.Warningf("Failed to get maxNodeProvisionTime for node %s in node group %s: %w", unregistered.Node.Name, nodeGroup.Id(), err)
continue
}
if unregistered.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) {
perNgCopy.LongUnregistered = append(perNgCopy.LongUnregistered, unregistered.Node.Name)
total.LongUnregistered = append(total.LongUnregistered, unregistered.Node.Name)
} else {
Expand Down
Loading

0 comments on commit e4358c2

Please sign in to comment.