From b8ba348835fc8a6d3b55bc13cb06952f36f79121 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sun, 21 Jun 2020 11:52:04 -0400 Subject: [PATCH] Debug graceful shutdown --- go.mod | 2 +- go.sum | 2 + .../machine/v1beta1/machine_webhook_test.go | 17 +- vendor/modules.txt | 2 +- .../controller-runtime/OWNERS_ALIASES | 4 +- .../pkg/manager/internal.go | 328 ++++++++++-------- .../controller-runtime/pkg/manager/manager.go | 61 ++-- 7 files changed, 243 insertions(+), 173 deletions(-) diff --git a/go.mod b/go.mod index ac63208049..762bf13567 100644 --- a/go.mod +++ b/go.mod @@ -40,4 +40,4 @@ replace sigs.k8s.io/cluster-api-provider-azure => github.com/openshift/cluster-a replace sigs.k8s.io/cluster-api-provider-gcp => github.com/openshift/cluster-api-provider-gcp v0.0.1-0.20200528175251-4f2fdeb49fe1 -replace sigs.k8s.io/controller-runtime => github.com/mgugino-upstream-stage/controller-runtime v0.6.1-0.20200618201807-9d82bf2a7266 +replace sigs.k8s.io/controller-runtime => github.com/alvaroaleman/controller-runtime v0.1.5-0.20200619152754-4a802fb9b747 diff --git a/go.sum b/go.sum index 4aed2dbd3a..510c3d6b92 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alvaroaleman/controller-runtime v0.1.5-0.20200619152754-4a802fb9b747 h1:l5sFFGjYd9YsVYN8u1JRsa4pSURnxBEpM80B1W2y46E= +github.com/alvaroaleman/controller-runtime v0.1.5-0.20200619152754-4a802fb9b747/go.mod h1:qN/IYzFHXI7mP9qhUiGRN9uDH3fdAAqBTCqP1YkMEtQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= diff --git a/pkg/apis/machine/v1beta1/machine_webhook_test.go b/pkg/apis/machine/v1beta1/machine_webhook_test.go index 1c0c43b37a..921da96617 100644 --- a/pkg/apis/machine/v1beta1/machine_webhook_test.go +++ b/pkg/apis/machine/v1beta1/machine_webhook_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "testing" + "time" . "github.com/onsi/gomega" osconfigv1 "github.com/openshift/api/config/v1" @@ -145,10 +146,12 @@ func TestMachineCreation(t *testing.T) { t.Run(tc.name, func(t *testing.T) { gs := NewWithT(t) + gracefulShutdownTimeout := 500 * time.Millisecond mgr, err := manager.New(cfg, manager.Options{ - MetricsBindAddress: "0", - Port: testEnv.WebhookInstallOptions.LocalServingPort, - CertDir: testEnv.WebhookInstallOptions.LocalServingCertDir, + GracefulShutdownTimeout: &gracefulShutdownTimeout, + MetricsBindAddress: "0", + Port: testEnv.WebhookInstallOptions.LocalServingPort, + CertDir: testEnv.WebhookInstallOptions.LocalServingCertDir, }) gs.Expect(err).ToNot(HaveOccurred()) @@ -543,10 +546,12 @@ func TestMachineUpdate(t *testing.T) { t.Run(tc.name, func(t *testing.T) { gs := NewWithT(t) + gracefulShutdownTimeout := 500 * time.Millisecond mgr, err := manager.New(cfg, manager.Options{ - MetricsBindAddress: "0", - Port: testEnv.WebhookInstallOptions.LocalServingPort, - CertDir: testEnv.WebhookInstallOptions.LocalServingCertDir, + GracefulShutdownTimeout: &gracefulShutdownTimeout, + MetricsBindAddress: "0", + Port: testEnv.WebhookInstallOptions.LocalServingPort, + CertDir: testEnv.WebhookInstallOptions.LocalServingCertDir, }) gs.Expect(err).ToNot(HaveOccurred()) diff --git a/vendor/modules.txt b/vendor/modules.txt index 6001ae1c69..de0962e508 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -793,7 +793,7 @@ sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1beta1 sigs.k8s.io/cluster-api-provider-azure/pkg/apis/azureprovider/v1beta1 # sigs.k8s.io/cluster-api-provider-gcp v0.0.0-00010101000000-000000000000 => github.com/openshift/cluster-api-provider-gcp v0.0.1-0.20200528175251-4f2fdeb49fe1 sigs.k8s.io/cluster-api-provider-gcp/pkg/apis/gcpprovider/v1beta1 -# sigs.k8s.io/controller-runtime v0.6.0 => github.com/mgugino-upstream-stage/controller-runtime v0.6.1-0.20200618201807-9d82bf2a7266 +# sigs.k8s.io/controller-runtime v0.6.0 => github.com/alvaroaleman/controller-runtime v0.1.5-0.20200619152754-4a802fb9b747 sigs.k8s.io/controller-runtime sigs.k8s.io/controller-runtime/pkg/builder sigs.k8s.io/controller-runtime/pkg/cache diff --git a/vendor/sigs.k8s.io/controller-runtime/OWNERS_ALIASES b/vendor/sigs.k8s.io/controller-runtime/OWNERS_ALIASES index 52b6673a2f..4756d9bb4f 100644 --- a/vendor/sigs.k8s.io/controller-runtime/OWNERS_ALIASES +++ b/vendor/sigs.k8s.io/controller-runtime/OWNERS_ALIASES @@ -7,7 +7,6 @@ aliases: - directxman12 - droot - mengqiy - - pwittrock # non-admin folks who have write-access and can approve any PRs in the repo controller-runtime-maintainers: @@ -37,4 +36,5 @@ aliases: # folks who may have context on ancient history, # but are no longer directly involved - # controller-runtime-emeritus-maintainers: + controller-runtime-emeritus-maintainers: + - pwittrock diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go index 4554298350..b2d6c25c9c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go @@ -18,6 +18,7 @@ package manager import ( "context" + "errors" "fmt" "net" "net/http" @@ -27,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -44,9 +46,10 @@ import ( const ( // Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go - defaultLeaseDuration = 15 * time.Second - defaultRenewDeadline = 10 * time.Second - defaultRetryPeriod = 2 * time.Second + defaultLeaseDuration = 15 * time.Second + defaultRenewDeadline = 10 * time.Second + defaultRetryPeriod = 2 * time.Second + defaultGracefulShutdownPeriod = 30 * time.Second defaultReadinessEndpoint = "/readyz" defaultLivenessEndpoint = "/healthz" @@ -118,11 +121,7 @@ type controllerManager struct { started bool startedLeader bool healthzStarted bool - - // NB(directxman12): we don't just use an error channel here to avoid the situation where the - // error channel is too small and we end up blocking some goroutines waiting to report their errors. - // errSignal lets us track when we should stop because an error occurred - errSignal *errSignaler + errChan chan error // internalStop is the stop channel *actually* used by everything involved // with the manager as a stop channel, so that we can pass a stop channel @@ -134,6 +133,14 @@ type controllerManager struct { // It and `internalStop` should point to the same channel. internalStopper chan<- struct{} + // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, + // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that + // it must be deferred until after gracefulShutdown is done. + leaderElectionCancel context.CancelFunc + + // stop procedure engaged. In other words, we should not add anything else to the manager + stopProcedureEngaged bool + // elected is closed when this manager becomes the leader of a group of // managers, either because it won a leader election or because no leader // election was configured. @@ -161,57 +168,32 @@ type controllerManager struct { // retryPeriod is the duration the LeaderElector clients should wait // between tries of actions. retryPeriod time.Duration -} - -type errSignaler struct { - // errSignal indicates that an error occurred, when closed. It shouldn't - // be written to. - errSignal chan struct{} - - // err is the received error - err error - mu sync.Mutex -} - -func (r *errSignaler) SignalError(err error) { - r.mu.Lock() - defer r.mu.Unlock() + // waitForRunnable is holding the number of runnables currently running so that + // we can wait for them to exit before quitting the manager + waitForRunnable sync.WaitGroup - if err == nil { - // non-error, ignore - log.Error(nil, "SignalError called without an (with a nil) error, which should never happen, ignoring") - return - } + // gracefulShutdownTimeout is the duration given to runnable to stop + // before the manager actually returns on stop. + gracefulShutdownTimeout time.Duration - if r.err != nil { - // we already have an error, don't try again - return - } + // onStoppedLeading is callled when the leader election lease is lost. + // It can be overridden for tests. + onStoppedLeading func() - // save the error and report it - r.err = err - close(r.errSignal) -} - -func (r *errSignaler) Error() error { - r.mu.Lock() - defer r.mu.Unlock() - - return r.err -} - -func (r *errSignaler) GotError() chan struct{} { - r.mu.Lock() - defer r.mu.Unlock() - - return r.errSignal + // shutdownCtx is the context that can be used during shutdown. It will be cancelled + // after the gracefulShutdownTimeout ended. It must not be accessed before internalStop + // is closed because it will be nil. + shutdownCtx context.Context } // Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return errors.New("can't accept new runnable as stop procedure is already engaged") + } // Set dependencies on the object if err := cm.SetFields(r); err != nil { @@ -231,11 +213,7 @@ func (cm *controllerManager) Add(r Runnable) error { if shouldStart { // If already started, start the controller - go func() { - if err := r.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - }() + cm.startRunnable(r) } return nil @@ -293,6 +271,10 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return errors.New("can't accept new healthCheck as stop procedure is already engaged") + } + if cm.healthzStarted { return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } @@ -310,6 +292,10 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return errors.New("can't accept new ready check as stop procedure is already engaged") + } + if cm.healthzStarted { return fmt.Errorf("unable to add new checker because readyz endpoint has already been created") } @@ -389,17 +375,18 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { Handler: mux, } // Run the server - go func() { + cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error { log.Info("starting metrics server", "path", defaultMetricsEndpoint) if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { - cm.errSignal.SignalError(err) + return err } - }() + return nil + })) // Shutdown the server when stop is closed <-stop - if err := server.Shutdown(context.Background()); err != nil { - cm.errSignal.SignalError(err) + if err := server.Shutdown(cm.shutdownCtx); err != nil { + cm.errChan <- err } } @@ -420,26 +407,48 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) { Handler: mux, } // Run server - go func() { + cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error { if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { - cm.errSignal.SignalError(err) + return err } - }() + return nil + })) cm.healthzStarted = true cm.mu.Unlock() // Shutdown the server when stop is closed <-stop - if err := server.Shutdown(context.Background()); err != nil { - cm.errSignal.SignalError(err) + if err := server.Shutdown(cm.shutdownCtx); err != nil { + cm.errChan <- err } } -func (cm *controllerManager) Start(stop <-chan struct{}) error { - // join the passed-in stop channel as an upstream feeding into cm.internalStopper +func (cm *controllerManager) Start(stop <-chan struct{}) (err error) { + // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request + stopComplete := make(chan struct{}) + defer close(stopComplete) + // This must be deferred after closing stopComplete, otherwise we deadlock + defer func() { + // https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg + stopErr := cm.engageStopProcedure(stopComplete) + if stopErr != nil { + if err != nil { + // Utilerrors.Aggregate allows to use errors.Is for all contained errors + // whereas fmt.Errorf allows wrapping at most one error which means the + // other one can not be found anymore. + err = utilerrors.NewAggregate([]error{err, stopErr}) + } else { + err = stopErr + } + } + }() // initialize this here so that we reset the signal channel state on every start - cm.errSignal = &errSignaler{errSignal: make(chan struct{})} + // Everything that might write into this channel must be started in a new goroutine, + // because otherwise we might block this routine trying to write into the full channel + // and will not be able to enter the deferred cm.engageStopProcedure() which drains + // it. + cm.errChan = make(chan error) // Metrics should be served whether the controller is leader or not. // (If we don't serve metrics for non-leaders, prometheus will still scrape @@ -453,69 +462,102 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { go cm.serveHealthProbes(cm.internalStop) } - doneCh := make(chan error, 1) - go cm.startNonLeaderElectionRunnables(doneCh) + go cm.startNonLeaderElectionRunnables() - if cm.resourceLock != nil { - err := cm.startLeaderElection() - if err != nil { - close(cm.internalStopper) - <-doneCh - return err + go func() { + if cm.resourceLock != nil { + err := cm.startLeaderElection() + if err != nil { + cm.errChan <- err + } + } else { + // Treat not having leader election enabled the same as being elected. + close(cm.elected) + go cm.startLeaderElectionRunnables() } - } else { - // Treat not having leader election enabled the same as being elected. - close(cm.elected) - go cm.startLeaderElectionRunnables() - } + }() select { case <-stop: - close(cm.internalStopper) - <-doneCh + // We are done return nil - case <-cm.errSignal.GotError(): - close(cm.internalStopper) - <-doneCh - return cm.errSignal.Error() + case err := <-cm.errChan: + // Error starting or running a runnable + return err } } -func (cm *controllerManager) startNonLeaderElectionRunnables(doneCh chan error) { +// engageStopProcedure signals all runnables to stop, reads potential errors +// from the errChan and waits for them to end. It must not be called more than once. +func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error { + var cancel context.CancelFunc + if cm.gracefulShutdownTimeout > 0 { + cm.shutdownCtx, cancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout) + } else { + cm.shutdownCtx, cancel = context.WithCancel(context.Background()) + } + defer cancel() + close(cm.internalStopper) + // Start draining the errors before acquiring the lock to make sure we don't deadlock + // if something that has the lock is blocked on trying to write into the unbuffered + // channel after something else already wrote into it. + go func() { + for { + select { + case err, ok := <-cm.errChan: + if ok { + log.Error(err, "error received after stop sequence was engaged") + } + case <-stopComplete: + return + } + } + }() + if cm.gracefulShutdownTimeout == 0 { + return nil + } + cm.mu.Lock() + defer cm.mu.Unlock() + cm.stopProcedureEngaged = true + return cm.waitForRunnableToEnd(cm.shutdownCtx, cancel) +} + +// waitForRunnableToEnd blocks until all runnables ended or the +// tearDownTimeout was reached. In the latter case, an error is returned. +func (cm *controllerManager) waitForRunnableToEnd(ctx context.Context, cancel context.CancelFunc) error { + defer cancel() + + // Cancel leader election only after we waited. It will os.Exit() the app for safety. + defer func() { + if cm.leaderElectionCancel != nil { + cm.leaderElectionCancel() + } + }() + + go func() { + cm.waitForRunnable.Wait() + cancel() + }() + + <-ctx.Done() + if err := ctx.Err(); err != nil && err != context.Canceled { + return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err) + } + return nil +} + +func (cm *controllerManager) startNonLeaderElectionRunnables() { cm.mu.Lock() defer cm.mu.Unlock() cm.waitForCache() - returnCh := make(chan error, 1) // Start the non-leaderelection Runnables after the cache has synced for _, c := range cm.nonLeaderElectionRunnables { // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them - ctrl := c - go func() { - if err := ctrl.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - // we use %T here because we don't have a good stand-in for "name", - // and the full runnable might not serialize (mutexes, etc) - log.V(1).Info("non-leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl)) - returnCh <- nil - }() + cm.startRunnable(c) } - - doneCount := 0 - - numRunners := len(cm.nonLeaderElectionRunnables) - for doneCount < numRunners { - select { - case <-returnCh: - doneCount++ - default: - } - } - close(returnCh) - close(doneCh) } func (cm *controllerManager) startLeaderElectionRunnables() { @@ -528,15 +570,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() { for _, c := range cm.leaderElectionRunnables { // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them - ctrl := c - go func() { - if err := ctrl.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - // we use %T here because we don't have a good stand-in for "name", - // and the full runnable might not serialize (mutexes, etc) - log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl)) - }() + cm.startRunnable(c) } cm.startedLeader = true @@ -551,19 +585,37 @@ func (cm *controllerManager) waitForCache() { if cm.startCache == nil { cm.startCache = cm.cache.Start } - go func() { - if err := cm.startCache(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - }() + cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error { + return cm.startCache(stop) + })) // Wait for the caches to sync. // TODO(community): Check the return value and write a test cm.cache.WaitForCacheSync(cm.internalStop) + // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse + // cm.started as check if we already started the cache so it must always become true. + // Making sure that the cache doesn't get started twice is needed to not get a "close + // of closed channel" panic cm.started = true } func (cm *controllerManager) startLeaderElection() (err error) { + ctx, cancel := context.WithCancel(context.Background()) + cm.mu.Lock() + cm.leaderElectionCancel = cancel + cm.mu.Unlock() + + if cm.onStoppedLeading == nil { + cm.onStoppedLeading = func() { + // Make sure graceful shutdown is skipped if we lost the leader lock without + // intending to. + cm.gracefulShutdownTimeout = time.Duration(0) + // Most implementations of leader election log.Fatal() here. + // Since Start is wrapped in log.Fatal when called, we can just return + // an error here which will cause the program to exit. + cm.errChan <- errors.New("leader election lost") + } + } l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: cm.resourceLock, LeaseDuration: cm.leaseDuration, @@ -574,27 +626,13 @@ func (cm *controllerManager) startLeaderElection() (err error) { close(cm.elected) cm.startLeaderElectionRunnables() }, - OnStoppedLeading: func() { - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errSignal.SignalError(fmt.Errorf("leader election lost")) - }, + OnStoppedLeading: cm.onStoppedLeading, }, }) if err != nil { return err } - ctx, cancel := context.WithCancel(context.Background()) - go func() { - select { - case <-cm.internalStop: - cancel() - case <-ctx.Done(): - } - }() - // Start the leader elector process go l.Run(ctx) return nil @@ -603,3 +641,13 @@ func (cm *controllerManager) startLeaderElection() (err error) { func (cm *controllerManager) Elected() <-chan struct{} { return cm.elected } + +func (cm *controllerManager) startRunnable(r Runnable) { + cm.waitForRunnable.Add(1) + go func() { + defer cm.waitForRunnable.Done() + if err := r.Start(cm.internalStop); err != nil { + cm.errChan <- err + } + }() +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go index 1526cef476..09e6298e73 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go @@ -75,6 +75,9 @@ type Manager interface { // Start starts all registered Controllers and blocks until the Stop channel is closed. // Returns an error if there is an error starting any controller. + // If LeaderElection is used, the binary must be exited immediately after this returns, + // otherwise components that need leader election might continue to run after the leader + // lock was lost. Start(<-chan struct{}) error // GetConfig returns an initialized Config @@ -205,6 +208,12 @@ type Options struct { // Use this to customize the event correlator and spam filter EventBroadcaster record.EventBroadcaster + // GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop. + // To disable graceful shutdown, set to time.Duration(0) + // To use graceful shutdown without timeout, set to a negative duration, e.G. time.Duration(-1) + // The graceful shutdown is skipped for safety reasons in case the leadere election lease is lost. + GracefulShutdownTimeout *time.Duration + // Dependency injection for testing newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) @@ -317,29 +326,30 @@ func New(config *rest.Config, options Options) (Manager, error) { stop := make(chan struct{}) return &controllerManager{ - config: config, - scheme: options.Scheme, - cache: cache, - fieldIndexes: cache, - client: writeObj, - apiReader: apiReader, - recorderProvider: recorderProvider, - resourceLock: resourceLock, - mapper: mapper, - metricsListener: metricsListener, - metricsExtraHandlers: metricsExtraHandlers, - internalStop: stop, - internalStopper: stop, - elected: make(chan struct{}), - port: options.Port, - host: options.Host, - certDir: options.CertDir, - leaseDuration: *options.LeaseDuration, - renewDeadline: *options.RenewDeadline, - retryPeriod: *options.RetryPeriod, - healthProbeListener: healthProbeListener, - readinessEndpointName: options.ReadinessEndpointName, - livenessEndpointName: options.LivenessEndpointName, + config: config, + scheme: options.Scheme, + cache: cache, + fieldIndexes: cache, + client: writeObj, + apiReader: apiReader, + recorderProvider: recorderProvider, + resourceLock: resourceLock, + mapper: mapper, + metricsListener: metricsListener, + metricsExtraHandlers: metricsExtraHandlers, + internalStop: stop, + internalStopper: stop, + elected: make(chan struct{}), + port: options.Port, + host: options.Host, + certDir: options.CertDir, + leaseDuration: *options.LeaseDuration, + renewDeadline: *options.RenewDeadline, + retryPeriod: *options.RetryPeriod, + healthProbeListener: healthProbeListener, + readinessEndpointName: options.ReadinessEndpointName, + livenessEndpointName: options.LivenessEndpointName, + gracefulShutdownTimeout: *options.GracefulShutdownTimeout, }, nil } @@ -439,5 +449,10 @@ func setOptionsDefaults(options Options) Options { options.newHealthProbeListener = defaultHealthProbeListener } + if options.GracefulShutdownTimeout == nil { + gracefulShutdownTimeout := defaultGracefulShutdownPeriod + options.GracefulShutdownTimeout = &gracefulShutdownTimeout + } + return options }