Skip to content

Commit

Permalink
Add expire time for nodeInfo cache items
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Feb 7, 2022
1 parent 994fbac commit e80f889
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scale_test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors {
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
Expand Down
12 changes: 6 additions & 6 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
}
context.ExpanderStrategy = expander

nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -691,7 +691,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
Expand Down Expand Up @@ -732,7 +732,7 @@ func TestScaleUpNoHelp(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
Expand Down Expand Up @@ -799,7 +799,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)

nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -867,7 +867,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 0}

nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -920,7 +920,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 2}

nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -185,6 +186,7 @@ var (

emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.")
debuggingSnapshotEnabled = flag.Bool("debugging-snapshot-enabled", false, "Whether the debugging snapshot of cluster autoscaler feature is enabled")
nodeInfoCacheExpireTime = flag.Duration("node-info-cache-expire-time", -1*time.Minute, "Node Info cache expire time for each item. If the value isn't set or negative, the cache items won't be expired")
)

func createAutoscalingOptions() config.AutoscalingOptions {
Expand Down Expand Up @@ -322,6 +324,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
}

opts.Processors = ca_processors.DefaultProcessors()
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime)
opts.Processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor()

nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,31 @@ import (

const stabilizationDelay = 1 * time.Minute

type cacheItem struct {
nodeInfo *schedulerframework.NodeInfo
added time.Time
}

// MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups.
type MixedTemplateNodeInfoProvider struct {
nodeInfoCache map[string]*schedulerframework.NodeInfo
nodeInfoCache map[string]cacheItem
ttl *time.Duration
}

// NewMixedTemplateNodeInfoProvider returns a NodeInfoProvider processor building
// NodeInfos from real-world nodes when available, otherwise from node groups templates.
func NewMixedTemplateNodeInfoProvider() *MixedTemplateNodeInfoProvider {
func NewMixedTemplateNodeInfoProvider(ttl *time.Duration) *MixedTemplateNodeInfoProvider {
return &MixedTemplateNodeInfoProvider{
nodeInfoCache: make(map[string]*schedulerframework.NodeInfo),
nodeInfoCache: make(map[string]cacheItem),
ttl: ttl,
}
}

func (p *MixedTemplateNodeInfoProvider) isCacheItemExpired(added time.Time) bool {
if p.ttl == nil || *p.ttl < 0 {
return false
}
return time.Now().Sub(added) > *p.ttl
}

// CleanUp cleans up processor's internal structures.
Expand Down Expand Up @@ -102,7 +116,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
}
if added && p.nodeInfoCache != nil {
if nodeInfoCopy, err := utils.DeepCopyNodeInfo(result[id]); err == nil {
p.nodeInfoCache[id] = nodeInfoCopy
p.nodeInfoCache[id] = cacheItem{nodeInfo: nodeInfoCopy, added: time.Now()}
}
}
}
Expand All @@ -115,8 +129,10 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,

// No good template, check cache of previously running nodes.
if p.nodeInfoCache != nil {
if nodeInfo, found := p.nodeInfoCache[id]; found {
if nodeInfoCopy, err := utils.DeepCopyNodeInfo(nodeInfo); err == nil {
if cacheItem, found := p.nodeInfoCache[id]; found {
if p.isCacheItemExpired(cacheItem.added) {
delete(p.nodeInfoCache, id)
} else if nodeInfoCopy, err := utils.DeepCopyNodeInfo(cacheItem.nodeInfo); err == nil {
result[id] = nodeInfoCopy
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

var (
cacheTtl = 1 * time.Second
)

func TestGetNodeInfosForGroups(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
Expand Down Expand Up @@ -81,7 +85,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 5, len(res))
info, found := res["ng1"]
Expand All @@ -108,7 +112,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
}
Expand Down Expand Up @@ -167,7 +171,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
ListerRegistry: registry,
},
}
niProcessor := NewMixedTemplateNodeInfoProvider()
niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl)
res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
// Check results
Expand All @@ -187,10 +191,10 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
// Check cache
cachedInfo, found := niProcessor.nodeInfoCache["ng1"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready1, cachedInfo.Node())
assertEqualNodeCapacities(t, ready1, cachedInfo.nodeInfo.Node())
cachedInfo, found = niProcessor.nodeInfoCache["ng2"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready2, cachedInfo.Node())
assertEqualNodeCapacities(t, ready2, cachedInfo.nodeInfo.Node())
cachedInfo, found = niProcessor.nodeInfoCache["ng3"]
assert.False(t, found)
cachedInfo, found = niProcessor.nodeInfoCache["ng4"]
Expand All @@ -216,14 +220,14 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
// Check cache
cachedInfo, found = niProcessor.nodeInfoCache["ng2"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready2, cachedInfo.Node())
assertEqualNodeCapacities(t, ready2, cachedInfo.nodeInfo.Node())
cachedInfo, found = niProcessor.nodeInfoCache["ng4"]
assert.False(t, found)

// Fill cache manually
infoNg4Node6 := schedulerframework.NewNodeInfo()
infoNg4Node6.SetNode(ready6.DeepCopy())
niProcessor.nodeInfoCache = map[string]*schedulerframework.NodeInfo{"ng4": infoNg4Node6}
niProcessor.nodeInfoCache = map[string]cacheItem{"ng4": {nodeInfo: infoNg4Node6, added: now}}
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
// Check if cache was used
assert.NoError(t, err)
Expand All @@ -236,6 +240,42 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
assertEqualNodeCapacities(t, ready6, info.Node())
}

func TestGetNodeInfosCacheExpired(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, now.Add(-2*time.Minute))

// Cloud provider with TemplateNodeInfo not implemented.
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
predicateChecker, err := simulator.NewTestPredicateChecker()
assert.NoError(t, err)

ctx := context.AutoscalingContext{
CloudProvider: provider,
PredicateChecker: predicateChecker,
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: registry,
},
}
tn := BuildTestNode("tn", 5000, 5000)
tni := schedulerframework.NewNodeInfo()
tni.SetNode(tn)
niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl)
niProcessor.nodeInfoCache = map[string]cacheItem{
"ng1": {nodeInfo: tni, added: now.Add(-2 * time.Second)},
"ng2": {nodeInfo: tni, added: now.Add(-2 * time.Second)},
}
provider.AddNodeGroup("ng1", 1, 10, 1) // Nodegroup with ready node.
provider.AddNode("ng1", ready1)

assert.Equal(t, 2, len(niProcessor.nodeInfoCache))
_, err = niProcessor.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, len(niProcessor.nodeInfoCache))
}

func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) {
t.Helper()
assert.NotEqual(t, actual.Status, nil, "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ type TemplateNodeInfoProvider interface {
}

// NewDefaultTemplateNodeInfoProvider returns a default TemplateNodeInfoProvider.
func NewDefaultTemplateNodeInfoProvider() TemplateNodeInfoProvider {
return NewMixedTemplateNodeInfoProvider()
func NewDefaultTemplateNodeInfoProvider(time *time.Duration) TemplateNodeInfoProvider {
return NewMixedTemplateNodeInfoProvider(time)
}
1 change: 0 additions & 1 deletion cluster-autoscaler/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func DefaultProcessors() *AutoscalingProcessors {
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
}
}
Expand Down

0 comments on commit e80f889

Please sign in to comment.