diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 9b7a69a95498..cf9b11bc89a6 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -49,7 +49,13 @@ import ( resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1" ) -const defaultResyncPeriod time.Duration = 10 * time.Minute +const ( + scaleCacheLoopPeriod time.Duration = time.Hour + scaleCacheCleanup time.Duration = time.Hour + scaleCacheRefresh time.Duration = 10 * time.Minute + scaleRefreshJitter time.Duration = time.Minute + defaultResyncPeriod time.Duration = 10 * time.Minute +) // ClusterStateFeeder can update state of ClusterState object. type ClusterStateFeeder interface { @@ -108,7 +114,8 @@ func NewClusterStateFeeder(config *rest.Config, clusterState *model.ClusterState kubeClient := kube_client.NewForConfigOrDie(config) podLister, oomObserver := NewPodListerAndOOMObserver(kubeClient, namespace) factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(namespace)) - controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory) + controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheRefresh, scaleRefreshJitter, scaleCacheCleanup) + controllerFetcher.Start(context.TODO(), scaleCacheLoopPeriod, scaleCacheLoopPeriod) return ClusterStateFeederFactory{ PodLister: podLister, OOMObserver: oomObserver, diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go index 36393db25350..2240b5656c74 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_cache_storage.go @@ -23,6 +23,7 @@ import ( autoscalingapi "k8s.io/api/autoscaling/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog" ) // Allows tests to inject their time. @@ -33,6 +34,7 @@ type scaleCacheKey struct { groupResource schema.GroupResource name string } + type scaleCacheEntry struct { refreshAfter time.Time deleteAfter time.Time @@ -121,14 +123,18 @@ func (cc *controllerCacheStorage) Insert(namespace string, groupResource schema. // Removes entries from the cache which we didn't read in a while. func (cc *controllerCacheStorage) RemoveExpired() { + klog.Infof("Removing entries from controllerCacheStorage") cc.mux.Lock() defer cc.mux.Unlock() now := now() + removed := 0 for k, v := range cc.cache { if now.After(v.deleteAfter) { + removed += 1 delete(cc.cache, k) } } + klog.Infof("Removed %d entries from controllerCacheStorage", removed) } // Returns a list of keys for which values need to be refreshed diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go index ac23080d26f7..2d1121ac2a39 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher.go @@ -22,6 +22,7 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" + autoscalingapi "k8s.io/api/autoscaling/v1" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" @@ -82,9 +83,10 @@ type controllerFetcher struct { scaleNamespacer scale.ScalesGetter mapper apimeta.RESTMapper informersMap map[wellKnownController]cache.SharedIndexInformer + controllerCache controllerCacheStorage } -func (f *controllerFetcher) periodicallyRemoveExpired(ctx context.Context, period time.Duration) { +func (f *controllerFetcher) periodicallyRemoveExpiredFromCache(ctx context.Context, period time.Duration) { for { select { case <-ctx.Done(): @@ -95,27 +97,30 @@ func (f *controllerFetcher) periodicallyRemoveExpired(ctx context.Context, perio } } -func (f *controllerFetcher) periodicallyRefresh(ctx context.Context, period time.Duration) { +func (f *controllerFetcher) periodicallyRefreshCache(ctx context.Context, period time.Duration) { for { select { case <-ctx.Done(): return case <-time.After(period): - for _, item := range f.controllerCache.GetKeysToRefresh() { + keysToReferesh := f.controllerCache.GetKeysToRefresh() + klog.Info("Starting to refresh entries in controllerFetchers controllerCache") + for _, item := range keysToReferesh { scale, err := f.scaleNamespacer.Scales(item.namespace).Get(context.TODO(), item.groupResource, item.name, metav1.GetOptions{}) f.controllerCache.Refresh(item.namespace, item.groupResource, item.name, scale, err) } + klog.Infof("Finished refreshing %d entries in controllerFetchers controllerCache", len(keysToReferesh)) } } } func (f *controllerFetcher) Start(ctx context.Context, removePeriod, refreshPeriod time.Duration) { - go f.periodicallyRefresh(ctx, refreshPeriod) - go f.periodicallyRemoveExpired(ctx, removePeriod) + go f.periodicallyRefreshCache(ctx, refreshPeriod) + go f.periodicallyRemoveExpiredFromCache(ctx, removePeriod) } // NewControllerFetcher returns a new instance of controllerFetcher -func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) ControllerFetcher { +func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory, betweenRefreshes, refreshJitter, lifeTime time.Duration) *controllerFetcher { discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { klog.Fatalf("Could not create discoveryClient: %v", err) @@ -154,6 +159,7 @@ func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, scaleNamespacer: scaleNamespacer, mapper: mapper, informersMap: informersMap, + controllerCache: newControllerCacheStorage(betweenRefreshes, refreshJitter, lifeTime), } } @@ -278,6 +284,15 @@ func (f *controllerFetcher) isWellKnown(key *ControllerKeyWithAPIVersion) bool { return exists } +func (f *controllerFetcher) getScaleForResource(namespace string, groupResource schema.GroupResource, name string) (controller *autoscalingapi.Scale, err error) { + if ok, scale, err := f.controllerCache.Get(namespace, groupResource, name); ok { + return scale, err + } + scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{}) + f.controllerCache.Insert(namespace, groupResource, name, scale, err) + return scale, err +} + func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersion) bool { if f.isWellKnown(key) { return true @@ -301,7 +316,7 @@ func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersi for _, mapping := range mappings { groupResource := mapping.Resource.GroupResource() - scale, err := f.scaleNamespacer.Scales(key.Namespace).Get(context.TODO(), groupResource, key.Name, metav1.GetOptions{}) + scale, err := f.getScaleForResource(key.Namespace, groupResource, key.Name) if err == nil && scale != nil { return true } @@ -323,7 +338,7 @@ func (f *controllerFetcher) getOwnerForScaleResource(groupKind schema.GroupKind, var lastError error for _, mapping := range mappings { groupResource := mapping.Resource.GroupResource() - scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{}) + scale, err := f.getScaleForResource(namespace, groupResource, name) if err == nil { return getOwnerController(scale.OwnerReferences, namespace), nil } diff --git a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher_test.go b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher_test.go index a6f827283687..8bcd298c59c2 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/controller_fetcher/controller_fetcher_test.go @@ -44,6 +44,7 @@ var trueVar = true func simpleControllerFetcher() *controllerFetcher { f := controllerFetcher{} f.informersMap = make(map[wellKnownController]cache.SharedIndexInformer) + f.controllerCache = newControllerCacheStorage(time.Second, time.Second, time.Minute) versioned := map[string][]metav1.APIResource{ "Foo": {{Kind: "Foo", Name: "bah", Group: "foo"}, {Kind: "Scale", Name: "iCanScale", Group: "foo"}}, }