Skip to content

Commit

Permalink
Use controllerCacheStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
jbartosik committed Nov 23, 2020
1 parent bb9b23c commit 9d7898a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 28 deletions.
11 changes: 9 additions & 2 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ import (
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
)

const defaultResyncPeriod time.Duration = 10 * time.Minute
const (
scaleCacheLoopPeriod time.Duration = 7 * time.Second
scaleCacheEntryLifetime time.Duration = time.Hour
scaleCacheEntryFreshnessTime time.Duration = 10 * time.Minute
scaleCacheEntryJitterFactor float64 = 1.
defaultResyncPeriod time.Duration = 10 * time.Minute
)

// ClusterStateFeeder can update state of ClusterState object.
type ClusterStateFeeder interface {
Expand Down Expand Up @@ -108,7 +114,8 @@ func NewClusterStateFeeder(config *rest.Config, clusterState *model.ClusterState
kubeClient := kube_client.NewForConfigOrDie(config)
podLister, oomObserver := NewPodListerAndOOMObserver(kubeClient, namespace)
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(namespace))
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory)
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
controllerFetcher.Start(context.TODO(), scaleCacheLoopPeriod)
return ClusterStateFeederFactory{
PodLister: podLister,
OOMObserver: oomObserver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type scaleCacheKey struct {
groupResource schema.GroupResource
name string
}

type scaleCacheEntry struct {
refreshAfter time.Time
deleteAfter time.Time
Expand Down Expand Up @@ -133,8 +134,10 @@ func (cc *controllerCacheStorage) RemoveExpired() {
cc.mux.Lock()
defer cc.mux.Unlock()
now := now()
removed := 0
for k, v := range cc.cache {
if now.After(v.deleteAfter) {
removed += 1
delete(cc.cache, k)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

appsv1 "k8s.io/api/apps/v1"
autoscalingapi "k8s.io/api/autoscaling/v1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -79,43 +80,36 @@ type ControllerFetcher interface {
}

type controllerFetcher struct {
scaleNamespacer scale.ScalesGetter
mapper apimeta.RESTMapper
informersMap map[wellKnownController]cache.SharedIndexInformer
scaleNamespacer scale.ScalesGetter
mapper apimeta.RESTMapper
informersMap map[wellKnownController]cache.SharedIndexInformer
scaleSubresourceCacheStorage controllerCacheStorage
}

func (f *controllerFetcher) periodicallyRemoveExpired(ctx context.Context, period time.Duration) {
func (f *controllerFetcher) periodicallyRefreshCache(ctx context.Context, period time.Duration) {
for {
select {
case <-ctx.Done():
return
case <-time.After(period):
f.controllerCache.RemoveExpired()
}
}
}

func (f *controllerFetcher) periodicallyRefresh(ctx context.Context, period time.Duration) {
for {
select {
case <-ctx.Done():
return
case <-time.After(period):
for _, item := range f.controllerCache.GetKeysToRefresh() {
keysToRefresh := f.scaleSubresourceCacheStorage.GetKeysToRefresh()
klog.Info("Starting to refresh entries in controllerFetchers scaleSubresourceCacheStorage")
for _, item := range keysToRefresh {
scale, err := f.scaleNamespacer.Scales(item.namespace).Get(context.TODO(), item.groupResource, item.name, metav1.GetOptions{})
f.controllerCache.Refresh(item.namespace, item.groupResource, item.name, scale, err)
f.scaleSubresourceCacheStorage.Refresh(item.namespace, item.groupResource, item.name, scale, err)
}
klog.Infof("Finished refreshing %d entries in controllerFetchers scaleSubresourceCacheStorage", len(keysToRefresh))
f.scaleSubresourceCacheStorage.RemoveExpired()
}
}
}

func (f *controllerFetcher) Start(ctx context.Context, removePeriod, refreshPeriod time.Duration) {
go f.periodicallyRefresh(ctx, refreshPeriod)
go f.periodicallyRemoveExpired(ctx, removePeriod)
func (f *controllerFetcher) Start(ctx context.Context, loopPeriod time.Duration) {
go f.periodicallyRefreshCache(ctx, loopPeriod)
}

// NewControllerFetcher returns a new instance of controllerFetcher
func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) ControllerFetcher {
func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory, betweenRefreshes, lifeTime time.Duration, jitterFactor float64) *controllerFetcher {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
klog.Fatalf("Could not create discoveryClient: %v", err)
Expand Down Expand Up @@ -151,9 +145,10 @@ func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface,

scaleNamespacer := scale.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
return &controllerFetcher{
scaleNamespacer: scaleNamespacer,
mapper: mapper,
informersMap: informersMap,
scaleNamespacer: scaleNamespacer,
mapper: mapper,
informersMap: informersMap,
scaleSubresourceCacheStorage: newControllerCacheStorage(betweenRefreshes, lifeTime, jitterFactor),
}
}

Expand Down Expand Up @@ -278,6 +273,15 @@ func (f *controllerFetcher) isWellKnown(key *ControllerKeyWithAPIVersion) bool {
return exists
}

func (f *controllerFetcher) getScaleForResource(namespace string, groupResource schema.GroupResource, name string) (controller *autoscalingapi.Scale, err error) {
if ok, scale, err := f.scaleSubresourceCacheStorage.Get(namespace, groupResource, name); ok {
return scale, err
}
scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{})
f.scaleSubresourceCacheStorage.Insert(namespace, groupResource, name, scale, err)
return scale, err
}

func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersion) bool {
if f.isWellKnown(key) {
return true
Expand All @@ -301,7 +305,7 @@ func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersi

for _, mapping := range mappings {
groupResource := mapping.Resource.GroupResource()
scale, err := f.scaleNamespacer.Scales(key.Namespace).Get(context.TODO(), groupResource, key.Name, metav1.GetOptions{})
scale, err := f.getScaleForResource(key.Namespace, groupResource, key.Name)
if err == nil && scale != nil {
return true
}
Expand All @@ -323,7 +327,7 @@ func (f *controllerFetcher) getOwnerForScaleResource(groupKind schema.GroupKind,
var lastError error
for _, mapping := range mappings {
groupResource := mapping.Resource.GroupResource()
scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{})
scale, err := f.getScaleForResource(namespace, groupResource, name)
if err == nil {
return getOwnerController(scale.OwnerReferences, namespace), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var trueVar = true
func simpleControllerFetcher() *controllerFetcher {
f := controllerFetcher{}
f.informersMap = make(map[wellKnownController]cache.SharedIndexInformer)
f.scaleSubresourceCacheStorage = newControllerCacheStorage(time.Second, time.Minute, 0.1)
versioned := map[string][]metav1.APIResource{
"Foo": {{Kind: "Foo", Name: "bah", Group: "foo"}, {Kind: "Scale", Name: "iCanScale", Group: "foo"}},
}
Expand Down

0 comments on commit 9d7898a

Please sign in to comment.