From d1d3c8436e996dee885afef957b886c29d2db824 Mon Sep 17 00:00:00 2001 From: Joachim Bartosik Date: Fri, 25 Sep 2020 18:26:06 +0200 Subject: [PATCH] Add controllerCacheStorage Type that caches results of attempts to get parent of controllers. --- .../controller_cache_storage.go | 163 +++++++++++++ .../controller_cache_storage_test.go | 217 ++++++++++++++++++ .../controller_fetcher/controller_fetcher.go | 30 +++ 3 files changed, 410 insertions(+) create mode 100644 vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go create mode 100644 vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage_test.go diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go new file mode 100644 index 000000000000..c59525d17a21 --- /dev/null +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go @@ -0,0 +1,163 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllerfetcher + +import ( + "math/rand" + "sync" + "time" + + autoscalingapi "k8s.io/api/autoscaling/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// Allows tests to inject their time. +var now = time.Now + +type scaleCacheKey struct { + namespace string + groupResource schema.GroupResource + name string +} +type scaleCacheEntry struct { + refreshAfter time.Time + deleteAfter time.Time + resource *autoscalingapi.Scale + err error +} + +// Cache for responses to get queries on controllers. Thread safe. +// Usage: +// - `Get` cached response. If there is one use it, otherwise make query and +// - `Insert` the response you got into the cache. +// When you create a `controllerCacheStorage` you should start two go routines: +// - One for refereshing cache entries, which calls `GetKeysToRefresh` then for +// each key makes query to the API server and calls `Refresh` to update +// content of the cache. +// - Second from removing stale entries which periodically calls `RemoveExpired` +// Each entry is refreshed after duration (`validityTime` + `jitter`) passes +// and is removed if there are no reads for it for more than `lifeTime`. +// +// Sometimes refreshing might take longer than refreshAfter (for example when +// VPA is starting in a big cluster and tries to fetch all controllers). To +// handle such situation lifeTime should be longer than refreshAfter so the main +// VPA loop can do its work quickly, using the cached information (instead of +// getting stuck on refreshing the cache). +// TODO(jbartosik): Add a way to detect when we don't refresh cache frequently +// enough. +type controllerCacheStorage struct { + cache map[scaleCacheKey]scaleCacheEntry + mux sync.Mutex + validityTime time.Duration + refreshJitter time.Duration + lifeTime time.Duration +} + +// Returns bool indicating whether the entry was present in the cache and the cached response. +// Updates deleteAfter for the element. +func (cc *controllerCacheStorage) Get(namespace string, groupResource schema.GroupResource, name string) (ok bool, controller *autoscalingapi.Scale, err error) { + key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name} + cc.mux.Lock() + defer cc.mux.Unlock() + r, ok := cc.cache[key] + if ok { + r.deleteAfter = now().Add(cc.lifeTime) + cc.cache[key] = r + } + return ok, r.resource, r.err +} + +func generateJitter(maxJitter time.Duration) time.Duration { + return time.Duration(rand.Float64()*float64(maxJitter.Nanoseconds())) * time.Nanosecond +} + +// If key is in the cache it updates the cached value, error and refresh time (but not time to remove). +// If the key is missing from the cache does nothing (relevant when we asynchronously updating cache and +// removing stale entries from it, to avoid adding back an entry which we just removed). +func (cc *controllerCacheStorage) Refresh(namespace string, groupResource schema.GroupResource, name string, controller *autoscalingapi.Scale, err error) { + key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name} + cc.mux.Lock() + defer cc.mux.Unlock() + old, ok := cc.cache[key] + if !ok { + return + } + now := now() + // We refresh entries that are waiting to be removed. So when we refresh an + // entry we mustn't change entries deleteAfter time (otherwise we risk never + // removing entries that none is reading) + cc.cache[key] = scaleCacheEntry{ + refreshAfter: now.Add(cc.validityTime).Add(generateJitter(cc.refreshJitter)), + deleteAfter: old.deleteAfter, + resource: controller, + err: err, + } +} + +// If the key is missing from the cache updates the cached value, error and refresh time (but not time to remove). +// If key is in the cache it does nothing (to make sure updating element doesn't change its deleteAfter time). +func (cc *controllerCacheStorage) Insert(namespace string, groupResource schema.GroupResource, name string, controller *autoscalingapi.Scale, err error) { + key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name} + cc.mux.Lock() + defer cc.mux.Unlock() + if _, ok := cc.cache[key]; ok { + return + } + jitter := time.Duration(rand.Float64()*float64(cc.refreshJitter.Nanoseconds())) * time.Nanosecond + now := now() + cc.cache[key] = scaleCacheEntry{ + refreshAfter: now.Add(cc.validityTime).Add(jitter), + deleteAfter: now.Add(cc.lifeTime), + resource: controller, + err: err, + } +} + +// Removes entries from the cache which we didn't read in a while. +func (cc *controllerCacheStorage) RemoveExpired() { + cc.mux.Lock() + defer cc.mux.Unlock() + now := now() + for k, v := range cc.cache { + if now.After(v.deleteAfter) { + delete(cc.cache, k) + } + } +} + +// Returns a list of keys for which values need to be refreshed +func (cc *controllerCacheStorage) GetKeysToRefresh() []scaleCacheKey { + result := make([]scaleCacheKey, 0) + cc.mux.Lock() + defer cc.mux.Unlock() + now := now() + for k, v := range cc.cache { + if now.After(v.refreshAfter) { + result = append(result, k) + } + } + return result +} + +func newControllerCacheStorage(validityTime, refreshJitter, lifeTime time.Duration) controllerCacheStorage { + return controllerCacheStorage{ + cache: make(map[scaleCacheKey]scaleCacheEntry), + validityTime: validityTime, + refreshJitter: refreshJitter, + lifeTime: lifeTime, + } +} diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage_test.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage_test.go new file mode 100644 index 000000000000..e791ea58994d --- /dev/null +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage_test.go @@ -0,0 +1,217 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllerfetcher + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + autoscalingapi "k8s.io/api/autoscaling/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func getKey(key string) scaleCacheKey { + return scaleCacheKey{ + namespace: "ns", + groupResource: schema.GroupResource{ + Group: "group", + Resource: "resource", + }, + name: key, + } +} + +func getScale() *autoscalingapi.Scale { + return &autoscalingapi.Scale{} +} + +func TestControllerCache_InitiallyNotPresent(t *testing.T) { + c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) + key := getKey("foo") + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) +} + +func TestControllerCache_Refresh_NotExisting(t *testing.T) { + key := getKey("foo") + c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) + + // Refreshing key that isn't in the cache doesn't insert it + c.Refresh(key.namespace, key.groupResource, key.name, getScale(), nil) + present, _, _ = c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) +} + +func TestControllerCache_Insert(t *testing.T) { + key := getKey("foo") + c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) + + c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil) + present, val, err := c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) + assert.Equal(t, getScale(), val) + assert.Nil(t, err) +} + +func TestControllerCache_InsertAndRefresh(t *testing.T) { + key := getKey("foo") + c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) + + c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil) + present, val, err := c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) + assert.Equal(t, getScale(), val) + assert.Nil(t, err) + + c.Refresh(key.namespace, key.groupResource, key.name, nil, fmt.Errorf("err")) + present, val, err = c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) + assert.Nil(t, val) + assert.Errorf(t, err, "err") +} + +func TestControllerCache_InsertNoOverwrite(t *testing.T) { + key := getKey("foo") + c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) + present, _, _ := c.Get(key.namespace, key.groupResource, key.name) + assert.False(t, present) + + c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil) + present, val, err := c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) + assert.Equal(t, getScale(), val) + assert.Nil(t, err) + + // Doesn't overwrite previous values + c.Insert(key.namespace, key.groupResource, key.name, nil, fmt.Errorf("err")) + present, val, err = c.Get(key.namespace, key.groupResource, key.name) + assert.True(t, present) + assert.Equal(t, getScale(), val) + assert.Nil(t, err) +} + +func TestControllerCache_GetRefreshesDeleteAfter(t *testing.T) { + oldNow := now + defer func() { now = oldNow }() + startTime := oldNow() + timeNow := startTime + now = func() time.Time { + return timeNow + } + + key := getKey("foo") + c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) + c.Insert(key.namespace, key.groupResource, key.name, nil, nil) + assert.Equal(t, startTime.Add(10*time.Second), c.cache[key].deleteAfter) + + timeNow = startTime.Add(5 * time.Second) + c.Get(key.namespace, key.groupResource, key.name) + assert.Equal(t, startTime.Add(15*time.Second), c.cache[key].deleteAfter) +} + +func assertTimeBetween(t *testing.T, got, expectAfter, expectBefore time.Time) { + assert.True(t, got.After(expectAfter), "expected %v to be after %v", got, expectAfter) + assert.False(t, got.After(expectBefore), "expected %v to not be after %v", got, expectBefore) +} + +func TestControllerCache_GetChangesLifeTimeNotFreshness(t *testing.T) { + oldNow := now + defer func() { now = oldNow }() + startTime := oldNow() + timeNow := startTime + now = func() time.Time { + return timeNow + } + + key := getKey("foo") + c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) + c.Insert(key.namespace, key.groupResource, key.name, nil, nil) + cacheEntry := c.cache[key] + // scheduled to delete 10s after insert + assert.Equal(t, startTime.Add(10*time.Second), cacheEntry.deleteAfter) + // scheduled to refresh (1-2)s after insert (with jitter) + firstRefreshAfter := cacheEntry.refreshAfter + assertTimeBetween(t, firstRefreshAfter, startTime.Add(time.Second), startTime.Add(2*time.Second)) + + timeNow = startTime.Add(5 * time.Second) + c.Get(key.namespace, key.groupResource, key.name) + cacheEntry = c.cache[key] + // scheduled to delete 10s after get (15s after insert) + assert.Equal(t, startTime.Add(15*time.Second), cacheEntry.deleteAfter) + // refresh the same as before calling Get + assert.Equal(t, firstRefreshAfter, cacheEntry.refreshAfter) +} + +func TestControllerCache_GetKeysToRefresh(t *testing.T) { + oldNow := now + defer func() { now = oldNow }() + startTime := oldNow() + timeNow := startTime + now = func() time.Time { + return timeNow + } + + key1 := getKey("foo") + c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) + c.Insert(key1.namespace, key1.groupResource, key1.name, nil, nil) + cacheEntry := c.cache[key1] + // scheduled to refresh (1-2)s after insert (with jitter) + refreshAfter := cacheEntry.refreshAfter + assertTimeBetween(t, refreshAfter, startTime.Add(time.Second), startTime.Add(2*time.Second)) + + timeNow = startTime.Add(5 * time.Second) + key2 := getKey("bar") + c.Insert(key2.namespace, key2.groupResource, key2.name, nil, nil) + cacheEntry = c.cache[key2] + // scheduled to refresh (1-2)s after insert (with jitter) + refreshAfter = cacheEntry.refreshAfter + assertTimeBetween(t, refreshAfter, startTime.Add(6*time.Second), startTime.Add(7*time.Second)) + + assert.ElementsMatch(t, []scaleCacheKey{key1}, c.GetKeysToRefresh()) +} + +func TestControllerCache_Clear(t *testing.T) { + oldNow := now + defer func() { now = oldNow }() + startTime := oldNow() + timeNow := startTime + now = func() time.Time { + return timeNow + } + + key1 := getKey("foo") + c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) + c.Insert(key1.namespace, key1.groupResource, key1.name, nil, nil) + assert.Equal(t, startTime.Add(10*time.Second), c.cache[key1].deleteAfter) + + timeNow = startTime.Add(15 * time.Second) + key2 := getKey("bar") + c.Insert(key2.namespace, key2.groupResource, key2.name, nil, nil) + assert.Equal(t, startTime.Add(25*time.Second), c.cache[key2].deleteAfter) + + c.RemoveExpired() + assert.Equal(t, 1, len(c.cache)) + assert.Contains(t, c.cache, key2) +} diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go index 23bed3e09d12..ac23080d26f7 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go @@ -84,6 +84,36 @@ type controllerFetcher struct { informersMap map[wellKnownController]cache.SharedIndexInformer } +func (f *controllerFetcher) periodicallyRemoveExpired(ctx context.Context, period time.Duration) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(period): + f.controllerCache.RemoveExpired() + } + } +} + +func (f *controllerFetcher) periodicallyRefresh(ctx context.Context, period time.Duration) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(period): + for _, item := range f.controllerCache.GetKeysToRefresh() { + scale, err := f.scaleNamespacer.Scales(item.namespace).Get(context.TODO(), item.groupResource, item.name, metav1.GetOptions{}) + f.controllerCache.Refresh(item.namespace, item.groupResource, item.name, scale, err) + } + } + } +} + +func (f *controllerFetcher) Start(ctx context.Context, removePeriod, refreshPeriod time.Duration) { + go f.periodicallyRefresh(ctx, refreshPeriod) + go f.periodicallyRemoveExpired(ctx, removePeriod) +} + // NewControllerFetcher returns a new instance of controllerFetcher func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) ControllerFetcher { discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)