Skip to content

Commit

Permalink
Don't cache NodeInfo for recently Ready nodes
Browse files Browse the repository at this point in the history
There's a race condition between DaemonSet pods getting scheduled to a
new node and Cluster Autoscaler caching that node for the sake of
predicting future nodes in a given node group. We can reduce the risk of
missing some DaemonSet by providing a grace period before accepting nodes in the
cache. 1 minute should be more than enough, except for some pathological
edge cases.
  • Loading branch information
x13n committed Jan 24, 2022
1 parent 5c741c8 commit 93152c1
Showing 1 changed file with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
klog "k8s.io/klog/v2"
)

const stabilizationDelay = 1 * time.Minute

// MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups.
type MixedTemplateNodeInfoProvider struct {
nodeInfoCache map[string]*schedulerframework.NodeInfo
Expand All @@ -56,6 +58,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
result := make(map[string]*schedulerframework.NodeInfo)
seenGroups := make(map[string]bool)
now := time.Now()

podsForNodes, err := getPodsForNodes(ctx.ListerRegistry)
if err != nil {
Expand Down Expand Up @@ -90,7 +93,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,

for _, node := range nodes {
// Broken nodes might have some stuff missing. Skipping.
if !kube_util.IsNodeReadyAndSchedulable(node) {
if !isNodeGoodForCaching(node, now) {
continue
}
added, id, typedErr := processNode(node)
Expand Down Expand Up @@ -144,19 +147,20 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
// Last resort - unready/unschedulable nodes.
for _, node := range nodes {
// Allowing broken nodes
if !kube_util.IsNodeReadyAndSchedulable(node) {
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulerframework.NodeInfo{}, typedErr
}
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError(
errors.CloudProviderError, err)
}
if added {
klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name)
}
if isNodeGoodForCaching(node, now) {
continue
}
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulerframework.NodeInfo{}, typedErr
}
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return map[string]*schedulerframework.NodeInfo{}, errors.ToAutoscalerError(
errors.CloudProviderError, err)
}
if added {
klog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name)
}
}

Expand All @@ -174,3 +178,10 @@ func getPodsForNodes(listers kube_util.ListerRegistry) (map[string][]*apiv1.Pod,
}
return podsForNodes, nil
}

func isNodeGoodForCaching(node *apiv1.Node, now time.Time) bool {
ready, lastTransitionTime, _ := kube_util.GetReadinessState(node)
stable := lastTransitionTime.Add(stabilizationDelay).Before(now)
schedulable := !node.Spec.Unschedulable
return ready && stable && schedulable
}

0 comments on commit 93152c1

Please sign in to comment.