From 53289b8a2a982c294d26ff4b1b6b1c9055954cca Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Mon, 25 May 2020 21:39:14 -0400 Subject: [PATCH] :sparkles: Implement graceful shutdown Co-authored-by: david.benque --- pkg/manager/internal.go | 303 +++++++++++++++++----------- pkg/manager/manager.go | 61 +++--- pkg/manager/manager_test.go | 391 +++++++++++++++++++++++++++++++++--- 3 files changed, 585 insertions(+), 170 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 42fad76141..9b9badae2b 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -18,6 +18,7 @@ package manager import ( "context" + "errors" "fmt" "net" "net/http" @@ -27,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -44,9 +46,10 @@ import ( const ( // Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go - defaultLeaseDuration = 15 * time.Second - defaultRenewDeadline = 10 * time.Second - defaultRetryPeriod = 2 * time.Second + defaultLeaseDuration = 15 * time.Second + defaultRenewDeadline = 10 * time.Second + defaultRetryPeriod = 2 * time.Second + defaultGracefulShutdownPeriod = 30 * time.Second defaultReadinessEndpoint = "/readyz" defaultLivenessEndpoint = "/healthz" @@ -118,11 +121,7 @@ type controllerManager struct { started bool startedLeader bool healthzStarted bool - - // NB(directxman12): we don't just use an error channel here to avoid the situation where the - // error channel is too small and we end up blocking some goroutines waiting to report their errors. - // errSignal lets us track when we should stop because an error occurred - errSignal *errSignaler + errChan chan error // internalStop is the stop channel *actually* used by everything involved // with the manager as a stop channel, so that we can pass a stop channel @@ -134,6 +133,14 @@ type controllerManager struct { // It and `internalStop` should point to the same channel. internalStopper chan<- struct{} + // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, + // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that + // it must be deferred until after gracefulShutdown is done. + leaderElectionCancel context.CancelFunc + + // stop procedure engaged. In other words, we should not add anything else to the manager + stopProcedureEngaged bool + // elected is closed when this manager becomes the leader of a group of // managers, either because it won a leader election or because no leader // election was configured. @@ -161,57 +168,32 @@ type controllerManager struct { // retryPeriod is the duration the LeaderElector clients should wait // between tries of actions. retryPeriod time.Duration -} - -type errSignaler struct { - // errSignal indicates that an error occurred, when closed. It shouldn't - // be written to. - errSignal chan struct{} - - // err is the received error - err error - - mu sync.Mutex -} - -func (r *errSignaler) SignalError(err error) { - r.mu.Lock() - defer r.mu.Unlock() - - if err == nil { - // non-error, ignore - log.Error(nil, "SignalError called without an (with a nil) error, which should never happen, ignoring") - return - } - if r.err != nil { - // we already have an error, don't try again - return - } + // waitForRunnable is holding the number of runnables currently running so that + // we can wait for them to exit before quitting the manager + waitForRunnable sync.WaitGroup - // save the error and report it - r.err = err - close(r.errSignal) -} + // gracefulShutdownTimeout is the duration given to runnable to stop + // before the manager actually returns on stop. + gracefulShutdownTimeout time.Duration -func (r *errSignaler) Error() error { - r.mu.Lock() - defer r.mu.Unlock() + // onStoppedLeading is callled when the leader election lease is lost. + // It can be overridden for tests. + onStoppedLeading func() - return r.err -} - -func (r *errSignaler) GotError() chan struct{} { - r.mu.Lock() - defer r.mu.Unlock() - - return r.errSignal + // shutdownCtx is the context that can be used during shutdown. It will be cancelled + // after the gracefulShutdownTimeout ended. It must not be accessed before internalStop + // is closed because it will be nil. + shutdownCtx context.Context } // Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return errors.New("can't accept new runnable as stop procedure is already engaged") + } // Set dependencies on the object if err := cm.SetFields(r); err != nil { @@ -231,11 +213,7 @@ func (cm *controllerManager) Add(r Runnable) error { if shouldStart { // If already started, start the controller - go func() { - if err := r.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - }() + cm.startRunnable(r) } return nil @@ -293,6 +271,10 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return errors.New("can't accept new healthCheck as stop procedure is already engaged") + } + if cm.healthzStarted { return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } @@ -310,6 +292,10 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return errors.New("can't accept new ready check as stop procedure is already engaged") + } + if cm.healthzStarted { return fmt.Errorf("unable to add new checker because readyz endpoint has already been created") } @@ -389,17 +375,18 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { Handler: mux, } // Run the server - go func() { + cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error { log.Info("starting metrics server", "path", defaultMetricsEndpoint) if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { - cm.errSignal.SignalError(err) + return err } - }() + return nil + })) // Shutdown the server when stop is closed <-stop - if err := server.Shutdown(context.Background()); err != nil { - cm.errSignal.SignalError(err) + if err := server.Shutdown(cm.shutdownCtx); err != nil { + cm.errChan <- err } } @@ -420,27 +407,48 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) { Handler: mux, } // Run server - go func() { + cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error { if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { - cm.errSignal.SignalError(err) + return err } - }() + return nil + })) cm.healthzStarted = true cm.mu.Unlock() // Shutdown the server when stop is closed <-stop - if err := server.Shutdown(context.Background()); err != nil { - cm.errSignal.SignalError(err) + if err := server.Shutdown(cm.shutdownCtx); err != nil { + cm.errChan <- err } } -func (cm *controllerManager) Start(stop <-chan struct{}) error { - // join the passed-in stop channel as an upstream feeding into cm.internalStopper - defer close(cm.internalStopper) +func (cm *controllerManager) Start(stop <-chan struct{}) (err error) { + // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request + stopComplete := make(chan struct{}) + defer close(stopComplete) + // This must be deferred after closing stopComplete, otherwise we deadlock + defer func() { + // https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg + stopErr := cm.engageStopProcedure(stopComplete) + if stopErr != nil { + if err != nil { + // Utilerrors.Aggregate allows to use errors.Is for all contained errors + // whereas fmt.Errorf allows wrapping at most one error which means the + // other one can not be found anymore. + err = utilerrors.NewAggregate([]error{err, stopErr}) + } else { + err = stopErr + } + } + }() // initialize this here so that we reset the signal channel state on every start - cm.errSignal = &errSignaler{errSignal: make(chan struct{})} + // Everything that might write into this channel must be started in a new goroutine, + // because otherwise we might block this routine trying to write into the full channel + // and will not be able to enter the deferred cm.engageStopProcedure() which drains + // it. + cm.errChan = make(chan error) // Metrics should be served whether the controller is leader or not. // (If we don't serve metrics for non-leaders, prometheus will still scrape @@ -456,27 +464,88 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { go cm.startNonLeaderElectionRunnables() - if cm.resourceLock != nil { - err := cm.startLeaderElection() - if err != nil { - return err + go func() { + if cm.resourceLock != nil { + err := cm.startLeaderElection() + if err != nil { + cm.errChan <- err + } + } else { + // Treat not having leader election enabled the same as being elected. + close(cm.elected) + go cm.startLeaderElectionRunnables() } - } else { - // Treat not having leader election enabled the same as being elected. - close(cm.elected) - go cm.startLeaderElectionRunnables() - } + }() select { case <-stop: // We are done return nil - case <-cm.errSignal.GotError(): - // Error starting a controller - return cm.errSignal.Error() + case err := <-cm.errChan: + // Error starting or running a runnable + return err } } +// engageStopProcedure signals all runnables to stop, reads potential errors +// from the errChan and waits for them to end. It must not be called more than once. +func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error { + var cancel context.CancelFunc + if cm.gracefulShutdownTimeout > 0 { + cm.shutdownCtx, cancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout) + } else { + cm.shutdownCtx, cancel = context.WithCancel(context.Background()) + } + defer cancel() + close(cm.internalStopper) + // Start draining the errors before acquiring the lock to make sure we don't deadlock + // if something that has the lock is blocked on trying to write into the unbuffered + // channel after something else already wrote into it. + go func() { + for { + select { + case err, ok := <-cm.errChan: + if ok { + log.Error(err, "error received after stop sequence was engaged") + } + case <-stopComplete: + return + } + } + }() + if cm.gracefulShutdownTimeout == 0 { + return nil + } + cm.mu.Lock() + defer cm.mu.Unlock() + cm.stopProcedureEngaged = true + return cm.waitForRunnableToEnd(cm.shutdownCtx, cancel) +} + +// waitForRunnableToEnd blocks until all runnables ended or the +// tearDownTimeout was reached. In the latter case, an error is returned. +func (cm *controllerManager) waitForRunnableToEnd(ctx context.Context, cancel context.CancelFunc) error { + defer cancel() + + // Cancel leader election only after we waited. It will os.Exit() the app for safety. + defer func() { + if cm.leaderElectionCancel != nil { + cm.leaderElectionCancel() + } + }() + + go func() { + cm.waitForRunnable.Wait() + cancel() + }() + + <-ctx.Done() + if err := ctx.Err(); err != nil && err != context.Canceled { + return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err) + } + return nil +} + func (cm *controllerManager) startNonLeaderElectionRunnables() { cm.mu.Lock() defer cm.mu.Unlock() @@ -487,15 +556,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() { for _, c := range cm.nonLeaderElectionRunnables { // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them - ctrl := c - go func() { - if err := ctrl.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - // we use %T here because we don't have a good stand-in for "name", - // and the full runnable might not serialize (mutexes, etc) - log.V(1).Info("non-leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl)) - }() + cm.startRunnable(c) } } @@ -509,15 +570,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() { for _, c := range cm.leaderElectionRunnables { // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them - ctrl := c - go func() { - if err := ctrl.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - // we use %T here because we don't have a good stand-in for "name", - // and the full runnable might not serialize (mutexes, etc) - log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl)) - }() + cm.startRunnable(c) } cm.startedLeader = true @@ -532,19 +585,37 @@ func (cm *controllerManager) waitForCache() { if cm.startCache == nil { cm.startCache = cm.cache.Start } - go func() { - if err := cm.startCache(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - }() + cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error { + return cm.startCache(stop) + })) // Wait for the caches to sync. // TODO(community): Check the return value and write a test cm.cache.WaitForCacheSync(cm.internalStop) + // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse + // cm.started as check if we already started the cache so it must always become true. + // Making sure that the cache doesn't get started twice is needed to not get a "close + // of closed channel" panic cm.started = true } func (cm *controllerManager) startLeaderElection() (err error) { + ctx, cancel := context.WithCancel(context.Background()) + cm.mu.Lock() + cm.leaderElectionCancel = cancel + cm.mu.Unlock() + + if cm.onStoppedLeading == nil { + cm.onStoppedLeading = func() { + // Make sure graceful shutdown is skipped if we lost the leader lock without + // intending to. + cm.gracefulShutdownTimeout = time.Duration(0) + // Most implementations of leader election log.Fatal() here. + // Since Start is wrapped in log.Fatal when called, we can just return + // an error here which will cause the program to exit. + cm.errChan <- errors.New("leader election lost") + } + } l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: cm.resourceLock, LeaseDuration: cm.leaseDuration, @@ -555,27 +626,13 @@ func (cm *controllerManager) startLeaderElection() (err error) { close(cm.elected) cm.startLeaderElectionRunnables() }, - OnStoppedLeading: func() { - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errSignal.SignalError(fmt.Errorf("leader election lost")) - }, + OnStoppedLeading: cm.onStoppedLeading, }, }) if err != nil { return err } - ctx, cancel := context.WithCancel(context.Background()) - go func() { - select { - case <-cm.internalStop: - cancel() - case <-ctx.Done(): - } - }() - // Start the leader elector process go l.Run(ctx) return nil @@ -584,3 +641,13 @@ func (cm *controllerManager) startLeaderElection() (err error) { func (cm *controllerManager) Elected() <-chan struct{} { return cm.elected } + +func (cm *controllerManager) startRunnable(r Runnable) { + cm.waitForRunnable.Add(1) + go func() { + defer cm.waitForRunnable.Done() + if err := r.Start(cm.internalStop); err != nil { + cm.errChan <- err + } + }() +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 73e4b94ad6..9c4ba91f89 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -75,6 +75,9 @@ type Manager interface { // Start starts all registered Controllers and blocks until the Stop channel is closed. // Returns an error if there is an error starting any controller. + // If LeaderElection is used, the binary must be exited immediately after this returns, + // otherwise components that need leader election might continue to run after the leader + // lock was lost. Start(<-chan struct{}) error // GetConfig returns an initialized Config @@ -205,6 +208,12 @@ type Options struct { // Use this to customize the event correlator and spam filter EventBroadcaster record.EventBroadcaster + // GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop. + // To disable graceful shutdown, set to time.Duration(0) + // To use graceful shutdown without timeout, set to a negative duration, e.G. time.Duration(-1) + // The graceful shutdown is skipped for safety reasons in case the leadere election lease is lost. + GracefulShutdownTimeout *time.Duration + // Dependency injection for testing newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) @@ -317,29 +326,30 @@ func New(config *rest.Config, options Options) (Manager, error) { stop := make(chan struct{}) return &controllerManager{ - config: config, - scheme: options.Scheme, - cache: cache, - fieldIndexes: cache, - client: writeObj, - apiReader: apiReader, - recorderProvider: recorderProvider, - resourceLock: resourceLock, - mapper: mapper, - metricsListener: metricsListener, - metricsExtraHandlers: metricsExtraHandlers, - internalStop: stop, - internalStopper: stop, - elected: make(chan struct{}), - port: options.Port, - host: options.Host, - certDir: options.CertDir, - leaseDuration: *options.LeaseDuration, - renewDeadline: *options.RenewDeadline, - retryPeriod: *options.RetryPeriod, - healthProbeListener: healthProbeListener, - readinessEndpointName: options.ReadinessEndpointName, - livenessEndpointName: options.LivenessEndpointName, + config: config, + scheme: options.Scheme, + cache: cache, + fieldIndexes: cache, + client: writeObj, + apiReader: apiReader, + recorderProvider: recorderProvider, + resourceLock: resourceLock, + mapper: mapper, + metricsListener: metricsListener, + metricsExtraHandlers: metricsExtraHandlers, + internalStop: stop, + internalStopper: stop, + elected: make(chan struct{}), + port: options.Port, + host: options.Host, + certDir: options.CertDir, + leaseDuration: *options.LeaseDuration, + renewDeadline: *options.RenewDeadline, + retryPeriod: *options.RetryPeriod, + healthProbeListener: healthProbeListener, + readinessEndpointName: options.ReadinessEndpointName, + livenessEndpointName: options.LivenessEndpointName, + gracefulShutdownTimeout: *options.GracefulShutdownTimeout, }, nil } @@ -439,5 +449,10 @@ func setOptionsDefaults(options Options) Options { options.newHealthProbeListener = defaultHealthProbeListener } + if options.GracefulShutdownTimeout == nil { + gracefulShutdownTimeout := defaultGracefulShutdownPeriod + options.GracefulShutdownTimeout = &gracefulShutdownTimeout + } + return options } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index e81a1dd135..f09d206745 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -17,6 +17,8 @@ limitations under the License. package manager import ( + "context" + "errors" "fmt" "io/ioutil" "net" @@ -24,6 +26,8 @@ import ( "os" rt "runtime" "runtime/pprof" + "sync" + "time" "github.com/go-logr/logr" . "github.com/onsi/ginkgo" @@ -143,6 +147,77 @@ var _ = Describe("manger.Manager", func() { }) Context("with leader election enabled", func() { + It("should only cancel the leader election after all runnables are done", func() { + m, err := New(cfg, Options{ + LeaderElection: true, + LeaderElectionNamespace: "default", + LeaderElectionID: "test-leader-election-id-2", + HealthProbeBindAddress: "0", + MetricsBindAddress: "0", + }) + Expect(err).To(BeNil()) + + runnableDone := make(chan struct{}) + slowRunnable := RunnableFunc(func(s <-chan struct{}) error { + <-s + time.Sleep(100 * time.Millisecond) + close(runnableDone) + return nil + }) + Expect(m.Add(slowRunnable)).To(BeNil()) + + cm := m.(*controllerManager) + cm.gracefulShutdownTimeout = time.Second + leaderElectionDone := make(chan struct{}) + cm.onStoppedLeading = func() { + close(leaderElectionDone) + } + + mgrStopChan := make(chan struct{}) + mgrDone := make(chan struct{}) + go func() { + defer GinkgoRecover() + Expect(m.Start(mgrStopChan)).To(BeNil()) + close(mgrDone) + }() + <-cm.elected + close(mgrStopChan) + select { + case <-leaderElectionDone: + Expect(errors.New("leader election was cancelled before runnables were done")).ToNot(HaveOccurred()) + case <-runnableDone: + // Success + } + // Don't leak routines + <-mgrDone + + }) + It("should disable gracefulShutdown when stopping to lead", func() { + m, err := New(cfg, Options{ + LeaderElection: true, + LeaderElectionNamespace: "default", + LeaderElectionID: "test-leader-election-id-3", + HealthProbeBindAddress: "0", + MetricsBindAddress: "0", + }) + Expect(err).To(BeNil()) + + mgrDone := make(chan struct{}) + go func() { + defer GinkgoRecover() + err := m.Start(make(chan struct{})) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("leader election lost")) + close(mgrDone) + }() + cm := m.(*controllerManager) + <-cm.elected + + cm.leaderElectionCancel() + <-mgrDone + + Expect(cm.gracefulShutdownTimeout.Nanoseconds()).To(Equal(int64(0))) + }) It("should default ID to controller-runtime if ID is not set", func() { var rl resourcelock.Interface m1, err := New(cfg, Options{ @@ -161,6 +236,10 @@ var _ = Describe("manger.Manager", func() { Expect(m1).ToNot(BeNil()) Expect(rl.Describe()).To(Equal("default/test-leader-election-id")) + m1cm, ok := m1.(*controllerManager) + Expect(ok).To(BeTrue()) + m1cm.onStoppedLeading = func() {} + m2, err := New(cfg, Options{ LeaderElection: true, LeaderElectionNamespace: "default", @@ -173,11 +252,14 @@ var _ = Describe("manger.Manager", func() { HealthProbeBindAddress: "0", MetricsBindAddress: "0", }) - Expect(err).ToNot(HaveOccurred()) Expect(m2).ToNot(BeNil()) Expect(rl.Describe()).To(Equal("default/test-leader-election-id")) + m2cm, ok := m2.(*controllerManager) + Expect(ok).To(BeTrue()) + m2cm.onStoppedLeading = func() {} + c1 := make(chan struct{}) Expect(m1.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() @@ -296,21 +378,24 @@ var _ = Describe("manger.Manager", func() { }) Describe("Start", func() { - var startSuite = func(options Options) { + var startSuite = func(options Options, callbacks ...func(Manager)) { It("should Start each Component", func(done Done) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) - c1 := make(chan struct{}) + for _, cb := range callbacks { + cb(m) + } + var wgRunnableStarted sync.WaitGroup + wgRunnableStarted.Add(2) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c1) + wgRunnableStarted.Done() return nil }))).To(Succeed()) - c2 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c2) + wgRunnableStarted.Done() return nil }))).To(Succeed()) @@ -320,15 +405,17 @@ var _ = Describe("manger.Manager", func() { Expect(m.Start(stop)).NotTo(HaveOccurred()) Expect(m.Elected()).Should(BeClosed()) }() - <-c1 - <-c2 + wgRunnableStarted.Wait() close(done) }) It("should stop when stop is called", func(done Done) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } s := make(chan struct{}) close(s) Expect(m.Start(s)).NotTo(HaveOccurred()) @@ -339,6 +426,9 @@ var _ = Describe("manger.Manager", func() { It("should return an error if it can't start the cache", func(done Done) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) mgr.startCache = func(stop <-chan struct{}) error { @@ -352,42 +442,271 @@ var _ = Describe("manger.Manager", func() { It("should return an error if any Components fail to Start", func(done Done) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) - c1 := make(chan struct{}) + for _, cb := range callbacks { + cb(m) + } + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c1) + <-s return nil }))).To(Succeed()) - c2 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c2) return fmt.Errorf("expected error") }))).To(Succeed()) - c3 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c3) return nil }))).To(Succeed()) + defer GinkgoRecover() + err = m.Start(stop) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("expected error")) + + close(done) + }) + + It("should wait for runnables to stop", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + var lock sync.Mutex + runnableDoneCount := 0 + runnableDoneFunc := func() { + lock.Lock() + defer lock.Unlock() + runnableDoneCount++ + } + var wgRunnableRunning sync.WaitGroup + wgRunnableRunning.Add(2) + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + wgRunnableRunning.Done() + defer GinkgoRecover() + defer runnableDoneFunc() + <-s + return nil + }))).To(Succeed()) + + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + wgRunnableRunning.Done() + defer GinkgoRecover() + defer runnableDoneFunc() + <-s + time.Sleep(300 * time.Millisecond) //slow closure simulation + return nil + }))).To(Succeed()) + + defer GinkgoRecover() + s := make(chan struct{}) + + var wgManagerRunning sync.WaitGroup + wgManagerRunning.Add(1) go func() { + defer wgManagerRunning.Done() + Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(runnableDoneCount).To(Equal(2)) + }() + wgRunnableRunning.Wait() + close(s) + + wgManagerRunning.Wait() + close(done) + }) + + It("should return an error if any Components fail to Start and wait for runnables to stop", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + defer GinkgoRecover() + var lock sync.Mutex + runnableDoneCount := 0 + runnableDoneFunc := func() { + lock.Lock() + defer lock.Unlock() + runnableDoneCount++ + } + + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - // NB(directxman12): this should definitely return an error. If it doesn't happen, - // it means someone was signaling "stop: error" with a nil "error". - Expect(m.Start(stop)).NotTo(Succeed()) - close(done) + defer runnableDoneFunc() + return fmt.Errorf("expected error") + }))).To(Succeed()) + + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + defer GinkgoRecover() + defer runnableDoneFunc() + <-s + return nil + }))).To(Succeed()) + + Expect(m.Start(stop)).To(HaveOccurred()) + Expect(runnableDoneCount).To(Equal(2)) + + close(done) + }) + + It("should refuse to add runnable if stop procedure is already engaged", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + defer GinkgoRecover() + + var wgRunnableRunning sync.WaitGroup + wgRunnableRunning.Add(1) + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + wgRunnableRunning.Done() + defer GinkgoRecover() + <-s + return nil + }))).To(Succeed()) + + s := make(chan struct{}) + go func() { + Expect(m.Start(s)).NotTo(HaveOccurred()) }() - <-c1 - <-c2 - <-c3 + wgRunnableRunning.Wait() + close(s) + time.Sleep(100 * time.Millisecond) // give some time for the stop chan closure to be caught by the manager + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + defer GinkgoRecover() + return nil + }))).NotTo(Succeed()) + + close(done) }) - It("should return an error if any non-leaderelection Components fail to Start", func() { - // TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429 + It("should return both runnables and stop errors when both error", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + m.(*controllerManager).gracefulShutdownTimeout = 1 * time.Nanosecond + Expect(m.Add(RunnableFunc(func(_ <-chan struct{}) error { + return runnableError{} + }))) + testDone := make(chan struct{}) + defer close(testDone) + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + <-s + timer := time.NewTimer(30 * time.Second) + defer timer.Stop() + select { + case <-testDone: + return nil + case <-timer.C: + return nil + } + }))) + err = m.Start(make(chan struct{})) + Expect(err).ToNot(BeNil()) + eMsg := "[not feeling like that, failed waiting for all runnables to end within grace period of 1ns: context deadline exceeded]" + Expect(err.Error()).To(Equal(eMsg)) + Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue()) + Expect(errors.Is(err, runnableError{})).To(BeTrue()) + + close(done) }) + + It("should return only stop errors if runnables dont error", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + m.(*controllerManager).gracefulShutdownTimeout = 1 * time.Nanosecond + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + <-s + return nil + }))) + testDone := make(chan struct{}) + defer close(testDone) + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + <-s + timer := time.NewTimer(30 * time.Second) + defer timer.Stop() + select { + case <-testDone: + return nil + case <-timer.C: + return nil + } + }))).NotTo(HaveOccurred()) + stop := make(chan struct{}) + managerStopDone := make(chan struct{}) + go func() { err = m.Start(stop); close(managerStopDone) }() + // Use the 'elected' channel to find out if startup was done, otherwise we stop + // before we started the Runnable and see flakes, mostly in low-CPU envs like CI + <-m.(*controllerManager).elected + close(stop) + <-managerStopDone + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("failed waiting for all runnables to end within grace period of 1ns: context deadline exceeded")) + Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue()) + Expect(errors.Is(err, runnableError{})).ToNot(BeTrue()) + + close(done) + }) + + It("should return only runnables error if stop doesn't error", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + Expect(m.Add(RunnableFunc(func(_ <-chan struct{}) error { + return runnableError{} + }))) + err = m.Start(make(chan struct{})) + Expect(err).ToNot(BeNil()) + Expect(err.Error()).To(Equal("not feeling like that")) + Expect(errors.Is(err, context.DeadlineExceeded)).ToNot(BeTrue()) + Expect(errors.Is(err, runnableError{})).To(BeTrue()) + + close(done) + }) + + It("should not wait for runnables if gracefulShutdownTimeout is 0", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + m.(*controllerManager).gracefulShutdownTimeout = time.Duration(0) + + runnableStopped := make(chan struct{}) + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + <-s + time.Sleep(100 * time.Millisecond) + close(runnableStopped) + return nil + }))).ToNot(HaveOccurred()) + + managerStop := make(chan struct{}) + managerStopDone := make(chan struct{}) + go func() { + Expect(m.Start(managerStop)).NotTo(HaveOccurred()) + close(managerStopDone) + }() + <-m.(*controllerManager).elected + close(managerStop) + + <-managerStopDone + <-runnableStopped + close(done) + }) + } Context("with defaults", func() { @@ -395,12 +714,19 @@ var _ = Describe("manger.Manager", func() { }) Context("with leaderelection enabled", func() { - startSuite(Options{ - LeaderElection: true, - LeaderElectionID: "controller-runtime", - LeaderElectionNamespace: "default", - newResourceLock: fakeleaderelection.NewResourceLock, - }) + startSuite( + Options{ + LeaderElection: true, + LeaderElectionID: "controller-runtime", + LeaderElectionNamespace: "default", + newResourceLock: fakeleaderelection.NewResourceLock, + }, + func(m Manager) { + cm, ok := m.(*controllerManager) + Expect(ok).To(BeTrue()) + cm.onStoppedLeading = func() {} + }, + ) }) Context("should start serving metrics", func() { @@ -983,3 +1309,10 @@ func (i *injectable) InjectStopChannel(stop <-chan struct{}) error { func (i *injectable) Start(<-chan struct{}) error { return nil } + +type runnableError struct { +} + +func (runnableError) Error() string { + return "not feeling like that" +}