Skip to content

Commit

Permalink
Wait for Runnables to end before returning in manager Start
Browse files Browse the repository at this point in the history
  • Loading branch information
dbenque committed Oct 29, 2019
1 parent ad57a97 commit c013ca0
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 52 deletions.
92 changes: 82 additions & 10 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second

defaultRunnableTearDownTimeout = 10 * time.Second

defaultReadinessEndpoint = "/readyz"
defaultLivenessEndpoint = "/healthz"
)
Expand Down Expand Up @@ -126,6 +128,9 @@ type controllerManager struct {
// It and `internalStop` should point to the same channel.
internalStopper chan<- struct{}

// stop procedure engaged. In other words, we should not add anything else to the manager
stopProcedureEngaged bool

startCache func(stop <-chan struct{}) error

// port is the port that the webhook server serves at.
Expand All @@ -148,12 +153,23 @@ type controllerManager struct {
// retryPeriod is the duration the LeaderElector clients should wait
// between tries of actions.
retryPeriod time.Duration

// waitForRunnable is holding the number of runnables currently running so that
// we can wait for them to exit before quitting the manager
waitForRunnable sync.WaitGroup

// runnableTearDownTimeout is the duration given to runnable to stop
// before the manager actually returns on stop.
runnableTearDownTimeout time.Duration
}

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.stopProcedureEngaged {
return fmt.Errorf("can't accept new runnable as stop procedure is already engaged")
}

// Set dependencies on the object
if err := cm.SetFields(r); err != nil {
Expand All @@ -173,9 +189,7 @@ func (cm *controllerManager) Add(r Runnable) error {

if shouldStart {
// If already started, start the controller
go func() {
cm.errChan <- r.Start(cm.internalStop)
}()
cm.startRunnable(r)
}

return nil
Expand Down Expand Up @@ -213,6 +227,9 @@ func (cm *controllerManager) SetFields(i interface{}) error {
func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.stopProcedureEngaged {
return fmt.Errorf("can't accept new healthCheck as stop procedure is already engaged")
}

if cm.healthzStarted {
return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
Expand All @@ -230,6 +247,9 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker)
func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.stopProcedureEngaged {
return fmt.Errorf("can't accept new ready check as stop procedure is already engaged")
}

if cm.healthzStarted {
return fmt.Errorf("unable to add new checker because readyz endpoint has already been created")
Expand Down Expand Up @@ -350,8 +370,9 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
}

func (cm *controllerManager) Start(stop <-chan struct{}) error {
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
defer close(cm.internalStopper)
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
stopComplete := make(chan struct{})
defer close(stopComplete)

// Metrics should be served whether the controller is leader or not.
// (If we don't serve metrics for non-leaders, prometheus will still scrape
Expand All @@ -370,6 +391,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
if errStop := cm.engageStopProcedure(stopComplete); errStop != nil {
log.Error(errStop, "some runnables could not be stopped after error occurred in startLeaderElection.")
}
return err
}
} else {
Expand All @@ -379,12 +403,52 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
select {
case <-stop:
// We are done
return nil
return cm.engageStopProcedure(stopComplete)
case err := <-cm.errChan:
// Error starting a controller
// Error starting or running a runnable
if errStop := cm.engageStopProcedure(stopComplete); errStop != nil {
log.Error(errStop, "some runnables could not be stopped after error occurred starting/running the manager.")
}
return err
}
}
func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.stopProcedureEngaged = true
close(cm.internalStopper)
go func() {
for {
select {
case err, ok := <-cm.errChan:
if ok {
log.Error(err, "error received after stop sequence was engaged")
}
case <-stopComplete:
return
}
}
}()
return cm.waitForRunnableToEnd()
}

func (cm *controllerManager) waitForRunnableToEnd() error {
runnableTearDownTimer := time.NewTimer(cm.runnableTearDownTimeout)
defer runnableTearDownTimer.Stop()
allStopped := make(chan struct{})

go func() {
cm.waitForRunnable.Wait()
close(allStopped)
}()

select {
case <-allStopped:
return nil
case <-runnableTearDownTimer.C:
return fmt.Errorf("not all runnables have stopped within the proposed delay of %s", cm.runnableTearDownTimeout.String())
}
}

func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
Expand Down Expand Up @@ -414,9 +478,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
cm.startRunnable(ctrl)
}

cm.startedLeader = true
Expand Down Expand Up @@ -478,3 +540,13 @@ func (cm *controllerManager) startLeaderElection() (err error) {
go l.Run(ctx)
return nil
}

func (cm *controllerManager) startRunnable(r Runnable) {
cm.waitForRunnable.Add(1)
go func() {
defer cm.waitForRunnable.Done()
if err := r.Start(cm.internalStop); err != nil {
cm.errChan <- err
}
}()
}
54 changes: 32 additions & 22 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ type Options struct {
// Use this to customize the event correlator and spam filter
EventBroadcaster record.EventBroadcaster

// RunnableTearDownTimeout is the duration given to runnable to stop
// before the manager actually returns on stop.
RunnableTearDownTimeout *time.Duration

// Dependency injection for testing
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error)
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
Expand Down Expand Up @@ -289,28 +293,29 @@ func New(config *rest.Config, options Options) (Manager, error) {
stop := make(chan struct{})

return &controllerManager{
config: config,
scheme: options.Scheme,
errChan: make(chan error),
cache: cache,
fieldIndexes: cache,
client: writeObj,
apiReader: apiReader,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
config: config,
scheme: options.Scheme,
errChan: make(chan error),
cache: cache,
fieldIndexes: cache,
client: writeObj,
apiReader: apiReader,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
runnableTearDownTimeout: *options.RunnableTearDownTimeout,
}, nil
}

Expand Down Expand Up @@ -410,5 +415,10 @@ func setOptionsDefaults(options Options) Options {
options.newHealthProbeListener = defaultHealthProbeListener
}

if options.RunnableTearDownTimeout == nil {
runnableTearDownTimeout := defaultRunnableTearDownTimeout
options.RunnableTearDownTimeout = &runnableTearDownTimeout
}

return options
}
Loading

0 comments on commit c013ca0

Please sign in to comment.