Skip to content

Commit

Permalink
Don't cache NodeInfo for recently Ready nodes
Browse files Browse the repository at this point in the history
There's a race condition between DaemonSet pods getting scheduled to a
new node and Cluster Autoscaler caching that node for the sake of
predicting future nodes in a given node group. We can reduce the risk of
missing some DaemonSet by providing a grace period before accepting nodes in the
cache. 1 minute should be more than enough, except for some pathological
edge cases.
  • Loading branch information
x13n committed Jan 24, 2022
1 parent 5c741c8 commit e0b9a31
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nodeinfosprovider

import (
"reflect"
"time"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -33,6 +34,8 @@ import (
klog "k8s.io/klog/v2"
)

const stabilizationDelay = 1 * time.Minute

// MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups.
type MixedTemplateNodeInfoProvider struct {
nodeInfoCache map[string]*schedulerframework.NodeInfo
Expand All @@ -56,6 +59,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
result := make(map[string]*schedulerframework.NodeInfo)
seenGroups := make(map[string]bool)
now := time.Now()

podsForNodes, err := getPodsForNodes(ctx.ListerRegistry)
if err != nil {
Expand Down Expand Up @@ -90,7 +94,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,

for _, node := range nodes {
// Broken nodes might have some stuff missing. Skipping.
if !kube_util.IsNodeReadyAndSchedulable(node) {
if !isNodeGoodForCaching(node, now) {
continue
}
added, id, typedErr := processNode(node)
Expand Down Expand Up @@ -144,19 +148,20 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
// Last resort - unready/unschedulable nodes.
for _, node := range nodes {
// Allowing broken nodes
if !kube_util.IsNodeReadyAndSchedulable(node) {
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulerframework.NodeInfo{}, typedErr
}
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError(
errors.CloudProviderError, err)
}
if added {
klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name)
}
if isNodeGoodForCaching(node, now) {
continue
}
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulerframework.NodeInfo{}, typedErr
}
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError(
errors.CloudProviderError, err)
}
if added {
klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name)
}
}

Expand All @@ -174,3 +179,10 @@ func getPodsForNodes(listers kube_util.ListerRegistry) (map[string][]*apiv1.Pod,
}
return podsForNodes, nil
}

func isNodeGoodForCaching(node *apiv1.Node, now time.Time) bool {
ready, lastTransitionTime, _ := kube_util.GetReadinessState(node)
stable := lastTransitionTime.Add(stabilizationDelay).Before(now)
schedulable := !node.Spec.Unschedulable
return ready && stable && schedulable
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import (

func TestGetNodeInfosForGroups(t *testing.T) {
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, time.Now())
SetNodeReadyState(ready1, true, time.Now().Add(-2*time.Minute))
ready2 := BuildTestNode("n2", 2000, 2000)
SetNodeReadyState(ready2, true, time.Now())
SetNodeReadyState(ready2, true, time.Now().Add(-2*time.Minute))
unready3 := BuildTestNode("n3", 3000, 3000)
SetNodeReadyState(unready3, false, time.Now())
unready4 := BuildTestNode("n4", 4000, 4000)
SetNodeReadyState(unready4, false, time.Now())
justReady5 := BuildTestNode("n5", 5000, 5000)
SetNodeReadyState(justReady5, true, time.Now())

tn := BuildTestNode("tn", 5000, 5000)
tni := schedulerframework.NewNodeInfo()
Expand All @@ -49,7 +51,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
// Cloud provider with TemplateNodeInfo implemented.
provider1 := testprovider.NewTestAutoprovisioningCloudProvider(
nil, nil, nil, nil, nil,
map[string]*schedulerframework.NodeInfo{"ng3": tni, "ng4": tni})
map[string]*schedulerframework.NodeInfo{"ng3": tni, "ng4": tni, "ng5": tni})
provider1.AddNodeGroup("ng1", 1, 10, 1) // Nodegroup with ready node.
provider1.AddNode("ng1", ready1)
provider1.AddNodeGroup("ng2", 1, 10, 1) // Nodegroup with ready and unready node.
Expand All @@ -58,10 +60,12 @@ func TestGetNodeInfosForGroups(t *testing.T) {
provider1.AddNodeGroup("ng3", 1, 10, 1) // Nodegroup with unready node.
provider1.AddNode("ng3", unready4)
provider1.AddNodeGroup("ng4", 0, 1000, 0) // Nodegroup without nodes.
provider1.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup with node that recently became ready.
provider1.AddNode("ng5", justReady5)

// Cloud provider with TemplateNodeInfo not implemented.
provider2 := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil)
provider2.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup without nodes.
provider2.AddNodeGroup("ng6", 1, 10, 1) // Nodegroup without nodes.

podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
Expand All @@ -76,9 +80,9 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil)
assert.NoError(t, err)
assert.Equal(t, 4, len(res))
assert.Equal(t, 5, len(res))
info, found := res["ng1"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready1, info.Node())
Expand All @@ -91,6 +95,9 @@ func TestGetNodeInfosForGroups(t *testing.T) {
info, found = res["ng4"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())
info, found = res["ng5"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())

// Test for a nodegroup without nodes and TemplateNodeInfo not implemented by cloud proivder
ctx = context.AutoscalingContext{
Expand All @@ -107,17 +114,17 @@ func TestGetNodeInfosForGroups(t *testing.T) {

func TestGetNodeInfosForGroupsCache(t *testing.T) {
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, time.Now())
SetNodeReadyState(ready1, true, time.Now().Add(-2*time.Minute))
ready2 := BuildTestNode("n2", 2000, 2000)
SetNodeReadyState(ready2, true, time.Now())
SetNodeReadyState(ready2, true, time.Now().Add(-2*time.Minute))
unready3 := BuildTestNode("n3", 3000, 3000)
SetNodeReadyState(unready3, false, time.Now())
unready4 := BuildTestNode("n4", 4000, 4000)
SetNodeReadyState(unready4, false, time.Now())
SetNodeReadyState(unready4, false, time.Now().Add(-2*time.Minute))
ready5 := BuildTestNode("n5", 5000, 5000)
SetNodeReadyState(ready5, true, time.Now())
SetNodeReadyState(ready5, true, time.Now().Add(-2*time.Minute))
ready6 := BuildTestNode("n6", 6000, 6000)
SetNodeReadyState(ready6, true, time.Now())
SetNodeReadyState(ready6, true, time.Now().Add(-2*time.Minute))

tn := BuildTestNode("tn", 10000, 10000)
tni := schedulerframework.NewNodeInfo()
Expand Down Expand Up @@ -229,6 +236,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {

func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) {
t.Helper()
assert.NotEqual(t, actual.Status, nil, "")
assert.Equal(t, getNodeResource(expected, apiv1.ResourceCPU), getNodeResource(actual, apiv1.ResourceCPU), "CPU should be the same")
assert.Equal(t, getNodeResource(expected, apiv1.ResourceMemory), getNodeResource(actual, apiv1.ResourceMemory), "Memory should be the same")
}
Expand Down

0 comments on commit e0b9a31

Please sign in to comment.