Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Add delegate cache to handle caching of global objects (#3524)
Browse files Browse the repository at this point in the history
This allows for using both namespaced and global watches based on object
type.

Use as a drop-in replacement of the default manager cache, without
putting any GVK mappings in.
  • Loading branch information
Ulf Lilleengen authored Dec 3, 2019
1 parent 9cb6226 commit 539d4f7
Show file tree
Hide file tree
Showing 7 changed files with 559 additions and 1 deletion.
6 changes: 5 additions & 1 deletion cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
enmassescheme "github.com/enmasseproject/enmasse/pkg/client/clientset/versioned/scheme"
"k8s.io/client-go/kubernetes/scheme"

"github.com/enmasseproject/enmasse/pkg/cache"
"github.com/enmasseproject/enmasse/pkg/controller"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -42,7 +43,10 @@ func main() {
os.Exit(1)
}

mgr, err := manager.New(cfg, manager.Options{Namespace: namespace})
mgr, err := manager.New(cfg, manager.Options{
Namespace: namespace,
NewCache: cache.NewDelegateCacheBuilder(namespace),
})
if err != nil {
log.Error(err, "Failed to create manager")
os.Exit(1)
Expand Down
153 changes: 153 additions & 0 deletions pkg/cache/delegate_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright 2019, EnMasse authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package cache

import (
"context"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
rlog "sigs.k8s.io/controller-runtime/pkg/log"
)

var log = rlog.Log.WithName("delegate_cache")

// DelegateCacheBuilder - Builder function to create a new delegate cache. This cache allows
// mapping GVK to be served by a global cache while falling back to a default namespaced cache
// for other types.
func NewDelegateCacheBuilder(defaultNamespace string, globalGvks ...schema.GroupVersionKind) cache.NewCacheFunc {
return func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.Namespace = defaultNamespace
defaultCache, err := cache.New(config, opts)
if err != nil {
return nil, err
}

opts.Namespace = ""
globalCache, err := cache.New(config, opts)
if err != nil {
return nil, err
}

globalGvkMap := make(map[schema.GroupVersionKind]bool)
for _, gvk := range globalGvks {
globalGvkMap[gvk] = true
}
return &delegateCache{
defaultNamespace: defaultNamespace,
defaultCache: defaultCache,
globalCache: globalCache,
globalGvkMap: globalGvkMap,
Scheme: opts.Scheme}, nil
}
}

type delegateCache struct {
defaultNamespace string
defaultCache cache.Cache
globalCache cache.Cache
globalGvkMap map[schema.GroupVersionKind]bool
Scheme *runtime.Scheme
}

var _ cache.Cache = &delegateCache{}

// Methods for delegateCache to conform to the Informers interface
func (c *delegateCache) GetInformer(obj runtime.Object) (cache.Informer, error) {
gvk, err := apiutil.GVKForObject(obj, c.Scheme)
if err != nil {
return nil, err
}

if c.globalGvkMap[gvk] {
return c.globalCache.GetInformer(obj)
} else {
return c.defaultCache.GetInformer(obj)
}
}

func (c *delegateCache) GetInformerForKind(gvk schema.GroupVersionKind) (cache.Informer, error) {
if c.globalGvkMap[gvk] {
return c.globalCache.GetInformerForKind(gvk)
} else {
return c.defaultCache.GetInformerForKind(gvk)
}

}

func (c *delegateCache) Start(stopCh <-chan struct{}) error {
startFn := func(cacheType string, cache cache.Cache) {
err := cache.Start(stopCh)
if err != nil {
log.Error(err, "delegate cache failed to start cache", "cacheType", cacheType)
}
}

go startFn("namespaced", c.defaultCache)
go startFn("global", c.globalCache)

<-stopCh
return nil
}

func (c *delegateCache) WaitForCacheSync(stop <-chan struct{}) bool {
synced := true
if s := c.defaultCache.WaitForCacheSync(stop); !s {
synced = s
}

if s := c.globalCache.WaitForCacheSync(stop); !s {
synced = s
}
return synced
}

func (c *delegateCache) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
gvk, err := apiutil.GVKForObject(obj, c.Scheme)
if err != nil {
return err
}
if c.globalGvkMap[gvk] {
if err := c.globalCache.IndexField(obj, field, extractValue); err != nil {
return err
}
} else {
if err := c.defaultCache.IndexField(obj, field, extractValue); err != nil {
return err
}
}
return nil
}

func (c *delegateCache) Get(ctx context.Context, key client.ObjectKey, obj runtime.Object) error {
gvk, err := apiutil.GVKForObject(obj, c.Scheme)
if err != nil {
return err
}

if c.globalGvkMap[gvk] {
return c.globalCache.Get(ctx, key, obj)
} else {
return c.defaultCache.Get(ctx, key, obj)
}
}

func (c *delegateCache) List(ctx context.Context, list runtime.Object, opts ...client.ListOption) error {
gvk, err := apiutil.GVKForObject(list, c.Scheme)
if err != nil {
return err
}

if c.globalGvkMap[gvk] {
return c.globalCache.List(ctx, list, opts...)
} else {
return c.defaultCache.List(ctx, list, opts...)
}
}
70 changes: 70 additions & 0 deletions pkg/cache/delegate_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2019, EnMasse authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/

package cache

import (
"reflect"
"testing"
"time"

userv1beta1 "github.com/enmasseproject/enmasse/pkg/apis/user/v1beta1"
userv1beta1informers "github.com/enmasseproject/enmasse/pkg/client/informers/externalversions/user/v1beta1"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/client-go/kubernetes/scheme"
toolscache "k8s.io/client-go/tools/cache"

fake "sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

func getGvk(t *testing.T, s *runtime.Scheme, obj runtime.Object) schema.GroupVersionKind {
gvk, err := apiutil.GVKForObject(obj, s)
if err != nil {
t.Fatal("error creating GVK", err)
}
return gvk
}

func TestDelegation(t *testing.T) {
s := scheme.Scheme
user := &userv1beta1.MessagingUser{}
s.AddKnownTypes(userv1beta1.SchemeGroupVersion, user)
userGvk := getGvk(t, s, user)

localCache := fake.FakeInformers{InformersByGVK: make(map[schema.GroupVersionKind]toolscache.SharedIndexInformer)}
globalCache := fake.FakeInformers{InformersByGVK: make(map[schema.GroupVersionKind]toolscache.SharedIndexInformer)}

// Set up mocks so that global cache returns something else
globalCache.InformersByGVK[userGvk] = userv1beta1informers.NewMessagingUserInformer(nil, "", time.Second*1, nil)

cache := delegateCache{
defaultNamespace: "test",
defaultCache: &localCache,
globalCache: &globalCache,
globalGvkMap: make(map[schema.GroupVersionKind]bool),
Scheme: s,
}

localInformer, err := cache.GetInformer(user)
if err != nil {
t.Fatal("Error getting informer", err)
}

if reflect.TypeOf(localInformer).String() != "*controllertest.FakeInformer" {
t.Error("Local informer is not the right type:", reflect.TypeOf(localInformer).String())
}

// Register MessagingUser in the global GVK map
cache.globalGvkMap[userGvk] = true

globalInformer, err := cache.GetInformer(user)
if reflect.TypeOf(globalInformer).String() != "*cache.sharedIndexInformer" {
t.Error("Global informer is not the right type:", reflect.TypeOf(globalInformer).String())
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 539d4f7

Please sign in to comment.