Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

servenv: add --onclose_timeout flag #8651

Merged
merged 1 commit into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/servenv/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Run(port int) {
}

log.Info("Shutting down gracefully")
Close()
fireOnCloseHooks(*onCloseTimeout)
}

// Close runs any registered exit hooks in parallel.
Expand Down
22 changes: 18 additions & 4 deletions go/vt/servenv/servenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
// Flags to alter the behavior of the library.
lameduckPeriod = flag.Duration("lameduck-period", 50*time.Millisecond, "keep running at least this long after SIGTERM before stopping")
onTermTimeout = flag.Duration("onterm_timeout", 10*time.Second, "wait no more than this for OnTermSync handlers before stopping")
onCloseTimeout = flag.Duration("onclose_timeout", time.Nanosecond, "wait no more than this for OnClose handlers before stopping")
_ = flag.Int("mem-profile-rate", 512*1024, "deprecated: use '-pprof=mem' instead")
_ = flag.Int("mutex-profile-fraction", 0, "deprecated: use '-pprof=mutex' instead")
catchSigpipe = flag.Bool("catch-sigpipe", false, "catch and ignore SIGPIPE on stdout and stderr if specified")
Expand Down Expand Up @@ -165,24 +166,37 @@ func OnTermSync(f func()) {

// fireOnTermSyncHooks returns true iff all the hooks finish before the timeout.
func fireOnTermSyncHooks(timeout time.Duration) bool {
return fireHooksWithTimeout(timeout, "OnTermSync", onTermSyncHooks.Fire)
}

// fireOnCloseHooks returns true iff all the hooks finish before the timeout.
func fireOnCloseHooks(timeout time.Duration) bool {
return fireHooksWithTimeout(timeout, "OnClose", func() {
onCloseHooks.Fire()
ListeningURL = url.URL{}
})
}

// fireHooksWithTimeout returns true iff all the hooks finish before the timeout.
func fireHooksWithTimeout(timeout time.Duration, name string, hookFn func()) bool {
defer log.Flush()
log.Infof("Firing synchronous OnTermSync hooks and waiting up to %v for them", timeout)
log.Infof("Firing %s hooks and waiting up to %v for them", name, timeout)

timer := time.NewTimer(timeout)
defer timer.Stop()

done := make(chan struct{})
go func() {
onTermSyncHooks.Fire()
hookFn()
close(done)
}()

select {
case <-done:
log.Infof("OnTermSync hooks finished")
log.Infof("%s hooks finished", name)
return true
case <-timer.C:
log.Infof("OnTermSync hooks timed out")
log.Infof("%s hooks timed out", name)
return false
}
}
Expand Down
14 changes: 14 additions & 0 deletions go/vt/servenv/servenv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,17 @@ func TestFireOnTermSyncHooksTimeout(t *testing.T) {
t.Errorf("finished = %v, want %v", finished, want)
}
}

func TestFireOnCloseHooksTimeout(t *testing.T) {
onCloseHooks = event.Hooks{}

OnClose(func() {
time.Sleep(1 * time.Second)
})

// we deliberatly test the flag to make sure it's not accidently set to a
// high value.
if finished, want := fireOnCloseHooks(*onCloseTimeout), false; finished != want {
t.Errorf("finished = %v, want %v", finished, want)
}
}