From fd7c94b313b426338f8b6d9de8b3ba3648c50894 Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Mon, 7 Feb 2022 18:43:38 +0000 Subject: [PATCH] Add expire time for nodeInfo cache items --- cluster-autoscaler/core/scale_test_common.go | 2 +- cluster-autoscaler/core/scale_up_test.go | 12 ++-- cluster-autoscaler/main.go | 3 + .../mixed_nodeinfos_processor.go | 28 ++++++-- .../mixed_nodeinfos_processor_test.go | 65 +++++++++++++++++-- .../node_info_provider_processor.go | 4 +- cluster-autoscaler/processors/processors.go | 1 - 7 files changed, 93 insertions(+), 22 deletions(-) diff --git a/cluster-autoscaler/core/scale_test_common.go b/cluster-autoscaler/core/scale_test_common.go index c689b7f9e42e..fa1c5292e53b 100644 --- a/cluster-autoscaler/core/scale_test_common.go +++ b/cluster-autoscaler/core/scale_test_common.go @@ -148,7 +148,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors { AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{}, NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), - TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(), + TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil), NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(), CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index 854eef9e24e3..e052d2e3739c 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -530,7 +530,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul } context.ExpanderStrategy = expander - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -691,7 +691,7 @@ func TestScaleUpUnhealthy(t *testing.T) { assert.NoError(t, err) nodes := []*apiv1.Node{n1, n2} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) @@ -732,7 +732,7 @@ func TestScaleUpNoHelp(t *testing.T) { assert.NoError(t, err) nodes := []*apiv1.Node{n1} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 500, 0) @@ -799,7 +799,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -867,7 +867,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 0} nodes := []*apiv1.Node{} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) assert.NoError(t, err) @@ -920,7 +920,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 2} nodes := []*apiv1.Node{} - nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil) assert.NoError(t, err) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 96443b952f4d..b7dc2793f429 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -46,6 +46,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/metrics" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -185,6 +186,7 @@ var ( emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.") debuggingSnapshotEnabled = flag.Bool("debugging-snapshot-enabled", false, "Whether the debugging snapshot of cluster autoscaler feature is enabled") + nodeInfoCacheExpireTime = flag.Duration("node-info-cache-expire-time", 87600*time.Hour, "Node Info cache expire time for each item. Default value is 10 years.") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -322,6 +324,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter } opts.Processors = ca_processors.DefaultProcessors() + opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime) opts.Processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor() nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator diff --git a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go index 37ac80fd0b8b..18e1e6d82cc6 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go @@ -36,17 +36,31 @@ import ( const stabilizationDelay = 1 * time.Minute +type cacheItem struct { + *schedulerframework.NodeInfo + added time.Time +} + // MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups. type MixedTemplateNodeInfoProvider struct { - nodeInfoCache map[string]*schedulerframework.NodeInfo + nodeInfoCache map[string]cacheItem + ttl *time.Duration } // NewMixedTemplateNodeInfoProvider returns a NodeInfoProvider processor building // NodeInfos from real-world nodes when available, otherwise from node groups templates. -func NewMixedTemplateNodeInfoProvider() *MixedTemplateNodeInfoProvider { +func NewMixedTemplateNodeInfoProvider(ttl *time.Duration) *MixedTemplateNodeInfoProvider { return &MixedTemplateNodeInfoProvider{ - nodeInfoCache: make(map[string]*schedulerframework.NodeInfo), + nodeInfoCache: make(map[string]cacheItem), + ttl: ttl, + } +} + +func (p *MixedTemplateNodeInfoProvider) isCacheItemExpired(added time.Time) bool { + if p.ttl == nil { + return false } + return time.Now().Sub(added) > *p.ttl } // CleanUp cleans up processor's internal structures. @@ -102,7 +116,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, } if added && p.nodeInfoCache != nil { if nodeInfoCopy, err := utils.DeepCopyNodeInfo(result[id]); err == nil { - p.nodeInfoCache[id] = nodeInfoCopy + p.nodeInfoCache[id] = cacheItem{NodeInfo: nodeInfoCopy, added: time.Now()} } } } @@ -115,8 +129,10 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, // No good template, check cache of previously running nodes. if p.nodeInfoCache != nil { - if nodeInfo, found := p.nodeInfoCache[id]; found { - if nodeInfoCopy, err := utils.DeepCopyNodeInfo(nodeInfo); err == nil { + if cacheItem, found := p.nodeInfoCache[id]; found { + if p.isCacheItemExpired(cacheItem.added) { + delete(p.nodeInfoCache, id) + } else if nodeInfoCopy, err := utils.DeepCopyNodeInfo(cacheItem.NodeInfo); err == nil { result[id] = nodeInfoCopy continue } diff --git a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go index 77e8f58fa45a..763916ce33bc 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go @@ -32,6 +32,10 @@ import ( schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) +var ( + cacheTtl = 1 * time.Second +) + func TestGetNodeInfosForGroups(t *testing.T) { now := time.Now() ready1 := BuildTestNode("n1", 1000, 1000) @@ -81,7 +85,7 @@ func TestGetNodeInfosForGroups(t *testing.T) { ListerRegistry: registry, }, } - res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) + res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) assert.Equal(t, 5, len(res)) info, found := res["ng1"] @@ -108,7 +112,7 @@ func TestGetNodeInfosForGroups(t *testing.T) { ListerRegistry: registry, }, } - res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now) + res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) assert.Equal(t, 0, len(res)) } @@ -167,7 +171,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { ListerRegistry: registry, }, } - niProcessor := NewMixedTemplateNodeInfoProvider() + niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl) res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) assert.NoError(t, err) // Check results @@ -187,10 +191,10 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { // Check cache cachedInfo, found := niProcessor.nodeInfoCache["ng1"] assert.True(t, found) - assertEqualNodeCapacities(t, ready1, cachedInfo.Node()) + assertEqualNodeCapacities(t, ready1, cachedInfo.NodeInfo.Node()) cachedInfo, found = niProcessor.nodeInfoCache["ng2"] assert.True(t, found) - assertEqualNodeCapacities(t, ready2, cachedInfo.Node()) + assertEqualNodeCapacities(t, ready2, cachedInfo.NodeInfo.Node()) cachedInfo, found = niProcessor.nodeInfoCache["ng3"] assert.False(t, found) cachedInfo, found = niProcessor.nodeInfoCache["ng4"] @@ -223,7 +227,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { // Fill cache manually infoNg4Node6 := schedulerframework.NewNodeInfo() infoNg4Node6.SetNode(ready6.DeepCopy()) - niProcessor.nodeInfoCache = map[string]*schedulerframework.NodeInfo{"ng4": infoNg4Node6} + niProcessor.nodeInfoCache = map[string]cacheItem{"ng4": {NodeInfo: infoNg4Node6, added: now}} res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now) // Check if cache was used assert.NoError(t, err) @@ -236,6 +240,55 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { assertEqualNodeCapacities(t, ready6, info.Node()) } +func TestGetNodeInfosCacheExpired(t *testing.T) { + now := time.Now() + ready1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(ready1, true, now.Add(-2*time.Minute)) + + // Cloud provider with TemplateNodeInfo not implemented. + provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil) + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + predicateChecker, err := simulator.NewTestPredicateChecker() + assert.NoError(t, err) + + ctx := context.AutoscalingContext{ + CloudProvider: provider, + PredicateChecker: predicateChecker, + AutoscalingKubeClients: context.AutoscalingKubeClients{ + ListerRegistry: registry, + }, + } + tn := BuildTestNode("tn", 5000, 5000) + tni := schedulerframework.NewNodeInfo() + tni.SetNode(tn) + // Cache expire time is set. + niProcessor1 := NewMixedTemplateNodeInfoProvider(&cacheTtl) + niProcessor1.nodeInfoCache = map[string]cacheItem{ + "ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)}, + "ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)}, + } + provider.AddNodeGroup("ng1", 1, 10, 1) + provider.AddNode("ng1", ready1) + + assert.Equal(t, 2, len(niProcessor1.nodeInfoCache)) + _, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now) + assert.NoError(t, err) + assert.Equal(t, 1, len(niProcessor1.nodeInfoCache)) + + // Cache expire time isn't set. + niProcessor2 := NewMixedTemplateNodeInfoProvider(nil) + niProcessor2.nodeInfoCache = map[string]cacheItem{ + "ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)}, + "ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)}, + } + assert.Equal(t, 2, len(niProcessor2.nodeInfoCache)) + _, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now) + assert.NoError(t, err) + assert.Equal(t, 2, len(niProcessor2.nodeInfoCache)) + +} + func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) { t.Helper() assert.NotEqual(t, actual.Status, nil, "") diff --git a/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go b/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go index 74f31815758e..6136cfd50c94 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go @@ -37,6 +37,6 @@ type TemplateNodeInfoProvider interface { } // NewDefaultTemplateNodeInfoProvider returns a default TemplateNodeInfoProvider. -func NewDefaultTemplateNodeInfoProvider() TemplateNodeInfoProvider { - return NewMixedTemplateNodeInfoProvider() +func NewDefaultTemplateNodeInfoProvider(time *time.Duration) TemplateNodeInfoProvider { + return NewMixedTemplateNodeInfoProvider(time) } diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 36e204f5101f..ca9d9cb6f401 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -77,7 +77,6 @@ func DefaultProcessors() *AutoscalingProcessors { NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(), CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), - TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), } }