From bb9b23c55ca313884c41949824f34862cefe8ef5 Mon Sep 17 00:00:00 2001 From: Joachim Bartosik Date: Fri, 25 Sep 2020 18:26:06 +0200 Subject: [PATCH 1/2] Add controllerCacheStorage Type that caches results of attempts to get parent of controllers. --- .../controller_cache_storage.go | 165 ++++++++++++++ .../controller_cache_storage_test.go | 215 ++++++++++++++++++ .../controller_fetcher/controller_fetcher.go | 30 +++ 3 files changed, 410 insertions(+) create mode 100644 vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go create mode 100644 vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage_test.go diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go new file mode 100644 index 000000000000..abddff3ea9b6 --- /dev/null +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go @@ -0,0 +1,165 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllerfetcher + +import ( + "sync" + "time" + + autoscalingapi "k8s.io/api/autoscaling/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog" +) + +// Allows tests to inject their time. +var now = time.Now + +type scaleCacheKey struct { + namespace string + groupResource schema.GroupResource + name string +} +type scaleCacheEntry struct { + refreshAfter time.Time + deleteAfter time.Time + resource *autoscalingapi.Scale + err error +} + +// Cache for responses to get queries on controllers. Thread safe. +// Usage: +// - `Get` cached response. If there is one use it, otherwise make query and +// - `Insert` the response you got into the cache. +// When you create a `controllerCacheStorage` you should start two go routines: +// - One for refreshing cache entries, which calls `GetKeysToRefresh` then for +// each key makes query to the API server and calls `Refresh` to update +// content of the cache. +// - Second for removing stale entries which periodically calls `RemoveExpired` +// Each entry is refreshed after duration +// `validityTime` * (1 + `jitterFactor`) +// passes and is removed if there are no reads for it for more than `lifeTime`. +// +// Sometimes refreshing might take longer than refreshAfter (for example when +// VPA is starting in a big cluster and tries to fetch all controllers). To +// handle such situation lifeTime should be longer than refreshAfter so the main +// VPA loop can do its work quickly, using the cached information (instead of +// getting stuck on refreshing the cache). +// TODO(jbartosik): Add a way to detect when we don't refresh cache frequently +// enough. +// TODO(jbartosik): Add a way to learn how long we keep entries around so we can +// decide if / how we want to optimize entry refreshes. +type controllerCacheStorage struct { + cache map[scaleCacheKey]scaleCacheEntry + mux sync.Mutex + validityTime time.Duration + jitterFactor float64 + lifeTime time.Duration +} + +// Returns bool indicating whether the entry was present in the cache and the cached response. +// Updates deleteAfter for the element. +func (cc *controllerCacheStorage) Get(namespace string, groupResource schema.GroupResource, name string) (ok bool, controller *autoscalingapi.Scale, err error) { + key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name} + cc.mux.Lock() + defer cc.mux.Unlock() + r, ok := cc.cache[key] + if ok { + r.deleteAfter = now().Add(cc.lifeTime) + cc.cache[key] = r + } + return ok, r.resource, r.err +} + +// If key is in the cache, refresh updates the cached value, error and refresh +// time (but not time to remove). +// If the key is missing from the cache does nothing (relevant when we're +// concurrently updating cache and removing stale entries from it, to avoid +// adding back an entry which we just removed). +func (cc *controllerCacheStorage) Refresh(namespace string, groupResource schema.GroupResource, name string, controller *autoscalingapi.Scale, err error) { + key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name} + cc.mux.Lock() + defer cc.mux.Unlock() + old, ok := cc.cache[key] + if !ok { + return + } + // We refresh entries that are waiting to be removed. So when we refresh an + // entry we mustn't change entries deleteAfter time (otherwise we risk never + // removing entries that are not being read). + cc.cache[key] = scaleCacheEntry{ + refreshAfter: now().Add(wait.Jitter(cc.validityTime, cc.jitterFactor)), + deleteAfter: old.deleteAfter, + resource: controller, + err: err, + } +} + +// If the key is missing from the cache, updates the cached value, error and refresh time (but not deleteAfter time). +// If key is in the cache, does nothing (to make sure updating element doesn't change its deleteAfter time). +func (cc *controllerCacheStorage) Insert(namespace string, groupResource schema.GroupResource, name string, controller *autoscalingapi.Scale, err error) { + key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name} + cc.mux.Lock() + defer cc.mux.Unlock() + if _, ok := cc.cache[key]; ok { + return + } + now := now() + cc.cache[key] = scaleCacheEntry{ + refreshAfter: now.Add(wait.Jitter(cc.validityTime, cc.jitterFactor)), + deleteAfter: now.Add(cc.lifeTime), + resource: controller, + err: err, + } +} + +// Removes entries which we didn't read in a while from the cache. +func (cc *controllerCacheStorage) RemoveExpired() { + klog.V(1).Info("Removing entries from controllerCacheStorage") + cc.mux.Lock() + defer cc.mux.Unlock() + now := now() + for k, v := range cc.cache { + if now.After(v.deleteAfter) { + delete(cc.cache, k) + } + } + klog.V(1).Infof("Removed %d entries from controllerCacheStorage", removed) +} + +// Returns a list of keys for which values need to be refreshed +func (cc *controllerCacheStorage) GetKeysToRefresh() []scaleCacheKey { + result := make([]scaleCacheKey, 0) + cc.mux.Lock() + defer cc.mux.Unlock() + now := now() + for k, v := range cc.cache { + if now.After(v.refreshAfter) { + result = append(result, k) + } + } + return result +} + +func newControllerCacheStorage(validityTime, lifeTime time.Duration, jitterFactor float64) controllerCacheStorage { + return controllerCacheStorage{ + cache: make(map[scaleCacheKey]scaleCacheEntry), + validityTime: validityTime, + jitterFactor: jitterFactor, + lifeTime: lifeTime, + } +} diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage_test.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage_test.go new file mode 100644 index 000000000000..0ee7b8d38965 --- /dev/null +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage_test.go @@ -0,0 +1,215 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllerfetcher + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + autoscalingapi "k8s.io/api/autoscaling/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func getKey(key string) scaleCacheKey { + return scaleCacheKey{ + namespace: "ns", + groupResource: schema.GroupResource{ + Group: "group", + Resource: "resource", + }, + name: key, + } +} + +func getScale() *autoscalingapi.Scale { + return &autoscalingapi.Scale{} +} + +func TestControllerCache_InitiallyNotPresent(t *testing.T) { + c := newControllerCacheStorage(time.Second, 10*time.Second, 1) + key := getKey("foo") + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) +} + +func TestControllerCache_Refresh_NotExisting(t *testing.T) { + key := getKey("foo") + c := newControllerCacheStorage(time.Second, 10*time.Second, 1) + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) + + // Refreshing key that isn't in the cache doesn't insert it + c.Refresh(key.namespace, key.groupResource, key.name, getScale(), nil) + present, _, _ = c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) +} + +func TestControllerCache_Insert(t *testing.T) { + key := getKey("foo") + c := newControllerCacheStorage(time.Second, 10*time.Second, 1) + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) + + c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil) + present, val, err := c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) + assert.Equal(t, getScale(), val) + assert.Nil(t, err) +} + +func TestControllerCache_InsertAndRefresh(t *testing.T) { + key := getKey("foo") + c := newControllerCacheStorage(time.Second, 10*time.Second, 1) + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) + + c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil) + present, val, err := c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) + assert.Equal(t, getScale(), val) + assert.Nil(t, err) + + c.Refresh(key.namespace, key.groupResource, key.name, nil, fmt.Errorf("err")) + present, val, err = c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) + assert.Nil(t, val) + assert.Errorf(t, err, "err") +} + +func TestControllerCache_InsertExistingKey(t *testing.T) { + key := getKey("foo") + c := newControllerCacheStorage(time.Second, 10*time.Second, 1) + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) + + c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil) + present, val, err := c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) + assert.Equal(t, getScale(), val) + assert.Nil(t, err) + + // We might overwrite old values or keep them, either way should be fine. + c.Insert(key.namespace, key.groupResource, key.name, nil, fmt.Errorf("err")) + present, _, _ = c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) +} + +func TestControllerCache_GetRefreshesDeleteAfter(t *testing.T) { + oldNow := now + defer func() { now = oldNow }() + startTime := oldNow() + timeNow := startTime + now = func() time.Time { + return timeNow + } + + key := getKey("foo") + c := newControllerCacheStorage(time.Second, 10*time.Second, 1) + c.Insert(key.namespace, key.groupResource, key.name, nil, nil) + assert.Equal(t, startTime.Add(10*time.Second), c.cache[key].deleteAfter) + + timeNow = startTime.Add(5 * time.Second) + c.Get(key.namespace, key.groupResource, key.name) + assert.Equal(t, startTime.Add(15*time.Second), c.cache[key].deleteAfter) +} + +func assertTimeBetween(t *testing.T, got, expectAfter, expectBefore time.Time) { + assert.True(t, got.After(expectAfter), "expected %v to be after %v", got, expectAfter) + assert.False(t, got.After(expectBefore), "expected %v to not be after %v", got, expectBefore) +} + +func TestControllerCache_GetChangesLifeTimeNotFreshness(t *testing.T) { + oldNow := now + defer func() { now = oldNow }() + startTime := oldNow() + timeNow := startTime + now = func() time.Time { + return timeNow + } + + key := getKey("foo") + c := newControllerCacheStorage(time.Second, 10*time.Second, 1) + c.Insert(key.namespace, key.groupResource, key.name, nil, nil) + cacheEntry := c.cache[key] + // scheduled to delete 10s after insert + assert.Equal(t, startTime.Add(10*time.Second), cacheEntry.deleteAfter) + // scheduled to refresh (1-2)s after insert (with jitter) + firstRefreshAfter := cacheEntry.refreshAfter + assertTimeBetween(t, firstRefreshAfter, startTime.Add(time.Second), startTime.Add(2*time.Second)) + + timeNow = startTime.Add(5 * time.Second) + c.Get(key.namespace, key.groupResource, key.name) + cacheEntry = c.cache[key] + // scheduled to delete 10s after get (15s after insert) + assert.Equal(t, startTime.Add(15*time.Second), cacheEntry.deleteAfter) + // refresh the same as before calling Get + assert.Equal(t, firstRefreshAfter, cacheEntry.refreshAfter) +} + +func TestControllerCache_GetKeysToRefresh(t *testing.T) { + oldNow := now + defer func() { now = oldNow }() + startTime := oldNow() + timeNow := startTime + now = func() time.Time { + return timeNow + } + + key1 := getKey("foo") + c := newControllerCacheStorage(time.Second, 10*time.Second, 1) + c.Insert(key1.namespace, key1.groupResource, key1.name, nil, nil) + cacheEntry := c.cache[key1] + // scheduled to refresh (1-2)s after insert (with jitter) + refreshAfter := cacheEntry.refreshAfter + assertTimeBetween(t, refreshAfter, startTime.Add(time.Second), startTime.Add(2*time.Second)) + + timeNow = startTime.Add(5 * time.Second) + key2 := getKey("bar") + c.Insert(key2.namespace, key2.groupResource, key2.name, nil, nil) + cacheEntry = c.cache[key2] + // scheduled to refresh (1-2)s after insert (with jitter) + refreshAfter = cacheEntry.refreshAfter + assertTimeBetween(t, refreshAfter, startTime.Add(6*time.Second), startTime.Add(7*time.Second)) + + assert.ElementsMatch(t, []scaleCacheKey{key1}, c.GetKeysToRefresh()) +} + +func TestControllerCache_Clear(t *testing.T) { + oldNow := now + defer func() { now = oldNow }() + startTime := oldNow() + timeNow := startTime + now = func() time.Time { + return timeNow + } + + key1 := getKey("foo") + c := newControllerCacheStorage(time.Second, 10*time.Second, 1) + c.Insert(key1.namespace, key1.groupResource, key1.name, nil, nil) + assert.Equal(t, startTime.Add(10*time.Second), c.cache[key1].deleteAfter) + + timeNow = startTime.Add(15 * time.Second) + key2 := getKey("bar") + c.Insert(key2.namespace, key2.groupResource, key2.name, nil, nil) + assert.Equal(t, startTime.Add(25*time.Second), c.cache[key2].deleteAfter) + + c.RemoveExpired() + assert.Equal(t, 1, len(c.cache)) + assert.Contains(t, c.cache, key2) +} diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go index 23bed3e09d12..ac23080d26f7 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go @@ -84,6 +84,36 @@ type controllerFetcher struct { informersMap map[wellKnownController]cache.SharedIndexInformer } +func (f *controllerFetcher) periodicallyRemoveExpired(ctx context.Context, period time.Duration) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(period): + f.controllerCache.RemoveExpired() + } + } +} + +func (f *controllerFetcher) periodicallyRefresh(ctx context.Context, period time.Duration) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(period): + for _, item := range f.controllerCache.GetKeysToRefresh() { + scale, err := f.scaleNamespacer.Scales(item.namespace).Get(context.TODO(), item.groupResource, item.name, metav1.GetOptions{}) + f.controllerCache.Refresh(item.namespace, item.groupResource, item.name, scale, err) + } + } + } +} + +func (f *controllerFetcher) Start(ctx context.Context, removePeriod, refreshPeriod time.Duration) { + go f.periodicallyRefresh(ctx, refreshPeriod) + go f.periodicallyRemoveExpired(ctx, removePeriod) +} + // NewControllerFetcher returns a new instance of controllerFetcher func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) ControllerFetcher { discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) From 9d7898a5a5722fb9d308fcd44be7939c75e24c24 Mon Sep 17 00:00:00 2001 From: Joachim Bartosik Date: Mon, 26 Oct 2020 16:43:20 +0100 Subject: [PATCH 2/2] Use controllerCacheStorage --- .../pkg/recommender/input/cluster_feeder.go | 11 +++- .../controller_cache_storage.go | 3 + .../controller_fetcher/controller_fetcher.go | 56 ++++++++++--------- .../controller_fetcher_test.go | 1 + 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 9b7a69a95498..bc0aab0f3f60 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -49,7 +49,13 @@ import ( resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" ) -const defaultResyncPeriod time.Duration = 10 * time.Minute +const ( + scaleCacheLoopPeriod time.Duration = 7 * time.Second + scaleCacheEntryLifetime time.Duration = time.Hour + scaleCacheEntryFreshnessTime time.Duration = 10 * time.Minute + scaleCacheEntryJitterFactor float64 = 1. + defaultResyncPeriod time.Duration = 10 * time.Minute +) // ClusterStateFeeder can update state of ClusterState object. type ClusterStateFeeder interface { @@ -108,7 +114,8 @@ func NewClusterStateFeeder(config *rest.Config, clusterState *model.ClusterState kubeClient := kube_client.NewForConfigOrDie(config) podLister, oomObserver := NewPodListerAndOOMObserver(kubeClient, namespace) factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(namespace)) - controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory) + controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor) + controllerFetcher.Start(context.TODO(), scaleCacheLoopPeriod) return ClusterStateFeederFactory{ PodLister: podLister, OOMObserver: oomObserver, diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go index abddff3ea9b6..a6d6521d334a 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go @@ -34,6 +34,7 @@ type scaleCacheKey struct { groupResource schema.GroupResource name string } + type scaleCacheEntry struct { refreshAfter time.Time deleteAfter time.Time @@ -133,8 +134,10 @@ func (cc *controllerCacheStorage) RemoveExpired() { cc.mux.Lock() defer cc.mux.Unlock() now := now() + removed := 0 for k, v := range cc.cache { if now.After(v.deleteAfter) { + removed += 1 delete(cc.cache, k) } } diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go index ac23080d26f7..bc4cfd3dfbdf 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go @@ -22,6 +22,7 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" + autoscalingapi "k8s.io/api/autoscaling/v1" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" @@ -79,43 +80,36 @@ type ControllerFetcher interface { } type controllerFetcher struct { - scaleNamespacer scale.ScalesGetter - mapper apimeta.RESTMapper - informersMap map[wellKnownController]cache.SharedIndexInformer + scaleNamespacer scale.ScalesGetter + mapper apimeta.RESTMapper + informersMap map[wellKnownController]cache.SharedIndexInformer + scaleSubresourceCacheStorage controllerCacheStorage } -func (f *controllerFetcher) periodicallyRemoveExpired(ctx context.Context, period time.Duration) { +func (f *controllerFetcher) periodicallyRefreshCache(ctx context.Context, period time.Duration) { for { select { case <-ctx.Done(): return case <-time.After(period): - f.controllerCache.RemoveExpired() - } - } -} - -func (f *controllerFetcher) periodicallyRefresh(ctx context.Context, period time.Duration) { - for { - select { - case <-ctx.Done(): - return - case <-time.After(period): - for _, item := range f.controllerCache.GetKeysToRefresh() { + keysToRefresh := f.scaleSubresourceCacheStorage.GetKeysToRefresh() + klog.Info("Starting to refresh entries in controllerFetchers scaleSubresourceCacheStorage") + for _, item := range keysToRefresh { scale, err := f.scaleNamespacer.Scales(item.namespace).Get(context.TODO(), item.groupResource, item.name, metav1.GetOptions{}) - f.controllerCache.Refresh(item.namespace, item.groupResource, item.name, scale, err) + f.scaleSubresourceCacheStorage.Refresh(item.namespace, item.groupResource, item.name, scale, err) } + klog.Infof("Finished refreshing %d entries in controllerFetchers scaleSubresourceCacheStorage", len(keysToRefresh)) + f.scaleSubresourceCacheStorage.RemoveExpired() } } } -func (f *controllerFetcher) Start(ctx context.Context, removePeriod, refreshPeriod time.Duration) { - go f.periodicallyRefresh(ctx, refreshPeriod) - go f.periodicallyRemoveExpired(ctx, removePeriod) +func (f *controllerFetcher) Start(ctx context.Context, loopPeriod time.Duration) { + go f.periodicallyRefreshCache(ctx, loopPeriod) } // NewControllerFetcher returns a new instance of controllerFetcher -func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) ControllerFetcher { +func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory, betweenRefreshes, lifeTime time.Duration, jitterFactor float64) *controllerFetcher { discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { klog.Fatalf("Could not create discoveryClient: %v", err) @@ -151,9 +145,10 @@ func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, scaleNamespacer := scale.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver) return &controllerFetcher{ - scaleNamespacer: scaleNamespacer, - mapper: mapper, - informersMap: informersMap, + scaleNamespacer: scaleNamespacer, + mapper: mapper, + informersMap: informersMap, + scaleSubresourceCacheStorage: newControllerCacheStorage(betweenRefreshes, lifeTime, jitterFactor), } } @@ -278,6 +273,15 @@ func (f *controllerFetcher) isWellKnown(key *ControllerKeyWithAPIVersion) bool { return exists } +func (f *controllerFetcher) getScaleForResource(namespace string, groupResource schema.GroupResource, name string) (controller *autoscalingapi.Scale, err error) { + if ok, scale, err := f.scaleSubresourceCacheStorage.Get(namespace, groupResource, name); ok { + return scale, err + } + scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{}) + f.scaleSubresourceCacheStorage.Insert(namespace, groupResource, name, scale, err) + return scale, err +} + func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersion) bool { if f.isWellKnown(key) { return true @@ -301,7 +305,7 @@ func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersi for _, mapping := range mappings { groupResource := mapping.Resource.GroupResource() - scale, err := f.scaleNamespacer.Scales(key.Namespace).Get(context.TODO(), groupResource, key.Name, metav1.GetOptions{}) + scale, err := f.getScaleForResource(key.Namespace, groupResource, key.Name) if err == nil && scale != nil { return true } @@ -323,7 +327,7 @@ func (f *controllerFetcher) getOwnerForScaleResource(groupKind schema.GroupKind, var lastError error for _, mapping := range mappings { groupResource := mapping.Resource.GroupResource() - scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{}) + scale, err := f.getScaleForResource(namespace, groupResource, name) if err == nil { return getOwnerController(scale.OwnerReferences, namespace), nil } diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher_test.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher_test.go index a6f827283687..8c5e284b5ab5 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher_test.go @@ -44,6 +44,7 @@ var trueVar = true func simpleControllerFetcher() *controllerFetcher { f := controllerFetcher{} f.informersMap = make(map[wellKnownController]cache.SharedIndexInformer) + f.scaleSubresourceCacheStorage = newControllerCacheStorage(time.Second, time.Minute, 0.1) versioned := map[string][]metav1.APIResource{ "Foo": {{Kind: "Foo", Name: "bah", Group: "foo"}, {Kind: "Scale", Name: "iCanScale", Group: "foo"}}, }