Skip to content

Commit

Permalink
NodeInfo processor to Refine synthetic NodeInfos
Browse files Browse the repository at this point in the history
Comparing synthetic NodeInfos obtained from nodegroup's TemplateInfo()
to NodeInfos obtained from real-world nodes is bound to fail, even with
kube reservations provided through nodegroups labels/annotations
(for instance: kernel mem reservation is hard to predict).

This makes `balance-similar-node-groups` likely to misbehave when
`scale-up-from-zero` is enabled (and a first nodegroup gets a real
node), for instance.

Following [Maciek Pytel suggestion](#3608 (comment))
(from discussions on a previous attempt at solving this), we can
implement a NodeInfo Processor that would improve template-generated
NodeInfos whenever a node was created off a similar nodegroup.

We're storing node's virtual origin through machineid, which works
fine but is a bit ugly (suggestions welcome).

Tested this solves balance-similar-node-groups + scale-up-from-zero,
with various instance types on AWS and GCP.

Previous attempts to solve that issue/discussions:
* #2892 (comment)
* #3608 (comment)
  • Loading branch information
bpineau committed Dec 14, 2020
1 parent d8b6319 commit 068bcf8
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 4 deletions.
26 changes: 23 additions & 3 deletions cluster-autoscaler/core/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
klog "k8s.io/klog/v2"
)

const templatedMachineID = "autoscaler-node-from-template"

// GetNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping.
func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulerframework.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
Expand Down Expand Up @@ -67,7 +69,7 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedu
if err != nil {
return false, "", err
}
sanitizedNodeInfo, err := sanitizeNodeInfo(nodeInfo, id, ignoredTaints)
sanitizedNodeInfo, err := SanitizeNodeInfo(nodeInfo, id, ignoredTaints)
if err != nil {
return false, "", err
}
Expand Down Expand Up @@ -181,13 +183,30 @@ func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*ap
}
fullNodeInfo := schedulerframework.NewNodeInfo(pods...)
fullNodeInfo.SetNode(baseNodeInfo.Node())
sanitizedNodeInfo, typedErr := sanitizeNodeInfo(fullNodeInfo, id, ignoredTaints)
SetNodeInfoBuiltFromTemplate(fullNodeInfo)
sanitizedNodeInfo, typedErr := SanitizeNodeInfo(fullNodeInfo, id, ignoredTaints)
if typedErr != nil {
return nil, typedErr
}
return sanitizedNodeInfo, nil
}

// SetNodeInfoBuiltFromTemplate marks a NodeInfo as generated from a cloud provider's TemplateInfo()
func SetNodeInfoBuiltFromTemplate(nodeInfo *schedulerframework.NodeInfo) {
node := nodeInfo.Node()
node.Status.NodeInfo.MachineID = templatedMachineID
nodeInfo.SetNode(node)
}

// IsNodeInfoBuiltFromTemplate returns true if a NodeInfo was generated from a cloud
// provider's TemplateInfo(), false if generated from a real-world node.
func IsNodeInfoBuiltFromTemplate(nodeInfo *schedulerframework.NodeInfo) bool {
if nodeInfo.Node().Status.NodeInfo.MachineID == templatedMachineID {
return true
}
return false
}

// isVirtualNode determines if the node is created by virtual kubelet
func isVirtualNode(node *apiv1.Node) bool {
return node.ObjectMeta.Labels["type"] == "virtual-kubelet"
Expand Down Expand Up @@ -228,7 +247,8 @@ func deepCopyNodeInfo(nodeInfo *schedulerframework.NodeInfo) (*schedulerframewor
return newNodeInfo, nil
}

func sanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
// SanitizeNodeInfo updates a provided, generic NodeInfo to match the nodegroup's name
func SanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
// Sanitize node name.
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName, ignoredTaints)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestSanitizeNodeInfo(t *testing.T) {
nodeInfo := schedulerframework.NewNodeInfo(pod)
nodeInfo.SetNode(node)

res, err := sanitizeNodeInfo(nodeInfo, "test-group", nil)
res, err := SanitizeNodeInfo(nodeInfo, "test-group", nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(res.Pods))
}
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfos"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -317,6 +318,10 @@ func buildAutoscaler() (core.Autoscaler, error) {
Comparator: nodeInfoComparatorBuilder(autoscalingOptions.BalancingExtraIgnoredLabels),
}

opts.Processors.NodeInfoProcessor = &nodeinfos.RefineNodeInfosProcessor{
NodeGroupSetProcessor: opts.Processors.NodeGroupSetProcessor,
}

// These metrics should be published only once.
metrics.UpdateNapEnabled(autoscalingOptions.NodeAutoprovisioningEnabled)
metrics.UpdateMaxNodesCount(autoscalingOptions.MaxNodesTotal)
Expand Down
174 changes: 174 additions & 0 deletions cluster-autoscaler/processors/nodeinfos/refine_node_infos_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeinfos

import (
"sync"
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog"

schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

const templateNodeInfoCacheTTL = 10 * time.Minute

// RefineNodeInfosProcessor improves NodeInfos accuracy where possible.
// For now it adjusts synthetic NodeInfos generated by TemplateInfo() with more accurate
// NodeInfos obtained from real-world nodes for similar nodegroups (when available).
type RefineNodeInfosProcessor struct {
NodeGroupSetProcessor nodegroupset.NodeGroupSetProcessor
nodeInfoCache nodeInfoCache
}

type nodeInfoCache struct {
sync.RWMutex
nodeInfos map[string]*schedulerframework.NodeInfo
lastUpdated time.Time
}

// Process refines nodeInfos obtained by TemplateInfos when possible
func (p *RefineNodeInfosProcessor) Process(ctx *context.AutoscalingContext, nodeInfosForNodeGroups map[string]*schedulerframework.NodeInfo) (map[string]*schedulerframework.NodeInfo, error) {
innacurateNodeInfos := make(map[string]*schedulerframework.NodeInfo)
templatesNodeInfo := make(map[string]*schedulerframework.NodeInfo)

nodeGroups := make(map[string]cloudprovider.NodeGroup)
for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
nodeGroups[nodeGroup.Id()] = nodeGroup
}

if err := p.nodeInfoCache.update(nodeGroups); err != nil {
return nodeInfosForNodeGroups, err
}
p.nodeInfoCache.RLock()
defer p.nodeInfoCache.RUnlock()

ignoredTaints := make(taints.TaintKeySet)
for _, taintKey := range ctx.IgnoredTaints {
ignoredTaints[taintKey] = true
}

// build comparable (all from templates) NodeInfos
for groupID, nodeInfo := range nodeInfosForNodeGroups {
templatedNodeInfo, ok := p.nodeInfoCache.nodeInfos[groupID]
if !ok || templatedNodeInfo == nil {
continue
}

templatesNodeInfo[groupID] = templatedNodeInfo

if utils.IsNodeInfoBuiltFromTemplate(nodeInfo) {
innacurateNodeInfos[groupID] = templatedNodeInfo
}
}

// refine innacurate nodeinfos when we can find similar nodeInfos built from real nodes
for groupID, nodeInfo := range innacurateNodeInfos {
nodeGroup, ok := nodeGroups[groupID]
if !ok {
continue
}

similars, err := p.NodeGroupSetProcessor.FindSimilarNodeGroups(ctx, nodeGroup, templatesNodeInfo)
if err != nil {
klog.Warningf("Failed to lookup for matching node groups for %s: %v", groupID, err)
return nodeInfosForNodeGroups, nil
}

for _, nodeGroup := range similars {
similarInfo, found := nodeInfosForNodeGroups[nodeGroup.Id()]
if !found || utils.IsNodeInfoBuiltFromTemplate(similarInfo) {
continue
}
refinedNodeInfo, err := mirrorNodeInfoFromSimilarGroup(nodeInfo, similarInfo, groupID, ignoredTaints)
if err != nil {
return nodeInfosForNodeGroups, err
}
nodeInfosForNodeGroups[groupID] = refinedNodeInfo
break
}
}

return nodeInfosForNodeGroups, nil
}

// CleanUp cleans up processor's internal structures.
func (p *RefineNodeInfosProcessor) CleanUp() {
}

func mirrorNodeInfoFromSimilarGroup(oldInfo, newInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
newInfoCopy := newInfo.Clone()

// keep locality labels (region, zone, failure domain, ...) from original template
newNode := newInfoCopy.Node().DeepCopy()
for label, val := range oldInfo.Node().GetLabels() {
if _, found := nodegroupset.BasicIgnoredLabels[label]; found {
newNode.Labels[label] = val
}
}
newInfoCopy.SetNode(newNode)
utils.SetNodeInfoBuiltFromTemplate(newInfoCopy)

// deep copy pods and set node name (in node, pods, labels...) to the nodegroup's
return utils.SanitizeNodeInfo(newInfoCopy, nodeGroupName, ignoredTaints)
}

func (n *nodeInfoCache) update(nodeGroups map[string]cloudprovider.NodeGroup) error {
n.Lock()
defer n.Unlock()

if len(n.nodeInfos) == 0 {
n.nodeInfos = make(map[string]*schedulerframework.NodeInfo)
n.lastUpdated = time.Now()
}
needsRefresh := n.lastUpdated.Add(templateNodeInfoCacheTTL).Before(time.Now())

for groupID := range n.nodeInfos {
if _, ok := nodeGroups[groupID]; !ok {
delete(n.nodeInfos, groupID)
}
}

for groupID, nodeGroup := range nodeGroups {
cachedNodeInfo, ok := n.nodeInfos[groupID]
if ok && !needsRefresh && !utils.IsNodeInfoBuiltFromTemplate(cachedNodeInfo) {
continue
}

nodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
if err == cloudprovider.ErrNotImplemented {
continue
} else {
return errors.ToAutoscalerError(errors.CloudProviderError, err)
}
}
n.nodeInfos[groupID] = nodeInfo
}

if needsRefresh {
n.lastUpdated = time.Now()
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeinfos

import (
"testing"

testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/stretchr/testify/assert"
)

type testNodeGroup struct {
name string
cpu int64
mem int64
fromRealNode bool
}

type refinetest struct {
name string
nodegroups []testNodeGroup
templates []testNodeGroup
expected []testNodeGroup
}

var refineCases = []refinetest{
{
"Generated templates should leverage NodeInfo from similar nodegroup when available",
[]testNodeGroup{{"n1", 1000, 1000, true}, {"n2", 3000, 3000, false}},
[]testNodeGroup{{"n1", 1000, 1000, false}, {"n2", 1000, 1000, false}},
[]testNodeGroup{{"n1", 1000, 1000, true}, {"n2", 1000, 1000, true}},
},
{
"NodeInfos obtained from real-world nodes should remain untouched",
[]testNodeGroup{{"n1", 1000, 1000, true}, {"n2", 1000, 1000, true}},
[]testNodeGroup{{"n1", 3000, 3000, false}, {"n2", 3000, 3000, false}},
[]testNodeGroup{{"n1", 1000, 1000, true}, {"n2", 1000, 1000, true}},
},
{
"Should be no-op on cloudproviders not implementing TemplateInfos",
[]testNodeGroup{{"n1", 1000, 1000, true}, {"n2", 2000, 2000, false}},
nil, /* TemplateNodeInfo() returns cloudprovider.ErrNotImplemented */
[]testNodeGroup{{"n1", 1000, 1000, true}, {"n2", 2000, 2000, false}},
},
{
"Should not use NodeInfo templates that aren't built from real nodes",
[]testNodeGroup{{"n1", 1000, 1000, false}, {"n2", 7000, 7000, false}},
[]testNodeGroup{{"n1", 1000, 1000, false}, {"n2", 1000, 1000, false}},
[]testNodeGroup{{"n1", 1000, 1000, false}, {"n2", 7000, 7000, false}},
},
{
"Should not use NodeInfo templates from dissimilar nodegroups",
[]testNodeGroup{{"n1", 1000, 1000, true}, {"n2", 7000, 7000, false}},
[]testNodeGroup{{"n1", 1000, 1000, false}, {"n2", 7000, 7000, false}},
[]testNodeGroup{{"n1", 1000, 1000, true}, {"n2", 7000, 7000, false}},
},
}

func buildRefineNodeInfosProcessorTest(r refinetest) (*context.AutoscalingContext, map[string]*schedulerframework.NodeInfo) {
templates := toNodeInfoMap(r.templates)
nodeinfos := toNodeInfoMap(r.nodegroups)
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, []string{}, templates)
if r.templates == nil {
provider = testprovider.NewTestCloudProvider(nil, nil)
}
for _, ng := range r.nodegroups {
provider.AddNodeGroup(ng.name, 1, 10, 1)
}

ctx := &context.AutoscalingContext{
CloudProvider: provider,
}

return ctx, nodeinfos
}

func toNodeInfoMap(nodeGroups []testNodeGroup) map[string]*schedulerframework.NodeInfo {
nodeinfos := make(map[string]*schedulerframework.NodeInfo)
for _, ng := range nodeGroups {
node := BuildTestNode(ng.name, ng.cpu, ng.mem)
ni := schedulerframework.NewNodeInfo()
ni.SetNode(node)
if !ng.fromRealNode {
utils.SetNodeInfoBuiltFromTemplate(ni)
}
nodeinfos[ng.name] = ni
}
return nodeinfos
}

func TestRefineNodeInfosProcessor(t *testing.T) {
for _, tt := range refineCases {
proc := RefineNodeInfosProcessor{
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}),
}
ctx, nodeInfosForNodeGroups := buildRefineNodeInfosProcessorTest(tt)
out, err := proc.Process(ctx, nodeInfosForNodeGroups)

assert.NoError(t, err)
expected := toNodeInfoMap(tt.expected)
comparator := nodegroupset.CreateGenericNodeInfoComparator([]string{})

for nodeGroupName, nodeGroup := range expected {
assert.Contains(t, out, nodeGroupName, tt.name)
assert.True(t, comparator(out[nodeGroupName], nodeGroup), tt.name)
}
}
}

0 comments on commit 068bcf8

Please sign in to comment.