Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NodeInfo processor to refine template-based NodeInfos #3761

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type AutoscalingOptions struct {
StatusConfigMapName string
// BalanceSimilarNodeGroups enables logic that identifies node groups with similar machines and tries to balance node count between them.
BalanceSimilarNodeGroups bool
// EnableRefineUsingSimilarNodeGroups tells wether cluster-autoscaler should try to refine synthetic nodeinfos (generated
// by using cloud providers TemplateInfos()) by using nodeinfos obtained from real-world nodes from similar nodegroups.
EnableRefineUsingSimilarNodeGroups bool
// ConfigNamespace is the namespace cluster-autoscaler is running in and all related configmaps live in
ConfigNamespace string
// ClusterName if available
Expand Down
27 changes: 24 additions & 3 deletions cluster-autoscaler/core/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
klog "k8s.io/klog/v2"
)

const templatedNodeAnnotation = "synthetic-autoscaler-node-from-template"

// GetNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping.
func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulerframework.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
Expand Down Expand Up @@ -67,7 +69,7 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedu
if err != nil {
return false, "", err
}
sanitizedNodeInfo, err := sanitizeNodeInfo(nodeInfo, id, ignoredTaints)
sanitizedNodeInfo, err := SanitizeNodeInfo(nodeInfo, id, ignoredTaints)
if err != nil {
return false, "", err
}
Expand Down Expand Up @@ -181,13 +183,31 @@ func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*ap
}
fullNodeInfo := schedulerframework.NewNodeInfo(pods...)
fullNodeInfo.SetNode(baseNodeInfo.Node())
sanitizedNodeInfo, typedErr := sanitizeNodeInfo(fullNodeInfo, id, ignoredTaints)
SetNodeInfoBuiltFromTemplate(fullNodeInfo)
sanitizedNodeInfo, typedErr := SanitizeNodeInfo(fullNodeInfo, id, ignoredTaints)
if typedErr != nil {
return nil, typedErr
}
return sanitizedNodeInfo, nil
}

// SetNodeInfoBuiltFromTemplate marks a NodeInfo as generated from a cloud provider's TemplateInfo()
func SetNodeInfoBuiltFromTemplate(nodeInfo *schedulerframework.NodeInfo) {
node := nodeInfo.Node()
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[templatedNodeAnnotation] = "true"
nodeInfo.SetNode(node)
}

// IsNodeInfoBuiltFromTemplate returns true if a NodeInfo was generated from a cloud
// provider's TemplateInfo(), false if generated from a real-world node.
func IsNodeInfoBuiltFromTemplate(nodeInfo *schedulerframework.NodeInfo) bool {
_, ok := nodeInfo.Node().Annotations[templatedNodeAnnotation]
return ok
}

// isVirtualNode determines if the node is created by virtual kubelet
func isVirtualNode(node *apiv1.Node) bool {
return node.ObjectMeta.Labels["type"] == "virtual-kubelet"
Expand Down Expand Up @@ -228,7 +248,8 @@ func deepCopyNodeInfo(nodeInfo *schedulerframework.NodeInfo) (*schedulerframewor
return newNodeInfo, nil
}

func sanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
// SanitizeNodeInfo updates a provided, generic NodeInfo to match the nodegroup's name
func SanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
// Sanitize node name.
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName, ignoredTaints)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestSanitizeNodeInfo(t *testing.T) {
nodeInfo := schedulerframework.NewNodeInfo(pod)
nodeInfo.SetNode(node)

res, err := sanitizeNodeInfo(nodeInfo, "test-group", nil)
res, err := SanitizeNodeInfo(nodeInfo, "test-group", nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(res.Pods))
}
Expand Down
10 changes: 10 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfos"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -170,6 +171,7 @@ var (
regional = flag.Bool("regional", false, "Cluster is regional.")
newPodScaleUpDelay = flag.Duration("new-pod-scale-up-delay", 0*time.Second, "Pods less than this old will not be considered for scale-up.")

enableRefineUsingSimilarNodeGroups = flag.Bool("refine-using-similar-nodegroups", false, "Mitigate unbalance among similar node groups when scale-up-from-zero is enabled")
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group")
balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar")
awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only")
Expand Down Expand Up @@ -237,6 +239,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
WriteStatusConfigMap: *writeStatusConfigMapFlag,
StatusConfigMapName: *statusConfigMapName,
BalanceSimilarNodeGroups: *balanceSimilarNodeGroupsFlag,
EnableRefineUsingSimilarNodeGroups: *enableRefineUsingSimilarNodeGroups,
ConfigNamespace: *namespace,
ClusterName: *clusterName,
NodeAutoprovisioningEnabled: *nodeAutoprovisioningEnabled,
Expand Down Expand Up @@ -329,6 +332,13 @@ func buildAutoscaler() (core.Autoscaler, error) {
Comparator: nodeInfoComparatorBuilder(autoscalingOptions.BalancingExtraIgnoredLabels),
}

if autoscalingOptions.EnableRefineUsingSimilarNodeGroups {
opts.Processors.NodeInfoProcessor = &nodeinfos.RefineNodeInfosProcessor{
NodeGroupSetProcessor: opts.Processors.NodeGroupSetProcessor,
RefineUsingSimilarNodeGroups: autoscalingOptions.EnableRefineUsingSimilarNodeGroups,
}
}

// These metrics should be published only once.
metrics.UpdateNapEnabled(autoscalingOptions.NodeAutoprovisioningEnabled)
metrics.UpdateMaxNodesCount(autoscalingOptions.MaxNodesTotal)
Expand Down
184 changes: 184 additions & 0 deletions cluster-autoscaler/processors/nodeinfos/refine_node_infos_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeinfos

import (
"sync"
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
klog "k8s.io/klog/v2"

schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

const templateNodeInfoCacheTTL = 10 * time.Minute

// RefineNodeInfosProcessor improves NodeInfos accuracy where possible.
// For now it adjusts synthetic NodeInfos generated by TemplateInfo() with more accurate
// NodeInfos obtained from real-world nodes for similar nodegroups (when available).
type RefineNodeInfosProcessor struct {
NodeGroupSetProcessor nodegroupset.NodeGroupSetProcessor
RefineUsingSimilarNodeGroups bool
nodeInfoCache nodeInfoCache
}

type nodeInfoCache struct {
sync.RWMutex
nodeInfos map[string]*schedulerframework.NodeInfo
lastUpdated time.Time
}

// Process refines nodeInfos obtained by TemplateInfos when possible
func (p *RefineNodeInfosProcessor) Process(ctx *context.AutoscalingContext, nodeInfosForNodeGroups map[string]*schedulerframework.NodeInfo) (map[string]*schedulerframework.NodeInfo, error) {
inaccurateNodeInfos := make(map[string]*schedulerframework.NodeInfo)
templatesNodeInfo := make(map[string]*schedulerframework.NodeInfo)

// for now this processor only supports refining by leveraging similar nodegroups
if !p.RefineUsingSimilarNodeGroups {
return nodeInfosForNodeGroups, nil
}

nodeGroups := make(map[string]cloudprovider.NodeGroup)
for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
nodeGroups[nodeGroup.Id()] = nodeGroup
}

if err := p.nodeInfoCache.update(nodeGroups); err != nil {
return nodeInfosForNodeGroups, err
}

ignoredTaints := make(taints.TaintKeySet)
for _, taintKey := range ctx.IgnoredTaints {
ignoredTaints[taintKey] = true
}

// build comparable (all from templates) NodeInfos
for groupID, nodeInfo := range nodeInfosForNodeGroups {
templatedNodeInfo, ok := p.nodeInfoCache.nodeInfos[groupID]
if !ok || templatedNodeInfo == nil {
continue
}

templatesNodeInfo[groupID] = templatedNodeInfo

if utils.IsNodeInfoBuiltFromTemplate(nodeInfo) {
// use the provided nodeInfo rather than the one we just generated:
// we want to keep the original daemonsets pods in refined template.
inaccurateNodeInfos[groupID] = nodeInfo
}
}

// refine inaccurate nodeinfos when we can find similar nodeInfos built from real nodes
for groupID, nodeInfo := range inaccurateNodeInfos {
nodeGroup, ok := nodeGroups[groupID]
if !ok {
continue
}

similars, err := p.NodeGroupSetProcessor.FindSimilarNodeGroups(ctx, nodeGroup, templatesNodeInfo)
if err != nil {
klog.Warningf("Failed to lookup for matching node groups for %s: %v", groupID, err)
return nodeInfosForNodeGroups, nil
}

for _, nodeGroup := range similars {
similarInfo, found := nodeInfosForNodeGroups[nodeGroup.Id()]
if !found || utils.IsNodeInfoBuiltFromTemplate(similarInfo) {
continue
}
refinedNodeInfo, err := refineNodeInfoUsingSimilarGroup(nodeInfo, similarInfo, groupID, ignoredTaints)
if err != nil {
return nodeInfosForNodeGroups, err
}
nodeInfosForNodeGroups[groupID] = refinedNodeInfo
break
}
}

return nodeInfosForNodeGroups, nil
}

// CleanUp cleans up processor's internal structures.
func (p *RefineNodeInfosProcessor) CleanUp() {
}

func refineNodeInfoUsingSimilarGroup(nodeInfo, similarInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
nodeInfoCopy := nodeInfo.Clone()

// original node identity and locality (region, zone, ...) must be retained
newNode := similarInfo.Node().DeepCopy()
if newNode.Labels == nil {
newNode.Labels = make(map[string]string)
}
for label, val := range nodeInfo.Node().GetLabels() {
if _, found := nodegroupset.BasicIgnoredLabels[label]; found {
newNode.Labels[label] = val
}
}
newNode.Name = nodeInfo.Node().Name
newNode.UID = nodeInfo.Node().UID
nodeInfoCopy.SetNode(newNode)

utils.SetNodeInfoBuiltFromTemplate(nodeInfoCopy)
return utils.SanitizeNodeInfo(nodeInfoCopy, nodeGroupName, ignoredTaints)
}

func (n *nodeInfoCache) update(nodeGroups map[string]cloudprovider.NodeGroup) error {
n.Lock()
defer n.Unlock()

if len(n.nodeInfos) == 0 {
n.nodeInfos = make(map[string]*schedulerframework.NodeInfo)
n.lastUpdated = time.Now()
}
needsRefresh := n.lastUpdated.Add(templateNodeInfoCacheTTL).Before(time.Now())
bpineau marked this conversation as resolved.
Show resolved Hide resolved

for groupID := range n.nodeInfos {
if _, ok := nodeGroups[groupID]; !ok {
delete(n.nodeInfos, groupID)
}
}

for groupID, nodeGroup := range nodeGroups {
cachedNodeInfo, ok := n.nodeInfos[groupID]
if ok && !needsRefresh && !utils.IsNodeInfoBuiltFromTemplate(cachedNodeInfo) {
continue
}

nodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
if err == cloudprovider.ErrNotImplemented {
continue
} else {
return errors.ToAutoscalerError(errors.CloudProviderError, err)
}
}
n.nodeInfos[groupID] = nodeInfo
}

if needsRefresh {
n.lastUpdated = time.Now()
}

return nil
}
Loading