Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't cache NodeInfo for recently Ready nodes #4641

Merged
merged 1 commit into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func expanderOptionToGroupSizeChange(option expander.Option) groupSizeChange {

func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResults {
expandedGroups := make(chan groupSizeChange, 10)
now := time.Now()

groups := make(map[string][]*apiv1.Node)
nodes := make([]*apiv1.Node, len(config.nodes))
Expand All @@ -482,7 +483,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
if n.gpu > 0 {
AddGpusToNode(node, n.gpu)
}
SetNodeReadyState(node, n.ready, time.Now())
SetNodeReadyState(node, n.ready, now.Add(-2*time.Minute))
nodes[i] = node
if n.group != "" {
groups[n.group] = append(groups[n.group], node)
Expand Down Expand Up @@ -529,7 +530,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
}
context.ExpanderStrategy = expander

nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -657,10 +658,12 @@ func buildTestPod(p podConfig) *apiv1.Pod {
}

func TestScaleUpUnhealthy(t *testing.T) {
now := time.Now()
someTimeAgo := now.Add(-2 * time.Minute)
n1 := BuildTestNode("n1", 100, 1000)
SetNodeReadyState(n1, true, time.Now())
SetNodeReadyState(n1, true, someTimeAgo)
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now())
SetNodeReadyState(n2, true, someTimeAgo)

p1 := BuildTestPod("p1", 80, 0)
p2 := BuildTestPod("p2", 800, 0)
Expand Down Expand Up @@ -688,7 +691,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
Expand All @@ -703,7 +706,8 @@ func TestScaleUpUnhealthy(t *testing.T) {

func TestScaleUpNoHelp(t *testing.T) {
n1 := BuildTestNode("n1", 100, 1000)
SetNodeReadyState(n1, true, time.Now())
now := time.Now()
SetNodeReadyState(n1, true, now.Add(-2*time.Minute))

p1 := BuildTestPod("p1", 80, 0)
p1.Spec.NodeName = "n1"
Expand All @@ -728,7 +732,7 @@ func TestScaleUpNoHelp(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
Expand Down Expand Up @@ -765,12 +769,14 @@ func TestScaleUpBalanceGroups(t *testing.T) {
podList := make([]*apiv1.Pod, 0, len(testCfg))
nodes := make([]*apiv1.Node, 0)

now := time.Now()

for gid, gconf := range testCfg {
provider.AddNodeGroup(gid, gconf.min, gconf.max, gconf.size)
for i := 0; i < gconf.size; i++ {
nodeName := fmt.Sprintf("%v-node-%v", gid, i)
node := BuildTestNode(nodeName, 100, 1000)
SetNodeReadyState(node, true, time.Now())
SetNodeReadyState(node, true, now.Add(-2*time.Minute))
nodes = append(nodes, node)

pod := BuildTestPod(fmt.Sprintf("%v-pod-%v", gid, i), 80, 0)
Expand All @@ -793,7 +799,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)

nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -861,7 +867,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 0}

nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -914,7 +920,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 2}

nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return typedErr.AddPrefix("Initialize ClusterSnapshot")
}

nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints)
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints, currentTime)
if autoscalerError != nil {
klog.Errorf("Failed to get node infos for groups: %v", autoscalerError)
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nodeinfosprovider

import (
"reflect"
"time"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -33,6 +34,8 @@ import (
klog "k8s.io/klog/v2"
)

const stabilizationDelay = 1 * time.Minute

// MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups.
type MixedTemplateNodeInfoProvider struct {
nodeInfoCache map[string]*schedulerframework.NodeInfo
Expand All @@ -51,7 +54,7 @@ func (p *MixedTemplateNodeInfoProvider) CleanUp() {
}

// Process returns the nodeInfos set for this cluster
func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, now time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
result := make(map[string]*schedulerframework.NodeInfo)
Expand Down Expand Up @@ -90,7 +93,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,

for _, node := range nodes {
// Broken nodes might have some stuff missing. Skipping.
if !kube_util.IsNodeReadyAndSchedulable(node) {
if !isNodeGoodTemplateCandidate(node, now) {
continue
}
added, id, typedErr := processNode(node)
Expand Down Expand Up @@ -144,19 +147,20 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
// Last resort - unready/unschedulable nodes.
for _, node := range nodes {
// Allowing broken nodes
if !kube_util.IsNodeReadyAndSchedulable(node) {
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulerframework.NodeInfo{}, typedErr
}
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError(
errors.CloudProviderError, err)
}
if added {
klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name)
}
if isNodeGoodTemplateCandidate(node, now) {
continue
}
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulerframework.NodeInfo{}, typedErr
}
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError(
errors.CloudProviderError, err)
}
if added {
klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name)
}
}

Expand All @@ -174,3 +178,10 @@ func getPodsForNodes(listers kube_util.ListerRegistry) (map[string][]*apiv1.Pod,
}
return podsForNodes, nil
}

func isNodeGoodTemplateCandidate(node *apiv1.Node, now time.Time) bool {
ready, lastTransitionTime, _ := kube_util.GetReadinessState(node)
stable := lastTransitionTime.Add(stabilizationDelay).Before(now)
schedulable := !node.Spec.Unschedulable
return ready && stable && schedulable
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ import (
)

func TestGetNodeInfosForGroups(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, time.Now())
SetNodeReadyState(ready1, true, now.Add(-2*time.Minute))
ready2 := BuildTestNode("n2", 2000, 2000)
SetNodeReadyState(ready2, true, time.Now())
SetNodeReadyState(ready2, true, now.Add(-2*time.Minute))
unready3 := BuildTestNode("n3", 3000, 3000)
SetNodeReadyState(unready3, false, time.Now())
SetNodeReadyState(unready3, false, now)
unready4 := BuildTestNode("n4", 4000, 4000)
SetNodeReadyState(unready4, false, time.Now())
SetNodeReadyState(unready4, false, now)
justReady5 := BuildTestNode("n5", 5000, 5000)
SetNodeReadyState(justReady5, true, now)

tn := BuildTestNode("tn", 5000, 5000)
tni := schedulerframework.NewNodeInfo()
Expand All @@ -49,7 +52,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
// Cloud provider with TemplateNodeInfo implemented.
provider1 := testprovider.NewTestAutoprovisioningCloudProvider(
nil, nil, nil, nil, nil,
map[string]*schedulerframework.NodeInfo{"ng3": tni, "ng4": tni})
map[string]*schedulerframework.NodeInfo{"ng3": tni, "ng4": tni, "ng5": tni})
provider1.AddNodeGroup("ng1", 1, 10, 1) // Nodegroup with ready node.
provider1.AddNode("ng1", ready1)
provider1.AddNodeGroup("ng2", 1, 10, 1) // Nodegroup with ready and unready node.
Expand All @@ -58,10 +61,12 @@ func TestGetNodeInfosForGroups(t *testing.T) {
provider1.AddNodeGroup("ng3", 1, 10, 1) // Nodegroup with unready node.
provider1.AddNode("ng3", unready4)
provider1.AddNodeGroup("ng4", 0, 1000, 0) // Nodegroup without nodes.
provider1.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup with node that recently became ready.
provider1.AddNode("ng5", justReady5)

// Cloud provider with TemplateNodeInfo not implemented.
provider2 := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil)
provider2.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup without nodes.
provider2.AddNodeGroup("ng6", 1, 10, 1) // Nodegroup without nodes.

podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
Expand All @@ -76,9 +81,9 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 4, len(res))
assert.Equal(t, 5, len(res))
info, found := res["ng1"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready1, info.Node())
Expand All @@ -91,6 +96,9 @@ func TestGetNodeInfosForGroups(t *testing.T) {
info, found = res["ng4"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())
info, found = res["ng5"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())

// Test for a nodegroup without nodes and TemplateNodeInfo not implemented by cloud proivder
ctx = context.AutoscalingContext{
Expand All @@ -100,24 +108,25 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil)
res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
}

func TestGetNodeInfosForGroupsCache(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, time.Now())
SetNodeReadyState(ready1, true, now.Add(-2*time.Minute))
ready2 := BuildTestNode("n2", 2000, 2000)
SetNodeReadyState(ready2, true, time.Now())
SetNodeReadyState(ready2, true, now.Add(-2*time.Minute))
unready3 := BuildTestNode("n3", 3000, 3000)
SetNodeReadyState(unready3, false, time.Now())
SetNodeReadyState(unready3, false, now)
unready4 := BuildTestNode("n4", 4000, 4000)
SetNodeReadyState(unready4, false, time.Now())
SetNodeReadyState(unready4, false, now.Add(-2*time.Minute))
ready5 := BuildTestNode("n5", 5000, 5000)
SetNodeReadyState(ready5, true, time.Now())
SetNodeReadyState(ready5, true, now.Add(-2*time.Minute))
ready6 := BuildTestNode("n6", 6000, 6000)
SetNodeReadyState(ready6, true, time.Now())
SetNodeReadyState(ready6, true, now.Add(-2*time.Minute))

tn := BuildTestNode("tn", 10000, 10000)
tni := schedulerframework.NewNodeInfo()
Expand Down Expand Up @@ -159,7 +168,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
},
}
niProcessor := NewMixedTemplateNodeInfoProvider()
res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
// Check results
assert.Equal(t, 4, len(res))
Expand Down Expand Up @@ -193,7 +202,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
assert.Equal(t, "ng3", lastDeletedGroup)

// Check cache with all nodes removed
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
// Check results
assert.Equal(t, 2, len(res))
Expand All @@ -215,7 +224,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
infoNg4Node6 := schedulerframework.NewNodeInfo()
infoNg4Node6.SetNode(ready6.DeepCopy())
niProcessor.nodeInfoCache = map[string]*schedulerframework.NodeInfo{"ng4": infoNg4Node6}
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
// Check if cache was used
assert.NoError(t, err)
assert.Equal(t, 2, len(res))
Expand All @@ -229,6 +238,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {

func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) {
t.Helper()
assert.NotEqual(t, actual.Status, nil, "")
assert.Equal(t, getNodeResource(expected, apiv1.ResourceCPU), getNodeResource(actual, apiv1.ResourceCPU), "CPU should be the same")
assert.Equal(t, getNodeResource(expected, apiv1.ResourceMemory), getNodeResource(actual, apiv1.ResourceMemory), "Memory should be the same")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package nodeinfosprovider

import (
"time"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"

Expand All @@ -29,7 +31,7 @@ import (
// TemplateNodeInfoProvider is provides the initial nodeInfos set.
type TemplateNodeInfoProvider interface {
// Process returns a map of nodeInfos for node groups.
Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError)
Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, currentTime time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError)
// CleanUp cleans up processor's internal structures.
CleanUp()
}
Expand Down