diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager.go b/cluster-autoscaler/cloudprovider/gce/gce_manager.go index da5d85122225..d0b6c6c007b6 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager.go @@ -575,6 +575,9 @@ func (m *gceManagerImpl) GetMigOptions(mig Mig, defaults config.NodeGroupAutosca if opt, ok := getDurationOption(options, migRef.Name, config.DefaultScaleDownUnreadyTimeKey); ok { defaults.ScaleDownUnreadyTime = opt } + if opt, ok := getDurationOption(options, migRef.Name, config.DefaultMaxNodeProvisionTimeKey); ok { + defaults.MaxNodeProvisionTime = opt + } return &defaults } diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go index c031e5cb8342..33106bebcf25 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go @@ -1520,6 +1520,7 @@ func TestGetMigOptions(t *testing.T) { ScaleDownGpuUtilizationThreshold: 0.2, ScaleDownUnneededTime: time.Second, ScaleDownUnreadyTime: time.Minute, + MaxNodeProvisionTime: 15 * time.Minute, } cases := []struct { @@ -1539,12 +1540,14 @@ func TestGetMigOptions(t *testing.T) { config.DefaultScaleDownUtilizationThresholdKey: "0.7", config.DefaultScaleDownUnneededTimeKey: "1h", config.DefaultScaleDownUnreadyTimeKey: "30m", + config.DefaultMaxNodeProvisionTimeKey: "60m", }, expected: &config.NodeGroupAutoscalingOptions{ ScaleDownGpuUtilizationThreshold: 0.6, ScaleDownUtilizationThreshold: 0.7, ScaleDownUnneededTime: time.Hour, ScaleDownUnreadyTime: 30 * time.Minute, + MaxNodeProvisionTime: 60 * time.Minute, }, }, { @@ -1558,6 +1561,7 @@ func TestGetMigOptions(t *testing.T) { ScaleDownUtilizationThreshold: defaultOptions.ScaleDownUtilizationThreshold, ScaleDownUnneededTime: time.Minute, ScaleDownUnreadyTime: defaultOptions.ScaleDownUnreadyTime, + MaxNodeProvisionTime: 15 * time.Minute, }, }, { diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index 86721cddedce..63af2c95e940 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -27,6 +27,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/api" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -45,6 +46,11 @@ const ( MaxNodeStartupTime = 15 * time.Minute ) +type nodeRegistrationTimeLimitProvider interface { + // GetMaxNodeProvisionTime returns MaxNodeProvisionTime value that should be used for the given NodeGroup. + GetMaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error) +} + // ScaleUpRequest contains information about the requested node group scale up. type ScaleUpRequest struct { // NodeGroup is the node group to be scaled up. @@ -76,8 +82,6 @@ type ClusterStateRegistryConfig struct { // Minimum number of nodes that must be unready for MaxTotalUnreadyPercentage to apply. // This is to ensure that in very small clusters (e.g. 2 nodes) a single node's failure doesn't disable autoscaling. OkTotalUnreadyCount int - // Maximum time CA waits for node to be provisioned - MaxNodeProvisionTime time.Duration } // IncorrectNodeGroupSize contains information about how much the current size of the node group @@ -111,6 +115,7 @@ type ScaleUpFailure struct { // ClusterStateRegistry is a structure to keep track the current state of the cluster. type ClusterStateRegistry struct { sync.Mutex + context *context.AutoscalingContext config ClusterStateRegistryConfig scaleUpRequests map[string]*ScaleUpRequest // nodeGroupName -> ScaleUpRequest scaleDownRequests []*ScaleDownRequest @@ -132,6 +137,7 @@ type ClusterStateRegistry struct { previousCloudProviderNodeInstances map[string][]cloudprovider.Instance cloudProviderNodeInstancesCache *utils.CloudProviderNodeInstancesCache interrupt chan struct{} + nodeRegistrationTimeLimitProvider nodeRegistrationTimeLimitProvider // scaleUpFailures contains information about scale-up failures for each node group. It should be // cleared periodically to avoid unnecessary accumulation. @@ -139,30 +145,32 @@ type ClusterStateRegistry struct { } // NewClusterStateRegistry creates new ClusterStateRegistry. -func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig, logRecorder *utils.LogEventRecorder, backoff backoff.Backoff) *ClusterStateRegistry { +func NewClusterStateRegistry(context *context.AutoscalingContext, config ClusterStateRegistryConfig, backoff backoff.Backoff, provider nodeRegistrationTimeLimitProvider) *ClusterStateRegistry { emptyStatus := &api.ClusterAutoscalerStatus{ ClusterwideConditions: make([]api.ClusterAutoscalerCondition, 0), NodeGroupStatuses: make([]api.NodeGroupStatus, 0), } return &ClusterStateRegistry{ - scaleUpRequests: make(map[string]*ScaleUpRequest), - scaleDownRequests: make([]*ScaleDownRequest, 0), - nodes: make([]*apiv1.Node, 0), - cloudProvider: cloudProvider, - config: config, - perNodeGroupReadiness: make(map[string]Readiness), - acceptableRanges: make(map[string]AcceptableRange), - incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize), - unregisteredNodes: make(map[string]UnregisteredNode), - deletedNodes: make(map[string]struct{}), - candidatesForScaleDown: make(map[string][]string), - backoff: backoff, - lastStatus: emptyStatus, - logRecorder: logRecorder, - cloudProviderNodeInstancesCache: utils.NewCloudProviderNodeInstancesCache(cloudProvider), - interrupt: make(chan struct{}), - scaleUpFailures: make(map[string][]ScaleUpFailure), + context: context, + scaleUpRequests: make(map[string]*ScaleUpRequest), + scaleDownRequests: make([]*ScaleDownRequest, 0), + nodes: make([]*apiv1.Node, 0), + cloudProvider: context.CloudProvider, + config: config, + perNodeGroupReadiness: make(map[string]Readiness), + acceptableRanges: make(map[string]AcceptableRange), + incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize), + unregisteredNodes: make(map[string]UnregisteredNode), + deletedNodes: make(map[string]struct{}), + candidatesForScaleDown: make(map[string][]string), + backoff: backoff, + lastStatus: emptyStatus, + logRecorder: context.LogRecorder, + cloudProviderNodeInstancesCache: utils.NewCloudProviderNodeInstancesCache(context.CloudProvider), + interrupt: make(chan struct{}), + scaleUpFailures: make(map[string][]ScaleUpFailure), + nodeRegistrationTimeLimitProvider: provider, } } @@ -188,14 +196,24 @@ func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime) } +func (csr *ClusterStateRegistry) MaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error) { + return csr.nodeRegistrationTimeLimitProvider.GetMaxNodeProvisionTime(context, nodeGroup) +} + func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) { + maxNodeProvisionTime, err := csr.nodeRegistrationTimeLimitProvider.GetMaxNodeProvisionTime(csr.context, nodeGroup) + if err != nil { + klog.Warningf("Couldn't update scale up request: failed to get maxNodeProvisionTime for node group %s: %w", nodeGroup.Id(), err) + return + } + scaleUpRequest, found := csr.scaleUpRequests[nodeGroup.Id()] if !found && delta > 0 { scaleUpRequest = &ScaleUpRequest{ NodeGroup: nodeGroup, Increase: delta, Time: currentTime, - ExpectedAddTime: currentTime.Add(csr.config.MaxNodeProvisionTime), + ExpectedAddTime: currentTime.Add(maxNodeProvisionTime), } csr.scaleUpRequests[nodeGroup.Id()] = scaleUpRequest return @@ -217,7 +235,7 @@ func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudpr if delta > 0 { // if we are actually adding new nodes shift Time and ExpectedAddTime scaleUpRequest.Time = currentTime - scaleUpRequest.ExpectedAddTime = currentTime.Add(csr.config.MaxNodeProvisionTime) + scaleUpRequest.ExpectedAddTime = currentTime.Add(maxNodeProvisionTime) } } @@ -589,7 +607,12 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) { continue } perNgCopy := perNodeGroup[nodeGroup.Id()] - if unregistered.UnregisteredSince.Add(csr.config.MaxNodeProvisionTime).Before(currentTime) { + maxNodeProvisionTime, err := csr.nodeRegistrationTimeLimitProvider.GetMaxNodeProvisionTime(csr.context, nodeGroup) + if err != nil { + klog.Warningf("Failed to get maxNodeProvisionTime for node %s in node group %s: %w", unregistered.Node.Name, nodeGroup.Id(), err) + continue + } + if unregistered.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) { perNgCopy.LongUnregistered = append(perNgCopy.LongUnregistered, unregistered.Node.Name) total.LongUnregistered = append(total.LongUnregistered, unregistered.Node.Name) } else { diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index 020b17640e40..38beb7223af8 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -21,7 +21,10 @@ import ( "testing" "time" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -69,11 +72,7 @@ func TestOKWithScaleUp(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: time.Minute, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, time.Minute) clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) @@ -111,11 +110,7 @@ func TestEmptyOK(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: time.Minute, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now.Add(-5*time.Second)) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -152,10 +147,7 @@ func TestOKOneUnreadyNode(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -190,10 +182,7 @@ func TestNodeWithoutNodeGroupDontCrash(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, nil, now) assert.NoError(t, err) assert.Empty(t, clusterstate.GetScaleUpFailures()) @@ -217,10 +206,7 @@ func TestOKOneUnreadyNodeWithScaleDownCandidate(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{ng1_1}, now) @@ -282,10 +268,7 @@ func TestMissingNodes(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -324,10 +307,7 @@ func TestTooManyUnready(t *testing.T) { assert.NotNil(t, provider) fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.False(t, clusterstate.IsClusterHealthy()) @@ -353,10 +333,7 @@ func TestUnreadyLongAfterCreation(t *testing.T) { assert.NotNil(t, provider) fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "some-map") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Unready)) @@ -385,10 +362,7 @@ func TestNotStarted(t *testing.T) { assert.NotNil(t, provider) fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "some-map") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted)) @@ -422,11 +396,7 @@ func TestExpiredScaleUp(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 2 * time.Minute, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 2*time.Minute) clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute)) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now) assert.NoError(t, err) @@ -448,10 +418,7 @@ func TestRegisterScaleDown(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) now := time.Now() @@ -517,10 +484,7 @@ func TestUpcomingNodes(t *testing.T) { assert.NotNil(t, provider) fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1, ng5_1, ng5_2}, nil, now) assert.NoError(t, err) assert.Empty(t, clusterstate.GetScaleUpFailures()) @@ -564,10 +528,7 @@ func TestTaintBasedNodeDeletion(t *testing.T) { assert.NotNil(t, provider) fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, now) assert.NoError(t, err) assert.Empty(t, clusterstate.GetScaleUpFailures()) @@ -585,10 +546,7 @@ func TestIncorrectSize(t *testing.T) { assert.NotNil(t, provider) fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) now := time.Now() clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-5*time.Minute)) incorrect := clusterstate.incorrectNodeGroupSizes["ng1"] @@ -621,11 +579,7 @@ func TestUnregisteredNodes(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 10 * time.Second, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 10*time.Second) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(-time.Minute)) assert.NoError(t, err) @@ -671,11 +625,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 10 * time.Second, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 10*time.Second) now.Add(time.Minute) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, noNgNode}, nil, now) @@ -873,11 +823,7 @@ func TestScaleUpBackoff(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 120 * time.Second, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 120*time.Second) // After failed scale-up, node group should be still healthy, but should backoff from scale-ups clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second)) @@ -941,10 +887,7 @@ func TestGetClusterSize(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) // There are 2 actual nodes in 2 node groups with target sizes of 5 and 1. clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, notAutoscaledNode}, nil, now) @@ -983,15 +926,7 @@ func TestUpdateScaleUp(t *testing.T) { provider.AddNodeGroup("ng1", 1, 10, 5) fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry( - provider, - ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 10 * time.Second, - }, - fakeLogRecorder, - newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 10*time.Second) clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 100, now) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100) @@ -1029,7 +964,7 @@ func TestScaleUpFailures(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff()) + clusterstate := newTestClusterStateRegistry(provider, fakeLogRecorder, 15*time.Minute) clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.Timeout, now) clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), metrics.Timeout, now) @@ -1054,3 +989,22 @@ func newBackoff() backoff.Backoff { return backoff.NewIdBasedExponentialBackoff(5*time.Minute, /*InitialNodeGroupBackoffDuration*/ 30*time.Minute /*MaxNodeGroupBackoffDuration*/, 3*time.Hour /*NodeGroupBackoffResetTimeout*/) } + +func newTestClusterStateRegistry(provider cloudprovider.CloudProvider, logRecorder *utils.LogEventRecorder, maxTime time.Duration) *ClusterStateRegistry { + context := &context.AutoscalingContext{ + AutoscalingOptions: config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + MaxNodeProvisionTime: maxTime, + }, + }, + AutoscalingKubeClients: context.AutoscalingKubeClients{ + LogRecorder: logRecorder, + }, + CloudProvider: provider, + } + + return NewClusterStateRegistry(context, ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) +} diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 595cf1d37a5a..aa747f698f17 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -44,6 +44,8 @@ type NodeGroupAutoscalingOptions struct { ScaleDownUnneededTime time.Duration // ScaleDownUnreadyTime represents how long an unready node should be unneeded before it is eligible for scale down ScaleDownUnreadyTime time.Duration + // Maximum time CA waits for node to be provisioned + MaxNodeProvisionTime time.Duration } const ( @@ -110,8 +112,6 @@ type AutoscalingOptions struct { // MaxGracefulTerminationSec is maximum number of seconds scale down waits for pods to terminate before // removing the node from cloud provider. MaxGracefulTerminationSec int - // Maximum time CA waits for node to be provisioned - MaxNodeProvisionTime time.Duration // MaxTotalUnreadyPercentage is the maximum percentage of unready nodes after which CA halts operations MaxTotalUnreadyPercentage float64 // OkTotalUnreadyCount is the number of allowed unready nodes, irrespective of max-total-unready-percentage diff --git a/cluster-autoscaler/config/const.go b/cluster-autoscaler/config/const.go index 5e4873d7e078..532b41a37d69 100644 --- a/cluster-autoscaler/config/const.go +++ b/cluster-autoscaler/config/const.go @@ -30,4 +30,6 @@ const ( DefaultScaleDownUnneededTimeKey = "scaledownunneededtime" // DefaultScaleDownUnreadyTimeKey identifies ScaleDownUnreadyTime autoscaling option DefaultScaleDownUnreadyTimeKey = "scaledownunreadytime" + // DefaultMaxNodeProvisionTimeKey identifies MaxNodeProvisionTime autoscaling option + DefaultMaxNodeProvisionTimeKey = "maxnodeprovisiontime" ) diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index df9a4af53c26..c51eb42d1bbc 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -828,7 +829,7 @@ func TestStartDeletion(t *testing.T) { if err != nil { t.Fatalf("Couldn't set up autoscaling context: %v", err) } - csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff()) + csr := clusterstate.NewClusterStateRegistry(&ctx, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) for _, node := range tc.emptyNodes { err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name]) if err != nil { @@ -1077,7 +1078,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) { if err != nil { t.Fatalf("Couldn't set up autoscaling context: %v", err) } - csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff()) + csr := clusterstate.NewClusterStateRegistry(&ctx, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) ndt := deletiontracker.NewNodeDeletionTracker(0) actuator := Actuator{ ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt, diff --git a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go index d4bda2f0ef30..c302d1e6be34 100644 --- a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" @@ -161,7 +162,8 @@ func TestRemove(t *testing.T) { }) ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, nil, provider, nil, nil) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder, NewBackoff()) + ctx.LogRecorder = fakeLogRecorder + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&ctx, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) if err != nil { t.Fatalf("Couldn't set up autoscaling context: %v", err) } diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go index 94ae598fccce..38559fb3c1ad 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" autoscaler_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -146,7 +147,7 @@ func TestFindUnneededNodes(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) sd := wrapper.sd allNodes := []*apiv1.Node{n1, n2, n3, n4, n5, n7, n8, n9} @@ -277,7 +278,7 @@ func TestFindUnneededGPUNodes(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) sd := wrapper.sd allNodes := []*apiv1.Node{n1, n2, n3} @@ -392,7 +393,7 @@ func TestFindUnneededWithPerNodeGroupThresholds(t *testing.T) { context, err := NewScaleTestAutoscalingContext(globalOptions, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) sd := wrapper.sd clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, allPods) @@ -475,7 +476,7 @@ func TestPodsWithPreemptionsFindUnneededNodes(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) sd := wrapper.sd @@ -539,7 +540,7 @@ func TestFindUnneededMaxCandidates(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) sd := wrapper.sd @@ -623,7 +624,7 @@ func TestFindUnneededEmptyNodes(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) sd := wrapper.sd @@ -680,7 +681,7 @@ func TestFindUnneededNodePool(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) sd := wrapper.sd clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, pods) @@ -771,7 +772,7 @@ func TestScaleDown(t *testing.T) { assert.NoError(t, err) nodes := []*apiv1.Node{n1, n2} - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p1, p2}) autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) @@ -1028,7 +1029,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) { context, err := NewScaleTestAutoscalingContext(config.Options, fakeClient, registry, provider, nil, nil) assert.NoError(t, err) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, config.NodeDeletionTracker) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{}) autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) @@ -1123,7 +1124,7 @@ func TestNoScaleDownUnready(t *testing.T) { nodes := []*apiv1.Node{n1, n2} // N1 is unready so it requires a bigger unneeded time. - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p2}) autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) @@ -1237,7 +1238,7 @@ func TestScaleDownNoMove(t *testing.T) { nodes := []*apiv1.Node{n1, n2} - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) wrapper := newWrapperForTesting(&context, clusterStateRegistry, nil) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, []*apiv1.Pod{p1, p2}) autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 4dc246ac25d1..be1ef359a40f 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -534,7 +534,8 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleTestConfig) *ScaleTestResul context.ExpanderStrategy = expander nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + processors := NewTestProcessors(&context) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), processors.NodeGroupConfigProcessor) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) extraPods := make([]*apiv1.Pod, 0, len(config.ExtraPods)) @@ -543,7 +544,6 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleTestConfig) *ScaleTestResul extraPods = append(extraPods, pod) } - processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, nil) scaleUpStatus, err := suOrchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos) @@ -696,11 +696,11 @@ func TestScaleUpUnhealthy(t *testing.T) { nodes := []*apiv1.Node{n1, n2} nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + processors := NewTestProcessors(&context) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), processors.NodeGroupConfigProcessor) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) - processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, nil) scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) @@ -739,11 +739,11 @@ func TestScaleUpNoHelp(t *testing.T) { nodes := []*apiv1.Node{n1} nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + processors := NewTestProcessors(&context) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), processors.NodeGroupConfigProcessor) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 500, 0) - processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, nil) scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) @@ -808,7 +808,8 @@ func TestScaleUpBalanceGroups(t *testing.T) { assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + processors := NewTestProcessors(&context) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), processors.NodeGroupConfigProcessor) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) pods := make([]*apiv1.Pod, 0) @@ -816,7 +817,6 @@ func TestScaleUpBalanceGroups(t *testing.T) { pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0)) } - processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, nil) scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos) @@ -870,9 +870,9 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil) assert.NoError(t, err) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) - processors := NewTestProcessors(&context) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), processors.NodeGroupConfigProcessor) + processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0} @@ -925,9 +925,8 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil, nil) assert.NoError(t, err) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) - processors := NewTestProcessors(&context) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), processors.NodeGroupConfigProcessor) processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2} @@ -986,7 +985,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) { nodes := []*apiv1.Node{n1, n2} nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) processors := NewTestProcessors(&context) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), processors.NodeGroupConfigProcessor) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) suOrchestrator := New() @@ -1059,7 +1058,7 @@ func TestAuthError(t *testing.T) { nodeGroup.On("IncreaseSize", 0).Return(errors.NewAutoscalerError(errors.AutoscalerErrorType("abcd"), "")) processors := NewTestProcessors(&context) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(nil, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(&context, clusterstate.ClusterStateRegistryConfig{}, NewBackoff(), processors.NodeGroupConfigProcessor) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterStateRegistry, nil) scaleUpOrchestrator := suOrchestrator.(*ScaleUpOrchestrator) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index b8bae788ce72..bd34073383a3 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -158,7 +158,6 @@ func NewStaticAutoscaler( clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage, OkTotalUnreadyCount: opts.OkTotalUnreadyCount, - MaxNodeProvisionTime: opts.MaxNodeProvisionTime, } ignoredTaints := make(taints.TaintKeySet) @@ -167,7 +166,7 @@ func NewStaticAutoscaler( ignoredTaints[taintKey] = true } - clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext.CloudProvider, clusterStateConfig, autoscalingContext.LogRecorder, backoff) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(autoscalingContext, clusterStateConfig, backoff, processors.NodeGroupConfigProcessor) processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry) deleteOptions := simulator.NodeDeleteOptions{ @@ -712,7 +711,11 @@ func fixNodeGroupSize(context *context.AutoscalingContext, clusterStateRegistry if incorrectSize == nil { continue } - if incorrectSize.FirstObserved.Add(context.MaxNodeProvisionTime).Before(currentTime) { + maxNodeProvisionTime, err := clusterStateRegistry.MaxNodeProvisionTime(context, nodeGroup) + if err != nil { + return false, fmt.Errorf("failed to retrieve maxNodeProvisionTime for nodeGroup %s", nodeGroup.Id()) + } + if incorrectSize.FirstObserved.Add(maxNodeProvisionTime).Before(currentTime) { delta := incorrectSize.CurrentSize - incorrectSize.ExpectedSize if delta < 0 { klog.V(0).Infof("Decreasing size of %s, expected=%d current=%d delta=%d", nodeGroup.Id(), @@ -734,17 +737,23 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod csr *clusterstate.ClusterStateRegistry, currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) { removedAny := false for _, unregisteredNode := range unregisteredNodes { - if unregisteredNode.UnregisteredSince.Add(context.MaxNodeProvisionTime).Before(currentTime) { + nodeGroup, err := context.CloudProvider.NodeGroupForNode(unregisteredNode.Node) + if err != nil { + klog.Warningf("Failed to get node group for %s: %v", unregisteredNode.Node.Name, err) + return removedAny, err + } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + klog.Warningf("No node group for node %s, skipping", unregisteredNode.Node.Name) + continue + } + + maxNodeProvisionTime, err := csr.MaxNodeProvisionTime(context, nodeGroup) + if err != nil { + return false, fmt.Errorf("failed to retrieve maxNodeProvisionTime for node %s in nodeGroup %s", unregisteredNode.Node.Name, nodeGroup.Id()) + } + + if unregisteredNode.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) { klog.V(0).Infof("Removing unregistered node %v", unregisteredNode.Node.Name) - nodeGroup, err := context.CloudProvider.NodeGroupForNode(unregisteredNode.Node) - if err != nil { - klog.Warningf("Failed to get node group for %s: %v", unregisteredNode.Node.Name, err) - return removedAny, err - } - if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { - klog.Warningf("No node group for node %s, skipping", unregisteredNode.Node.Name) - continue - } size, err := nodeGroup.TargetSize() if err != nil { klog.Warningf("Failed to get node group size; unregisteredNode=%v; nodeGroup=%v; err=%v", unregisteredNode.Node.Name, nodeGroup.Id(), err) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index d3b9ba14e10e..b6e7d3d2aa27 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -42,6 +42,7 @@ import ( core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" @@ -204,6 +205,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { ScaleDownUnneededTime: time.Minute, ScaleDownUnreadyTime: time.Minute, ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, }, EstimatorName: estimator.BinpackingEstimatorName, EnforceNodeGroupMinSize: true, @@ -225,12 +227,10 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 10 * time.Second, + OkTotalUnreadyCount: 1, } - processors := NewTestProcessors(&context) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterStateConfig, NewBackoff(), processors.NodeGroupConfigProcessor) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, nil) @@ -418,6 +418,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { ScaleDownUnneededTime: time.Minute, ScaleDownUnreadyTime: time.Minute, ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, }, EstimatorName: estimator.BinpackingEstimatorName, ScaleDownEnabled: true, @@ -444,10 +445,9 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: 0, - MaxNodeProvisionTime: 10 * time.Second, + OkTotalUnreadyCount: 0, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterStateConfig, NewBackoff(), processors.NodeGroupConfigProcessor) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) suOrchestrator := orchestrator.New() @@ -568,13 +568,13 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { ScaleDownUnneededTime: time.Minute, ScaleDownUnreadyTime: time.Minute, ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, }, - EstimatorName: estimator.BinpackingEstimatorName, - ScaleDownEnabled: true, - MaxNodesTotal: 10, - MaxCoresTotal: 10, - MaxMemoryTotal: 100000, - MaxNodeProvisionTime: 10 * time.Second, + EstimatorName: estimator.BinpackingEstimatorName, + ScaleDownEnabled: true, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, } processorCallbacks := newStaticAutoscalerProcessorCallbacks() @@ -589,10 +589,10 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 10 * time.Second, + OkTotalUnreadyCount: 1, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + processors := NewTestProcessors(&context) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterStateConfig, NewBackoff(), processors.NodeGroupConfigProcessor) // broken node detected as unregistered nodes := []*apiv1.Node{n1} @@ -602,8 +602,6 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // broken node failed to register in time clusterState.UpdateNodes(nodes, nil, later) - processors := NewTestProcessors(&context) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, nil) @@ -727,6 +725,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { ScaleDownUnneededTime: time.Minute, ScaleDownUtilizationThreshold: 0.5, ScaleDownUnreadyTime: time.Minute, + MaxNodeProvisionTime: 10 * time.Second, }, EstimatorName: estimator.BinpackingEstimatorName, ScaleDownEnabled: true, @@ -749,12 +748,11 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 10 * time.Second, + OkTotalUnreadyCount: 1, } processors := NewTestProcessors(&context) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterStateConfig, NewBackoff(), processors.NodeGroupConfigProcessor) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, nil) @@ -863,6 +861,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) options := config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, }, EstimatorName: estimator.BinpackingEstimatorName, ScaleDownEnabled: false, @@ -884,12 +883,11 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 10 * time.Second, + OkTotalUnreadyCount: 1, } processors := NewTestProcessors(&context) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterStateConfig, NewBackoff(), processors.NodeGroupConfigProcessor) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) autoscaler := &StaticAutoscaler{ @@ -963,6 +961,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * options := config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, }, EstimatorName: estimator.BinpackingEstimatorName, ScaleDownEnabled: false, @@ -984,12 +983,11 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 10 * time.Second, + OkTotalUnreadyCount: 1, } processors := NewTestProcessors(&context) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterStateConfig, NewBackoff(), processors.NodeGroupConfigProcessor) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) autoscaler := &StaticAutoscaler{ @@ -1028,6 +1026,7 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { ScaleDownUnneededTime: time.Minute, ScaleDownUnreadyTime: time.Minute, ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, }, EstimatorName: estimator.BinpackingEstimatorName, ScaleDownEnabled: true, @@ -1042,11 +1041,10 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { assert.NoError(t, err) clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: 1, - MaxNodeProvisionTime: 10 * time.Second, + OkTotalUnreadyCount: 1, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + clusterState := clusterstate.NewClusterStateRegistry(&context, clusterStateConfig, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, clusterStateRegistry: clusterState, @@ -1064,6 +1062,7 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { nodeGroupA.On("TargetSize").Return(5, nil) nodeGroupA.On("Id").Return("A") nodeGroupA.On("DeleteNodes", mock.Anything).Return(nil) + nodeGroupA.On("GetOptions", options.NodeGroupDefaults).Return(&options.NodeGroupDefaults, nil) nodeGroupA.On("Nodes").Return([]cloudprovider.Instance{ { Id: "A1", @@ -1124,6 +1123,7 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { nodeGroupB.On("TargetSize").Return(5, nil) nodeGroupB.On("Id").Return("B") nodeGroupB.On("DeleteNodes", mock.Anything).Return(nil) + nodeGroupB.On("GetOptions", options.NodeGroupDefaults).Return(&options.NodeGroupDefaults, nil) nodeGroupB.On("Nodes").Return([]cloudprovider.Instance{ { Id: "B1", @@ -1261,6 +1261,7 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { nodeGroupC.On("TargetSize").Return(1, nil) nodeGroupC.On("Id").Return("C") nodeGroupC.On("DeleteNodes", mock.Anything).Return(nil) + nodeGroupC.On("GetOptions", options.NodeGroupDefaults).Return(&options.NodeGroupDefaults, nil) nodeGroupC.On("Nodes").Return([]cloudprovider.Instance{ { Id: "C1", @@ -1280,8 +1281,8 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { func(node *apiv1.Node) bool { return false }, nil) - - clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff()) + context.CloudProvider = provider + clusterState = clusterstate.NewClusterStateRegistry(&context, clusterStateConfig, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) clusterState.RefreshCloudProviderNodeInstancesCache() autoscaler.clusterStateRegistry = clusterState @@ -1409,9 +1410,10 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { ctx, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listerRegistry, provider, processorCallbacks, nil) assert.NoError(t, err) + processors := NewTestProcessors(&ctx) // Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic. csrConfig := clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: nodeGroupCount * unreadyNodesCount} - csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff()) + csr := clusterstate.NewClusterStateRegistry(&ctx, csrConfig, NewBackoff(), processors.NodeGroupConfigProcessor) // Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test. actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), simulator.NodeDeleteOptions{}) @@ -1425,7 +1427,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { clusterStateRegistry: csr, scaleDownActuator: actuator, scaleDownPlanner: planner, - processors: NewTestProcessors(&ctx), + processors: processors, processorCallbacks: processorCallbacks, } @@ -1495,20 +1497,26 @@ func TestRemoveFixNodeTargetSize(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := clusterstate_utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, NewBackoff()) - err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-time.Hour)) - assert.NoError(t, err) context := &context.AutoscalingContext{ AutoscalingOptions: config.AutoscalingOptions{ - MaxNodeProvisionTime: 45 * time.Minute, + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + MaxNodeProvisionTime: 45 * time.Minute, + }, + }, + AutoscalingKubeClients: context.AutoscalingKubeClients{ + LogRecorder: fakeLogRecorder, }, CloudProvider: provider, } + clusterState := clusterstate.NewClusterStateRegistry(context, clusterstate.ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) + err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-time.Hour)) + assert.NoError(t, err) + // Nothing should be fixed. The incorrect size state is not old enough. removed, err := fixNodeGroupSize(context, clusterState, now.Add(-50*time.Minute)) assert.NoError(t, err) @@ -1541,19 +1549,26 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) { fakeClient := &fake.Clientset{} fakeLogRecorder, _ := clusterstate_utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{ - MaxTotalUnreadyPercentage: 10, - OkTotalUnreadyCount: 1, - }, fakeLogRecorder, NewBackoff()) - err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-time.Hour)) - assert.NoError(t, err) context := &context.AutoscalingContext{ AutoscalingOptions: config.AutoscalingOptions{ - MaxNodeProvisionTime: 45 * time.Minute, + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + MaxNodeProvisionTime: 45 * time.Minute, + }, }, CloudProvider: provider, + AutoscalingKubeClients: context.AutoscalingKubeClients{ + LogRecorder: fakeLogRecorder, + }, } + + clusterState := clusterstate.NewClusterStateRegistry(context, clusterstate.ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor()) + err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-time.Hour)) + assert.NoError(t, err) + unregisteredNodes := clusterState.GetUnregisteredNodes() assert.Equal(t, 1, len(unregisteredNodes)) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index b46b927333d0..160cb16da2ae 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -148,7 +148,7 @@ var ( maxTotalUnreadyPercentage = flag.Float64("max-total-unready-percentage", 45, "Maximum percentage of unready nodes in the cluster. After this is exceeded, CA halts operations") okTotalUnreadyCount = flag.Int("ok-total-unready-count", 3, "Number of allowed unready nodes, irrespective of max-total-unready-percentage") scaleUpFromZero = flag.Bool("scale-up-from-zero", true, "Should CA scale up when there 0 ready nodes.") - maxNodeProvisionTime = flag.Duration("max-node-provision-time", 15*time.Minute, "Maximum time CA waits for node to be provisioned") + maxNodeProvisionTime = flag.Duration("max-node-provision-time", 15*time.Minute, "The default maximum time CA waits for node to be provisioned") maxPodEvictionTime = flag.Duration("max-pod-eviction-time", 2*time.Minute, "Maximum time CA tries to evict a pod before giving up") nodeGroupsFlag = multiStringFlag( "nodes", @@ -253,6 +253,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { ScaleDownGpuUtilizationThreshold: *scaleDownGpuUtilizationThreshold, ScaleDownUnneededTime: *scaleDownUnneededTime, ScaleDownUnreadyTime: *scaleDownUnreadyTime, + MaxNodeProvisionTime: *maxNodeProvisionTime, }, CloudConfig: *cloudConfig, CloudProviderName: *cloudProviderFlag, @@ -270,7 +271,6 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxBulkSoftTaintTime: *maxBulkSoftTaintTime, MaxEmptyBulkDelete: *maxEmptyBulkDeleteFlag, MaxGracefulTerminationSec: *maxGracefulTerminationFlag, - MaxNodeProvisionTime: *maxNodeProvisionTime, MaxPodEvictionTime: *maxPodEvictionTime, MaxNodesTotal: *maxNodesTotal, MaxCoresTotal: maxCoresTotal, diff --git a/cluster-autoscaler/processors/nodegroupconfig/node_group_config_processor.go b/cluster-autoscaler/processors/nodegroupconfig/node_group_config_processor.go index 74ce0efbce68..5d5c80aed3ac 100644 --- a/cluster-autoscaler/processors/nodegroupconfig/node_group_config_processor.go +++ b/cluster-autoscaler/processors/nodegroupconfig/node_group_config_processor.go @@ -33,6 +33,8 @@ type NodeGroupConfigProcessor interface { GetScaleDownUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error) // GetScaleDownGpuUtilizationThreshold returns ScaleDownGpuUtilizationThreshold value that should be used for a given NodeGroup. GetScaleDownGpuUtilizationThreshold(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (float64, error) + // GetMaxNodeProvisionTime return MaxNodeProvisionTime value that should be used for a given NodeGroup. + GetMaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error) // CleanUp cleans up processor's internal structures. CleanUp() } @@ -91,6 +93,18 @@ func (p *DelegatingNodeGroupConfigProcessor) GetScaleDownGpuUtilizationThreshold return ngConfig.ScaleDownGpuUtilizationThreshold, nil } +// GetMaxNodeProvisionTime returns MaxNodeProvisionTime value that should be used for a given NodeGroup. +func (p *DelegatingNodeGroupConfigProcessor) GetMaxNodeProvisionTime(context *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup) (time.Duration, error) { + ngConfig, err := nodeGroup.GetOptions(context.NodeGroupDefaults) + if err != nil && err != cloudprovider.ErrNotImplemented { + return time.Duration(0), err + } + if ngConfig == nil || err == cloudprovider.ErrNotImplemented { + return context.NodeGroupDefaults.MaxNodeProvisionTime, nil + } + return ngConfig.MaxNodeProvisionTime, nil +} + // CleanUp cleans up processor's internal structures. func (p *DelegatingNodeGroupConfigProcessor) CleanUp() { } diff --git a/cluster-autoscaler/processors/nodegroupconfig/node_group_config_processor_test.go b/cluster-autoscaler/processors/nodegroupconfig/node_group_config_processor_test.go index aabb1838ceed..4046424afd58 100644 --- a/cluster-autoscaler/processors/nodegroupconfig/node_group_config_processor_test.go +++ b/cluster-autoscaler/processors/nodegroupconfig/node_group_config_processor_test.go @@ -48,12 +48,14 @@ func TestDelegatingNodeGroupConfigProcessor(t *testing.T) { ScaleDownUnreadyTime: 4 * time.Minute, ScaleDownGpuUtilizationThreshold: 0.6, ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 15 * time.Minute, } ngOpts := &config.NodeGroupAutoscalingOptions{ ScaleDownUnneededTime: 10 * time.Minute, ScaleDownUnreadyTime: 11 * time.Minute, ScaleDownGpuUtilizationThreshold: 0.85, ScaleDownUtilizationThreshold: 0.75, + MaxNodeProvisionTime: 60 * time.Minute, } testUnneededTime := func(t *testing.T, p DelegatingNodeGroupConfigProcessor, c *context.AutoscalingContext, ng cloudprovider.NodeGroup, w Want, we error) { @@ -96,17 +98,29 @@ func TestDelegatingNodeGroupConfigProcessor(t *testing.T) { } assert.Equal(t, res, results[w]) } + testMaxNodeProvisionTime := func(t *testing.T, p DelegatingNodeGroupConfigProcessor, c *context.AutoscalingContext, ng cloudprovider.NodeGroup, w Want, we error) { + res, err := p.GetMaxNodeProvisionTime(c, ng) + assert.Equal(t, err, we) + results := map[Want]time.Duration{ + NIL: time.Duration(0), + GLOBAL: 15 * time.Minute, + NG: 60 * time.Minute, + } + assert.Equal(t, res, results[w]) + } funcs := map[string]func(*testing.T, DelegatingNodeGroupConfigProcessor, *context.AutoscalingContext, cloudprovider.NodeGroup, Want, error){ "ScaleDownUnneededTime": testUnneededTime, "ScaleDownUnreadyTime": testUnreadyTime, "ScaleDownUtilizationThreshold": testUtilizationThreshold, "ScaleDownGpuUtilizationThreshold": testGpuThreshold, + "MaxNodeProvisionTime": testMaxNodeProvisionTime, "MultipleOptions": func(t *testing.T, p DelegatingNodeGroupConfigProcessor, c *context.AutoscalingContext, ng cloudprovider.NodeGroup, w Want, we error) { testUnneededTime(t, p, c, ng, w, we) testUnreadyTime(t, p, c, ng, w, we) testUtilizationThreshold(t, p, c, ng, w, we) testGpuThreshold(t, p, c, ng, w, we) + testMaxNodeProvisionTime(t, p, c, ng, w, we) }, "RepeatingTheSameCallGivesConsistentResults": func(t *testing.T, p DelegatingNodeGroupConfigProcessor, c *context.AutoscalingContext, ng cloudprovider.NodeGroup, w Want, we error) { testUnneededTime(t, p, c, ng, w, we)