Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait for cache sync and DAG build before starting xDS server #5672

Merged
merged 21 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelogs/unreleased/5672-therealak12-minor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Contour now waits for the cache sync before starting the DAG rebuild and XDS server

Before this, we only waited for informer caches to sync but didn't wait for delivering the events to subscribed handlers.
Now contour waits for the initial list of Kubernetes objects to be cached and processed by handlers (using the returned `HasSynced` methods)
and then starts building its DAG and serving XDS.
109 changes: 67 additions & 42 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package main

import (
"context"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -25,6 +24,25 @@ import (

"github.com/alecthomas/kingpin/v2"
envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
networking_v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
ctrl_cache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
controller_runtime_metrics "sigs.k8s.io/controller-runtime/pkg/metrics"
gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

controller_runtime_metrics_server "sigs.k8s.io/controller-runtime/pkg/metrics/server"

contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1"
contour_api_v1alpha1 "github.com/projectcontour/contour/apis/projectcontour/v1alpha1"
"github.com/projectcontour/contour/internal/annotation"
Expand All @@ -46,22 +64,10 @@ import (
"github.com/projectcontour/contour/internal/xdscache"
xdscache_v3 "github.com/projectcontour/contour/internal/xdscache/v3"
"github.com/projectcontour/contour/pkg/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
networking_v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
ctrl_cache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
controller_runtime_metrics "sigs.k8s.io/controller-runtime/pkg/metrics"
controller_runtime_metrics_server "sigs.k8s.io/controller-runtime/pkg/metrics/server"
gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

const (
initialDagBuildPollPeriod = 100 * time.Millisecond
)

// registerServe registers the serve subcommand and flags
Expand Down Expand Up @@ -176,11 +182,12 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext)
}

type Server struct {
log logrus.FieldLogger
ctx *serveContext
coreClient *kubernetes.Clientset
mgr manager.Manager
registry *prometheus.Registry
log logrus.FieldLogger
ctx *serveContext
coreClient *kubernetes.Clientset
mgr manager.Manager
registry *prometheus.Registry
handlerCacheSyncs []cache.InformerSynced
}

// NewServer returns a Server object which contains the initial configuration
Expand Down Expand Up @@ -537,14 +544,24 @@ func (s *Server) doServe() error {
contourMetrics,
dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...),
)

hasSynced := func() bool {
for _, syncFunc := range s.handlerCacheSyncs {
if !syncFunc() {
return false
}
}
return true
}

contourHandler := contour.NewEventHandler(contour.EventHandlerConfig{
Logger: s.log.WithField("context", "contourEventHandler"),
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
Observer: observer,
StatusUpdater: sh.Writer(),
Builder: builder,
})
}, hasSynced)

// Wrap contourHandler in an EventRecorder which tracks API server events.
eventHandler := &contour.EventRecorder{
Expand All @@ -568,7 +585,7 @@ func (s *Server) doServe() error {

// Inform on the remaining resources.
for name, r := range informerResources {
if err := informOnResource(r, eventHandler, s.mgr.GetCache()); err != nil {
if err := s.informOnResource(r, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", name).Fatal("failed to create informer")
}
}
Expand All @@ -584,15 +601,15 @@ func (s *Server) doServe() error {
handler = k8s.NewNamespaceFilter(sets.List(secretNamespaces), eventHandler)
}

if err := informOnResource(&corev1.Secret{}, handler, s.mgr.GetCache()); err != nil {
if err := s.informOnResource(&corev1.Secret{}, handler); err != nil {
s.log.WithError(err).WithField("resource", "secrets").Fatal("failed to create informer")
}

// Inform on endpoints.
if err := informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{
if err := s.informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{
Next: endpointHandler,
Counter: contourMetrics.EventHandlerOperations,
}, s.mgr.GetCache()); err != nil {
}); err != nil {
s.log.WithError(err).WithField("resource", "endpoints").Fatal("failed to create informer")
}

Expand Down Expand Up @@ -646,7 +663,7 @@ func (s *Server) doServe() error {
handler = k8s.NewNamespaceFilter([]string{contourConfiguration.Envoy.Service.Namespace}, handler)
}

if err := informOnResource(&corev1.Service{}, handler, s.mgr.GetCache()); err != nil {
if err := s.informOnResource(&corev1.Service{}, handler); err != nil {
s.log.WithError(err).WithField("resource", "services").Fatal("failed to create informer")
}

Expand All @@ -657,11 +674,11 @@ func (s *Server) doServe() error {

xdsServer := &xdsServer{
log: s.log,
mgr: s.mgr,
registry: s.registry,
config: *contourConfiguration.XDSServer,
snapshotHandler: snapshotHandler,
resources: resources,
initialDagBuilt: contourHandler.HasBuiltInitialDag,
}
if err := s.mgr.Add(xdsServer); err != nil {
return err
Expand Down Expand Up @@ -830,11 +847,11 @@ func (s *Server) setupDebugService(debugConfig contour_api_v1alpha1.DebugConfig,

type xdsServer struct {
log logrus.FieldLogger
mgr manager.Manager
registry *prometheus.Registry
config contour_api_v1alpha1.XDSServerConfig
snapshotHandler *xdscache.SnapshotHandler
resources []xdscache.ResourceCache
initialDagBuilt func() bool
}

func (x *xdsServer) NeedLeaderElection() bool {
Expand All @@ -844,11 +861,13 @@ func (x *xdsServer) NeedLeaderElection() bool {
func (x *xdsServer) Start(ctx context.Context) error {
log := x.log.WithField("context", "xds")

log.Printf("waiting for informer caches to sync")
if !x.mgr.GetCache().WaitForCacheSync(ctx) {
return errors.New("informer cache failed to sync")
log.Printf("waiting for the initial dag to be built")
if err := wait.PollUntilContextCancel(ctx, initialDagBuildPollPeriod, true, func(ctx context.Context) (done bool, err error) {
return x.initialDagBuilt(), nil
}); err != nil {
return fmt.Errorf("failed to wait for initial dag build, %w", err)
}
log.Printf("informer caches synced")
log.Printf("the initial dag is built")

grpcServer := xds.NewServer(x.registry, grpcOptions(log, x.config.TLS)...)

Expand Down Expand Up @@ -953,12 +972,12 @@ func (s *Server) setupGatewayAPI(contourConfiguration contour_api_v1alpha1.Conto
// to process, we just need informers to get events.
case contourConfiguration.Gateway.GatewayRef != nil:
// Inform on GatewayClasses.
if err := informOnResource(&gatewayapi_v1beta1.GatewayClass{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&gatewayapi_v1beta1.GatewayClass{}, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", "gatewayclasses").Fatal("failed to create informer")
}

// Inform on Gateways.
if err := informOnResource(&gatewayapi_v1beta1.Gateway{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&gatewayapi_v1beta1.Gateway{}, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", "gateways").Fatal("failed to create informer")
}
// Otherwise, run the GatewayClass and Gateway controllers to determine
Expand Down Expand Up @@ -1029,12 +1048,12 @@ func (s *Server) setupGatewayAPI(contourConfiguration contour_api_v1alpha1.Conto
}

// Inform on ReferenceGrants.
if err := informOnResource(&gatewayapi_v1beta1.ReferenceGrant{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&gatewayapi_v1beta1.ReferenceGrant{}, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", "referencegrants").Fatal("failed to create informer")
}

// Inform on Namespaces.
if err := informOnResource(&corev1.Namespace{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&corev1.Namespace{}, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", "namespaces").Fatal("failed to create informer")
}
}
Expand Down Expand Up @@ -1197,12 +1216,18 @@ func (s *Server) getDAGBuilder(dbc dagBuilderConfig) *dag.Builder {
return builder
}

func informOnResource(obj client.Object, handler cache.ResourceEventHandler, cache ctrl_cache.Cache) error {
inf, err := cache.GetInformer(context.Background(), obj)
func (s *Server) informOnResource(obj client.Object, handler cache.ResourceEventHandler) error {
inf, err := s.mgr.GetCache().GetInformer(context.Background(), obj)
if err != nil {
return err
}

_, err = inf.AddEventHandler(handler)
return err
registration, err := inf.AddEventHandler(handler)

if err != nil {
return err
}

s.handlerCacheSyncs = append(s.handlerCacheSyncs, registration.HasSynced)
return nil
}
70 changes: 53 additions & 17 deletions internal/contour/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package contour
import (
"context"
"reflect"
"sync/atomic"
"time"

"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/k8s"
"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache/synctrack"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/k8s"
)

type EventHandlerConfig struct {
Expand Down Expand Up @@ -55,9 +59,14 @@ type EventHandler struct {
// seq is the sequence counter of the number of times
// an event has been received.
seq int

// syncTracker is used to update/query the status of the cache sync.
syncTracker *synctrack.SingleFileTracker
sunjayBhatia marked this conversation as resolved.
Show resolved Hide resolved

initialDagBuilt atomic.Bool
}

func NewEventHandler(config EventHandlerConfig) *EventHandler {
func NewEventHandler(config EventHandlerConfig, upstreamHasSynced cache.InformerSynced) *EventHandler {
return &EventHandler{
FieldLogger: config.Logger,
builder: config.Builder,
Expand All @@ -67,11 +76,13 @@ func NewEventHandler(config EventHandlerConfig) *EventHandler {
statusUpdater: config.StatusUpdater,
update: make(chan any),
sequence: make(chan int, 1),
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: upstreamHasSynced},
}
}

type opAdd struct {
obj any
obj any
isInInitialList bool
}

type opUpdate struct {
Expand All @@ -83,7 +94,10 @@ type opDelete struct {
}

func (e *EventHandler) OnAdd(obj any, isInInitialList bool) {
e.update <- opAdd{obj: obj}
if isInInitialList {
e.syncTracker.Start()
}
e.update <- opAdd{obj: obj, isInInitialList: isInInitialList}
}

func (e *EventHandler) OnUpdate(oldObj, newObj any) {
Expand All @@ -94,10 +108,15 @@ func (e *EventHandler) OnDelete(obj any) {
e.update <- opDelete{obj: obj}
}

// NeedLeaderElection is included to implement manager.LeaderElectionRunnable
func (e *EventHandler) NeedLeaderElection() bool {
return false
}

func (e *EventHandler) HasBuiltInitialDag() bool {
return e.initialDagBuilt.Load()
}

// Implements leadership.NeedLeaderElectionNotification
func (e *EventHandler) OnElectedLeader() {
// Trigger an update when we are elected leader to ensure resource
Expand Down Expand Up @@ -164,9 +183,37 @@ func (e *EventHandler) Start(ctx context.Context) error {
// not to process it.
e.incSequence()
}

// We're done processing this event
if updateOpAdd, ok := op.(opAdd); ok {
if updateOpAdd.isInInitialList {
e.syncTracker.Finished()
}
}
case <-pending:
// Ensure informer caches are synced.
// Schedule a retry for dag rebuild if cache is not synced yet.
// Note that we can't block and wait for the cache sync as it depends on progress of this loop.
if !e.syncTracker.HasSynced() {
e.Info("skipping dag rebuild as cache is not synced")
timer.Reset(e.holdoffDelay)
break
}

tsaarni marked this conversation as resolved.
Show resolved Hide resolved
e.WithField("last_update", time.Since(lastDAGRebuild)).WithField("outstanding", reset()).Info("performing delayed update")
e.rebuildDAG()

// Build a new DAG and sends it to the Observer.
latestDAG := e.builder.Build()
e.observer.OnChange(latestDAG)

// Allow XDS server to start (if it hasn't already).
e.initialDagBuilt.Store(true)

// Update the status on objects.
for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.statusUpdater.Send(upd)
}

e.incSequence()
lastDAGRebuild = time.Now()
case <-ctx.Done():
Expand Down Expand Up @@ -237,14 +284,3 @@ func (e *EventHandler) incSequence() {
default:
}
}

// rebuildDAG builds a new DAG and sends it to the Observer,
// the updates the status on objects, and updates the metrics.
func (e *EventHandler) rebuildDAG() {
latestDAG := e.builder.Build()
e.observer.OnChange(latestDAG)

for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.statusUpdater.Send(upd)
}
}
Loading