Skip to content

Commit

Permalink
internal/dag: add logger to KubernetesCache
Browse files Browse the repository at this point in the history
Updates projectcontour#513

Add a logger to the KubernetesCache. This is useful at the moment to
spot failures to promulgate the projectcontour.HTTPLoadbalancer objects
through Contour.

This will also be useful to log cert validation failures, see projectcontour#513.

Signed-off-by: Dave Cheney <[email protected]>
  • Loading branch information
davecheney committed Aug 30, 2019
1 parent 45fbb08 commit 9ce7f4a
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 39 deletions.
70 changes: 34 additions & 36 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,41 +135,39 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
namespacedInformers = append(namespacedInformers, inf)
}

// step 3. establish our (poorly named) gRPC cache handler.
ch := &contour.CacheHandler{
ListenerVisitorConfig: contour.ListenerVisitorConfig{
UseProxyProto: ctx.useProxyProto,
HTTPAddress: ctx.httpAddr,
HTTPPort: ctx.httpPort,
HTTPAccessLog: ctx.httpAccessLog,
HTTPSAddress: ctx.httpsAddr,
HTTPSPort: ctx.httpsPort,
HTTPSAccessLog: ctx.httpsAccessLog,
MinimumProtocolVersion: dag.MinProtoVersion(ctx.TLSConfig.MinimumProtocolVersion),
},
ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort),
FieldLogger: log.WithField("context", "CacheHandler"),
IngressRouteStatus: &k8s.IngressRouteStatus{
Client: contourClient,
},
}

// step 4. wrap the cache handler in a k8s event handler.
// step 3. build our mammoth Kubernetes event handler.
eh := &contour.EventHandler{
CacheHandler: ch,
CacheHandler: &contour.CacheHandler{
ListenerVisitorConfig: contour.ListenerVisitorConfig{
UseProxyProto: ctx.useProxyProto,
HTTPAddress: ctx.httpAddr,
HTTPPort: ctx.httpPort,
HTTPAccessLog: ctx.httpAccessLog,
HTTPSAddress: ctx.httpsAddr,
HTTPSPort: ctx.httpsPort,
HTTPSAccessLog: ctx.httpsAccessLog,
MinimumProtocolVersion: dag.MinProtoVersion(ctx.TLSConfig.MinimumProtocolVersion),
},
ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort),
FieldLogger: log.WithField("context", "CacheHandler"),
IngressRouteStatus: &k8s.IngressRouteStatus{
Client: contourClient,
},
},
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
Builder: dag.Builder{
Source: dag.KubernetesCache{
IngressRouteRootNamespaces: ctx.ingressRouteRootNamespaces(),
IngressClass: ctx.ingressClass,
FieldLogger: log.WithField("context", "KubernetesCache"),
},
DisablePermitInsecure: ctx.DisablePermitInsecure,
},
FieldLogger: log.WithField("context", "contourEventHandler"),
}

// step 5. register our resource event handler with the k8s informers.
// step 4. register our resource event handler with the k8s informers.
coreInformers.Core().V1().Services().Informer().AddEventHandler(eh)
coreInformers.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eh)
contourInformers.Contour().V1beta1().IngressRoutes().Informer().AddEventHandler(eh)
Expand All @@ -183,30 +181,30 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
coreInformers.Core().V1().Secrets().Informer().AddEventHandler(eh)
}

// step 6. endpoints updates are handled directly by the EndpointsTranslator
// step 5. endpoints updates are handled directly by the EndpointsTranslator
// due to their high update rate and their orthogonal nature.
et := &contour.EndpointsTranslator{
FieldLogger: log.WithField("context", "endpointstranslator"),
}
coreInformers.Core().V1().Endpoints().Informer().AddEventHandler(et)

// step 7. setup workgroup runner and register informers.
// step 6. setup workgroup runner and register informers.
var g workgroup.Group
g.Add(startInformer(coreInformers, log.WithField("context", "coreinformers")))
g.Add(startInformer(contourInformers, log.WithField("context", "contourinformers")))
for _, inf := range namespacedInformers {
g.Add(startInformer(inf, log.WithField("context", "corenamespacedinformers")))
}

// step 8. register our event handler with the workgroup
// step 7. register our event handler with the workgroup
g.Add(eh.Start())

// step 9. setup prometheus registry and register base metrics.
// step 8. setup prometheus registry and register base metrics.
registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())

// step 10. create metrics service and register with workgroup.
// step 9. create metrics service and register with workgroup.
metricsvc := metrics.Service{
Service: httpsvc.Service{
Addr: ctx.metricsAddr,
Expand All @@ -218,7 +216,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
g.Add(metricsvc.Start)

// step 11. create debug service and register with workgroup.
// step 10. create debug service and register with workgroup.
debugsvc := debug.Service{
Service: httpsvc.Service{
Addr: ctx.debugAddr,
Expand All @@ -229,7 +227,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
g.Add(debugsvc.Start)

// step 12. Setup leader election
// step 11. Setup leader election

// leaderOK will block gRPC startup until it's closed.
leaderOK := make(chan struct{})
Expand Down Expand Up @@ -333,22 +331,22 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
return nil
})
// step 13. register our custom metrics and plumb into cache handler
// step 12. register our custom metrics and plumb into cache handler
// and resource event handler.
metrics := metrics.NewMetrics(registry)
ch.Metrics = metrics
eh.Metrics = metrics
eh.CacheHandler.Metrics = metrics

// step 14. create grpc handler and register with workgroup.
// This will block until the program becomes the leader.
g.Add(func(stop <-chan struct{}) error {
log := log.WithField("context", "grpc")
resources := map[string]cgrpc.Resource{
ch.ClusterCache.TypeURL(): &ch.ClusterCache,
ch.RouteCache.TypeURL(): &ch.RouteCache,
ch.ListenerCache.TypeURL(): &ch.ListenerCache,
et.TypeURL(): et,
ch.SecretCache.TypeURL(): &ch.SecretCache,
eh.CacheHandler.ClusterCache.TypeURL(): &eh.CacheHandler.ClusterCache,
eh.CacheHandler.RouteCache.TypeURL(): &eh.CacheHandler.RouteCache,
eh.CacheHandler.ListenerCache.TypeURL(): &eh.CacheHandler.ListenerCache,
eh.CacheHandler.SecretCache.TypeURL(): &eh.CacheHandler.SecretCache,
et.TypeURL(): et,
}
opts := ctx.grpcOptions()
s := cgrpc.NewAPI(log, resources, opts...)
Expand Down
1 change: 1 addition & 0 deletions internal/contour/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ func TestIngressRouteMetrics(t *testing.T) {
builder := dag.Builder{
Source: dag.KubernetesCache{
IngressRouteRootNamespaces: tc.rootNamespaces,
FieldLogger: testLogger(t),
},
}
for _, o := range tc.objs {
Expand Down
5 changes: 4 additions & 1 deletion internal/dag/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3099,7 +3099,8 @@ func TestBuilderLookupHTTPService(t *testing.T) {
t.Run(name, func(t *testing.T) {
b := Builder{
Source: KubernetesCache{
services: services,
services: services,
FieldLogger: testLogger(t),
},
}
b.reset()
Expand Down Expand Up @@ -3229,6 +3230,7 @@ func TestDAGRootNamespaces(t *testing.T) {
builder := Builder{
Source: KubernetesCache{
IngressRouteRootNamespaces: tc.rootNamespaces,
FieldLogger: testLogger(t),
},
}

Expand Down Expand Up @@ -3797,6 +3799,7 @@ func TestDAGIngressRouteStatus(t *testing.T) {
builder := Builder{
Source: KubernetesCache{
IngressRouteRootNamespaces: []string{"roots"},
FieldLogger: testLogger(t),
},
}
for _, o := range tc.objs {
Expand Down
5 changes: 5 additions & 0 deletions internal/dag/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/client-go/tools/cache"

ingressroutev1 "github.com/heptio/contour/apis/contour/v1beta1"
"github.com/sirupsen/logrus"
)

const DEFAULT_INGRESS_CLASS = "contour"
Expand All @@ -40,6 +41,8 @@ type KubernetesCache struct {
secrets map[Meta]*v1.Secret
delegations map[Meta]*ingressroutev1.TLSCertificateDelegation
services map[Meta]*v1.Service

logrus.FieldLogger
}

// Meta holds the name and namespace of a Kubernetes object.
Expand Down Expand Up @@ -107,6 +110,7 @@ func (kc *KubernetesCache) Insert(obj interface{}) bool {
return true
default:
// not an interesting object
kc.WithField("object", obj).Error("insert unknown object")
return false
}
}
Expand Down Expand Up @@ -157,6 +161,7 @@ func (kc *KubernetesCache) remove(obj interface{}) bool {
return ok
default:
// not interesting
kc.WithField("object", obj).Error("remove unknown object")
return false
}
}
Expand Down
24 changes: 22 additions & 2 deletions internal/dag/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

ingressroutev1 "github.com/heptio/contour/apis/contour/v1beta1"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -548,7 +549,9 @@ func TestKubernetesCacheInsert(t *testing.T) {

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
var cache KubernetesCache
cache := KubernetesCache{
FieldLogger: testLogger(t),
}
for _, p := range tc.pre {
cache.Insert(p)
}
Expand All @@ -562,7 +565,9 @@ func TestKubernetesCacheInsert(t *testing.T) {

func TestKubernetesCacheRemove(t *testing.T) {
cache := func(objs ...interface{}) *KubernetesCache {
var cache KubernetesCache
cache := KubernetesCache{
FieldLogger: testLogger(t),
}
for _, o := range objs {
cache.Insert(o)
}
Expand Down Expand Up @@ -694,3 +699,18 @@ func TestKubernetesCacheRemove(t *testing.T) {
})
}
}

func testLogger(t *testing.T) logrus.FieldLogger {
log := logrus.New()
log.Out = &testWriter{t}
return log
}

type testWriter struct {
*testing.T
}

func (t *testWriter) Write(buf []byte) (int, error) {
t.Logf("%s", buf)
return len(buf), nil
}

0 comments on commit 9ce7f4a

Please sign in to comment.