-
Notifications
You must be signed in to change notification settings - Fork 4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Type that caches results of attempts to get parent of controllers.
- Loading branch information
Showing
3 changed files
with
410 additions
and
0 deletions.
There are no files selected for viewing
163 changes: 163 additions & 0 deletions
163
vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
/* | ||
Copyright 2020 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package controllerfetcher | ||
|
||
import ( | ||
"math/rand" | ||
"sync" | ||
"time" | ||
|
||
autoscalingapi "k8s.io/api/autoscaling/v1" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
) | ||
|
||
// Allows tests to inject their time. | ||
var now = time.Now | ||
|
||
type scaleCacheKey struct { | ||
namespace string | ||
groupResource schema.GroupResource | ||
name string | ||
} | ||
type scaleCacheEntry struct { | ||
refreshAfter time.Time | ||
deleteAfter time.Time | ||
resource *autoscalingapi.Scale | ||
err error | ||
} | ||
|
||
// Cache for responses to get queries on controllers. Thread safe. | ||
// Usage: | ||
// - `Get` cached response. If there is one use it, otherwise make query and | ||
// - `Insert` the response you got into the cache. | ||
// When you create a `controllerCacheStorage` you should start two go routines: | ||
// - One for refereshing cache entries, which calls `GetKeysToRefresh` then for | ||
// each key makes query to the API server and calls `Refresh` to update | ||
// content of the cache. | ||
// - Second from removing stale entries which periodically calls `RemoveExpired` | ||
// Each entry is refreshed after duration (`validityTime` + `jitter`) passes | ||
// and is removed if there are no reads for it for more than `lifeTime`. | ||
// | ||
// Sometimes refreshing might take longer than refreshAfter (for example when | ||
// VPA is starting in a big cluster and tries to fetch all controllers). To | ||
// handle such situation lifeTime should be longer than refreshAfter so the main | ||
// VPA loop can do its work quickly, using the cached information (instead of | ||
// getting stuck on refreshing the cache). | ||
// TODO(jbartosik): Add a way to detect when we don't refresh cache frequently | ||
// enough. | ||
type controllerCacheStorage struct { | ||
cache map[scaleCacheKey]scaleCacheEntry | ||
mux sync.Mutex | ||
validityTime time.Duration | ||
refreshJitter time.Duration | ||
lifeTime time.Duration | ||
} | ||
|
||
// Returns bool indicating whether the entry was present in the cache and the cached response. | ||
// Updates deleteAfter for the element. | ||
func (cc *controllerCacheStorage) Get(namespace string, groupResource schema.GroupResource, name string) (ok bool, controller *autoscalingapi.Scale, err error) { | ||
key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name} | ||
cc.mux.Lock() | ||
defer cc.mux.Unlock() | ||
r, ok := cc.cache[key] | ||
if ok { | ||
r.deleteAfter = now().Add(cc.lifeTime) | ||
cc.cache[key] = r | ||
} | ||
return ok, r.resource, r.err | ||
} | ||
|
||
func generateJitter(maxJitter time.Duration) time.Duration { | ||
return time.Duration(rand.Float64()*float64(maxJitter.Nanoseconds())) * time.Nanosecond | ||
} | ||
|
||
// If key is in the cache it updates the cached value, error and refresh time (but not time to remove). | ||
// If the key is missing from the cache does nothing (relevant when we asynchronously updating cache and | ||
// removing stale entries from it, to avoid adding back an entry which we just removed). | ||
func (cc *controllerCacheStorage) Refresh(namespace string, groupResource schema.GroupResource, name string, controller *autoscalingapi.Scale, err error) { | ||
key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name} | ||
cc.mux.Lock() | ||
defer cc.mux.Unlock() | ||
old, ok := cc.cache[key] | ||
if !ok { | ||
return | ||
} | ||
now := now() | ||
// We refresh entries that are waiting to be removed. So when we refresh an | ||
// entry we mustn't change entries deleteAfter time (otherwise we risk never | ||
// removing entries that none is reading) | ||
cc.cache[key] = scaleCacheEntry{ | ||
refreshAfter: now.Add(cc.validityTime).Add(generateJitter(cc.refreshJitter)), | ||
deleteAfter: old.deleteAfter, | ||
resource: controller, | ||
err: err, | ||
} | ||
} | ||
|
||
// If the key is missing from the cache updates the cached value, error and refresh time (but not time to remove). | ||
// If key is in the cache it does nothing (to make sure updating element doesn't change its deleteAfter time). | ||
func (cc *controllerCacheStorage) Insert(namespace string, groupResource schema.GroupResource, name string, controller *autoscalingapi.Scale, err error) { | ||
key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name} | ||
cc.mux.Lock() | ||
defer cc.mux.Unlock() | ||
if _, ok := cc.cache[key]; ok { | ||
return | ||
} | ||
jitter := time.Duration(rand.Float64()*float64(cc.refreshJitter.Nanoseconds())) * time.Nanosecond | ||
now := now() | ||
cc.cache[key] = scaleCacheEntry{ | ||
refreshAfter: now.Add(cc.validityTime).Add(jitter), | ||
deleteAfter: now.Add(cc.lifeTime), | ||
resource: controller, | ||
err: err, | ||
} | ||
} | ||
|
||
// Removes entries from the cache which we didn't read in a while. | ||
func (cc *controllerCacheStorage) RemoveExpired() { | ||
cc.mux.Lock() | ||
defer cc.mux.Unlock() | ||
now := now() | ||
for k, v := range cc.cache { | ||
if now.After(v.deleteAfter) { | ||
delete(cc.cache, k) | ||
} | ||
} | ||
} | ||
|
||
// Returns a list of keys for which values need to be refreshed | ||
func (cc *controllerCacheStorage) GetKeysToRefresh() []scaleCacheKey { | ||
result := make([]scaleCacheKey, 0) | ||
cc.mux.Lock() | ||
defer cc.mux.Unlock() | ||
now := now() | ||
for k, v := range cc.cache { | ||
if now.After(v.refreshAfter) { | ||
result = append(result, k) | ||
} | ||
} | ||
return result | ||
} | ||
|
||
func newControllerCacheStorage(validityTime, refreshJitter, lifeTime time.Duration) controllerCacheStorage { | ||
return controllerCacheStorage{ | ||
cache: make(map[scaleCacheKey]scaleCacheEntry), | ||
validityTime: validityTime, | ||
refreshJitter: refreshJitter, | ||
lifeTime: lifeTime, | ||
} | ||
} |
217 changes: 217 additions & 0 deletions
217
...-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
/* | ||
Copyright 2020 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package controllerfetcher | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
autoscalingapi "k8s.io/api/autoscaling/v1" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
) | ||
|
||
func getKey(key string) scaleCacheKey { | ||
return scaleCacheKey{ | ||
namespace: "ns", | ||
groupResource: schema.GroupResource{ | ||
Group: "group", | ||
Resource: "resource", | ||
}, | ||
name: key, | ||
} | ||
} | ||
|
||
func getScale() *autoscalingapi.Scale { | ||
return &autoscalingapi.Scale{} | ||
} | ||
|
||
func TestControllerCache_InitiallyNotPresent(t *testing.T) { | ||
c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) | ||
key := getKey("foo") | ||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name) | ||
assert.False(t, present) | ||
} | ||
|
||
func TestControllerCache_Refresh_NotExisting(t *testing.T) { | ||
key := getKey("foo") | ||
c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) | ||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name) | ||
assert.False(t, present) | ||
|
||
// Refreshing key that isn't in the cache doesn't insert it | ||
c.Refresh(key.namespace, key.groupResource, key.name, getScale(), nil) | ||
present, _, _ = c.Get(key.namespace, key.groupResource, key.name) | ||
assert.False(t, present) | ||
} | ||
|
||
func TestControllerCache_Insert(t *testing.T) { | ||
key := getKey("foo") | ||
c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) | ||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name) | ||
assert.False(t, present) | ||
|
||
c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil) | ||
present, val, err := c.Get(key.namespace, key.groupResource, key.name) | ||
assert.True(t, present) | ||
assert.Equal(t, getScale(), val) | ||
assert.Nil(t, err) | ||
} | ||
|
||
func TestControllerCache_InsertAndRefresh(t *testing.T) { | ||
key := getKey("foo") | ||
c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) | ||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name) | ||
assert.False(t, present) | ||
|
||
c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil) | ||
present, val, err := c.Get(key.namespace, key.groupResource, key.name) | ||
assert.True(t, present) | ||
assert.Equal(t, getScale(), val) | ||
assert.Nil(t, err) | ||
|
||
c.Refresh(key.namespace, key.groupResource, key.name, nil, fmt.Errorf("err")) | ||
present, val, err = c.Get(key.namespace, key.groupResource, key.name) | ||
assert.True(t, present) | ||
assert.Nil(t, val) | ||
assert.Errorf(t, err, "err") | ||
} | ||
|
||
func TestControllerCache_InsertNoOverwrite(t *testing.T) { | ||
key := getKey("foo") | ||
c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) | ||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name) | ||
assert.False(t, present) | ||
|
||
c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil) | ||
present, val, err := c.Get(key.namespace, key.groupResource, key.name) | ||
assert.True(t, present) | ||
assert.Equal(t, getScale(), val) | ||
assert.Nil(t, err) | ||
|
||
// Doesn't overwrite previous values | ||
c.Insert(key.namespace, key.groupResource, key.name, nil, fmt.Errorf("err")) | ||
present, val, err = c.Get(key.namespace, key.groupResource, key.name) | ||
assert.True(t, present) | ||
assert.Equal(t, getScale(), val) | ||
assert.Nil(t, err) | ||
} | ||
|
||
func TestControllerCache_GetRefreshesDeleteAfter(t *testing.T) { | ||
oldNow := now | ||
defer func() { now = oldNow }() | ||
startTime := oldNow() | ||
timeNow := startTime | ||
now = func() time.Time { | ||
return timeNow | ||
} | ||
|
||
key := getKey("foo") | ||
c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) | ||
c.Insert(key.namespace, key.groupResource, key.name, nil, nil) | ||
assert.Equal(t, startTime.Add(10*time.Second), c.cache[key].deleteAfter) | ||
|
||
timeNow = startTime.Add(5 * time.Second) | ||
c.Get(key.namespace, key.groupResource, key.name) | ||
assert.Equal(t, startTime.Add(15*time.Second), c.cache[key].deleteAfter) | ||
} | ||
|
||
func assertTimeBetween(t *testing.T, got, expectAfter, expectBefore time.Time) { | ||
assert.True(t, got.After(expectAfter), "expected %v to be after %v", got, expectAfter) | ||
assert.False(t, got.After(expectBefore), "expected %v to not be after %v", got, expectBefore) | ||
} | ||
|
||
func TestControllerCache_GetChangesLifeTimeNotFreshness(t *testing.T) { | ||
oldNow := now | ||
defer func() { now = oldNow }() | ||
startTime := oldNow() | ||
timeNow := startTime | ||
now = func() time.Time { | ||
return timeNow | ||
} | ||
|
||
key := getKey("foo") | ||
c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) | ||
c.Insert(key.namespace, key.groupResource, key.name, nil, nil) | ||
cacheEntry := c.cache[key] | ||
// scheduled to delete 10s after insert | ||
assert.Equal(t, startTime.Add(10*time.Second), cacheEntry.deleteAfter) | ||
// scheduled to refresh (1-2)s after insert (with jitter) | ||
firstRefreshAfter := cacheEntry.refreshAfter | ||
assertTimeBetween(t, firstRefreshAfter, startTime.Add(time.Second), startTime.Add(2*time.Second)) | ||
|
||
timeNow = startTime.Add(5 * time.Second) | ||
c.Get(key.namespace, key.groupResource, key.name) | ||
cacheEntry = c.cache[key] | ||
// scheduled to delete 10s after get (15s after insert) | ||
assert.Equal(t, startTime.Add(15*time.Second), cacheEntry.deleteAfter) | ||
// refresh the same as before calling Get | ||
assert.Equal(t, firstRefreshAfter, cacheEntry.refreshAfter) | ||
} | ||
|
||
func TestControllerCache_GetKeysToRefresh(t *testing.T) { | ||
oldNow := now | ||
defer func() { now = oldNow }() | ||
startTime := oldNow() | ||
timeNow := startTime | ||
now = func() time.Time { | ||
return timeNow | ||
} | ||
|
||
key1 := getKey("foo") | ||
c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) | ||
c.Insert(key1.namespace, key1.groupResource, key1.name, nil, nil) | ||
cacheEntry := c.cache[key1] | ||
// scheduled to refresh (1-2)s after insert (with jitter) | ||
refreshAfter := cacheEntry.refreshAfter | ||
assertTimeBetween(t, refreshAfter, startTime.Add(time.Second), startTime.Add(2*time.Second)) | ||
|
||
timeNow = startTime.Add(5 * time.Second) | ||
key2 := getKey("bar") | ||
c.Insert(key2.namespace, key2.groupResource, key2.name, nil, nil) | ||
cacheEntry = c.cache[key2] | ||
// scheduled to refresh (1-2)s after insert (with jitter) | ||
refreshAfter = cacheEntry.refreshAfter | ||
assertTimeBetween(t, refreshAfter, startTime.Add(6*time.Second), startTime.Add(7*time.Second)) | ||
|
||
assert.ElementsMatch(t, []scaleCacheKey{key1}, c.GetKeysToRefresh()) | ||
} | ||
|
||
func TestControllerCache_Clear(t *testing.T) { | ||
oldNow := now | ||
defer func() { now = oldNow }() | ||
startTime := oldNow() | ||
timeNow := startTime | ||
now = func() time.Time { | ||
return timeNow | ||
} | ||
|
||
key1 := getKey("foo") | ||
c := newControllerCacheStorage(time.Second, time.Second, 10*time.Second) | ||
c.Insert(key1.namespace, key1.groupResource, key1.name, nil, nil) | ||
assert.Equal(t, startTime.Add(10*time.Second), c.cache[key1].deleteAfter) | ||
|
||
timeNow = startTime.Add(15 * time.Second) | ||
key2 := getKey("bar") | ||
c.Insert(key2.namespace, key2.groupResource, key2.name, nil, nil) | ||
assert.Equal(t, startTime.Add(25*time.Second), c.cache[key2].deleteAfter) | ||
|
||
c.RemoveExpired() | ||
assert.Equal(t, 1, len(c.cache)) | ||
assert.Contains(t, c.cache, key2) | ||
} |
Oops, something went wrong.