Skip to content

Commit

Permalink
Don't cache NodeInfo for recently Ready nodes
Browse files Browse the repository at this point in the history
There's a race condition between DaemonSet pods getting scheduled to a
new node and Cluster Autoscaler caching that node for the sake of
predicting future nodes in a given node group. We can reduce the risk of
missing some DaemonSet by providing a grace period before accepting nodes in the
cache. 1 minute should be more than enough, except for some pathological
edge cases.
  • Loading branch information
x13n committed Jan 26, 2022
1 parent 5c741c8 commit 9944137
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 46 deletions.
28 changes: 17 additions & 11 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func expanderOptionToGroupSizeChange(option expander.Option) groupSizeChange {

func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResults {
expandedGroups := make(chan groupSizeChange, 10)
now := time.Now()

groups := make(map[string][]*apiv1.Node)
nodes := make([]*apiv1.Node, len(config.nodes))
Expand All @@ -482,7 +483,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
if n.gpu > 0 {
AddGpusToNode(node, n.gpu)
}
SetNodeReadyState(node, n.ready, time.Now())
SetNodeReadyState(node, n.ready, now.Add(-2*time.Minute))
nodes[i] = node
if n.group != "" {
groups[n.group] = append(groups[n.group], node)
Expand Down Expand Up @@ -529,7 +530,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
}
context.ExpanderStrategy = expander

nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -657,10 +658,12 @@ func buildTestPod(p podConfig) *apiv1.Pod {
}

func TestScaleUpUnhealthy(t *testing.T) {
now := time.Now()
someTimeAgo := now.Add(-2 * time.Minute)
n1 := BuildTestNode("n1", 100, 1000)
SetNodeReadyState(n1, true, time.Now())
SetNodeReadyState(n1, true, someTimeAgo)
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now())
SetNodeReadyState(n2, true, someTimeAgo)

p1 := BuildTestPod("p1", 80, 0)
p2 := BuildTestPod("p2", 800, 0)
Expand Down Expand Up @@ -688,7 +691,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
Expand All @@ -703,7 +706,8 @@ func TestScaleUpUnhealthy(t *testing.T) {

func TestScaleUpNoHelp(t *testing.T) {
n1 := BuildTestNode("n1", 100, 1000)
SetNodeReadyState(n1, true, time.Now())
now := time.Now()
SetNodeReadyState(n1, true, now.Add(-2*time.Minute))

p1 := BuildTestPod("p1", 80, 0)
p1.Spec.NodeName = "n1"
Expand All @@ -728,7 +732,7 @@ func TestScaleUpNoHelp(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
Expand Down Expand Up @@ -765,12 +769,14 @@ func TestScaleUpBalanceGroups(t *testing.T) {
podList := make([]*apiv1.Pod, 0, len(testCfg))
nodes := make([]*apiv1.Node, 0)

now := time.Now()

for gid, gconf := range testCfg {
provider.AddNodeGroup(gid, gconf.min, gconf.max, gconf.size)
for i := 0; i < gconf.size; i++ {
nodeName := fmt.Sprintf("%v-node-%v", gid, i)
node := BuildTestNode(nodeName, 100, 1000)
SetNodeReadyState(node, true, time.Now())
SetNodeReadyState(node, true, now.Add(-2*time.Minute))
nodes = append(nodes, node)

pod := BuildTestPod(fmt.Sprintf("%v-pod-%v", gid, i), 80, 0)
Expand All @@ -793,7 +799,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)

nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -861,7 +867,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 0}

nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -914,7 +920,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 2}

nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return typedErr.AddPrefix("Initialize ClusterSnapshot")
}

nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints)
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints, currentTime)
if autoscalerError != nil {
klog.Errorf("Failed to get node infos for groups: %v", autoscalerError)
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nodeinfosprovider

import (
"reflect"
"time"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -33,6 +34,8 @@ import (
klog "k8s.io/klog/v2"
)

const stabilizationDelay = 1 * time.Minute

// MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups.
type MixedTemplateNodeInfoProvider struct {
nodeInfoCache map[string]*schedulerframework.NodeInfo
Expand All @@ -51,7 +54,7 @@ func (p *MixedTemplateNodeInfoProvider) CleanUp() {
}

// Process returns the nodeInfos set for this cluster
func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, now time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
result := make(map[string]*schedulerframework.NodeInfo)
Expand Down Expand Up @@ -90,7 +93,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,

for _, node := range nodes {
// Broken nodes might have some stuff missing. Skipping.
if !kube_util.IsNodeReadyAndSchedulable(node) {
if !isNodeGoodTemplateCandidate(node, now) {
continue
}
added, id, typedErr := processNode(node)
Expand Down Expand Up @@ -144,19 +147,20 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
// Last resort - unready/unschedulable nodes.
for _, node := range nodes {
// Allowing broken nodes
if !kube_util.IsNodeReadyAndSchedulable(node) {
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulerframework.NodeInfo{}, typedErr
}
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError(
errors.CloudProviderError, err)
}
if added {
klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name)
}
if isNodeGoodTemplateCandidate(node, now) {
continue
}
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulerframework.NodeInfo{}, typedErr
}
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError(
errors.CloudProviderError, err)
}
if added {
klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name)
}
}

Expand All @@ -174,3 +178,10 @@ func getPodsForNodes(listers kube_util.ListerRegistry) (map[string][]*apiv1.Pod,
}
return podsForNodes, nil
}

func isNodeGoodTemplateCandidate(node *apiv1.Node, now time.Time) bool {
ready, lastTransitionTime, _ := kube_util.GetReadinessState(node)
stable := lastTransitionTime.Add(stabilizationDelay).Before(now)
schedulable := !node.Spec.Unschedulable
return ready && stable && schedulable
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ import (
)

func TestGetNodeInfosForGroups(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, time.Now())
SetNodeReadyState(ready1, true, now.Add(-2*time.Minute))
ready2 := BuildTestNode("n2", 2000, 2000)
SetNodeReadyState(ready2, true, time.Now())
SetNodeReadyState(ready2, true, now.Add(-2*time.Minute))
unready3 := BuildTestNode("n3", 3000, 3000)
SetNodeReadyState(unready3, false, time.Now())
SetNodeReadyState(unready3, false, now)
unready4 := BuildTestNode("n4", 4000, 4000)
SetNodeReadyState(unready4, false, time.Now())
SetNodeReadyState(unready4, false, now)
justReady5 := BuildTestNode("n5", 5000, 5000)
SetNodeReadyState(justReady5, true, now)

tn := BuildTestNode("tn", 5000, 5000)
tni := schedulerframework.NewNodeInfo()
Expand All @@ -49,7 +52,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
// Cloud provider with TemplateNodeInfo implemented.
provider1 := testprovider.NewTestAutoprovisioningCloudProvider(
nil, nil, nil, nil, nil,
map[string]*schedulerframework.NodeInfo{"ng3": tni, "ng4": tni})
map[string]*schedulerframework.NodeInfo{"ng3": tni, "ng4": tni, "ng5": tni})
provider1.AddNodeGroup("ng1", 1, 10, 1) // Nodegroup with ready node.
provider1.AddNode("ng1", ready1)
provider1.AddNodeGroup("ng2", 1, 10, 1) // Nodegroup with ready and unready node.
Expand All @@ -58,10 +61,12 @@ func TestGetNodeInfosForGroups(t *testing.T) {
provider1.AddNodeGroup("ng3", 1, 10, 1) // Nodegroup with unready node.
provider1.AddNode("ng3", unready4)
provider1.AddNodeGroup("ng4", 0, 1000, 0) // Nodegroup without nodes.
provider1.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup with node that recently became ready.
provider1.AddNode("ng5", justReady5)

// Cloud provider with TemplateNodeInfo not implemented.
provider2 := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil)
provider2.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup without nodes.
provider2.AddNodeGroup("ng6", 1, 10, 1) // Nodegroup without nodes.

podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
Expand All @@ -76,9 +81,9 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 4, len(res))
assert.Equal(t, 5, len(res))
info, found := res["ng1"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready1, info.Node())
Expand All @@ -91,6 +96,9 @@ func TestGetNodeInfosForGroups(t *testing.T) {
info, found = res["ng4"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())
info, found = res["ng5"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())

// Test for a nodegroup without nodes and TemplateNodeInfo not implemented by cloud proivder
ctx = context.AutoscalingContext{
Expand All @@ -100,24 +108,25 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil)
res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
}

func TestGetNodeInfosForGroupsCache(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, time.Now())
SetNodeReadyState(ready1, true, now.Add(-2*time.Minute))
ready2 := BuildTestNode("n2", 2000, 2000)
SetNodeReadyState(ready2, true, time.Now())
SetNodeReadyState(ready2, true, now.Add(-2*time.Minute))
unready3 := BuildTestNode("n3", 3000, 3000)
SetNodeReadyState(unready3, false, time.Now())
SetNodeReadyState(unready3, false, now)
unready4 := BuildTestNode("n4", 4000, 4000)
SetNodeReadyState(unready4, false, time.Now())
SetNodeReadyState(unready4, false, now.Add(-2*time.Minute))
ready5 := BuildTestNode("n5", 5000, 5000)
SetNodeReadyState(ready5, true, time.Now())
SetNodeReadyState(ready5, true, now.Add(-2*time.Minute))
ready6 := BuildTestNode("n6", 6000, 6000)
SetNodeReadyState(ready6, true, time.Now())
SetNodeReadyState(ready6, true, now.Add(-2*time.Minute))

tn := BuildTestNode("tn", 10000, 10000)
tni := schedulerframework.NewNodeInfo()
Expand Down Expand Up @@ -159,7 +168,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
},
}
niProcessor := NewMixedTemplateNodeInfoProvider()
res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
// Check results
assert.Equal(t, 4, len(res))
Expand Down Expand Up @@ -193,7 +202,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
assert.Equal(t, "ng3", lastDeletedGroup)

// Check cache with all nodes removed
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
// Check results
assert.Equal(t, 2, len(res))
Expand All @@ -215,7 +224,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
infoNg4Node6 := schedulerframework.NewNodeInfo()
infoNg4Node6.SetNode(ready6.DeepCopy())
niProcessor.nodeInfoCache = map[string]*schedulerframework.NodeInfo{"ng4": infoNg4Node6}
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
// Check if cache was used
assert.NoError(t, err)
assert.Equal(t, 2, len(res))
Expand All @@ -229,6 +238,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {

func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) {
t.Helper()
assert.NotEqual(t, actual.Status, nil, "")
assert.Equal(t, getNodeResource(expected, apiv1.ResourceCPU), getNodeResource(actual, apiv1.ResourceCPU), "CPU should be the same")
assert.Equal(t, getNodeResource(expected, apiv1.ResourceMemory), getNodeResource(actual, apiv1.ResourceMemory), "Memory should be the same")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package nodeinfosprovider

import (
"time"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"

Expand All @@ -29,7 +31,7 @@ import (
// TemplateNodeInfoProvider is provides the initial nodeInfos set.
type TemplateNodeInfoProvider interface {
// Process returns a map of nodeInfos for node groups.
Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError)
Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, currentTime time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError)
// CleanUp cleans up processor's internal structures.
CleanUp()
}
Expand Down

0 comments on commit 9944137

Please sign in to comment.