From 85a83b62bdd19826b20bf9dbb90bffb3005f8346 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 2 Jan 2019 16:16:45 +0100 Subject: [PATCH] Pass nodeGroup->NodeInfo map to ClusterStateRegistry Change-Id: Ie2a51694b5731b39c8a4135355a3b4c832c26801 --- .../clusterstate/clusterstate.go | 5 +- .../clusterstate/clusterstate_test.go | 46 ++++++++--------- cluster-autoscaler/core/scale_up.go | 7 +-- cluster-autoscaler/core/scale_up_test.go | 39 ++++++++++----- cluster-autoscaler/core/static_autoscaler.go | 28 +++++++---- .../core/static_autoscaler_test.go | 50 +++++++++++-------- cluster-autoscaler/core/utils_test.go | 4 +- 7 files changed, 102 insertions(+), 77 deletions(-) diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index dc3017092549..0ebfdee02657 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -34,6 +34,7 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/klog" ) @@ -119,6 +120,7 @@ type ClusterStateRegistry struct { scaleUpRequests map[string]*ScaleUpRequest // nodeGroupName -> ScaleUpRequest scaleDownRequests []*ScaleDownRequest nodes []*apiv1.Node + nodeInfosForGroups map[string]*schedulercache.NodeInfo cloudProvider cloudprovider.CloudProvider perNodeGroupReadiness map[string]Readiness totalReadiness Readiness @@ -267,7 +269,7 @@ func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprov } // UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats -func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime time.Time) error { +func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulercache.NodeInfo, currentTime time.Time) error { csr.updateNodeGroupMetrics() targetSizes, err := getTargetSizes(csr.cloudProvider) if err != nil { @@ -284,6 +286,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime ti defer csr.Unlock() csr.nodes = nodes + csr.nodeInfosForGroups = nodeInfosForGroups csr.previousCloudProviderNodeInstances = csr.cloudProviderNodeInstances csr.cloudProviderNodeInstances = cloudProviderNodeInstances diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index abcbdb537632..7d47c3388cff 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -57,7 +57,7 @@ func TestOKWithScaleUp(t *testing.T) { MaxNodeProvisionTime: time.Minute, }, fakeLogRecorder, newBackoff()) clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now()) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -97,7 +97,7 @@ func TestEmptyOK(t *testing.T) { OkTotalUnreadyCount: 1, MaxNodeProvisionTime: time.Minute, }, fakeLogRecorder, newBackoff()) - err := clusterstate.UpdateNodes([]*apiv1.Node{}, now.Add(-5*time.Second)) + err := clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now.Add(-5*time.Second)) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) @@ -107,7 +107,7 @@ func TestEmptyOK(t *testing.T) { clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second)) // clusterstate.scaleUpRequests["ng1"].Time = now.Add(-3 * time.Second) // clusterstate.scaleUpRequests["ng1"].ExpectedAddTime = now.Add(1 * time.Minute) - err = clusterstate.UpdateNodes([]*apiv1.Node{}, now) + err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -136,7 +136,7 @@ func TestOKOneUnreadyNode(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) @@ -173,7 +173,7 @@ func TestNodeWithoutNodeGroupDontCrash(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) - err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, nil, now) assert.NoError(t, err) clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{noNgNode}, now) } @@ -199,7 +199,7 @@ func TestOKOneUnreadyNodeWithScaleDownCandidate(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{ng1_1}, now) assert.NoError(t, err) @@ -263,7 +263,7 @@ func TestMissingNodes(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.False(t, clusterstate.IsNodeGroupHealthy("ng1")) @@ -304,7 +304,7 @@ func TestTooManyUnready(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.False(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) @@ -329,7 +329,7 @@ func TestExpiredScaleUp(t *testing.T) { MaxNodeProvisionTime: 2 * time.Minute, }, fakeLogRecorder, newBackoff()) clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute)) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.False(t, clusterstate.IsNodeGroupHealthy("ng1")) @@ -400,7 +400,7 @@ func TestUpcomingNodes(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, nil, now) assert.NoError(t, err) upcomingNodes := clusterstate.GetUpcomingNodes() @@ -423,19 +423,19 @@ func TestIncorrectSize(t *testing.T) { OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) now := time.Now() - clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-5*time.Minute)) + clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-5*time.Minute)) incorrect := clusterstate.incorrectNodeGroupSizes["ng1"] assert.Equal(t, 5, incorrect.ExpectedSize) assert.Equal(t, 1, incorrect.CurrentSize) assert.Equal(t, now.Add(-5*time.Minute), incorrect.FirstObserved) - clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-4*time.Minute)) + clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-4*time.Minute)) incorrect = clusterstate.incorrectNodeGroupSizes["ng1"] assert.Equal(t, 5, incorrect.ExpectedSize) assert.Equal(t, 1, incorrect.CurrentSize) assert.Equal(t, now.Add(-5*time.Minute), incorrect.FirstObserved) - clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1}, now.Add(-3*time.Minute)) + clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1}, nil, now.Add(-3*time.Minute)) incorrect = clusterstate.incorrectNodeGroupSizes["ng1"] assert.Equal(t, 5, incorrect.ExpectedSize) assert.Equal(t, 2, incorrect.CurrentSize) @@ -459,7 +459,7 @@ func TestUnregisteredNodes(t *testing.T) { OkTotalUnreadyCount: 1, MaxNodeProvisionTime: 10 * time.Second, }, fakeLogRecorder, newBackoff()) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, time.Now().Add(-time.Minute)) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(-time.Minute)) assert.NoError(t, err) assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes())) @@ -469,14 +469,14 @@ func TestUnregisteredNodes(t *testing.T) { // The node didn't come up in MaxNodeProvisionTime, it should no longer be // counted as upcoming (but it is still an unregistered node) - err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, time.Now().Add(time.Minute)) + err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(time.Minute)) assert.NoError(t, err) assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes())) assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name) upcomingNodes = clusterstate.GetUpcomingNodes() assert.Equal(t, 0, len(upcomingNodes)) - err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, time.Now().Add(time.Minute)) + err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, time.Now().Add(time.Minute)) assert.NoError(t, err) assert.Equal(t, 0, len(clusterstate.GetUnregisteredNodes())) } @@ -614,7 +614,7 @@ func TestScaleUpBackoff(t *testing.T) { // After failed scale-up, node group should be still healthy, but should backoff from scale-ups clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second)) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) @@ -629,7 +629,7 @@ func TestScaleUpBackoff(t *testing.T) { // Another failed scale up should cause longer backoff clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second)) - err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now) + err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) @@ -643,7 +643,7 @@ func TestScaleUpBackoff(t *testing.T) { ng1_4 := BuildTestNode("ng1-4", 1000, 1000) SetNodeReadyState(ng1_4, true, now.Add(-1*time.Minute)) provider.AddNode("ng1", ng1_4) - err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3, ng1_4}, now) + err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3, ng1_4}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) @@ -674,20 +674,20 @@ func TestGetClusterSize(t *testing.T) { }, fakeLogRecorder, newBackoff()) // There are 2 actual nodes in 2 node groups with target sizes of 5 and 1. - clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) currentSize, targetSize := clusterstate.GetClusterSize() assert.Equal(t, 2, currentSize) assert.Equal(t, 6, targetSize) // Current size should increase after a new node is added. - clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, now.Add(time.Minute)) + clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, nil, now.Add(time.Minute)) currentSize, targetSize = clusterstate.GetClusterSize() assert.Equal(t, 3, currentSize) assert.Equal(t, 6, targetSize) // Target size should increase after a new node group is added. provider.AddNodeGroup("ng3", 1, 10, 1) - clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, now.Add(2*time.Minute)) + clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, nil, now.Add(2*time.Minute)) currentSize, targetSize = clusterstate.GetClusterSize() assert.Equal(t, 3, currentSize) assert.Equal(t, 7, targetSize) @@ -696,7 +696,7 @@ func TestGetClusterSize(t *testing.T) { for _, ng := range provider.NodeGroups() { ng.(*testprovider.TestNodeGroup).SetTargetSize(10) } - clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, now.Add(3*time.Minute)) + clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, nil, now.Add(3*time.Minute)) currentSize, targetSize = clusterstate.GetClusterSize() assert.Equal(t, 3, currentSize) assert.Equal(t, 30, targetSize) diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index f46a059fb503..707285cb281b 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -244,7 +244,7 @@ var ( // false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are // ready and in sync with instance groups. func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod, - nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet) (*status.ScaleUpStatus, errors.AutoscalerError) { + nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulercache.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) { // From now on we only care about unschedulable pods that were marked after the newest // node became available for the scheduler. if len(unschedulablePods) == 0 { @@ -263,11 +263,6 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto podsRemainUnschedulable[pod] = make(map[string]status.Reasons) } glogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left()) - nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ListerRegistry, - daemonSets, context.PredicateChecker) - if err != nil { - return &status.ScaleUpStatus{Result: status.ScaleUpError}, err.AddPrefix("failed to build node infos for node groups: ") - } nodesFromNotAutoscaledGroups, err := FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider) if err != nil { diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index 760cff10cd44..11888d84ea84 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -414,8 +414,9 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) { } context.ExpanderStrategy = expander + nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) - clusterState.UpdateNodes(nodes, time.Now()) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) extraPods := make([]*apiv1.Pod, len(config.extraPods)) for i, p := range config.extraPods { @@ -425,7 +426,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) { processors := ca_processors.TestProcessors() - scaleUpStatus, err := ScaleUp(&context, processors, clusterState, extraPods, nodes, []*appsv1.DaemonSet{}) + scaleUpStatus, err := ScaleUp(&context, processors, clusterState, extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -499,19 +500,21 @@ func TestScaleUpNodeComingNoScale(t *testing.T) { } context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider) + nodes := []*apiv1.Node{n1, n2} + nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker) clusterState := clusterstate.NewClusterStateRegistry( provider, clusterstate.ClusterStateRegistryConfig{MaxNodeProvisionTime: 5 * time.Minute}, context.LogRecorder, newBackoff()) clusterState.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng2"), 1, time.Now()) - clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) processors := ca_processors.TestProcessors() - scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*appsv1.DaemonSet{}) + scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) assert.NoError(t, err) // A node is already coming - no need for scale up. assert.False(t, scaleUpStatus.WasSuccessful()) @@ -543,6 +546,8 @@ func TestScaleUpNodeComingHasScale(t *testing.T) { context := NewScaleTestAutoscalingContext(defaultOptions, &fake.Clientset{}, listers, provider) + nodes := []*apiv1.Node{n1, n2} + nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker) clusterState := clusterstate.NewClusterStateRegistry( provider, clusterstate.ClusterStateRegistryConfig{ @@ -551,13 +556,13 @@ func TestScaleUpNodeComingHasScale(t *testing.T) { context.LogRecorder, newBackoff()) clusterState.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng2"), 1, time.Now()) - clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) p4 := BuildTestPod("p-new", 550, 0) processors := ca_processors.TestProcessors() - scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3, p4}, []*apiv1.Node{n1, n2}, []*appsv1.DaemonSet{}) + scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3, p4}, nodes, []*appsv1.DaemonSet{}, nodeInfos) assert.NoError(t, err) // Two nodes needed but one node is already coming, so it should increase by one. @@ -595,12 +600,14 @@ func TestScaleUpUnhealthy(t *testing.T) { } context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider) + nodes := []*apiv1.Node{n1, n2} + nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) - clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) processors := ca_processors.TestProcessors() - scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*appsv1.DaemonSet{}) + scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) assert.NoError(t, err) // Node group is unhealthy. @@ -632,12 +639,14 @@ func TestScaleUpNoHelp(t *testing.T) { } context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider) + nodes := []*apiv1.Node{n1} + nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) - clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now()) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 500, 0) processors := ca_processors.TestProcessors() - scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*appsv1.DaemonSet{}) + scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) assert.NoError(t, err) @@ -695,8 +704,9 @@ func TestScaleUpBalanceGroups(t *testing.T) { } context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider) + nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) - clusterState.UpdateNodes(nodes, time.Now()) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) pods := make([]*apiv1.Pod, 0) for i := 0; i < 2; i++ { @@ -704,7 +714,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { } processors := ca_processors.TestProcessors() - scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, pods, nodes, []*appsv1.DaemonSet{}) + scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, pods, nodes, []*appsv1.DaemonSet{}, nodeInfos) assert.NoError(t, typedErr) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -758,7 +768,10 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { processors.NodeGroupListProcessor = &mockAutoprovisioningNodeGroupListProcessor{t} processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t} - scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*appsv1.DaemonSet{}) + nodes := []*apiv1.Node{} + nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, context.ListerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker) + + scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) assert.Equal(t, "autoprovisioned-T1", getStringFromChan(createdGroups)) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index f6e84d346f5b..73a32390ca5e 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -39,6 +39,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" "k8s.io/klog" + schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) const ( @@ -137,7 +138,19 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError return nil } - typedErr = a.updateClusterState(allNodes, currentTime) + daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to get daemonset list") + return errors.ToAutoscalerError(errors.ApiCallError, err) + } + + nodeInfosForGroups, autoscalerError := GetNodeInfosForGroups(readyNodes, autoscalingContext.CloudProvider, autoscalingContext.ListerRegistry, + daemonsets, autoscalingContext.PredicateChecker) + if err != nil { + return autoscalerError.AddPrefix("failed to build node infos for node groups: ") + } + + typedErr = a.updateClusterState(allNodes, nodeInfosForGroups, currentTime) if typedErr != nil { return typedErr } @@ -293,17 +306,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError scaleUpStatus.Result = status.ScaleUpInCooldown klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more") } else { - daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything()) - if err != nil { - klog.Errorf("Failed to get daemonset list") - scaleUpStatus.Result = status.ScaleUpError - return errors.ToAutoscalerError(errors.ApiCallError, err) - } - scaleUpStart := time.Now() metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart) - scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets) + scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups) metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart) @@ -511,14 +517,14 @@ func (a *StaticAutoscaler) actOnEmptyCluster(allNodes, readyNodes []*apiv1.Node, return false } -func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, currentTime time.Time) errors.AutoscalerError { +func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulercache.NodeInfo, currentTime time.Time) errors.AutoscalerError { err := a.AutoscalingContext.CloudProvider.Refresh() if err != nil { klog.Errorf("Failed to refresh cloud provider config: %v", err) return errors.ToAutoscalerError(errors.CloudProviderError, err) } - err = a.clusterStateRegistry.UpdateNodes(allNodes, currentTime) + err = a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfosForGroups, currentTime) if err != nil { klog.Errorf("Failed to update node registry: %v", err) a.scaleDown.CleanUpUnneededNodes() diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index a985004a0956..7fdee66a8ae7 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -198,8 +198,6 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff()) - clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) - sd := NewScaleDown(&context, clusterState) autoscaler := &StaticAutoscaler{ @@ -215,8 +213,9 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // MaxNodesTotal reached. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once() - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() unschedulablePodMock.On("List").Return([]*apiv1.Pod{p2}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() err := autoscaler.RunOnce(time.Now()) @@ -241,8 +240,9 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Mark unneeded nodes. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() provider.AddNode("ng1", n2) @@ -256,8 +256,9 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Scale down. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() onScaleDownMock.On("ScaleDown", "ng1", "n2").Return(nil).Once() @@ -270,8 +271,9 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Mark unregistered nodes. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() unschedulablePodMock.On("List").Return([]*apiv1.Pod{p2}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() provider.AddNode("ng1", n3) @@ -285,6 +287,8 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Remove unregistered nodes. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onScaleDownMock.On("ScaleDown", "ng1", "n3").Return(nil).Once() err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour)) @@ -292,7 +296,6 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { assert.NoError(t, err) mock.AssertExpectationsForObjects(t, readyNodeListerMock, allNodeListerMock, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) - } func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { @@ -377,7 +380,6 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { MaxNodeProvisionTime: 10 * time.Second, } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff()) - clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now()) sd := NewScaleDown(&context, clusterState) @@ -411,9 +413,10 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { // Remove autoprovisioned node group and mark unneeded nodes. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onNodeGroupDeleteMock.On("Delete", "autoprovisioned-TN1").Return(nil).Once() provider.AddAutoprovisionedNodeGroup("autoprovisioned-TN2", 0, 10, 1, "TN1") @@ -427,9 +430,10 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { // Scale down. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onNodeGroupDeleteMock.On("Delete", "autoprovisioned-"+ "TN1").Return(nil).Once() onScaleDownMock.On("ScaleDown", "autoprovisioned-TN2", "n2").Return(nil).Once() @@ -505,10 +509,13 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff()) // broken node detected as unregistered - clusterState.UpdateNodes([]*apiv1.Node{n1}, now) + + nodes := []*apiv1.Node{n1} + //nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, listerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker) + clusterState.UpdateNodes(nodes, nil, now) // broken node failed to register in time - clusterState.UpdateNodes([]*apiv1.Node{n1}, later) + clusterState.UpdateNodes(nodes, nil, later) sd := NewScaleDown(&context, clusterState) @@ -524,7 +531,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // Scale up. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Times(2) // due to initialized=false allNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once() - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(2) // 1 to get pods + 1 per nodegroup when building nodeInfo map + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() // 1 to get pods + 1 per nodegroup when building nodeInfo map unschedulablePodMock.On("List").Return([]*apiv1.Pod{p2}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once() @@ -540,7 +547,9 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() err = autoscaler.RunOnce(later.Add(2 * time.Hour)) waitForDeleteToFinish(t, autoscaler.scaleDown) @@ -638,8 +647,6 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff()) - clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now()) - sd := NewScaleDown(&context, clusterState) autoscaler := &StaticAutoscaler{ @@ -667,8 +674,9 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { // Mark unneeded nodes. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once() - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Times(3) unschedulablePodMock.On("List").Return([]*apiv1.Pod{p4, p5}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() ng2.SetTargetSize(2) @@ -681,8 +689,9 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { // Scale down. readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once() allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once() - scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4}, nil).Once() + scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4}, nil).Times(3) unschedulablePodMock.On("List").Return([]*apiv1.Pod{p5}, nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() onScaleDownMock.On("ScaleDown", "ng1", "n1").Return(nil).Once() @@ -693,7 +702,6 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { assert.NoError(t, err) mock.AssertExpectationsForObjects(t, readyNodeListerMock, allNodeListerMock, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) - } func TestStaticAutoscalerOutOfResources(t *testing.T) { @@ -811,7 +819,7 @@ func TestStaticAutoscalerOutOfResources(t *testing.T) { now := time.Now() // propagate nodes info in cluster state - clusterState.UpdateNodes([]*apiv1.Node{}, now) + clusterState.UpdateNodes([]*apiv1.Node{}, nil, now) // delete nodes with create errors autoscaler.deleteCreatedNodesWithErrors() @@ -833,7 +841,7 @@ func TestStaticAutoscalerOutOfResources(t *testing.T) { // propagate nodes info in cluster state again // no changes in what provider returns - clusterState.UpdateNodes([]*apiv1.Node{}, now) + clusterState.UpdateNodes([]*apiv1.Node{}, nil, now) // delete nodes with create errors autoscaler.deleteCreatedNodesWithErrors() @@ -888,7 +896,7 @@ func TestStaticAutoscalerOutOfResources(t *testing.T) { }, nil) // update cluster state - clusterState.UpdateNodes([]*apiv1.Node{}, now) + clusterState.UpdateNodes([]*apiv1.Node{}, nil, now) // delete nodes with create errors autoscaler.deleteCreatedNodesWithErrors() diff --git a/cluster-autoscaler/core/utils_test.go b/cluster-autoscaler/core/utils_test.go index c3e983f7d80e..2d66c787f19e 100644 --- a/cluster-autoscaler/core/utils_test.go +++ b/cluster-autoscaler/core/utils_test.go @@ -384,7 +384,7 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) - err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour)) + err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-time.Hour)) assert.NoError(t, err) context := &context.AutoscalingContext{ @@ -481,7 +481,7 @@ func TestRemoveFixNodeTargetSize(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) - err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-time.Hour)) + err := clusterState.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-time.Hour)) assert.NoError(t, err) context := &context.AutoscalingContext{