From 97d3e5fdb3a32efb15091c4f213b75e57e73de51 Mon Sep 17 00:00:00 2001 From: "david.benque" Date: Fri, 25 Oct 2019 14:29:31 +0200 Subject: [PATCH] Fix #429 and #350 --- pkg/manager/internal.go | 92 +++++++++++++++++++++--- pkg/manager/manager.go | 54 ++++++++------ pkg/manager/manager_test.go | 136 ++++++++++++++++++++++++++++++------ 3 files changed, 230 insertions(+), 52 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index ec00910b2c..6a632f7fe6 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -48,6 +48,8 @@ const ( defaultRenewDeadline = 10 * time.Second defaultRetryPeriod = 2 * time.Second + defaultRunnableTearDownTimeout = 10 * time.Second + defaultReadinessEndpoint = "/readyz" defaultLivenessEndpoint = "/healthz" ) @@ -126,6 +128,9 @@ type controllerManager struct { // It and `internalStop` should point to the same channel. internalStopper chan<- struct{} + // stop procedure engaged. In other words, we should not add anything else to the manager + stopProcedureEngaged bool + startCache func(stop <-chan struct{}) error // port is the port that the webhook server serves at. @@ -148,12 +153,23 @@ type controllerManager struct { // retryPeriod is the duration the LeaderElector clients should wait // between tries of actions. retryPeriod time.Duration + + // waitForRunnable is holding the number of runnables currently running so that + // we can wait for them to exit before quitting the manager + waitForRunnable sync.WaitGroup + + // runnableTearDownTimeout is the duration given to runnable to stop + // before the manager actually returns on stop. + runnableTearDownTimeout time.Duration } // Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return fmt.Errorf("can't accept new runnable as stop procedure is already engaged") + } // Set dependencies on the object if err := cm.SetFields(r); err != nil { @@ -173,9 +189,7 @@ func (cm *controllerManager) Add(r Runnable) error { if shouldStart { // If already started, start the controller - go func() { - cm.errChan <- r.Start(cm.internalStop) - }() + cm.startRunnable(r) } return nil @@ -213,6 +227,9 @@ func (cm *controllerManager) SetFields(i interface{}) error { func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return fmt.Errorf("can't accept new healthCheck as stop procedure is already engaged") + } if cm.healthzStarted { return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") @@ -230,6 +247,9 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return fmt.Errorf("can't accept new ready check as stop procedure is already engaged") + } if cm.healthzStarted { return fmt.Errorf("unable to add new checker because readyz endpoint has already been created") @@ -350,8 +370,9 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) { } func (cm *controllerManager) Start(stop <-chan struct{}) error { - // join the passed-in stop channel as an upstream feeding into cm.internalStopper - defer close(cm.internalStopper) + // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request + stopComplete := make(chan struct{}) + defer close(stopComplete) // Metrics should be served whether the controller is leader or not. // (If we don't serve metrics for non-leaders, prometheus will still scrape @@ -370,6 +391,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { if cm.resourceLock != nil { err := cm.startLeaderElection() if err != nil { + if errStop := cm.engageStopProcedure(stopComplete); errStop != nil { + log.Error(errStop, "some runnables could not be stopped after error occurred in startLeaderElection.") + } return err } } else { @@ -379,12 +403,52 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { select { case <-stop: // We are done - return nil + return cm.engageStopProcedure(stopComplete) case err := <-cm.errChan: - // Error starting a controller + // Error starting or running a runnable + if errStop := cm.engageStopProcedure(stopComplete); errStop != nil { + log.Error(errStop, "some runnables could not be stopped after error occurred starting/running the manager.") + } return err } } +func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error { + cm.mu.Lock() + defer cm.mu.Unlock() + cm.stopProcedureEngaged = true + close(cm.internalStopper) + go func() { + for { + select { + case err, ok := <-cm.errChan: + if ok { + log.Error(err, "error received after stop sequence was engaged") + } + case <-stopComplete: + return + } + } + }() + return cm.waitForRunnableToEnd() +} + +func (cm *controllerManager) waitForRunnableToEnd() error { + runnableTearDownTimer := time.NewTimer(cm.runnableTearDownTimeout) + defer runnableTearDownTimer.Stop() + allStopped := make(chan struct{}) + + go func() { + cm.waitForRunnable.Wait() + close(allStopped) + }() + + select { + case <-allStopped: + return nil + case <-runnableTearDownTimer.C: + return fmt.Errorf("not all runnables have stopped within the proposed delay of %s", cm.runnableTearDownTimeout.String()) + } +} func (cm *controllerManager) startNonLeaderElectionRunnables() { cm.mu.Lock() @@ -414,9 +478,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() { // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them ctrl := c - go func() { - cm.errChan <- ctrl.Start(cm.internalStop) - }() + cm.startRunnable(ctrl) } cm.startedLeader = true @@ -478,3 +540,13 @@ func (cm *controllerManager) startLeaderElection() (err error) { go l.Run(ctx) return nil } + +func (cm *controllerManager) startRunnable(r Runnable) { + cm.waitForRunnable.Add(1) + go func() { + defer cm.waitForRunnable.Done() + if err := r.Start(cm.internalStop); err != nil { + cm.errChan <- err + } + }() +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 55cf0063b4..d86b1a931c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -185,6 +185,10 @@ type Options struct { // Use this to customize the event correlator and spam filter EventBroadcaster record.EventBroadcaster + // RunnableTearDownTimeout is the duration given to runnable to stop + // before the manager actually returns on stop. + RunnableTearDownTimeout *time.Duration + // Dependency injection for testing newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) @@ -289,28 +293,29 @@ func New(config *rest.Config, options Options) (Manager, error) { stop := make(chan struct{}) return &controllerManager{ - config: config, - scheme: options.Scheme, - errChan: make(chan error), - cache: cache, - fieldIndexes: cache, - client: writeObj, - apiReader: apiReader, - recorderProvider: recorderProvider, - resourceLock: resourceLock, - mapper: mapper, - metricsListener: metricsListener, - internalStop: stop, - internalStopper: stop, - port: options.Port, - host: options.Host, - certDir: options.CertDir, - leaseDuration: *options.LeaseDuration, - renewDeadline: *options.RenewDeadline, - retryPeriod: *options.RetryPeriod, - healthProbeListener: healthProbeListener, - readinessEndpointName: options.ReadinessEndpointName, - livenessEndpointName: options.LivenessEndpointName, + config: config, + scheme: options.Scheme, + errChan: make(chan error), + cache: cache, + fieldIndexes: cache, + client: writeObj, + apiReader: apiReader, + recorderProvider: recorderProvider, + resourceLock: resourceLock, + mapper: mapper, + metricsListener: metricsListener, + internalStop: stop, + internalStopper: stop, + port: options.Port, + host: options.Host, + certDir: options.CertDir, + leaseDuration: *options.LeaseDuration, + renewDeadline: *options.RenewDeadline, + retryPeriod: *options.RetryPeriod, + healthProbeListener: healthProbeListener, + readinessEndpointName: options.ReadinessEndpointName, + livenessEndpointName: options.LivenessEndpointName, + runnableTearDownTimeout: *options.RunnableTearDownTimeout, }, nil } @@ -410,5 +415,10 @@ func setOptionsDefaults(options Options) Options { options.newHealthProbeListener = defaultHealthProbeListener } + if options.RunnableTearDownTimeout == nil { + runnableTearDownTimeout := defaultRunnableTearDownTimeout + options.RunnableTearDownTimeout = &runnableTearDownTimeout + } + return options } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index e19ef77126..88ffee7fb7 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -21,6 +21,8 @@ import ( "io/ioutil" "net" "net/http" + "sync" + "time" "github.com/go-logr/logr" . "github.com/onsi/ginkgo" @@ -241,17 +243,17 @@ var _ = Describe("manger.Manager", func() { It("should Start each Component", func(done Done) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) - c1 := make(chan struct{}) + var wgRunnableStarted sync.WaitGroup + wgRunnableStarted.Add(2) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c1) + wgRunnableStarted.Done() return nil }))).To(Succeed()) - c2 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c2) + wgRunnableStarted.Done() return nil }))).To(Succeed()) @@ -259,8 +261,8 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(stop)).NotTo(HaveOccurred()) }() - <-c1 - <-c2 + + wgRunnableStarted.Wait() close(done) }) @@ -291,39 +293,133 @@ var _ = Describe("manger.Manager", func() { It("should return an error if any Components fail to Start", func(done Done) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) - c1 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c1) + <-s return nil }))).To(Succeed()) - c2 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c2) return fmt.Errorf("expected error") }))).To(Succeed()) - c3 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c3) return nil }))).To(Succeed()) - go func() { + defer GinkgoRecover() + Expect(m.Start(stop)).To(HaveOccurred()) + + close(done) + }) + + It("should wait for runnables to stop", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + + var lock sync.Mutex + runnableDoneCount := 0 + runnableDoneFunc := func() { + lock.Lock() + defer lock.Unlock() + runnableDoneCount++ + } + var wgRunnableRunning sync.WaitGroup + wgRunnableRunning.Add(2) + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + wgRunnableRunning.Done() defer GinkgoRecover() - Expect(m.Start(stop)).NotTo(HaveOccurred()) - close(done) + defer runnableDoneFunc() + <-s + return nil + }))).To(Succeed()) + + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + wgRunnableRunning.Done() + defer GinkgoRecover() + defer runnableDoneFunc() + <-s + time.Sleep(300 * time.Millisecond) //slow closure simulation + return nil + }))).To(Succeed()) + + defer GinkgoRecover() + s := make(chan struct{}) + + var wgManagerRunning sync.WaitGroup + wgManagerRunning.Add(1) + go func() { + defer wgManagerRunning.Done() + Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(runnableDoneCount).To(Equal(2)) }() - <-c1 - <-c2 - <-c3 + wgRunnableRunning.Wait() // ensure that runnable are running + close(s) + + wgManagerRunning.Wait() // wait for the manager clean exit before closing the test + close(done) + }) + + It("should return an error if any Components fail to Start and wait for runnables to stop", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + defer GinkgoRecover() + var lock sync.Mutex + runnableDoneCount := 0 + runnableDoneFunc := func() { + lock.Lock() + defer lock.Unlock() + runnableDoneCount++ + } + + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + defer GinkgoRecover() + defer runnableDoneFunc() + return fmt.Errorf("expected error") + }))).To(Succeed()) + + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + defer GinkgoRecover() + defer runnableDoneFunc() + <-s + return nil + }))).To(Succeed()) + + Expect(m.Start(stop)).To(HaveOccurred()) + Expect(runnableDoneCount).To(Equal(2)) + + close(done) }) - It("should return an error if any non-leaderelection Components fail to Start", func() { - // TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429 + It("should refuse to add runnable if stop procedure is already engaged", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + defer GinkgoRecover() + + var wgRunnableRunning sync.WaitGroup + wgRunnableRunning.Add(1) + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + wgRunnableRunning.Done() + defer GinkgoRecover() + <-s + return nil + }))).To(Succeed()) + + s := make(chan struct{}) + go func() { + Expect(m.Start(s)).NotTo(HaveOccurred()) + }() + wgRunnableRunning.Wait() + close(s) + time.Sleep(100 * time.Millisecond) // give some time for the stop chan closure to be caught by the manager + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + defer GinkgoRecover() + return nil + }))).NotTo(Succeed()) + + close(done) }) }