Skip to content

Commit

Permalink
Merge pull request #663 from djzager/cache-timeout
Browse files Browse the repository at this point in the history
✨ Take context when getting informer.
  • Loading branch information
k8s-ci-robot authored Feb 26, 2020
2 parents 0374b8c + a7df8ba commit 741745a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
14 changes: 14 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,20 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
err := informerCache.Get(context.Background(), svcKey, svc)
Expect(err).To(HaveOccurred())
})

It("should return an error when context is cancelled", func() {
By("creating a context and cancelling it")
ctx, cancel := context.WithCancel(context.Background())
cancel()

By("listing pods in test-namespace-1 with a cancelled context")
listObj := &kcorev1.PodList{}
err := informerCache.List(ctx, listObj, client.InNamespace(testNamespaceOne))

By("verifying that an error is returned")
Expect(err).To(HaveOccurred())
Expect(errors.IsTimeout(err)).To(BeTrue())
})
})
Context("with unstructured objects", func() {
It("should be able to list objects that haven't been watched previously", func() {
Expand Down
15 changes: 10 additions & 5 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out runt
return err
}

started, cache, err := ip.InformersMap.Get(gvk, out)
started, cache, err := ip.InformersMap.Get(ctx, gvk, out)
if err != nil {
return err
}
Expand All @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out runtime.Object, opts ...c
return err
}

started, cache, err := ip.InformersMap.Get(*gvk, cacheTypeObj)
started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj)
if err != nil {
return err
}
Expand Down Expand Up @@ -128,7 +128,6 @@ func (ip *informerCache) objectTypeForListObject(list runtime.Object) (*schema.G
}

return &gvk, cacheTypeObj, nil

}

// GetInformerForKind returns the informer for the GroupVersionKind
Expand All @@ -138,7 +137,10 @@ func (ip *informerCache) GetInformerForKind(gvk schema.GroupVersionKind) (Inform
if err != nil {
return nil, err
}
_, i, err := ip.InformersMap.Get(gvk, obj)

// TODO(djzager): before a context can be passed down, the Informers interface
// must be updated to accept a context when getting an informer
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
if err != nil {
return nil, err
}
Expand All @@ -151,7 +153,10 @@ func (ip *informerCache) GetInformer(obj runtime.Object) (Informer, error) {
if err != nil {
return nil, err
}
_, i, err := ip.InformersMap.Get(gvk, obj)

// TODO(djzager): before a context can be passed down, the Informers interface
// must be updated to accept a context when getting an informer
_, i, err := ip.InformersMap.Get(context.TODO(), gvk, obj)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package internal

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -79,16 +80,16 @@ func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {

// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList

if isUnstructured {
return m.unstructured.Get(gvk, obj)
return m.unstructured.Get(ctx, gvk, obj)
}

return m.structured.Get(gvk, obj)
return m.structured.Get(ctx, gvk, obj)
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
Expand Down
8 changes: 5 additions & 3 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package internal

import (
"context"
"fmt"
"math/rand"
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -163,7 +165,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
Expand All @@ -181,8 +183,8 @@ func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Obj

if started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
return started, nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}

Expand Down

0 comments on commit 741745a

Please sign in to comment.