Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add expire time for nodeInfo cache items #4669

Merged
merged 1 commit into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scale_test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors {
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
Expand Down
12 changes: 6 additions & 6 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
}
context.ExpanderStrategy = expander

nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -691,7 +691,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
Expand Down Expand Up @@ -732,7 +732,7 @@ func TestScaleUpNoHelp(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
Expand Down Expand Up @@ -799,7 +799,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)

nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -867,7 +867,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 0}

nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -920,7 +920,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 2}

nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -185,6 +186,7 @@ var (

emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.")
debuggingSnapshotEnabled = flag.Bool("debugging-snapshot-enabled", false, "Whether the debugging snapshot of cluster autoscaler feature is enabled")
nodeInfoCacheExpireTime = flag.Duration("node-info-cache-expire-time", 87600*time.Hour, "Node Info cache expire time for each item. Default value is 10 years.")
)

func createAutoscalingOptions() config.AutoscalingOptions {
Expand Down Expand Up @@ -322,6 +324,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
}

opts.Processors = ca_processors.DefaultProcessors()
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime)
opts.Processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor()

nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,36 @@ import (
)

const stabilizationDelay = 1 * time.Minute
const maxCacheExpireTime = 87660 * time.Hour

type cacheItem struct {
yaroslava-serdiuk marked this conversation as resolved.
Show resolved Hide resolved
*schedulerframework.NodeInfo
added time.Time
}

// MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups.
type MixedTemplateNodeInfoProvider struct {
nodeInfoCache map[string]*schedulerframework.NodeInfo
nodeInfoCache map[string]cacheItem
ttl time.Duration
}

// NewMixedTemplateNodeInfoProvider returns a NodeInfoProvider processor building
// NodeInfos from real-world nodes when available, otherwise from node groups templates.
func NewMixedTemplateNodeInfoProvider() *MixedTemplateNodeInfoProvider {
func NewMixedTemplateNodeInfoProvider(t *time.Duration) *MixedTemplateNodeInfoProvider {
ttl := maxCacheExpireTime
if t != nil {
ttl = *t
}
return &MixedTemplateNodeInfoProvider{
nodeInfoCache: make(map[string]*schedulerframework.NodeInfo),
nodeInfoCache: make(map[string]cacheItem),
ttl: ttl,
}
}

func (p *MixedTemplateNodeInfoProvider) isCacheItemExpired(added time.Time) bool {
return time.Now().Sub(added) > p.ttl
}

// CleanUp cleans up processor's internal structures.
func (p *MixedTemplateNodeInfoProvider) CleanUp() {
}
Expand Down Expand Up @@ -102,7 +118,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
}
if added && p.nodeInfoCache != nil {
if nodeInfoCopy, err := utils.DeepCopyNodeInfo(result[id]); err == nil {
p.nodeInfoCache[id] = nodeInfoCopy
p.nodeInfoCache[id] = cacheItem{NodeInfo: nodeInfoCopy, added: time.Now()}
}
}
}
Expand All @@ -115,8 +131,10 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,

// No good template, check cache of previously running nodes.
if p.nodeInfoCache != nil {
if nodeInfo, found := p.nodeInfoCache[id]; found {
if nodeInfoCopy, err := utils.DeepCopyNodeInfo(nodeInfo); err == nil {
if cacheItem, found := p.nodeInfoCache[id]; found {
if p.isCacheItemExpired(cacheItem.added) {
delete(p.nodeInfoCache, id)
} else if nodeInfoCopy, err := utils.DeepCopyNodeInfo(cacheItem.NodeInfo); err == nil {
result[id] = nodeInfoCopy
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

var (
cacheTtl = 1 * time.Second
)

func TestGetNodeInfosForGroups(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
Expand Down Expand Up @@ -81,7 +85,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 5, len(res))
info, found := res["ng1"]
Expand All @@ -108,7 +112,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
}
Expand Down Expand Up @@ -167,7 +171,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
ListerRegistry: registry,
},
}
niProcessor := NewMixedTemplateNodeInfoProvider()
niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl)
res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
// Check results
Expand Down Expand Up @@ -223,7 +227,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
// Fill cache manually
infoNg4Node6 := schedulerframework.NewNodeInfo()
infoNg4Node6.SetNode(ready6.DeepCopy())
niProcessor.nodeInfoCache = map[string]*schedulerframework.NodeInfo{"ng4": infoNg4Node6}
niProcessor.nodeInfoCache = map[string]cacheItem{"ng4": {NodeInfo: infoNg4Node6, added: now}}
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
// Check if cache was used
assert.NoError(t, err)
Expand All @@ -236,6 +240,55 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
assertEqualNodeCapacities(t, ready6, info.Node())
}

func TestGetNodeInfosCacheExpired(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, now.Add(-2*time.Minute))

// Cloud provider with TemplateNodeInfo not implemented.
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
predicateChecker, err := simulator.NewTestPredicateChecker()
assert.NoError(t, err)

ctx := context.AutoscalingContext{
CloudProvider: provider,
PredicateChecker: predicateChecker,
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: registry,
},
}
tn := BuildTestNode("tn", 5000, 5000)
tni := schedulerframework.NewNodeInfo()
tni.SetNode(tn)
// Cache expire time is set.
niProcessor1 := NewMixedTemplateNodeInfoProvider(&cacheTtl)
niProcessor1.nodeInfoCache = map[string]cacheItem{
"ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
"ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
}
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNode("ng1", ready1)

assert.Equal(t, 2, len(niProcessor1.nodeInfoCache))
_, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, len(niProcessor1.nodeInfoCache))

// Cache expire time isn't set.
niProcessor2 := NewMixedTemplateNodeInfoProvider(nil)
niProcessor2.nodeInfoCache = map[string]cacheItem{
"ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
"ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
}
assert.Equal(t, 2, len(niProcessor2.nodeInfoCache))
_, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 2, len(niProcessor2.nodeInfoCache))

}

func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) {
t.Helper()
assert.NotEqual(t, actual.Status, nil, "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ type TemplateNodeInfoProvider interface {
}

// NewDefaultTemplateNodeInfoProvider returns a default TemplateNodeInfoProvider.
func NewDefaultTemplateNodeInfoProvider() TemplateNodeInfoProvider {
return NewMixedTemplateNodeInfoProvider()
func NewDefaultTemplateNodeInfoProvider(time *time.Duration) TemplateNodeInfoProvider {
return NewMixedTemplateNodeInfoProvider(time)
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func DefaultProcessors() *AutoscalingProcessors {
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(),
yaroslava-serdiuk marked this conversation as resolved.
Show resolved Hide resolved
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
}
}

Expand Down