Skip to content

Commit

Permalink
internal/k8s: add check to verify resources exist in cluster
Browse files Browse the repository at this point in the history
Add a discovery client to look up server groups and resources which
allows for a check to be made that a resource type exists before
starting any informers against.

Fixes projectcontour#2219
  • Loading branch information
stevesloka committed Jul 8, 2020
1 parent bd79b57 commit 499b74e
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 13 deletions.
39 changes: 29 additions & 10 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,24 +234,38 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

// step 4. register our resource event handler with the k8s informers,
// using the SyncList to keep track of what to sync later.
var informerSyncList k8s.InformerSyncList
informerSyncList := k8s.InformerSyncList{
Clients: clients,
}

apiResourceList, err := clients.ServerGroupsAndResources()
if err != nil {
log.WithField("context", "ServerGroupsAndResources").Errorf("could not get list of API Resources: %v", err)
}

informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, k8s.DefaultResources()...)
if errors := informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, apiResourceList, k8s.DefaultResources()...); len(errors) > 0 {
log.WithField("InformOnResources", "DefaultResources").Warn(errors)
}

if ctx.UseExperimentalServiceAPITypes {
informerSyncList.InformOnResources(clusterInformerFactory,
dynamicHandler, k8s.ServiceAPIResources()...)
if errors := informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, apiResourceList, k8s.ServiceAPIResources()...); len(errors) > 0 {
log.WithField("InformOnResources", "ExperimentalServiceAPITypes").Warn(errors)
}
}

// TODO(youngnick): Move this logic out to internal/k8s/informers.go somehow.
// Add informers for each root namespace
for _, factory := range namespacedInformerFactories {
informerSyncList.InformOnResources(factory, dynamicHandler, k8s.SecretsResources()...)
for ns, factory := range namespacedInformerFactories {
if errors := informerSyncList.InformOnResources(factory, dynamicHandler, apiResourceList, k8s.SecretsResources()...); len(errors) > 0 {
log.WithField("InformOnResources.Namespaced", ns).Warn(errors)
}
}

// If root namespaces are not defined, then add the informer for all namespaces
if len(namespacedInformerFactories) == 0 {
informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, k8s.SecretsResources()...)
if errors := informerSyncList.InformOnResources(clusterInformerFactory, dynamicHandler, apiResourceList, k8s.SecretsResources()...); len(errors) > 0 {
log.WithField("InformOnResources", "secrets").Warn(errors)
}
}

// step 5. endpoints updates are handled directly by the EndpointsTranslator
Expand All @@ -260,15 +274,17 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
FieldLogger: log.WithField("context", "endpointstranslator"),
}

informerSyncList.InformOnResources(clusterInformerFactory,
if errors := informerSyncList.InformOnResources(clusterInformerFactory,
&k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Next: et,
Counter: eventHandler.Metrics.EventHandlerOperations,
},
Converter: converter,
Logger: log.WithField("context", "endpointstranslator"),
}, k8s.EndpointsResources()...)
}, apiResourceList, k8s.EndpointsResources()...); len(errors) > 0 {
log.WithField("InformOnResources.Namespaced", "endpointstranslator").Warn(errors)
}

// step 6. setup workgroup runner and register informers.
var g workgroup.Group
Expand Down Expand Up @@ -366,7 +382,10 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
Logger: log.WithField("context", "serviceStatusLoadBalancerWatcher"),
}
factory := clients.NewInformerFactoryForNamespace(ctx.EnvoyServiceNamespace)
informerSyncList.InformOnResources(factory, dynamicServiceHandler, k8s.ServicesResources()...)
if errors := informerSyncList.InformOnResources(factory, dynamicServiceHandler, apiResourceList, k8s.ServicesResources()...); len(errors) > 0 {
log.WithField("InformOnResources", "ServicesResources").Warn(errors)
}

g.Add(startInformer(factory, log.WithField("context", "serviceStatusLoadBalancerWatcher")))
log.WithField("envoy-service-name", ctx.EnvoyServiceName).
WithField("envoy-service-namespace", ctx.EnvoyServiceNamespace).
Expand Down
38 changes: 36 additions & 2 deletions internal/k8s/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
package k8s

import (
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/client-go/discovery"

"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
Expand All @@ -25,8 +30,9 @@ import (

// Clients holds the various API clients required by Contour.
type Clients struct {
core *kubernetes.Clientset
dynamic dynamic.Interface
core *kubernetes.Clientset
dynamic dynamic.Interface
discovery *discovery.DiscoveryClient
}

// NewClients returns a new set of the various API clients required
Expand All @@ -49,6 +55,11 @@ func NewClients(kubeconfig string, inCluster bool) (*Clients, error) {
return nil, err
}

clients.discovery, err = discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}

return &clients, nil
}

Expand Down Expand Up @@ -84,3 +95,26 @@ func (c *Clients) ClientSet() *kubernetes.Clientset {
func (c *Clients) DynamicClient() dynamic.Interface {
return c.dynamic
}

// ServerGroupsAndResources returns the set of API Resource object that exist in the
// Kubernetes cluster
func (c *Clients) ServerGroupsAndResources() ([]*metav1.APIResourceList, error) {
_, resources, err := c.discovery.ServerGroupsAndResources()
return resources, err
}

// ResourceKindExists returns true if an APIResource.Kind exists in the cluster.
// ex: groupVersion=projectcontour.io/v1 kind=httpproxies
func (c *Clients) ResourceKindExists(groupVersion, kind string, apiResourceList []*metav1.APIResourceList) (bool, error) {

for _, r := range apiResourceList {
if strings.EqualFold(r.GroupVersion, groupVersion) {
for _, list := range r.APIResources {
if strings.EqualFold(list.Name, kind) {
return true, nil
}
}
}
}
return false, nil
}
86 changes: 86 additions & 0 deletions internal/k8s/clients_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright Project Contour Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8s

import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/projectcontour/contour/internal/assert"
)

func TestResourceKindExists(t *testing.T) {
type testcase struct {
groupVersion, kind string
apiResourceList []*metav1.APIResourceList
want bool
wantError error
}

run := func(t *testing.T, name string, tc testcase) {
t.Helper()

t.Run(name, func(t *testing.T) {
t.Helper()

clients := &Clients{}
got, err := clients.ResourceKindExists(tc.groupVersion, tc.kind, tc.apiResourceList)

// Note we don't match error string values
// because the actual values come from Kubernetes
// internals and may not be stable.
if tc.wantError == nil && err != nil {
t.Errorf("wanted no error, got error %q", err)
}

if tc.wantError != nil && err == nil {
t.Errorf("wanted error %q, got no error", tc.wantError)
}

assert.Equal(t, tc.want, got)
})
}

run(t, "ingress exist", testcase{
groupVersion: "networking.k8s.io/v1beta1",
kind: "ingress",
want: true,
wantError: nil,
apiResourceList: []*metav1.APIResourceList{{
TypeMeta: metav1.TypeMeta{},
GroupVersion: "networking.k8s.io/v1beta1",
APIResources: []metav1.APIResource{{
Name: "ingress",
Namespaced: false,
}},
}},
})

run(t, "ingressclass does not exist", testcase{
groupVersion: "networking.k8s.io/v1beta1",
kind: "ingressclasses",
want: false,
wantError: nil,
apiResourceList: []*metav1.APIResourceList{{
TypeMeta: metav1.TypeMeta{},
GroupVersion: "v1",
APIResources: []metav1.APIResource{{
Name: "services",
Namespaced: false,
}},
}},
})

}
14 changes: 13 additions & 1 deletion internal/k8s/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,36 @@ package k8s
import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
)

// InformerSyncList holds the functions to call to check that an informer is synced.
type InformerSyncList struct {
syncers []cache.InformerSynced
Clients *Clients
}

// InformOnResources creates informers for each of the given resources and registers their sync callbacks.
func (sl *InformerSyncList) InformOnResources(f InformerFactory, handler *DynamicClientHandler, resources ...schema.GroupVersionResource) {
func (sl *InformerSyncList) InformOnResources(f InformerFactory, handler *DynamicClientHandler, apiResourceList []*metav1.APIResourceList, resources ...schema.GroupVersionResource) []error {
var errors []error

for _, r := range resources {
exists, _ := sl.Clients.ResourceKindExists(r.GroupVersion().String(), r.Resource, apiResourceList)

if !exists {
errors = append(errors, fmt.Errorf("GroupVersion: %s Kind: %s not found in ServerGroupsAndResources", r.GroupVersion().String(), r.Resource))
continue
}
informer := f.ForResource(r).Informer()
informer.AddEventHandler(handler)

sl.syncers = append(sl.syncers, informer.HasSynced)
}

return errors
}

// WaitForSync ensures that all the informers in the InformerSyncList are synced before returning.
Expand Down

0 comments on commit 499b74e

Please sign in to comment.