diff --git a/pkg/internal/httpserver/server.go b/pkg/internal/httpserver/server.go new file mode 100644 index 0000000000..b5f91f18e0 --- /dev/null +++ b/pkg/internal/httpserver/server.go @@ -0,0 +1,16 @@ +package httpserver + +import ( + "net/http" + "time" +) + +// New returns a new server with sane defaults. +func New(handler http.Handler) *http.Server { + return &http.Server{ + Handler: handler, + MaxHeaderBytes: 1 << 20, + IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout + ReadHeaderTimeout: 32 * time.Second, + } +} diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index cd01715b4e..d10109df5a 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "github.com/go-logr/logr" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -40,6 +42,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/internal/httpserver" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" @@ -61,16 +64,15 @@ const ( var _ Runnable = &controllerManager{} type controllerManager struct { - // cluster holds a variety of methods to interact with a cluster. Required. - cluster cluster.Cluster + sync.Mutex + started bool - // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. - // These Runnables are managed by lead election. - leaderElectionRunnables []Runnable + stopProcedureEngaged *int64 + errChan chan error + runnables *runnables - // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. - // These Runnables will not be blocked by lead election. - nonLeaderElectionRunnables []Runnable + // cluster holds a variety of methods to interact with a cluster. Required. + cluster cluster.Cluster // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). @@ -104,12 +106,6 @@ type controllerManager struct { // Healthz probe handler healthzHandler *healthz.Handler - mu sync.Mutex - started bool - startedLeader bool - healthzStarted bool - errChan chan error - // controllerOptions are the global controller options. controllerOptions v1alpha1.ControllerConfigurationSpec @@ -117,25 +113,20 @@ type controllerManager struct { // If none is set, it defaults to log.Log global logger. logger logr.Logger - // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, - // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that - // it must be deferred until after gracefulShutdown is done. - leaderElectionCancel context.CancelFunc - // leaderElectionStopped is an internal channel used to signal the stopping procedure that the // LeaderElection.Run(...) function has returned and the shutdown can proceed. leaderElectionStopped chan struct{} - // stop procedure engaged. In other words, we should not add anything else to the manager - stopProcedureEngaged bool + // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, + // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that + // it must be deferred until after gracefulShutdown is done. + leaderElectionCancel context.CancelFunc // elected is closed when this manager becomes the leader of a group of // managers, either because it won a leader election or because no leader // election was configured. elected chan struct{} - caches []hasCache - // port is the port that the webhook server serves at. port int // host is the hostname that the webhook server binds to. @@ -160,10 +151,6 @@ type controllerManager struct { // between tries of actions. retryPeriod time.Duration - // waitForRunnable is holding the number of runnables currently running so that - // we can wait for them to exit before quitting the manager - waitForRunnable sync.WaitGroup - // gracefulShutdownTimeout is the duration given to runnable to stop // before the manager actually returns on stop. gracefulShutdownTimeout time.Duration @@ -192,42 +179,17 @@ type hasCache interface { // Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { - cm.mu.Lock() - defer cm.mu.Unlock() - if cm.stopProcedureEngaged { - return errors.New("can't accept new runnable as stop procedure is already engaged") - } + cm.Lock() + defer cm.Unlock() + return cm.add(r) +} +func (cm *controllerManager) add(r Runnable) error { // Set dependencies on the object if err := cm.SetFields(r); err != nil { return err } - - var shouldStart bool - - // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { - shouldStart = cm.started - cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) - } else if hasCache, ok := r.(hasCache); ok { - cm.caches = append(cm.caches, hasCache) - if cm.started { - cm.startRunnable(hasCache) - if !hasCache.GetCache().WaitForCacheSync(cm.internalCtx) { - return fmt.Errorf("could not sync cache") - } - } - } else { - shouldStart = cm.startedLeader - cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) - } - - if shouldStart { - // If already started, start the controller - cm.startRunnable(r) - } - - return nil + return cm.runnables.Add(r, nil) } // Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10. @@ -250,13 +212,17 @@ func (cm *controllerManager) SetFields(i interface{}) error { // AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics. func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error { + cm.Lock() + defer cm.Unlock() + + if cm.started { + return fmt.Errorf("uunable to add new metrics handler because metrics endpoint has already been created") + } + if path == defaultMetricsEndpoint { return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint) } - cm.mu.Lock() - defer cm.mu.Unlock() - if _, found := cm.metricsExtraHandlers[path]; found { return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path) } @@ -268,14 +234,10 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha // AddHealthzCheck allows you to add Healthz checker. func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error { - cm.mu.Lock() - defer cm.mu.Unlock() - - if cm.stopProcedureEngaged { - return errors.New("can't accept new healthCheck as stop procedure is already engaged") - } + cm.Lock() + defer cm.Unlock() - if cm.healthzStarted { + if cm.started { return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } @@ -289,15 +251,11 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) // AddReadyzCheck allows you to add Readyz checker. func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error { - cm.mu.Lock() - defer cm.mu.Unlock() + cm.Lock() + defer cm.Unlock() - if cm.stopProcedureEngaged { - return errors.New("can't accept new ready check as stop procedure is already engaged") - } - - if cm.healthzStarted { - return fmt.Errorf("unable to add new checker because readyz endpoint has already been created") + if cm.started { + return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } if cm.readyzHandler == nil { @@ -350,7 +308,7 @@ func (cm *controllerManager) GetWebhookServer() *webhook.Server { } } if err := cm.Add(cm.webhookServer); err != nil { - panic("unable to add webhook server to the controller manager") + panic(fmt.Sprintf("unable to add webhook server to the controller manager: %s", err)) } }) return cm.webhookServer @@ -371,77 +329,89 @@ func (cm *controllerManager) serveMetrics() { // TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics mux := http.NewServeMux() mux.Handle(defaultMetricsEndpoint, handler) - - func() { - cm.mu.Lock() - defer cm.mu.Unlock() - - for path, extraHandler := range cm.metricsExtraHandlers { - mux.Handle(path, extraHandler) - } - }() - - server := http.Server{ - Handler: mux, + for path, extraHandler := range cm.metricsExtraHandlers { + mux.Handle(path, extraHandler) } - // Run the server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { - cm.logger.Info("Starting metrics server", "path", defaultMetricsEndpoint) - if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { - return err - } - return nil - })) - // Shutdown the server when stop is closed - <-cm.internalProceduresStop - if err := server.Shutdown(cm.shutdownCtx); err != nil { - cm.errChan <- err - } + server := httpserver.New(mux) + go cm.httpServe("metrics", cm.logger.WithValues("path", defaultMetricsEndpoint), server, cm.metricsListener) } func (cm *controllerManager) serveHealthProbes() { mux := http.NewServeMux() - server := http.Server{ - Handler: mux, + server := httpserver.New(mux) + + if cm.readyzHandler != nil { + mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) + // Append '/' suffix to handle subpaths + mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) + } + if cm.healthzHandler != nil { + mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) + // Append '/' suffix to handle subpaths + mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) } - func() { - cm.mu.Lock() - defer cm.mu.Unlock() + go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener) +} - if cm.readyzHandler != nil { - mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) - // Append '/' suffix to handle subpaths - mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) - } - if cm.healthzHandler != nil { - mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) - // Append '/' suffix to handle subpaths - mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) - } +func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) { + log = log.WithValues("kind", kind, "addr", ln.Addr()) - // Run server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { - if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { - return err + go func() { + log.Info("Starting server") + if err := server.Serve(ln); err != nil { + if errors.Is(err, http.ErrServerClosed) { + return } - return nil - })) - cm.healthzStarted = true + if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 { + // There might be cases where connections are still open and we try to shutdown + // but not having enough time to close the connection causes an error in Serve + // + // In that case we want to avoid returning an error to the main error channel. + log.Error(err, "error on Serve after stop has been engaged") + return + } + cm.errChan <- err + } }() - // Shutdown the server when stop is closed + // Shutdown the server when stop is closed. <-cm.internalProceduresStop if err := server.Shutdown(cm.shutdownCtx); err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + // Avoid logging context related errors. + return + } + if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 { + cm.logger.Error(err, "error on Shutdown after stop has been engaged") + return + } cm.errChan <- err } } +// Start starts the manager and waits indefinitely. +// There is only two ways to have start return: +// An error has occurred during in one of the internal operations, +// such as leader election, cache start, webhooks, and so on. +// Or, the context is cancelled. func (cm *controllerManager) Start(ctx context.Context) (err error) { - if err := cm.Add(cm.cluster); err != nil { - return fmt.Errorf("failed to add cluster to runnables: %w", err) + cm.Lock() + if cm.started { + cm.Unlock() + return errors.New("manager already started") } + var ready bool + defer func() { + // Only unlock the manager if we haven't reached + // the internal readiness condition. + if !ready { + cm.Unlock() + } + }() + + // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request @@ -463,40 +433,70 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } }() - // initialize this here so that we reset the signal channel state on every start - // Everything that might write into this channel must be started in a new goroutine, - // because otherwise we might block this routine trying to write into the full channel - // and will not be able to enter the deferred cm.engageStopProcedure() which drains - // it. - cm.errChan = make(chan error) + // Add the cluster runnable. + if err := cm.add(cm.cluster); err != nil { + return fmt.Errorf("failed to add cluster to runnables: %w", err) + } // Metrics should be served whether the controller is leader or not. // (If we don't serve metrics for non-leaders, prometheus will still scrape - // the pod but will get a connection refused) + // the pod but will get a connection refused). if cm.metricsListener != nil { - go cm.serveMetrics() + cm.serveMetrics() } - // Serve health probes + // Serve health probes. if cm.healthProbeListener != nil { - go cm.serveHealthProbes() + cm.serveHealthProbes() + } + + // First start any webhook servers, which includes conversion, validation, and defaulting + // webhooks that are registered. + // + // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition + // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks + // to never start because no cache can be populated. + if err := cm.runnables.Webhooks.StartAndWaitReady(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err + } } - go cm.startNonLeaderElectionRunnables() + // Start and wait for caches. + if err := cm.runnables.Caches.StartAndWaitReady(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err + } + } - go func() { - if cm.resourceLock != nil { - err := cm.startLeaderElection() - if err != nil { - cm.errChan <- err - } - } else { - // Treat not having leader election enabled the same as being elected. - cm.startLeaderElectionRunnables() - close(cm.elected) + // Start the non-leaderelection Runnables after the cache has synced. + if err := cm.runnables.Others.StartAndWaitReady(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err } - }() + } + // Start the leader election and all required runnables. + { + ctx, cancel := context.WithCancel(context.Background()) + cm.leaderElectionCancel = cancel + go func() { + if cm.resourceLock != nil { + if err := cm.startLeaderElection(ctx); err != nil { + cm.errChan <- err + } + } else { + // Treat not having leader election enabled the same as being elected. + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + } + close(cm.elected) + } + }() + } + + ready = true + cm.Unlock() select { case <-ctx.Done(): // We are done @@ -510,24 +510,31 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // engageStopProcedure signals all runnables to stop, reads potential errors // from the errChan and waits for them to end. It must not be called more than once. func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) error { - // Populate the shutdown context. - var shutdownCancel context.CancelFunc - if cm.gracefulShutdownTimeout > 0 { - cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout) - } else { - cm.shutdownCtx, shutdownCancel = context.WithCancel(context.Background()) + if !atomic.CompareAndSwapInt64(cm.stopProcedureEngaged, 0, 1) { + return errors.New("stop procedure already engaged") } - defer shutdownCancel() - // Cancel the internal stop channel and wait for the procedures to stop and complete. - close(cm.internalProceduresStop) - cm.internalCancel() + // Populate the shutdown context, this operation MUST be done before + // closing the internalProceduresStop channel. + // + // The shutdown context immediately expires if the gracefulShutdownTimeout is not set. + var shutdownCancel context.CancelFunc + cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout) + defer shutdownCancel() // Start draining the errors before acquiring the lock to make sure we don't deadlock // if something that has the lock is blocked on trying to write into the unbuffered // channel after something else already wrote into it. + var closeOnce sync.Once go func() { for { + // Closing in the for loop is required to avoid race conditions between + // the closure of all internal procedures and making sure to have a reader off the error channel. + closeOnce.Do(func() { + // Cancel the internal stop channel and wait for the procedures to stop and complete. + close(cm.internalProceduresStop) + cm.internalCancel() + }) select { case err, ok := <-cm.errChan: if ok { @@ -538,26 +545,14 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } } }() - if cm.gracefulShutdownTimeout == 0 { - return nil - } - cm.mu.Lock() - defer cm.mu.Unlock() - cm.stopProcedureEngaged = true - // we want to close this after the other runnables stop, because we don't + // We want to close this after the other runnables stop, because we don't // want things like leader election to try and emit events on a closed // channel defer cm.recorderProvider.Stop(cm.shutdownCtx) - return cm.waitForRunnableToEnd(shutdownCancel) -} - -// waitForRunnableToEnd blocks until all runnables ended or the -// tearDownTimeout was reached. In the latter case, an error is returned. -func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelFunc) (retErr error) { - // Cancel leader election only after we waited. It will os.Exit() the app for safety. defer func() { - if retErr == nil && cm.leaderElectionCancel != nil { + // Cancel leader election only after we waited. It will os.Exit() the app for safety. + if cm.resourceLock != nil { // After asking the context to be cancelled, make sure // we wait for the leader stopped channel to be closed, otherwise // we might encounter race conditions between this code @@ -568,102 +563,48 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF }() go func() { - cm.waitForRunnable.Wait() + // First stop the non-leader election runnables. + cm.logger.Info("Stopping and waiting for non leader election runnables") + cm.runnables.Others.StopAndWait(cm.shutdownCtx) + + // Stop the caches before the leader election runnables, this is an important + // step to make sure that we don't race with the reconcilers by receiving more events + // from the API servers and enqueueing them. + cm.logger.Info("Stopping and waiting for caches") + cm.runnables.Caches.StopAndWait(cm.shutdownCtx) + + // Stop all the leader election runnables, which includes reconcilers. + cm.logger.Info("Stopping and waiting for leader election runnables") + cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx) + + // Webhooks should come last, as they might be still serving some requests. + cm.logger.Info("Stopping and waiting for webhooks") + cm.runnables.Webhooks.StopAndWait(cm.shutdownCtx) + + // Proceed to close the manager and overall shutdown context. + cm.logger.Info("Wait completed, proceeding to shutdown the manager") shutdownCancel() }() <-cm.shutdownCtx.Done() if err := cm.shutdownCtx.Err(); err != nil && err != context.Canceled { - return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err) - } - return nil -} - -func (cm *controllerManager) startNonLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - - // First start any webhook servers, which includes conversion, validation, and defaulting - // webhooks that are registered. - // - // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition - // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks - // to never start because no cache can be populated. - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - cm.startRunnable(c) - } - } - - // Start and wait for caches. - cm.waitForCache(cm.internalCtx) - - // Start the non-leaderelection Runnables after the cache has synced - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - continue + if errors.Is(err, context.DeadlineExceeded) { + if cm.gracefulShutdownTimeout > 0 { + return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err) + } + return nil } - - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) - } -} - -func (cm *controllerManager) startLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - - cm.waitForCache(cm.internalCtx) - - // Start the leader election Runnables after the cache has synced - for _, c := range cm.leaderElectionRunnables { - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) + // For any other error, return the error. + return err } - - cm.startedLeader = true + return nil } -func (cm *controllerManager) waitForCache(ctx context.Context) { - if cm.started { - return - } - - for _, cache := range cm.caches { - cm.startRunnable(cache) - } - - // Wait for the caches to sync. - // TODO(community): Check the return value and write a test - for _, cache := range cm.caches { - cache.GetCache().WaitForCacheSync(ctx) - } - // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse - // cm.started as check if we already started the cache so it must always become true. - // Making sure that the cache doesn't get started twice is needed to not get a "close - // of closed channel" panic - cm.started = true +func (cm *controllerManager) startLeaderElectionRunnables() error { + return cm.runnables.LeaderElection.StartAndWaitReady(cm.internalCtx) } -func (cm *controllerManager) startLeaderElection() (err error) { - ctx, cancel := context.WithCancel(context.Background()) - cm.mu.Lock() - cm.leaderElectionCancel = cancel - cm.mu.Unlock() - - if cm.onStoppedLeading == nil { - cm.onStoppedLeading = func() { - // Make sure graceful shutdown is skipped if we lost the leader lock without - // intending to. - cm.gracefulShutdownTimeout = time.Duration(0) - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errChan <- errors.New("leader election lost") - } - } +func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) { l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: cm.resourceLock, LeaseDuration: cm.leaseDuration, @@ -671,10 +612,24 @@ func (cm *controllerManager) startLeaderElection() (err error) { RetryPeriod: cm.retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ context.Context) { - cm.startLeaderElectionRunnables() + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + return + } close(cm.elected) }, - OnStoppedLeading: cm.onStoppedLeading, + OnStoppedLeading: func() { + if cm.onStoppedLeading != nil { + cm.onStoppedLeading() + } + // Make sure graceful shutdown is skipped if we lost the leader lock without + // intending to. + cm.gracefulShutdownTimeout = time.Duration(0) + // Most implementations of leader election log.Fatal() here. + // Since Start is wrapped in log.Fatal when called, we can just return + // an error here which will cause the program to exit. + cm.errChan <- errors.New("leader election lost") + }, }, ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, }) @@ -694,13 +649,3 @@ func (cm *controllerManager) startLeaderElection() (err error) { func (cm *controllerManager) Elected() <-chan struct{} { return cm.elected } - -func (cm *controllerManager) startRunnable(r Runnable) { - cm.waitForRunnable.Add(1) - go func() { - defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { - cm.errChan <- err - } - }() -} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2d2733f0a6..e2248d8735 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" @@ -365,8 +366,14 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } + errChan := make(chan error) + runnables := newRunnables(errChan) + return &controllerManager{ + stopProcedureEngaged: pointer.Int64(0), cluster: cluster, + runnables: runnables, + errChan: errChan, recorderProvider: recorderProvider, resourceLock: resourceLock, metricsListener: metricsListener, diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 2cb2c72560..3fff91cbc5 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -306,7 +306,7 @@ var _ = Describe("manger.Manager", func() { Expect(m.Start(ctx)).To(BeNil()) close(mgrDone) }() - <-cm.elected + <-cm.Elected() cancel() select { case <-leaderElectionDone: @@ -335,7 +335,7 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() err := m.Start(ctx) Expect(err).ToNot(BeNil()) - Expect(err.Error()).To(Equal("leader election lost")) + Expect(err.Error()).To(ContainSubstring("leader election lost")) close(mgrDone) }() cm := m.(*controllerManager) @@ -401,8 +401,8 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m1.Elected()).ShouldNot(BeClosed()) Expect(m1.Start(ctx1)).NotTo(HaveOccurred()) - Expect(m1.Elected()).Should(BeClosed()) }() + <-m1.Elected() <-c1 c2 := make(chan struct{}) @@ -435,6 +435,7 @@ var _ = Describe("manger.Manager", func() { Expect(m).To(BeNil()) Expect(err).To(MatchError(ContainSubstring("expected error"))) }) + It("should return an error if namespace not set and not running in cluster", func() { m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) Expect(m).To(BeNil()) @@ -609,9 +610,9 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Elected()).ShouldNot(BeClosed()) Expect(m.Start(ctx)).NotTo(HaveOccurred()) - Expect(m.Elected()).Should(BeClosed()) }() + <-m.Elected() wgRunnableStarted.Wait() }) @@ -653,7 +654,9 @@ var _ = Describe("manger.Manager", func() { } mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}} + Expect(mgr.Add( + &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}, + )).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -672,14 +675,15 @@ var _ = Describe("manger.Manager", func() { } runnableWasStarted := make(chan struct{}) - Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + runnable := RunnableFunc(func(ctx context.Context) error { defer GinkgoRecover() if !fakeCache.wasSynced { return errors.New("runnable got started before cache was synced") } close(runnableWasStarted) return nil - }))).To(Succeed()) + }) + Expect(m.Add(runnable)).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -801,8 +805,11 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) Expect(m.Add(fakeCluster)).NotTo(HaveOccurred()) - Expect(fakeCluster.informer.wasStarted).To(BeTrue()) - Expect(fakeCluster.informer.wasSynced).To(BeTrue()) + Eventually(func() bool { + fakeCluster.informer.mu.Lock() + defer fakeCluster.informer.mu.Unlock() + return fakeCluster.informer.wasStarted && fakeCluster.informer.wasSynced + }).Should(BeTrue()) }) It("should wait for runnables to stop", func() { @@ -1029,10 +1036,11 @@ var _ = Describe("manger.Manager", func() { ctx, cancel := context.WithCancel(context.Background()) managerStopDone := make(chan struct{}) go func() { + defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(managerStopDone) }() - <-m.(*controllerManager).elected + <-m.Elected() cancel() <-managerStopDone @@ -1119,6 +1127,7 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() metricsEndpoint := fmt.Sprintf("http://%s/metrics", listener.Addr().String()) resp, err := http.Get(metricsEndpoint) @@ -1137,10 +1146,12 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() endpoint := fmt.Sprintf("http://%s/should-not-exist", listener.Addr().String()) resp, err := http.Get(endpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(404)) }) @@ -1163,10 +1174,12 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() metricsEndpoint := fmt.Sprintf("http://%s/metrics", listener.Addr().String()) resp, err := http.Get(metricsEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(200)) data, err := ioutil.ReadAll(resp.Body) @@ -1204,10 +1217,12 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() endpoint := fmt.Sprintf("http://%s/debug", listener.Addr().String()) resp, err := http.Get(endpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) body, err := ioutil.ReadAll(resp.Body) @@ -1248,6 +1263,7 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() // Check the health probes started endpoint := fmt.Sprintf("http://%s", listener.Addr().String()) @@ -1261,7 +1277,7 @@ var _ = Describe("manger.Manager", func() { Eventually(func() error { _, err = http.Get(endpoint) return err - }).ShouldNot(Succeed()) + }, 10*time.Second).ShouldNot(Succeed()) }) It("should serve readiness endpoint", func() { @@ -1280,18 +1296,21 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() readinessEndpoint := fmt.Sprint("http://", listener.Addr().String(), defaultReadinessEndpoint) // Controller is not ready resp, err := http.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError)) // Controller is ready res = nil resp, err = http.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check readiness path without trailing slash without redirect @@ -1304,6 +1323,7 @@ var _ = Describe("manger.Manager", func() { } resp, err = httpClient.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check readiness path for individual check @@ -1311,6 +1331,7 @@ var _ = Describe("manger.Manager", func() { res = nil resp, err = http.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) }) @@ -1330,18 +1351,21 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() livenessEndpoint := fmt.Sprint("http://", listener.Addr().String(), defaultLivenessEndpoint) // Controller is not ready resp, err := http.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError)) // Controller is ready res = nil resp, err = http.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check liveness path without trailing slash without redirect @@ -1354,6 +1378,7 @@ var _ = Describe("manger.Manager", func() { } resp, err = httpClient.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check readiness path for individual check @@ -1361,6 +1386,7 @@ var _ = Describe("manger.Manager", func() { res = nil resp, err = http.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) }) }) @@ -1387,12 +1413,11 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() + <-m.Elected() // Wait for the Manager to start Eventually(func() bool { - mgr.mu.Lock() - defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.Caches.Started() }).Should(BeTrue()) // Add another component after starting @@ -1421,9 +1446,7 @@ var _ = Describe("manger.Manager", func() { // Wait for the Manager to start Eventually(func() bool { - mgr.mu.Lock() - defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.Caches.Started() }).Should(BeTrue()) c1 := make(chan struct{}) @@ -1577,6 +1600,8 @@ var _ = Describe("manger.Manager", func() { defer close(doneCh) Expect(m.Start(ctx)).To(Succeed()) }() + <-m.Elected() + Eventually(func() *corev1.Event { evts, err := clientset.CoreV1().Events("").Search(m.GetScheme(), &ns) Expect(err).NotTo(HaveOccurred()) @@ -1765,11 +1790,12 @@ func (c *cacheProvider) Start(ctx context.Context) error { } type startSignalingInformer struct { + mu sync.Mutex + // The manager calls Start and WaitForCacheSync in // parallel, so we have to protect wasStarted with a Mutex // and block in WaitForCacheSync until it is true. - wasStartedLock sync.Mutex - wasStarted bool + wasStarted bool // was synced will be true once Start was called and // WaitForCacheSync returned, just like a real cache. wasSynced bool @@ -1777,15 +1803,15 @@ type startSignalingInformer struct { } func (c *startSignalingInformer) started() bool { - c.wasStartedLock.Lock() - defer c.wasStartedLock.Unlock() + c.mu.Lock() + defer c.mu.Unlock() return c.wasStarted } func (c *startSignalingInformer) Start(ctx context.Context) error { - c.wasStartedLock.Lock() + c.mu.Lock() c.wasStarted = true - c.wasStartedLock.Unlock() + c.mu.Unlock() return c.Cache.Start(ctx) } @@ -1794,7 +1820,9 @@ func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool { for !c.started() { continue } + c.mu.Lock() c.wasSynced = true + c.mu.Unlock() }() return c.Cache.WaitForCacheSync(ctx) } diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go new file mode 100644 index 0000000000..c474c2f2fa --- /dev/null +++ b/pkg/manager/runnable_group.go @@ -0,0 +1,325 @@ +package manager + +import ( + "context" + "errors" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +var ( + errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged") +) + +// readyRunnable encapsulates a runnable with +// a ready check. +type readyRunnable struct { + Runnable + Check runnableCheck + Ready bool +} + +// runnableCheck can be passed to Add() to let the runnable group determine that a +// runnable is ready. A runnable check should block until a runnable is ready, +// if the returned result is false, the runnable is considered not ready and failed. +type runnableCheck func(ctx context.Context) bool + +// runnables handles all the runnables for a manager by grouping them accordingly to their +// type (webhooks, caches etc.). +type runnables struct { + Webhooks *runnableGroup + Caches *runnableGroup + LeaderElection *runnableGroup + Others *runnableGroup +} + +// newRunnables creates a new runnables object. +func newRunnables(errChan chan error) *runnables { + return &runnables{ + Webhooks: newRunnableGroup(errChan), + Caches: newRunnableGroup(errChan), + LeaderElection: newRunnableGroup(errChan), + Others: newRunnableGroup(errChan), + } +} + +// Add adds a runnable and its ready check to the closest +// group of runnable that they belong to. +// +// Add should be able to be called before and after Start, but not after StopAndWait. +// Add should return an error when called during StopAndWait. +// The runnables added before Start are started when Start is called. +// The runnables added after Start are started directly. +func (r *runnables) Add(fn Runnable, ready runnableCheck) error { + switch runnable := fn.(type) { + case hasCache: + return r.Caches.Add(fn, func(ctx context.Context) bool { + // Run the ready check for the cache a fixed number of times + // backing off a bit; this is to give time to the runnables + // to start up before their health check returns true. + if err := wait.ExponentialBackoffWithContext(ctx, wait.Backoff{ + Duration: 10 * time.Millisecond, + Steps: 10, + Factor: 1.0, + }, func() (bool, error) { + for i := 0; i < 10; i++ { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + if !runnable.GetCache().WaitForCacheSync(ctx) { + continue + } + return true, nil + } + return false, nil + }); err != nil { + return false + } + return true + }) + case *webhook.Server: + return r.Webhooks.Add(fn, ready) + case LeaderElectionRunnable: + if !runnable.NeedLeaderElection() { + return r.Others.Add(fn, ready) + } + return r.LeaderElection.Add(fn, ready) + default: + return r.LeaderElection.Add(fn, ready) + } +} + +// runnableGroup manages a group of runnables that are +// meant to be running together until StopAndWait is called. +// +// Runnables can be added to a group after the group has started +// but not after it's stopped or while shutting down. +type runnableGroup struct { + ctx context.Context + cancel context.CancelFunc + + start sync.Mutex + startOnce sync.Once + started bool + + stop sync.RWMutex + stopOnce sync.Once + stopped bool + + // errChan is the error channel passed by the caller + // when the group is created. + // All errors are forwarded to this channel once they occur. + errChan chan error + + // ch is the internal channel where the runnables are read off from. + ch chan *readyRunnable + + // wg is an internal sync.WaitGroup that allows us to properly stop + // and wait for all the runnables to finish before returning. + wg *sync.WaitGroup + + // group is a sync.Map that contains every runnable ever. + // The key of the map is the runnable itself (key'd by pointer), + // while the value is its ready state. + // + // The group of runnable is append-only, runnables scheduled + // through this group are going to be stored in this internal map + // until the application exits. The limit is the available memory. + group *sync.Map +} + +func newRunnableGroup(errChan chan error) *runnableGroup { + r := &runnableGroup{ + errChan: errChan, + ch: make(chan *readyRunnable), + wg: new(sync.WaitGroup), + group: new(sync.Map), + } + r.ctx, r.cancel = context.WithCancel(context.Background()) + return r +} + +// Started returns true if the group has started. +func (r *runnableGroup) Started() bool { + r.start.Lock() + defer r.start.Unlock() + return r.started +} + +// StartAndWaitReady starts all the runnables previously +// added to the group and waits for all to report ready. +func (r *runnableGroup) StartAndWaitReady(ctx context.Context) error { + r.Start() + return r.WaitReady(ctx) +} + +// Start starts the group, it can only be called once. +func (r *runnableGroup) Start() { + r.startOnce.Do(func() { + go r.reconcile() + r.start.Lock() + r.started = true + r.group.Range(func(key, _ interface{}) bool { + r.ch <- key.(*readyRunnable) + return true + }) + r.start.Unlock() + }) +} + +// reconcile is our main entrypoint for every runnable added +// to this group. Its primary job is to read off the internal channel +// and schedule runnables while tracking their state. +func (r *runnableGroup) reconcile() { + for runnable := range r.ch { + // Handle stop. + // If the shutdown has been called we want to avoid + // adding new goroutines to the WaitGroup because Wait() + // panics if Add() is called after it. + { + r.stop.RLock() + if r.stopped { + // Drop any runnables if we're stopped. + r.errChan <- errRunnableGroupStopped + r.stop.RUnlock() + continue + } + + // Why is this here? + // When StopAndWait is called, if a runnable is in the process + // of being added, we could end up in a situation where + // the WaitGroup is incremented while StopAndWait has called Wait(), + // which would result in a panic. + r.wg.Add(1) + r.stop.RUnlock() + } + + // Start the runnable. + go func(rn *readyRunnable) { + go func() { + if rn.Check(r.ctx) { + r.group.Store(rn, true) + } + }() + + // If we return, the runnable ended cleanly + // or returned an error to the channel. + // + // We should always decrement the WaitGroup and + // mark the runnable as ready. + // + // Think about the group as an append-only system. + // + // A runnable is marked as ready if: + // - The health check return true. + // - The runnable Start() method returned and + // it either finished cleanly (e.g. one shot operations) + // or it failed to run and it returned an error which + // gets propagated to the manager. + defer r.wg.Done() + defer r.group.Store(rn, true) + + // Start the runnable. + if err := rn.Start(r.ctx); err != nil { + r.errChan <- err + } + }(runnable) + } +} + +// WaitReady polls until the group is ready or until the context is cancelled. +func (r *runnableGroup) WaitReady(ctx context.Context) error { + return wait.PollImmediateInfiniteWithContext(ctx, + 100*time.Millisecond, + func(_ context.Context) (bool, error) { + if !r.Started() { + return false, nil + } + ready, total := 0, 0 + r.group.Range(func(_, value interface{}) bool { + total++ + if rd, ok := value.(bool); ok && rd { + ready++ + } + return true + }) + return ready == total, nil + }, + ) +} + +// Add should be able to be called before and after Start, but not after StopAndWait. +// Add should return an error when called during StopAndWait. +func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error { + r.stop.RLock() + if r.stopped { + r.stop.RUnlock() + return errRunnableGroupStopped + } + r.stop.RUnlock() + + // If we don't have a readiness check, always return true. + if ready == nil { + ready = func(_ context.Context) bool { return true } + } + + readyRunnable := &readyRunnable{ + Runnable: rn, + Check: ready, + } + + // Handle start. + // If the overall runnable group isn't started yet + // we want to buffer the runnables and let Start() + // queue them up again later. + { + r.start.Lock() + + // Store the runnable in the internal buffer. + r.group.Store(readyRunnable, false) + + // Check if we're already started. + if !r.started { + r.start.Unlock() + return nil + } + r.start.Unlock() + } + + // Enqueue the runnable. + r.ch <- readyRunnable + return nil +} + +// StopAndWait waits for all the runnables to finish before returning. +func (r *runnableGroup) StopAndWait(ctx context.Context) { + r.stopOnce.Do(func() { + // Close the reconciler channel once we're done. + defer close(r.ch) + + r.Start() + r.stop.Lock() + // Store the stopped variable so we don't accept any new + // runnables for the time being. + r.stopped = true + r.stop.Unlock() + + // Cancel the internal channel. + r.cancel() + + done := make(chan struct{}) + go func() { + defer close(done) + // Wait for all the runnables to finish. + r.wg.Wait() + }() + + select { + case <-done: + // We're done, exit. + case <-ctx.Done(): + // Calling context has expired, exit. + } + }) +} diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go new file mode 100644 index 0000000000..5c9ee81b64 --- /dev/null +++ b/pkg/manager/runnable_group_test.go @@ -0,0 +1,223 @@ +package manager + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +var _ = Describe("runnables", func() { + errCh := make(chan error) + + It("should be able to create a new runnables object", func() { + Expect(newRunnables(errCh)).ToNot(BeNil()) + }) + + It("should add caches to the appropriate group", func() { + cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}} + r := newRunnables(errCh) + Expect(r.Add(cache, nil)).To(Succeed()) + var found *readyRunnable + r.Caches.group.Range(func(key, value interface{}) bool { + found = key.(*readyRunnable) + // Only iterate once. + return false + }) + Expect(found).ToNot(BeNil()) + Expect(found.Runnable).To(BeIdenticalTo(cache)) + }) + + It("should add webhooks to the appropriate group", func() { + webhook := &webhook.Server{} + r := newRunnables(errCh) + Expect(r.Add(webhook, nil)).To(Succeed()) + var found *readyRunnable + r.Webhooks.group.Range(func(key, value interface{}) bool { + found = key.(*readyRunnable) + // Only iterate once. + return false + }) + Expect(found).ToNot(BeNil()) + Expect(found.Runnable).To(BeIdenticalTo(webhook)) + }) + + It("should add any runnable to the leader election group", func() { + err := errors.New("runnable func") + runnable := RunnableFunc(func(c context.Context) error { + return err + }) + + r := newRunnables(errCh) + Expect(r.Add(runnable, nil)).To(Succeed()) + var found *readyRunnable + r.LeaderElection.group.Range(func(key, value interface{}) bool { + found = key.(*readyRunnable) + // Only iterate once. + return false + }) + Expect(found).ToNot(BeNil()) + + // Functions are not comparable, we just make sure it's the same type and it returns what we expect + Expect(found.Runnable).To(BeAssignableToTypeOf(runnable)) + Expect(found.Runnable.Start(context.Background())).To(MatchError(err)) + }) +}) + +var _ = Describe("runnableGroup", func() { + errCh := make(chan error) + + It("should be able to add new runnables before it starts", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + + Expect(rg.Started()).To(BeFalse()) + }) + + It("should be able to add new runnables before and after start", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + rg.Start() + Expect(rg.Started()).To(BeTrue()) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + Expect(rg.WaitReady(ctx)).To(Succeed()) + }) + + It("should be able to add new runnables before and after start concurrently", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start() + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + <-time.After(time.Duration(i) * 10 * time.Millisecond) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + }(i) + } + Expect(rg.WaitReady(ctx)).To(Succeed()) + Eventually(func() int { + i := 0 + rg.group.Range(func(key, value interface{}) bool { + i++ + return true + }) + return i + }).Should(BeNumerically("==", 20)) + }) + + It("should be able to close the group and wait for all runnables to finish", func() { + ctx, cancel := context.WithCancel(context.Background()) + + exited := pointer.Int64(0) + rg := newRunnableGroup(errCh) + for i := 0; i < 10; i++ { + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + defer atomic.AddInt64(exited, 1) + <-ctx.Done() + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return nil + }), nil)).To(Succeed()) + } + Expect(rg.StartAndWaitReady(ctx)).To(Succeed()) + + // Cancel the context, asking the runnables to exit. + cancel() + rg.StopAndWait(context.Background()) + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + return nil + }), nil)).ToNot(Succeed()) + + Expect(atomic.LoadInt64(exited)).To(BeNumerically("==", 10)) + }) + + It("should be able to wait for all runnables to be ready at different intervals", func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start() + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return true + })).To(Succeed()) + }(i) + } + Expect(rg.WaitReady(ctx)).To(Succeed()) + Eventually(func() int { + i := 0 + rg.group.Range(func(key, value interface{}) bool { + i++ + return true + }) + return i + }).Should(BeNumerically("==", 20)) + }) + + It("should not turn ready if some readiness check fail", func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start() + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return i%2 == 0 // Return false readiness all uneven indexes. + })).To(Succeed()) + }(i) + } + Expect(rg.WaitReady(ctx)).ToNot(Succeed()) + }) +}) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 1db38113f7..364d0f902e 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -34,6 +34,7 @@ import ( kscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/internal/httpserver" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics" ) @@ -261,9 +262,7 @@ func (s *Server) Start(ctx context.Context) error { log.Info("Serving webhook server", "host", s.Host, "port", s.Port) - srv := &http.Server{ - Handler: s.WebhookMux, - } + srv := httpserver.New(s.WebhookMux) idleConnsClosed := make(chan struct{}) go func() { diff --git a/pkg/webhook/webhook_integration_test.go b/pkg/webhook/webhook_integration_test.go index 0b9754ef40..029a503b4b 100644 --- a/pkg/webhook/webhook_integration_test.go +++ b/pkg/webhook/webhook_integration_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/internal/httpserver" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -185,7 +186,7 @@ var _ = Describe("Webhook", func() { http.Handle("/failing", hook) By("running the http server") - srv := &http.Server{} + srv := httpserver.New(nil) go func() { idleConnsClosed := make(chan struct{}) go func() {