From 9944137faea4e34ee0e94ac9f0da74b6e83a01d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20K=C5=82obuszewski?= Date: Fri, 21 Jan 2022 14:25:57 +0100 Subject: [PATCH] Don't cache NodeInfo for recently Ready nodes There's a race condition between DaemonSet pods getting scheduled to a new node and Cluster Autoscaler caching that node for the sake of predicting future nodes in a given node group. We can reduce the risk of missing some DaemonSet by providing a grace period before accepting nodes in the cache. 1 minute should be more than enough, except for some pathological edge cases. --- cluster-autoscaler/core/scale_up_test.go | 28 ++++++----- cluster-autoscaler/core/static_autoscaler.go | 2 +- .../mixed_nodeinfos_processor.go | 41 +++++++++++------ .../mixed_nodeinfos_processor_test.go | 46 +++++++++++-------- .../node_info_provider_processor.go | 4 +- 5 files changed, 75 insertions(+), 46 deletions(-) diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index a05388bf0bcf..854eef9e24e3 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -474,6 +474,7 @@ func expanderOptionToGroupSizeChange(option expander.Option) groupSizeChange { func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResults { expandedGroups := make(chan groupSizeChange, 10) + now := time.Now() groups := make(map[string][]*apiv1.Node) nodes := make([]*apiv1.Node, len(config.nodes)) @@ -482,7 +483,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul if n.gpu > 0 { AddGpusToNode(node, n.gpu) } - SetNodeReadyState(node, n.ready, time.Now()) + SetNodeReadyState(node, n.ready, now.Add(-2*time.Minute)) nodes[i] = node if n.group != "" { groups[n.group] = append(groups[n.group], node) @@ -529,7 +530,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul } context.ExpanderStrategy = expander - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -657,10 +658,12 @@ func buildTestPod(p podConfig) *apiv1.Pod { } func TestScaleUpUnhealthy(t *testing.T) { + now := time.Now() + someTimeAgo := now.Add(-2 * time.Minute) n1 := BuildTestNode("n1", 100, 1000) - SetNodeReadyState(n1, true, time.Now()) + SetNodeReadyState(n1, true, someTimeAgo) n2 := BuildTestNode("n2", 1000, 1000) - SetNodeReadyState(n2, true, time.Now()) + SetNodeReadyState(n2, true, someTimeAgo) p1 := BuildTestPod("p1", 80, 0) p2 := BuildTestPod("p2", 800, 0) @@ -688,7 +691,7 @@ func TestScaleUpUnhealthy(t *testing.T) { assert.NoError(t, err) nodes := []*apiv1.Node{n1, n2} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) @@ -703,7 +706,8 @@ func TestScaleUpUnhealthy(t *testing.T) { func TestScaleUpNoHelp(t *testing.T) { n1 := BuildTestNode("n1", 100, 1000) - SetNodeReadyState(n1, true, time.Now()) + now := time.Now() + SetNodeReadyState(n1, true, now.Add(-2*time.Minute)) p1 := BuildTestPod("p1", 80, 0) p1.Spec.NodeName = "n1" @@ -728,7 +732,7 @@ func TestScaleUpNoHelp(t *testing.T) { assert.NoError(t, err) nodes := []*apiv1.Node{n1} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 500, 0) @@ -765,12 +769,14 @@ func TestScaleUpBalanceGroups(t *testing.T) { podList := make([]*apiv1.Pod, 0, len(testCfg)) nodes := make([]*apiv1.Node, 0) + now := time.Now() + for gid, gconf := range testCfg { provider.AddNodeGroup(gid, gconf.min, gconf.max, gconf.size) for i := 0; i < gconf.size; i++ { nodeName := fmt.Sprintf("%v-node-%v", gid, i) node := BuildTestNode(nodeName, 100, 1000) - SetNodeReadyState(node, true, time.Now()) + SetNodeReadyState(node, true, now.Add(-2*time.Minute)) nodes = append(nodes, node) pod := BuildTestPod(fmt.Sprintf("%v-pod-%v", gid, i), 80, 0) @@ -793,7 +799,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -861,7 +867,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 0} nodes := []*apiv1.Node{} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) assert.NoError(t, err) @@ -914,7 +920,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 2} nodes := []*apiv1.Node{} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) assert.NoError(t, err) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 0338062fe36a..a7b8a0d90c02 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -285,7 +285,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError return typedErr.AddPrefix("Initialize ClusterSnapshot") } - nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints) + nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints, currentTime) if autoscalerError != nil { klog.Errorf("Failed to get node infos for groups: %v", autoscalerError) return autoscalerError.AddPrefix("failed to build node infos for node groups: ") diff --git a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go index dcea6fd7fb73..37ac80fd0b8b 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go @@ -18,6 +18,7 @@ package nodeinfosprovider import ( "reflect" + "time" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" @@ -33,6 +34,8 @@ import ( klog "k8s.io/klog/v2" ) +const stabilizationDelay = 1 * time.Minute + // MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups. type MixedTemplateNodeInfoProvider struct { nodeInfoCache map[string]*schedulerframework.NodeInfo @@ -51,7 +54,7 @@ func (p *MixedTemplateNodeInfoProvider) CleanUp() { } // Process returns the nodeInfos set for this cluster -func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) { +func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, now time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) { // TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key. // TODO(mwielgus): Review error policy - sometimes we may continue with partial errors. result := make(map[string]*schedulerframework.NodeInfo) @@ -90,7 +93,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, for _, node := range nodes { // Broken nodes might have some stuff missing. Skipping. - if !kube_util.IsNodeReadyAndSchedulable(node) { + if !isNodeGoodTemplateCandidate(node, now) { continue } added, id, typedErr := processNode(node) @@ -144,19 +147,20 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, // Last resort - unready/unschedulable nodes. for _, node := range nodes { // Allowing broken nodes - if !kube_util.IsNodeReadyAndSchedulable(node) { - added, _, typedErr := processNode(node) - if typedErr != nil { - return map[string]*schedulerframework.NodeInfo{}, typedErr - } - nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node) - if err != nil { - return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError( - errors.CloudProviderError, err) - } - if added { - klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name) - } + if isNodeGoodTemplateCandidate(node, now) { + continue + } + added, _, typedErr := processNode(node) + if typedErr != nil { + return map[string]*schedulerframework.NodeInfo{}, typedErr + } + nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node) + if err != nil { + return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError( + errors.CloudProviderError, err) + } + if added { + klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name) } } @@ -174,3 +178,10 @@ func getPodsForNodes(listers kube_util.ListerRegistry) (map[string][]*apiv1.Pod, } return podsForNodes, nil } + +func isNodeGoodTemplateCandidate(node *apiv1.Node, now time.Time) bool { + ready, lastTransitionTime, _ := kube_util.GetReadinessState(node) + stable := lastTransitionTime.Add(stabilizationDelay).Before(now) + schedulable := !node.Spec.Unschedulable + return ready && stable && schedulable +} diff --git a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go index 8b9f2722ff2a..77e8f58fa45a 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go @@ -33,14 +33,17 @@ import ( ) func TestGetNodeInfosForGroups(t *testing.T) { + now := time.Now() ready1 := BuildTestNode("n1", 1000, 1000) - SetNodeReadyState(ready1, true, time.Now()) + SetNodeReadyState(ready1, true, now.Add(-2*time.Minute)) ready2 := BuildTestNode("n2", 2000, 2000) - SetNodeReadyState(ready2, true, time.Now()) + SetNodeReadyState(ready2, true, now.Add(-2*time.Minute)) unready3 := BuildTestNode("n3", 3000, 3000) - SetNodeReadyState(unready3, false, time.Now()) + SetNodeReadyState(unready3, false, now) unready4 := BuildTestNode("n4", 4000, 4000) - SetNodeReadyState(unready4, false, time.Now()) + SetNodeReadyState(unready4, false, now) + justReady5 := BuildTestNode("n5", 5000, 5000) + SetNodeReadyState(justReady5, true, now) tn := BuildTestNode("tn", 5000, 5000) tni := schedulerframework.NewNodeInfo() @@ -49,7 +52,7 @@ func TestGetNodeInfosForGroups(t *testing.T) { // Cloud provider with TemplateNodeInfo implemented. provider1 := testprovider.NewTestAutoprovisioningCloudProvider( nil, nil, nil, nil, nil, - map[string]*schedulerframework.NodeInfo{"ng3": tni, "ng4": tni}) + map[string]*schedulerframework.NodeInfo{"ng3": tni, "ng4": tni, "ng5": tni}) provider1.AddNodeGroup("ng1", 1, 10, 1) // Nodegroup with ready node. provider1.AddNode("ng1", ready1) provider1.AddNodeGroup("ng2", 1, 10, 1) // Nodegroup with ready and unready node. @@ -58,10 +61,12 @@ func TestGetNodeInfosForGroups(t *testing.T) { provider1.AddNodeGroup("ng3", 1, 10, 1) // Nodegroup with unready node. provider1.AddNode("ng3", unready4) provider1.AddNodeGroup("ng4", 0, 1000, 0) // Nodegroup without nodes. + provider1.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup with node that recently became ready. + provider1.AddNode("ng5", justReady5) // Cloud provider with TemplateNodeInfo not implemented. provider2 := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil) - provider2.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup without nodes. + provider2.AddNodeGroup("ng6", 1, 10, 1) // Nodegroup without nodes. podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) @@ -76,9 +81,9 @@ func TestGetNodeInfosForGroups(t *testing.T) { ListerRegistry: registry, }, } - res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil) + res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) - assert.Equal(t, 4, len(res)) + assert.Equal(t, 5, len(res)) info, found := res["ng1"] assert.True(t, found) assertEqualNodeCapacities(t, ready1, info.Node()) @@ -91,6 +96,9 @@ func TestGetNodeInfosForGroups(t *testing.T) { info, found = res["ng4"] assert.True(t, found) assertEqualNodeCapacities(t, tn, info.Node()) + info, found = res["ng5"] + assert.True(t, found) + assertEqualNodeCapacities(t, tn, info.Node()) // Test for a nodegroup without nodes and TemplateNodeInfo not implemented by cloud proivder ctx = context.AutoscalingContext{ @@ -100,24 +108,25 @@ func TestGetNodeInfosForGroups(t *testing.T) { ListerRegistry: registry, }, } - res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil) + res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) assert.Equal(t, 0, len(res)) } func TestGetNodeInfosForGroupsCache(t *testing.T) { + now := time.Now() ready1 := BuildTestNode("n1", 1000, 1000) - SetNodeReadyState(ready1, true, time.Now()) + SetNodeReadyState(ready1, true, now.Add(-2*time.Minute)) ready2 := BuildTestNode("n2", 2000, 2000) - SetNodeReadyState(ready2, true, time.Now()) + SetNodeReadyState(ready2, true, now.Add(-2*time.Minute)) unready3 := BuildTestNode("n3", 3000, 3000) - SetNodeReadyState(unready3, false, time.Now()) + SetNodeReadyState(unready3, false, now) unready4 := BuildTestNode("n4", 4000, 4000) - SetNodeReadyState(unready4, false, time.Now()) + SetNodeReadyState(unready4, false, now.Add(-2*time.Minute)) ready5 := BuildTestNode("n5", 5000, 5000) - SetNodeReadyState(ready5, true, time.Now()) + SetNodeReadyState(ready5, true, now.Add(-2*time.Minute)) ready6 := BuildTestNode("n6", 6000, 6000) - SetNodeReadyState(ready6, true, time.Now()) + SetNodeReadyState(ready6, true, now.Add(-2*time.Minute)) tn := BuildTestNode("tn", 10000, 10000) tni := schedulerframework.NewNodeInfo() @@ -159,7 +168,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { }, } niProcessor := NewMixedTemplateNodeInfoProvider() - res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil) + res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) // Check results assert.Equal(t, 4, len(res)) @@ -193,7 +202,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { assert.Equal(t, "ng3", lastDeletedGroup) // Check cache with all nodes removed - res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil) + res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) // Check results assert.Equal(t, 2, len(res)) @@ -215,7 +224,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { infoNg4Node6 := schedulerframework.NewNodeInfo() infoNg4Node6.SetNode(ready6.DeepCopy()) niProcessor.nodeInfoCache = map[string]*schedulerframework.NodeInfo{"ng4": infoNg4Node6} - res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil) + res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) // Check if cache was used assert.NoError(t, err) assert.Equal(t, 2, len(res)) @@ -229,6 +238,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) { t.Helper() + assert.NotEqual(t, actual.Status, nil, "") assert.Equal(t, getNodeResource(expected, apiv1.ResourceCPU), getNodeResource(actual, apiv1.ResourceCPU), "CPU should be the same") assert.Equal(t, getNodeResource(expected, apiv1.ResourceMemory), getNodeResource(actual, apiv1.ResourceMemory), "Memory should be the same") } diff --git a/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go b/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go index 8f6832d269dd..74f31815758e 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go @@ -17,6 +17,8 @@ limitations under the License. package nodeinfosprovider import ( + "time" + appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" @@ -29,7 +31,7 @@ import ( // TemplateNodeInfoProvider is provides the initial nodeInfos set. type TemplateNodeInfoProvider interface { // Process returns a map of nodeInfos for node groups. - Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) + Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, currentTime time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) // CleanUp cleans up processor's internal structures. CleanUp() }