From 2c0a9ac87ccbe46194e20e62e33479d92c8162b5 Mon Sep 17 00:00:00 2001 From: Fatih Arslan Date: Fri, 20 Aug 2021 12:14:43 +0300 Subject: [PATCH] servenv: add `--onclose_timeout` flag This PR adds a new `--onclose_timeout` flag that adds a timeout for `onClose` hooks. It's very similar to the existing `--onterm_timeout` flag. This will be useful for services who use the `--lameduck-period` flag in conjuction with `onClose` hooks. In this scenario, a service doesn't have the ability to add timeouts to `onClose` hooks, hence it could lead a `onClose` hook to block the shutdown of a service. Signed-off-by: Fatih Arslan --- go/vt/servenv/run.go | 2 +- go/vt/servenv/servenv.go | 22 ++++++++++++++++++---- go/vt/servenv/servenv_test.go | 14 ++++++++++++++ 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/go/vt/servenv/run.go b/go/vt/servenv/run.go index 25b1e189d1b..811511da63f 100644 --- a/go/vt/servenv/run.go +++ b/go/vt/servenv/run.go @@ -69,7 +69,7 @@ func Run(port int) { } log.Info("Shutting down gracefully") - Close() + fireOnCloseHooks(*onCloseTimeout) } // Close runs any registered exit hooks in parallel. diff --git a/go/vt/servenv/servenv.go b/go/vt/servenv/servenv.go index 9695b48c1e5..3d44f29ba5c 100644 --- a/go/vt/servenv/servenv.go +++ b/go/vt/servenv/servenv.go @@ -57,6 +57,7 @@ var ( // Flags to alter the behavior of the library. lameduckPeriod = flag.Duration("lameduck-period", 50*time.Millisecond, "keep running at least this long after SIGTERM before stopping") onTermTimeout = flag.Duration("onterm_timeout", 10*time.Second, "wait no more than this for OnTermSync handlers before stopping") + onCloseTimeout = flag.Duration("onclose_timeout", time.Nanosecond, "wait no more than this for OnClose handlers before stopping") _ = flag.Int("mem-profile-rate", 512*1024, "deprecated: use '-pprof=mem' instead") _ = flag.Int("mutex-profile-fraction", 0, "deprecated: use '-pprof=mutex' instead") catchSigpipe = flag.Bool("catch-sigpipe", false, "catch and ignore SIGPIPE on stdout and stderr if specified") @@ -165,24 +166,37 @@ func OnTermSync(f func()) { // fireOnTermSyncHooks returns true iff all the hooks finish before the timeout. func fireOnTermSyncHooks(timeout time.Duration) bool { + return fireHooksWithTimeout(timeout, "OnTermSync", onTermSyncHooks.Fire) +} + +// fireOnCloseHooks returns true iff all the hooks finish before the timeout. +func fireOnCloseHooks(timeout time.Duration) bool { + return fireHooksWithTimeout(timeout, "OnClose", func() { + onCloseHooks.Fire() + ListeningURL = url.URL{} + }) +} + +// fireHooksWithTimeout returns true iff all the hooks finish before the timeout. +func fireHooksWithTimeout(timeout time.Duration, name string, hookFn func()) bool { defer log.Flush() - log.Infof("Firing synchronous OnTermSync hooks and waiting up to %v for them", timeout) + log.Infof("Firing %s hooks and waiting up to %v for them", name, timeout) timer := time.NewTimer(timeout) defer timer.Stop() done := make(chan struct{}) go func() { - onTermSyncHooks.Fire() + hookFn() close(done) }() select { case <-done: - log.Infof("OnTermSync hooks finished") + log.Infof("%s hooks finished", name) return true case <-timer.C: - log.Infof("OnTermSync hooks timed out") + log.Infof("%s hooks timed out", name) return false } } diff --git a/go/vt/servenv/servenv_test.go b/go/vt/servenv/servenv_test.go index 2c3509a2228..7e9c904b677 100644 --- a/go/vt/servenv/servenv_test.go +++ b/go/vt/servenv/servenv_test.go @@ -57,3 +57,17 @@ func TestFireOnTermSyncHooksTimeout(t *testing.T) { t.Errorf("finished = %v, want %v", finished, want) } } + +func TestFireOnCloseHooksTimeout(t *testing.T) { + onCloseHooks = event.Hooks{} + + OnClose(func() { + time.Sleep(1 * time.Second) + }) + + // we deliberatly test the flag to make sure it's not accidently set to a + // high value. + if finished, want := fireOnCloseHooks(*onCloseTimeout), false; finished != want { + t.Errorf("finished = %v, want %v", finished, want) + } +}