Skip to content

Commit

Permalink
NodeInfo processor to Refine synthetic NodeInfos
Browse files Browse the repository at this point in the history
Comparing synthetic NodeInfos obtained from nodegroup's TemplateInfo()
to NodeInfos obtained from real-world nodes is bound to fail, even with
kube reservations provided through nodegroups labels/annotations
(for instance: kernel mem reservation is hard to predict).

This makes `balance-similar-node-groups` likely to misbehave when
`scale-up-from-zero` is enabled (and a first nodegroup gets a real
node), for instance.

Following [Maciek Pytel suggestion](#3608 (comment))
(from discussions on a previous attempt at solving this), we can
implement a NodeInfo Processor that would improve template-generated
NodeInfos whenever a node was created off a similar nodegroup.

We're storing node's virtual origin through machineid, which works
fine but is a bit ugly (suggestions welcome).

Tested this solves balance-similar-node-groups + scale-up-from-zero,
with various instance types on AWS and GCP.

Previous attempts to solve that issue/discussions:
* #2892 (comment)
* #3608 (comment)
  • Loading branch information
bpineau committed Jan 18, 2021
1 parent a9cf777 commit 449879e
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 4 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,7 @@ type AutoscalingOptions struct {
ClusterAPICloudConfigAuthoritative bool
// Enable or disable cordon nodes functionality before terminating the node during downscale process
CordonNodeBeforeTerminate bool
// EnableRefineUsingSimilarNodeGroups tells wether cluster-autoscaler should try to refine synthetic nodeinfos (generated
// by using cloud providers TemplateInfos()) by using nodeinfos obtained from real-world nodes from similar nodegroups.
EnableRefineUsingSimilarNodeGroups bool
}
23 changes: 20 additions & 3 deletions cluster-autoscaler/core/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
klog "k8s.io/klog/v2"
)

const templatedMachineID = "autoscaler-node-from-template"

// GetNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping.
func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulerframework.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
Expand Down Expand Up @@ -67,7 +69,7 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedu
if err != nil {
return false, "", err
}
sanitizedNodeInfo, err := sanitizeNodeInfo(nodeInfo, id, ignoredTaints)
sanitizedNodeInfo, err := SanitizeNodeInfo(nodeInfo, id, ignoredTaints)
if err != nil {
return false, "", err
}
Expand Down Expand Up @@ -181,13 +183,27 @@ func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*ap
}
fullNodeInfo := schedulerframework.NewNodeInfo(pods...)
fullNodeInfo.SetNode(baseNodeInfo.Node())
sanitizedNodeInfo, typedErr := sanitizeNodeInfo(fullNodeInfo, id, ignoredTaints)
SetNodeInfoBuiltFromTemplate(fullNodeInfo)
sanitizedNodeInfo, typedErr := SanitizeNodeInfo(fullNodeInfo, id, ignoredTaints)
if typedErr != nil {
return nil, typedErr
}
return sanitizedNodeInfo, nil
}

// SetNodeInfoBuiltFromTemplate marks a NodeInfo as generated from a cloud provider's TemplateInfo()
func SetNodeInfoBuiltFromTemplate(nodeInfo *schedulerframework.NodeInfo) {
node := nodeInfo.Node()
node.Status.NodeInfo.MachineID = templatedMachineID
nodeInfo.SetNode(node)
}

// IsNodeInfoBuiltFromTemplate returns true if a NodeInfo was generated from a cloud
// provider's TemplateInfo(), false if generated from a real-world node.
func IsNodeInfoBuiltFromTemplate(nodeInfo *schedulerframework.NodeInfo) bool {
return nodeInfo.Node().Status.NodeInfo.MachineID == templatedMachineID
}

// isVirtualNode determines if the node is created by virtual kubelet
func isVirtualNode(node *apiv1.Node) bool {
return node.ObjectMeta.Labels["type"] == "virtual-kubelet"
Expand Down Expand Up @@ -228,7 +244,8 @@ func deepCopyNodeInfo(nodeInfo *schedulerframework.NodeInfo) (*schedulerframewor
return newNodeInfo, nil
}

func sanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
// SanitizeNodeInfo updates a provided, generic NodeInfo to match the nodegroup's name
func SanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
// Sanitize node name.
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName, ignoredTaints)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestSanitizeNodeInfo(t *testing.T) {
nodeInfo := schedulerframework.NewNodeInfo(pod)
nodeInfo.SetNode(node)

res, err := sanitizeNodeInfo(nodeInfo, "test-group", nil)
res, err := SanitizeNodeInfo(nodeInfo, "test-group", nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(res.Pods))
}
Expand Down
8 changes: 8 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfos"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -176,6 +177,7 @@ var (
enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled")
clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only")
cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process")
enableRefineUsingSimilarNodeGroups = flag.Bool("refine-using-similar-nodegroups", false, "Refine empty nodegoups capacities evaluations using real-world nodes from similar nodegroups when available.")
)

func createAutoscalingOptions() config.AutoscalingOptions {
Expand Down Expand Up @@ -247,6 +249,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ConcurrentGceRefreshes: *concurrentGceRefreshes,
ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative,
CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate,
EnableRefineUsingSimilarNodeGroups: *enableRefineUsingSimilarNodeGroups,
}
}

Expand Down Expand Up @@ -321,6 +324,11 @@ func buildAutoscaler() (core.Autoscaler, error) {
Comparator: nodeInfoComparatorBuilder(autoscalingOptions.BalancingExtraIgnoredLabels),
}

opts.Processors.NodeInfoProcessor = &nodeinfos.RefineNodeInfosProcessor{
NodeGroupSetProcessor: opts.Processors.NodeGroupSetProcessor,
RefineUsingSimilarNodeGroups: autoscalingOptions.EnableRefineUsingSimilarNodeGroups,
}

// These metrics should be published only once.
metrics.UpdateNapEnabled(autoscalingOptions.NodeAutoprovisioningEnabled)
metrics.UpdateMaxNodesCount(autoscalingOptions.MaxNodesTotal)
Expand Down
178 changes: 178 additions & 0 deletions cluster-autoscaler/processors/nodeinfos/refine_node_infos_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeinfos

import (
"sync"
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
klog "k8s.io/klog/v2"

schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

const templateNodeInfoCacheTTL = 10 * time.Minute

// RefineNodeInfosProcessor improves NodeInfos accuracy where possible.
// For now it adjusts synthetic NodeInfos generated by TemplateInfo() with more accurate
// NodeInfos obtained from real-world nodes for similar nodegroups (when available).
type RefineNodeInfosProcessor struct {
NodeGroupSetProcessor nodegroupset.NodeGroupSetProcessor
RefineUsingSimilarNodeGroups bool
nodeInfoCache nodeInfoCache
}

type nodeInfoCache struct {
sync.RWMutex
nodeInfos map[string]*schedulerframework.NodeInfo
lastUpdated time.Time
}

// Process refines nodeInfos obtained by TemplateInfos when possible
func (p *RefineNodeInfosProcessor) Process(ctx *context.AutoscalingContext, nodeInfosForNodeGroups map[string]*schedulerframework.NodeInfo) (map[string]*schedulerframework.NodeInfo, error) {
inaccurateNodeInfos := make(map[string]*schedulerframework.NodeInfo)
templatesNodeInfo := make(map[string]*schedulerframework.NodeInfo)

// for now this processor only supports refining by leveraging similar nodegroups
if !p.RefineUsingSimilarNodeGroups {
return nodeInfosForNodeGroups, nil
}

nodeGroups := make(map[string]cloudprovider.NodeGroup)
for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
nodeGroups[nodeGroup.Id()] = nodeGroup
}

if err := p.nodeInfoCache.update(nodeGroups); err != nil {
return nodeInfosForNodeGroups, err
}

ignoredTaints := make(taints.TaintKeySet)
for _, taintKey := range ctx.IgnoredTaints {
ignoredTaints[taintKey] = true
}

// build comparable (all from templates) NodeInfos
for groupID, nodeInfo := range nodeInfosForNodeGroups {
templatedNodeInfo, ok := p.nodeInfoCache.nodeInfos[groupID]
if !ok || templatedNodeInfo == nil {
continue
}

templatesNodeInfo[groupID] = templatedNodeInfo

if utils.IsNodeInfoBuiltFromTemplate(nodeInfo) {
inaccurateNodeInfos[groupID] = templatedNodeInfo
}
}

// refine inaccurate nodeinfos when we can find similar nodeInfos built from real nodes
for groupID, nodeInfo := range inaccurateNodeInfos {
nodeGroup, ok := nodeGroups[groupID]
if !ok {
continue
}

similars, err := p.NodeGroupSetProcessor.FindSimilarNodeGroups(ctx, nodeGroup, templatesNodeInfo)
if err != nil {
klog.Warningf("Failed to lookup for matching node groups for %s: %v", groupID, err)
return nodeInfosForNodeGroups, nil
}

for _, nodeGroup := range similars {
similarInfo, found := nodeInfosForNodeGroups[nodeGroup.Id()]
if !found || utils.IsNodeInfoBuiltFromTemplate(similarInfo) {
continue
}
refinedNodeInfo, err := mirrorNodeInfoFromSimilarGroup(nodeInfo, similarInfo, groupID, ignoredTaints)
if err != nil {
return nodeInfosForNodeGroups, err
}
nodeInfosForNodeGroups[groupID] = refinedNodeInfo
break
}
}

return nodeInfosForNodeGroups, nil
}

// CleanUp cleans up processor's internal structures.
func (p *RefineNodeInfosProcessor) CleanUp() {
}

func mirrorNodeInfoFromSimilarGroup(oldInfo, newInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
newInfoCopy := newInfo.Clone()

// keep locality labels (region, zone, failure domain, ...) from original template
newNode := newInfoCopy.Node().DeepCopy()
for label, val := range oldInfo.Node().GetLabels() {
if _, found := nodegroupset.BasicIgnoredLabels[label]; found {
newNode.Labels[label] = val
}
}
newInfoCopy.SetNode(newNode)
utils.SetNodeInfoBuiltFromTemplate(newInfoCopy)

// deep copy pods and set node name (in node, pods, labels...) to the nodegroup's
return utils.SanitizeNodeInfo(newInfoCopy, nodeGroupName, ignoredTaints)
}

func (n *nodeInfoCache) update(nodeGroups map[string]cloudprovider.NodeGroup) error {
n.Lock()
defer n.Unlock()

if len(n.nodeInfos) == 0 {
n.nodeInfos = make(map[string]*schedulerframework.NodeInfo)
n.lastUpdated = time.Now()
}
needsRefresh := n.lastUpdated.Add(templateNodeInfoCacheTTL).Before(time.Now())

for groupID := range n.nodeInfos {
if _, ok := nodeGroups[groupID]; !ok {
delete(n.nodeInfos, groupID)
}
}

for groupID, nodeGroup := range nodeGroups {
cachedNodeInfo, ok := n.nodeInfos[groupID]
if ok && !needsRefresh && !utils.IsNodeInfoBuiltFromTemplate(cachedNodeInfo) {
continue
}

nodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
if err == cloudprovider.ErrNotImplemented {
continue
} else {
return errors.ToAutoscalerError(errors.CloudProviderError, err)
}
}
n.nodeInfos[groupID] = nodeInfo
}

if needsRefresh {
n.lastUpdated = time.Now()
}

return nil
}
Loading

0 comments on commit 449879e

Please sign in to comment.