Skip to content

Commit

Permalink
Use controllerCacheStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
jbartosik committed Oct 28, 2020
1 parent 1ffc3c5 commit 70799e8
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
11 changes: 9 additions & 2 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ import (
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
)

const defaultResyncPeriod time.Duration = 10 * time.Minute
const (
scaleCacheLoopPeriod time.Duration = time.Hour
scaleCacheCleanup time.Duration = time.Hour
scaleCacheRefresh time.Duration = 10 * time.Minute
scaleRefreshJitter time.Duration = time.Minute
defaultResyncPeriod time.Duration = 10 * time.Minute
)

// ClusterStateFeeder can update state of ClusterState object.
type ClusterStateFeeder interface {
Expand Down Expand Up @@ -108,7 +114,8 @@ func NewClusterStateFeeder(config *rest.Config, clusterState *model.ClusterState
kubeClient := kube_client.NewForConfigOrDie(config)
podLister, oomObserver := NewPodListerAndOOMObserver(kubeClient, namespace)
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(namespace))
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory)
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheRefresh, scaleRefreshJitter, scaleCacheCleanup)
controllerFetcher.Start(context.TODO(), scaleCacheLoopPeriod, scaleCacheLoopPeriod)
return ClusterStateFeederFactory{
PodLister: podLister,
OOMObserver: oomObserver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

autoscalingapi "k8s.io/api/autoscaling/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog"
)

// Allows tests to inject their time.
Expand All @@ -33,6 +34,7 @@ type scaleCacheKey struct {
groupResource schema.GroupResource
name string
}

type scaleCacheEntry struct {
refreshAfter time.Time
deleteAfter time.Time
Expand Down Expand Up @@ -121,14 +123,18 @@ func (cc *controllerCacheStorage) Insert(namespace string, groupResource schema.

// Removes entries from the cache which we didn't read in a while.
func (cc *controllerCacheStorage) RemoveExpired() {
klog.Infof("Removing entries from controllerCacheStorage")
cc.mux.Lock()
defer cc.mux.Unlock()
now := now()
removed := 0
for k, v := range cc.cache {
if now.After(v.deleteAfter) {
removed += 1
delete(cc.cache, k)
}
}
klog.Infof("Removed %d entries from controllerCacheStorage", removed)
}

// Returns a list of keys for which values need to be refreshed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

appsv1 "k8s.io/api/apps/v1"
autoscalingapi "k8s.io/api/autoscaling/v1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -82,9 +83,10 @@ type controllerFetcher struct {
scaleNamespacer scale.ScalesGetter
mapper apimeta.RESTMapper
informersMap map[wellKnownController]cache.SharedIndexInformer
controllerCache controllerCacheStorage
}

func (f *controllerFetcher) periodicallyRemoveExpired(ctx context.Context, period time.Duration) {
func (f *controllerFetcher) periodicallyRemoveExpiredFromCache(ctx context.Context, period time.Duration) {
for {
select {
case <-ctx.Done():
Expand All @@ -95,27 +97,30 @@ func (f *controllerFetcher) periodicallyRemoveExpired(ctx context.Context, perio
}
}

func (f *controllerFetcher) periodicallyRefresh(ctx context.Context, period time.Duration) {
func (f *controllerFetcher) periodicallyRefreshCache(ctx context.Context, period time.Duration) {
for {
select {
case <-ctx.Done():
return
case <-time.After(period):
for _, item := range f.controllerCache.GetKeysToRefresh() {
keysToReferesh := f.controllerCache.GetKeysToRefresh()
klog.Info("Starting to refresh entries in controllerFetchers controllerCache")
for _, item := range keysToReferesh {
scale, err := f.scaleNamespacer.Scales(item.namespace).Get(context.TODO(), item.groupResource, item.name, metav1.GetOptions{})
f.controllerCache.Refresh(item.namespace, item.groupResource, item.name, scale, err)
}
klog.Infof("Finished refreshing %d entries in controllerFetchers controllerCache", len(keysToReferesh))
}
}
}

func (f *controllerFetcher) Start(ctx context.Context, removePeriod, refreshPeriod time.Duration) {
go f.periodicallyRefresh(ctx, refreshPeriod)
go f.periodicallyRemoveExpired(ctx, removePeriod)
go f.periodicallyRefreshCache(ctx, refreshPeriod)
go f.periodicallyRemoveExpiredFromCache(ctx, removePeriod)
}

// NewControllerFetcher returns a new instance of controllerFetcher
func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) ControllerFetcher {
func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory, betweenRefreshes, refreshJitter, lifeTime time.Duration) *controllerFetcher {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
klog.Fatalf("Could not create discoveryClient: %v", err)
Expand Down Expand Up @@ -154,6 +159,7 @@ func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface,
scaleNamespacer: scaleNamespacer,
mapper: mapper,
informersMap: informersMap,
controllerCache: newControllerCacheStorage(betweenRefreshes, refreshJitter, lifeTime),
}
}

Expand Down Expand Up @@ -278,6 +284,15 @@ func (f *controllerFetcher) isWellKnown(key *ControllerKeyWithAPIVersion) bool {
return exists
}

func (f *controllerFetcher) getScaleForResource(namespace string, groupResource schema.GroupResource, name string) (controller *autoscalingapi.Scale, err error) {
if ok, scale, err := f.controllerCache.Get(namespace, groupResource, name); ok {
return scale, err
}
scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{})
f.controllerCache.Insert(namespace, groupResource, name, scale, err)
return scale, err
}

func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersion) bool {
if f.isWellKnown(key) {
return true
Expand All @@ -301,7 +316,7 @@ func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersi

for _, mapping := range mappings {
groupResource := mapping.Resource.GroupResource()
scale, err := f.scaleNamespacer.Scales(key.Namespace).Get(context.TODO(), groupResource, key.Name, metav1.GetOptions{})
scale, err := f.getScaleForResource(key.Namespace, groupResource, key.Name)
if err == nil && scale != nil {
return true
}
Expand All @@ -323,7 +338,7 @@ func (f *controllerFetcher) getOwnerForScaleResource(groupKind schema.GroupKind,
var lastError error
for _, mapping := range mappings {
groupResource := mapping.Resource.GroupResource()
scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{})
scale, err := f.getScaleForResource(namespace, groupResource, name)
if err == nil {
return getOwnerController(scale.OwnerReferences, namespace), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var trueVar = true
func simpleControllerFetcher() *controllerFetcher {
f := controllerFetcher{}
f.informersMap = make(map[wellKnownController]cache.SharedIndexInformer)
f.controllerCache = newControllerCacheStorage(time.Second, time.Second, time.Minute)
versioned := map[string][]metav1.APIResource{
"Foo": {{Kind: "Foo", Name: "bah", Group: "foo"}, {Kind: "Scale", Name: "iCanScale", Group: "foo"}},
}
Expand Down

0 comments on commit 70799e8

Please sign in to comment.