From 8d90c8b5d3f2345890a5aa542defcf3f9bdf92c5 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Tue, 3 Jan 2023 10:07:57 +0100 Subject: [PATCH 01/22] Improve process shutdown handling If killing just the forked PID is not working, try to kill the entire process group. This should help in scenarios where a wrapper script is used, which doesn't pass the signal to all of its children. --- daemon/exec_runner.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index 31adb5a2..920de8d4 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -208,6 +208,7 @@ func (r *ExecRunner) start() error { r.cmd = exec.Command(r.exec, quotedArgs...) r.cmd.Dir = r.daemon.Dir r.cmd.Env = append(os.Environ(), r.daemon.Env...) + r.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // start the actual process and don't block r.startTime = time.Now() @@ -240,11 +241,18 @@ func (r *ExecRunner) stop() error { } // in doubt kill the process + if r.Running() && runtime.GOOS != "windows" { + log.Debugf("[%s] PID SIGHUP ignored, sending SIGHUP to process group", r.Name()) + err := syscall.Kill(-r.cmd.Process.Pid, syscall.SIGHUP) + if err != nil { + log.Debugf("[%s] Failed to HUP process group %s", r.Name(), err) + } + time.Sleep(2 * time.Second) + } if r.Running() { - log.Debugf("[%s] SIGHUP ignored, killing process", r.Name()) - err := r.cmd.Process.Kill() + err := syscall.Kill(-r.cmd.Process.Pid, syscall.SIGKILL) if err != nil { - log.Debugf("[%s] Failed to kill process %s", r.Name(), err) + log.Debugf("[%s] Failed to kill process group %s", r.Name(), err) } } @@ -307,7 +315,10 @@ func (r *ExecRunner) run() { // wait for process exit in the background. Ensure single cmd.Wait() call go func() { r.setRunning(true) - r.cmd.Wait() + err := r.cmd.Wait() + if err != nil { + log.Warnf("[%s] Wait() error %s", r.name, err) + } r.setRunning(false) }() } From 9c66d67d0a201f814d2256d838ccd539f1f2ea2d Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Tue, 3 Jan 2023 11:36:26 +0100 Subject: [PATCH 02/22] remove stale code --- main.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/main.go b/main.go index 27e4f51b..410aff8d 100644 --- a/main.go +++ b/main.go @@ -120,9 +120,6 @@ func main() { } hooks.AddLogHooks(ctx, log) - // initialize backends - //backendSetup(ctx) - // start main loop services.StartPeriodicals(ctx) err = s.Run() @@ -150,19 +147,3 @@ func commandLineSetup() error { return nil } - -//func backendSetup(context *context.Ctx) { -// for _, collector := range context.UserConfig.Backends { -// backendCreator, err := backends.GetCreator(collector.Name) -// if err != nil { -// log.Error("Unsupported collector backend found in configuration: " + collector.Name) -// continue -// } -// backend := backendCreator(context) -// backends.Store.AddRunner(backend) -// if *collector.Enabled == true && backend.ValidatePreconditions() { -// log.Debug("Add collector backend: " + backend.Name()) -// daemon.Daemon.AddRunner(backend, context) -// } -// } -//} From ecc35a7bd6bb251d4db4dc915af111d9a0d96ef0 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Tue, 3 Jan 2023 16:51:44 +0100 Subject: [PATCH 03/22] Fix build for windows This needed a bigger refactoring to deal with import cycles: - Move exec_helpers to daemon package - Extract some helper functions into new helper package --- api/graylog.go | 5 +- assignments/assignment.go | 4 +- backends/backend.go | 14 ++--- backends/registry.go | 8 +-- backends/render.go | 3 +- context/context.go | 7 ++- daemon/action_handler.go | 4 +- daemon/daemon.go | 4 +- daemon/exec_helper.go | 44 +++++++++++++ daemon/exec_helper_windows.go | 61 +++++++++++++++++++ daemon/exec_runner.go | 22 ++----- .../exec_helper.go => helpers/argv_helper.go | 3 +- .../argv_helper_windows.go | 3 +- {common => helpers}/helper.go | 10 ++- {common => helpers}/helper_test.go | 2 +- logger/hooks/hooks.go | 2 +- 16 files changed, 149 insertions(+), 47 deletions(-) create mode 100644 daemon/exec_helper.go create mode 100644 daemon/exec_helper_windows.go rename common/exec_helper.go => helpers/argv_helper.go (95%) rename common/exec_helper_windows.go => helpers/argv_helper_windows.go (97%) rename {common => helpers}/helper.go (95%) rename {common => helpers}/helper_test.go (99%) diff --git a/api/graylog.go b/api/graylog.go index 81a05a0d..bb78baa1 100644 --- a/api/graylog.go +++ b/api/graylog.go @@ -19,6 +19,7 @@ import ( "bytes" "crypto/tls" "encoding/json" + "github.com/Graylog2/collector-sidecar/helpers" "io" "net/http" "strconv" @@ -164,7 +165,7 @@ func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.C registration := graylog.RegistrationRequest{} registration.NodeName = ctx.UserConfig.NodeName - registration.NodeDetails.OperatingSystem = common.GetSystemName() + registration.NodeDetails.OperatingSystem = helpers.GetSystemName() if ctx.UserConfig.SendStatus { metrics := &graylog.MetricsRequest{ @@ -172,7 +173,7 @@ func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.C CpuIdle: common.GetCpuIdle(), Load1: common.GetLoad1(), } - registration.NodeDetails.IP = common.GetHostIP() + registration.NodeDetails.IP = helpers.GetHostIP() registration.NodeDetails.Status = status registration.NodeDetails.Metrics = metrics if len(ctx.UserConfig.ListLogFiles) > 0 { diff --git a/assignments/assignment.go b/assignments/assignment.go index 5a0b9719..b540e224 100644 --- a/assignments/assignment.go +++ b/assignments/assignment.go @@ -16,7 +16,7 @@ package assignments import ( - "github.com/Graylog2/collector-sidecar/common" + "github.com/Graylog2/collector-sidecar/helpers" "reflect" ) @@ -92,7 +92,7 @@ func (as *assignmentStore) Update(assignments []ConfigurationAssignment) bool { func (as *assignmentStore) cleanup(validBackendIds []string) { for backendId := range as.assignments { - if !common.IsInList(backendId, validBackendIds) { + if !helpers.IsInList(backendId, validBackendIds) { delete(as.assignments, backendId) } } diff --git a/backends/backend.go b/backends/backend.go index 83290808..551413f0 100644 --- a/backends/backend.go +++ b/backends/backend.go @@ -18,6 +18,7 @@ package backends import ( "bytes" "fmt" + "github.com/Graylog2/collector-sidecar/helpers" "os/exec" "path/filepath" "reflect" @@ -27,7 +28,6 @@ import ( "github.com/flynn-archive/go-shlex" "github.com/Graylog2/collector-sidecar/api/graylog" - "github.com/Graylog2/collector-sidecar/common" "github.com/Graylog2/collector-sidecar/context" "github.com/Graylog2/collector-sidecar/system" ) @@ -50,7 +50,7 @@ type Backend struct { func BackendFromResponse(response graylog.ResponseCollectorBackend, configId string, ctx *context.Ctx) *Backend { return &Backend{ - Enabled: common.NewTrue(), + Enabled: helpers.NewTrue(), Id: response.Id + "-" + configId, CollectorId: response.Id, ConfigId: configId, @@ -74,10 +74,10 @@ func (b *Backend) Equals(a *Backend) bool { } func (b *Backend) EqualSettings(a *Backend) bool { - executeParameters, _ := common.Sprintf( + executeParameters, _ := helpers.Sprintf( a.ExecuteParameters, a.ConfigurationPath) - validationParameters, _ := common.Sprintf( + validationParameters, _ := helpers.Sprintf( a.ValidationParameters, a.ConfigurationPath) @@ -104,7 +104,7 @@ func (b *Backend) CheckExecutableAgainstAccesslist(context *context.Ctx) error { if len(context.UserConfig.CollectorBinariesAccesslist) <= 0 { return nil } - isListed, err := common.PathMatch(b.ExecutablePath, context.UserConfig.CollectorBinariesAccesslist) + isListed, err := helpers.PathMatch(b.ExecutablePath, context.UserConfig.CollectorBinariesAccesslist) if err != nil { return fmt.Errorf("Can not validate binary path: %s", err) } @@ -121,7 +121,7 @@ func (b *Backend) CheckExecutableAgainstAccesslist(context *context.Ctx) error { } func (b *Backend) CheckConfigPathAgainstAccesslist(context *context.Ctx) bool { - configuration, err := common.PathMatch(b.ConfigurationPath, context.UserConfig.CollectorBinariesAccesslist) + configuration, err := helpers.PathMatch(b.ConfigurationPath, context.UserConfig.CollectorBinariesAccesslist) if err != nil { log.Errorf("Can not validate configuration path: %s", err) return false @@ -145,7 +145,7 @@ func (b *Backend) ValidateConfigurationFile(context *context.Ctx) (error, string var err error var quotedArgs []string if runtime.GOOS == "windows" { - quotedArgs = common.CommandLineToArgv(b.ValidationParameters) + quotedArgs = helpers.CommandLineToArgv(b.ValidationParameters) } else { quotedArgs, err = shlex.Split(b.ValidationParameters) } diff --git a/backends/registry.go b/backends/registry.go index 6a538313..69482b4e 100644 --- a/backends/registry.go +++ b/backends/registry.go @@ -16,7 +16,7 @@ package backends import ( - "github.com/Graylog2/collector-sidecar/common" + "github.com/Graylog2/collector-sidecar/helpers" "github.com/Graylog2/collector-sidecar/logger" ) @@ -32,13 +32,13 @@ type backendStore struct { func (bs *backendStore) SetBackend(backend Backend) { bs.backends[backend.Id] = &backend - executeParameters, err := common.Sprintf(backend.ExecuteParameters, backend.ConfigurationPath) + executeParameters, err := helpers.Sprintf(backend.ExecuteParameters, backend.ConfigurationPath) if err != nil { log.Errorf("Invalid execute parameters, skip adding backend: %s", backend.Name) return } bs.backends[backend.Id].ExecuteParameters = executeParameters - validationParameters, err := common.Sprintf(backend.ValidationParameters, backend.ConfigurationPath) + validationParameters, err := helpers.Sprintf(backend.ValidationParameters, backend.ConfigurationPath) if err != nil { log.Errorf("Invalid validation parameters, skip adding backend: %s", backend.Name) return @@ -84,7 +84,7 @@ func (bs *backendStore) Update(backends []Backend) { func (bs *backendStore) Cleanup(validBackendIds []string) { for _, backend := range bs.backends { - if !common.IsInList(backend.Id, validBackendIds) { + if !helpers.IsInList(backend.Id, validBackendIds) { log.Debug("Cleaning up backend: " + backend.Name) delete(bs.backends, backend.Id) } diff --git a/backends/render.go b/backends/render.go index 973405e5..51e8f25b 100644 --- a/backends/render.go +++ b/backends/render.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/Graylog2/collector-sidecar/common" "github.com/Graylog2/collector-sidecar/context" + "github.com/Graylog2/collector-sidecar/helpers" "io/ioutil" ) @@ -27,7 +28,7 @@ func (b *Backend) render() []byte { var result bytes.Buffer result.WriteString(b.Template) - return common.ConvertLineBreak(result.Bytes()) + return helpers.ConvertLineBreak(result.Bytes()) } func (b *Backend) renderToFile(context *context.Ctx) error { diff --git a/context/context.go b/context/context.go index c1bb6939..156b8837 100644 --- a/context/context.go +++ b/context/context.go @@ -16,6 +16,8 @@ package context import ( + "github.com/Graylog2/collector-sidecar/common" + "github.com/Graylog2/collector-sidecar/helpers" "github.com/docker/go-units" "net/url" "os" @@ -25,7 +27,6 @@ import ( "time" "github.com/Graylog2/collector-sidecar/cfgfile" - "github.com/Graylog2/collector-sidecar/common" "github.com/Graylog2/collector-sidecar/logger" "github.com/Graylog2/collector-sidecar/system" ) @@ -71,7 +72,7 @@ func (ctx *Ctx) LoadConfig(path *string) error { if ctx.UserConfig.NodeId == "" { log.Fatal("No node ID was configured.") } - ctx.NodeId = common.GetCollectorId(ctx.UserConfig.NodeId) + ctx.NodeId = helpers.GetCollectorId(ctx.UserConfig.NodeId) if ctx.NodeId == "" { log.Fatal("Empty node-id, exiting! Make sure a valid id is configured.") } @@ -79,7 +80,7 @@ func (ctx *Ctx) LoadConfig(path *string) error { // node_name if ctx.UserConfig.NodeName == "" { log.Info("No node name was configured, falling back to hostname") - ctx.UserConfig.NodeName, err = common.GetHostname() + ctx.UserConfig.NodeName, err = helpers.GetHostname() if err != nil { log.Fatal("No node name configured and not able to obtain hostname as alternative.") } diff --git a/daemon/action_handler.go b/daemon/action_handler.go index 22ddc16a..0e2a9c72 100644 --- a/daemon/action_handler.go +++ b/daemon/action_handler.go @@ -18,7 +18,7 @@ package daemon import ( "github.com/Graylog2/collector-sidecar/api/graylog" "github.com/Graylog2/collector-sidecar/backends" - "github.com/Graylog2/collector-sidecar/common" + "github.com/Graylog2/collector-sidecar/helpers" ) func HandleCollectorActions(actions []graylog.ResponseCollectorAction) { @@ -37,7 +37,7 @@ func HandleCollectorActions(actions []graylog.ResponseCollectorAction) { case action.Properties["stop"] == true: stopAction(backend) default: - log.Infof("Got unsupported collector command: %s", common.Inspect(action.Properties)) + log.Infof("Got unsupported collector command: %s", helpers.Inspect(action.Properties)) } } } diff --git a/daemon/daemon.go b/daemon/daemon.go index c998dc52..df84c92c 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -18,8 +18,8 @@ package daemon import ( "github.com/Graylog2/collector-sidecar/assignments" "github.com/Graylog2/collector-sidecar/backends" - "github.com/Graylog2/collector-sidecar/common" "github.com/Graylog2/collector-sidecar/context" + "github.com/Graylog2/collector-sidecar/helpers" "github.com/Graylog2/collector-sidecar/logger" ) @@ -45,7 +45,7 @@ func init() { } func NewConfig() *DaemonConfig { - rootDir, err := common.GetRootPath() + rootDir, err := helpers.GetRootPath() if err != nil { log.Error("Can not access root directory") } diff --git a/daemon/exec_helper.go b/daemon/exec_helper.go new file mode 100644 index 00000000..24b524b9 --- /dev/null +++ b/daemon/exec_helper.go @@ -0,0 +1,44 @@ +// Copyright (C) 2020 Graylog, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the Server Side Public License, version 1, +// as published by MongoDB, Inc. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// Server Side Public License for more details. +// +// You should have received a copy of the Server Side Public License +// along with this program. If not, see +// . + +//go:build !windows +// +build !windows + +package daemon + +import ( + "os" + "os/exec" + "syscall" + "time" +) + +func Setpgid(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} +} +func KillProcess(r *ExecRunner, proc *os.Process) { + log.Debugf("[%s] PID SIGHUP ignored, sending SIGHUP to process group", r.Name()) + err := syscall.Kill(-proc.Pid, syscall.SIGHUP) + if err != nil { + log.Debugf("[%s] Failed to HUP process group %s", r.Name(), err) + } + time.Sleep(2 * time.Second) + if r.Running() { + err := syscall.Kill(-proc.Pid, syscall.SIGKILL) + if err != nil { + log.Debugf("[%s] Failed to kill process group %s", r.Name(), err) + } + } +} diff --git a/daemon/exec_helper_windows.go b/daemon/exec_helper_windows.go new file mode 100644 index 00000000..c6238639 --- /dev/null +++ b/daemon/exec_helper_windows.go @@ -0,0 +1,61 @@ +// Copyright (C) 2020 Graylog, Inc. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the Server Side Public License, version 1, +// as published by MongoDB, Inc. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// Server Side Public License for more details. +// +// You should have received a copy of the Server Side Public License +// along with this program. If not, see +// . + +// Copyright 2009 The Go Authors. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// extracted from github.com/golang/go/blob/master/src/os/exec_windows.go + +package daemon + +import ( + "os" + "os/exec" +) + +func Setpgid(cmd *exec.Cmd) { + panic("not implemented on this platform") +} +func KillProcess(r *ExecRunner, proc *os.Process) { + err := proc.Kill() + if err != nil { + log.Debugf("[%s] Failed to kill process %s", r.Name(), err) + } +} diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index 920de8d4..b40f7ef9 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -17,6 +17,7 @@ package daemon import ( "errors" + "github.com/Graylog2/collector-sidecar/helpers" "io/ioutil" "os" "os/exec" @@ -198,7 +199,7 @@ func (r *ExecRunner) start() error { var err error var quotedArgs []string if runtime.GOOS == "windows" { - quotedArgs = common.CommandLineToArgv(r.args) + quotedArgs = helpers.CommandLineToArgv(r.args) } else { quotedArgs, err = shlex.Split(r.args) } @@ -208,7 +209,9 @@ func (r *ExecRunner) start() error { r.cmd = exec.Command(r.exec, quotedArgs...) r.cmd.Dir = r.daemon.Dir r.cmd.Env = append(os.Environ(), r.daemon.Env...) - r.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if runtime.GOOS != "windows" { + Setpgid(r.cmd) + } // start the actual process and don't block r.startTime = time.Now() @@ -241,20 +244,7 @@ func (r *ExecRunner) stop() error { } // in doubt kill the process - if r.Running() && runtime.GOOS != "windows" { - log.Debugf("[%s] PID SIGHUP ignored, sending SIGHUP to process group", r.Name()) - err := syscall.Kill(-r.cmd.Process.Pid, syscall.SIGHUP) - if err != nil { - log.Debugf("[%s] Failed to HUP process group %s", r.Name(), err) - } - time.Sleep(2 * time.Second) - } - if r.Running() { - err := syscall.Kill(-r.cmd.Process.Pid, syscall.SIGKILL) - if err != nil { - log.Debugf("[%s] Failed to kill process group %s", r.Name(), err) - } - } + KillProcess(r, r.cmd.Process) r.backend.SetStatus(backends.StatusStopped, "Stopped", "") diff --git a/common/exec_helper.go b/helpers/argv_helper.go similarity index 95% rename from common/exec_helper.go rename to helpers/argv_helper.go index e0bc254b..d580c3ed 100644 --- a/common/exec_helper.go +++ b/helpers/argv_helper.go @@ -13,9 +13,10 @@ // along with this program. If not, see // . +//go:build !windows // +build !windows -package common +package helpers // Dummy function. Only used on Windows func CommandLineToArgv(cmd string) []string { diff --git a/common/exec_helper_windows.go b/helpers/argv_helper_windows.go similarity index 97% rename from common/exec_helper_windows.go rename to helpers/argv_helper_windows.go index a5c69a57..75dcd7a7 100644 --- a/common/exec_helper_windows.go +++ b/helpers/argv_helper_windows.go @@ -43,9 +43,8 @@ // extracted from github.com/golang/go/blob/master/src/os/exec_windows.go -package common +package helpers -// appendBSBytes appends n '\\' bytes to b and returns the resulting slice. func appendBSBytes(b []byte, n int) []byte { for ; n > 0; n-- { b = append(b, '\\') diff --git a/common/helper.go b/helpers/helper.go similarity index 95% rename from common/helper.go rename to helpers/helper.go index bda01e71..4c09b591 100644 --- a/common/helper.go +++ b/helpers/helper.go @@ -13,13 +13,15 @@ // along with this program. If not, see // . -package common +package helpers import ( "bytes" "encoding/json" "fmt" "github.com/Graylog2/collector-sidecar/cfgfile" + "github.com/Graylog2/collector-sidecar/common" + "github.com/Graylog2/collector-sidecar/logger" "github.com/pborman/uuid" "io/ioutil" "net" @@ -31,6 +33,8 @@ import ( "unicode" ) +var log = logger.Log() + func GetRootPath() (string, error) { return filepath.Abs("/") } @@ -90,10 +94,10 @@ func GetCollectorId(collectorId string) string { } func idFromFile(filePath string) string { - err := FileExists(filePath) + err := common.FileExists(filePath) if err != nil { log.Info("node-id file doesn't exist, generating a new one") - err = CreatePathToFile(filePath) + err = common.CreatePathToFile(filePath) if err == nil { err = ioutil.WriteFile(filePath, []byte(RandomUuid()), 0644) if err != nil { diff --git a/common/helper_test.go b/helpers/helper_test.go similarity index 99% rename from common/helper_test.go rename to helpers/helper_test.go index 80a9be42..5b8d6f70 100644 --- a/common/helper_test.go +++ b/helpers/helper_test.go @@ -13,7 +13,7 @@ // along with this program. If not, see // . -package common +package helpers import ( "io/ioutil" diff --git a/logger/hooks/hooks.go b/logger/hooks/hooks.go index 52c6f842..55e5ebd0 100644 --- a/logger/hooks/hooks.go +++ b/logger/hooks/hooks.go @@ -16,13 +16,13 @@ package hooks import ( + "github.com/Graylog2/collector-sidecar/common" "github.com/Graylog2/collector-sidecar/logger" "path/filepath" "github.com/Sirupsen/logrus" "github.com/rifflock/lfshook" - "github.com/Graylog2/collector-sidecar/common" "github.com/Graylog2/collector-sidecar/context" ) From ef02cedfb32e83c64a5f9bda644c020d76722ff9 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Tue, 3 Jan 2023 17:01:12 +0100 Subject: [PATCH 04/22] Don't log wait errors as a warning --- daemon/exec_runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index b40f7ef9..beaac1ad 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -307,7 +307,7 @@ func (r *ExecRunner) run() { r.setRunning(true) err := r.cmd.Wait() if err != nil { - log.Warnf("[%s] Wait() error %s", r.name, err) + log.Debugf("[%s] Wait() error %s", r.name, err) } r.setRunning(false) }() From 9393693b720a0ba5812494771984637088cfc24e Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Wed, 4 Jan 2023 14:39:07 +0100 Subject: [PATCH 05/22] Improve restart behaviour for hanging collectors Some collectors might be started wit a wrapper script, which runs the collector with elevated privileges (e.g. using sudo). Such collectors might not be killed from the sidecar. The restart func had to timeout loop to ignore that, but the logic was wrong and it would loop indefininatly once it passed the 5 second mark. Furthermore, the existing goroutine that is calling Wait() on the process might mess up the running state of the restarted collector. Thus we provide a channel that will abort the goroutine. Also improve the status of the backands reporting to Graylog. The GetRotatedLog() writers were closed immideatly after creating them. This had no effect and can be removed. --- daemon/exec_runner.go | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index beaac1ad..9034cd39 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -46,6 +46,7 @@ type ExecRunner struct { startTime time.Time cmd *exec.Cmd signals chan string + terminate chan error } func init() { @@ -67,6 +68,7 @@ func NewExecRunner(backend backends.Backend, context *context.Ctx) Runner { signals: make(chan string), stderr: filepath.Join(context.UserConfig.LogPath, backend.Name+"_stderr.log"), stdout: filepath.Join(context.UserConfig.LogPath, backend.Name+"_stdout.log"), + terminate: make(chan error), } // set default state @@ -213,6 +215,7 @@ func (r *ExecRunner) start() error { Setpgid(r.cmd) } + r.terminate = make(chan error) // start the actual process and don't block r.startTime = time.Now() r.run() @@ -246,7 +249,11 @@ func (r *ExecRunner) stop() error { // in doubt kill the process KillProcess(r, r.cmd.Process) - r.backend.SetStatus(backends.StatusStopped, "Stopped", "") + if !r.Running() { + r.backend.SetStatus(backends.StatusStopped, "Stopped", "") + } else { + r.backend.SetStatus(backends.StatusError, "Failed to be stopped", "") + } return nil } @@ -259,15 +266,25 @@ func (r *ExecRunner) Restart() error { func (r *ExecRunner) restart() error { if r.Running() { r.stop() - for timeout := 0; r.Running() || timeout >= 5; timeout++ { - log.Debugf("[%s] waiting for process to finish...", r.Name()) + limit := 10 + for timeout := 0; r.Running() && timeout < limit; timeout++ { + log.Debugf("[%s] waiting %ds/%ds for process to finish...", r.Name(), timeout, limit) time.Sleep(1 * time.Second) } } + if r.Running() { + // skip the hanging r.cmd.Wait() goroutine + r.terminate <- errors.New("timeout") + <-r.terminate // wait for termination + } + // wipe collector log files after each try os.Truncate(r.stderr, 0) os.Truncate(r.stdout, 0) - r.start() + err := r.start() + if err != nil { + log.Errorf("[%s] got start error: %s", r.Name(), err) + } return nil } @@ -282,7 +299,6 @@ func (r *ExecRunner) run() { } f := logger.GetRotatedLog(r.stderr, r.context.UserConfig.LogRotateMaxFileSize, r.context.UserConfig.LogRotateKeepFiles) - defer f.Close() r.cmd.Stderr = f } if r.stdout != "" { @@ -292,7 +308,6 @@ func (r *ExecRunner) run() { } f := logger.GetRotatedLog(r.stdout, r.context.UserConfig.LogRotateMaxFileSize, r.context.UserConfig.LogRotateKeepFiles) - defer f.Close() r.cmd.Stdout = f } @@ -303,13 +318,21 @@ func (r *ExecRunner) run() { } // wait for process exit in the background. Ensure single cmd.Wait() call + // to deal with hanging processes, provide a termination channel go func() { r.setRunning(true) - err := r.cmd.Wait() + go func(terminate chan error) { + err := r.cmd.Wait() + terminate <- err + }(r.terminate) + + err := <-r.terminate if err != nil { log.Debugf("[%s] Wait() error %s", r.name, err) } r.setRunning(false) + r.backend.SetStatus(backends.StatusStopped, "Stopped", "") + r.terminate <- nil //confirm termination }() } From 8b00166eee99a95a3c252145a23908cc75929631 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Wed, 4 Jan 2023 16:24:26 +0100 Subject: [PATCH 06/22] Don't prevent sidecar from shutting down with hanging collectors Timeout after 30 seconds --- daemon/distributor.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/daemon/distributor.go b/daemon/distributor.go index f0b3385e..a20be90e 100644 --- a/daemon/distributor.go +++ b/daemon/distributor.go @@ -56,9 +56,13 @@ func (dist *Distributor) Stop(s service.Service) error { runner.Shutdown() } for _, runner := range Daemon.Runner { - for runner.Running() { + limit := 100 + for timeout := 0; runner.Running() && timeout < limit; timeout++ { time.Sleep(300 * time.Millisecond) } + if runner.Running() { + log.Warnf("[%s] collector failed to exit", runner.Name()) + } } dist.Running = false From 0a70658b0162a716a021b08037b16ebd9ab824e0 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Thu, 5 Jan 2023 17:24:04 +0100 Subject: [PATCH 07/22] Adjust timeouts And fix early review comments from @thll --- daemon/distributor.go | 2 +- daemon/exec_helper.go | 2 +- daemon/exec_helper_windows.go | 2 +- daemon/exec_runner.go | 6 ++---- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/daemon/distributor.go b/daemon/distributor.go index a20be90e..b3cde19b 100644 --- a/daemon/distributor.go +++ b/daemon/distributor.go @@ -58,7 +58,7 @@ func (dist *Distributor) Stop(s service.Service) error { for _, runner := range Daemon.Runner { limit := 100 for timeout := 0; runner.Running() && timeout < limit; timeout++ { - time.Sleep(300 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } if runner.Running() { log.Warnf("[%s] collector failed to exit", runner.Name()) diff --git a/daemon/exec_helper.go b/daemon/exec_helper.go index 24b524b9..46919a17 100644 --- a/daemon/exec_helper.go +++ b/daemon/exec_helper.go @@ -34,7 +34,7 @@ func KillProcess(r *ExecRunner, proc *os.Process) { if err != nil { log.Debugf("[%s] Failed to HUP process group %s", r.Name(), err) } - time.Sleep(2 * time.Second) + time.Sleep(5 * time.Second) if r.Running() { err := syscall.Kill(-proc.Pid, syscall.SIGKILL) if err != nil { diff --git a/daemon/exec_helper_windows.go b/daemon/exec_helper_windows.go index c6238639..6c016d49 100644 --- a/daemon/exec_helper_windows.go +++ b/daemon/exec_helper_windows.go @@ -51,7 +51,7 @@ import ( ) func Setpgid(cmd *exec.Cmd) { - panic("not implemented on this platform") + // nop on windows } func KillProcess(r *ExecRunner, proc *os.Process) { err := proc.Kill() diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index 9034cd39..7487ee18 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -211,9 +211,7 @@ func (r *ExecRunner) start() error { r.cmd = exec.Command(r.exec, quotedArgs...) r.cmd.Dir = r.daemon.Dir r.cmd.Env = append(os.Environ(), r.daemon.Env...) - if runtime.GOOS != "windows" { - Setpgid(r.cmd) - } + Setpgid(r.cmd) // run with a new process group (unix only) r.terminate = make(chan error) // start the actual process and don't block @@ -266,7 +264,7 @@ func (r *ExecRunner) Restart() error { func (r *ExecRunner) restart() error { if r.Running() { r.stop() - limit := 10 + limit := 5 for timeout := 0; r.Running() && timeout < limit; timeout++ { log.Debugf("[%s] waiting %ds/%ds for process to finish...", r.Name(), timeout, limit) time.Sleep(1 * time.Second) From e37056b7f7bdd6171c01bb4e3282c065fb54c431 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 10:21:58 +0100 Subject: [PATCH 08/22] Refactor KillProcess - Use SIGTERM instead of SIGHUP - Immediatly signal the process group instead of the process Co-authored-by: Bernd Ahlers --- daemon/exec_helper.go | 37 +++++++++++++++++++++++++++-------- daemon/exec_helper_windows.go | 5 ++--- daemon/exec_runner.go | 10 +--------- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/daemon/exec_helper.go b/daemon/exec_helper.go index 46919a17..f494c5e2 100644 --- a/daemon/exec_helper.go +++ b/daemon/exec_helper.go @@ -19,7 +19,6 @@ package daemon import ( - "os" "os/exec" "syscall" "time" @@ -28,17 +27,39 @@ import ( func Setpgid(cmd *exec.Cmd) { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } -func KillProcess(r *ExecRunner, proc *os.Process) { - log.Debugf("[%s] PID SIGHUP ignored, sending SIGHUP to process group", r.Name()) - err := syscall.Kill(-proc.Pid, syscall.SIGHUP) + +func KillProcess(r *ExecRunner, timeout time.Duration) { + pid := r.cmd.Process.Pid + + if pid == -1 { + log.Debugf("[%s] Process already released", r.Name()) + return + } + if pid == 0 { + log.Debugf("[%s] Process not initialized", r.Name()) + return + } + + // Signal the process group (-pid) instead of just the process. Otherwise, forked child processes + // can keep running and cause cmd.Wait to hang. + log.Infof("[%s] SIGTERM process group", r.Name()) + err := syscall.Kill(-pid, syscall.SIGTERM) if err != nil { - log.Debugf("[%s] Failed to HUP process group %s", r.Name(), err) + log.Infof("[%s] Failed to SIGTERM process group %s", r.Name(), err) + } + + limit := timeout.Milliseconds() + tick := 100 * time.Millisecond + for t := tick.Milliseconds(); r.Running() && t < limit; t += tick.Milliseconds() { + log.Infof("[%s] Waiting for process group to finish (%vms / %vms)", r.Name(), t, limit) + time.Sleep(tick) } - time.Sleep(5 * time.Second) + if r.Running() { - err := syscall.Kill(-proc.Pid, syscall.SIGKILL) + log.Infof("[%s] SIGKILL process group", r.Name()) + err := syscall.Kill(-pid, syscall.SIGKILL) if err != nil { - log.Debugf("[%s] Failed to kill process group %s", r.Name(), err) + log.Debugf("[%s] Failed to SIGKILL process group %s", r.Name(), err) } } } diff --git a/daemon/exec_helper_windows.go b/daemon/exec_helper_windows.go index 6c016d49..589ac593 100644 --- a/daemon/exec_helper_windows.go +++ b/daemon/exec_helper_windows.go @@ -46,15 +46,14 @@ package daemon import ( - "os" "os/exec" ) func Setpgid(cmd *exec.Cmd) { // nop on windows } -func KillProcess(r *ExecRunner, proc *os.Process) { - err := proc.Kill() +func KillProcess(r *ExecRunner, _ time.Duration) { + err := r.cmd.Process.Kill() if err != nil { log.Debugf("[%s] Failed to kill process %s", r.Name(), err) } diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index 7487ee18..ebdd1c30 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -24,7 +24,6 @@ import ( "path/filepath" "runtime" "sync/atomic" - "syscall" "time" "github.com/flynn-archive/go-shlex" @@ -238,14 +237,7 @@ func (r *ExecRunner) stop() error { log.Infof("[%s] Stopping", r.name) - // give the chance to cleanup resources - if r.cmd.Process != nil && runtime.GOOS != "windows" { - r.cmd.Process.Signal(syscall.SIGHUP) - time.Sleep(2 * time.Second) - } - - // in doubt kill the process - KillProcess(r, r.cmd.Process) + KillProcess(r, 5*time.Second) if !r.Running() { r.backend.SetStatus(backends.StatusStopped, "Stopped", "") From c102f5e37019b792a577609c8868dd6290e4eb77 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 11:32:26 +0100 Subject: [PATCH 09/22] be less verbose --- daemon/exec_helper.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/daemon/exec_helper.go b/daemon/exec_helper.go index f494c5e2..f1966745 100644 --- a/daemon/exec_helper.go +++ b/daemon/exec_helper.go @@ -42,7 +42,7 @@ func KillProcess(r *ExecRunner, timeout time.Duration) { // Signal the process group (-pid) instead of just the process. Otherwise, forked child processes // can keep running and cause cmd.Wait to hang. - log.Infof("[%s] SIGTERM process group", r.Name()) + log.Debugf("[%s] SIGTERM process group", r.Name()) err := syscall.Kill(-pid, syscall.SIGTERM) if err != nil { log.Infof("[%s] Failed to SIGTERM process group %s", r.Name(), err) @@ -51,12 +51,12 @@ func KillProcess(r *ExecRunner, timeout time.Duration) { limit := timeout.Milliseconds() tick := 100 * time.Millisecond for t := tick.Milliseconds(); r.Running() && t < limit; t += tick.Milliseconds() { - log.Infof("[%s] Waiting for process group to finish (%vms / %vms)", r.Name(), t, limit) + log.Debugf("[%s] Waiting for process group to finish (%vms / %vms)", r.Name(), t, limit) time.Sleep(tick) } if r.Running() { - log.Infof("[%s] SIGKILL process group", r.Name()) + log.Infof("[%s] Still running after SIGTERM. Sending SIGKILL to the process group", r.Name()) err := syscall.Kill(-pid, syscall.SIGKILL) if err != nil { log.Debugf("[%s] Failed to SIGKILL process group %s", r.Name(), err) From 049b10146f2f474a132c0ad97c64f97524a9b187 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 11:44:28 +0100 Subject: [PATCH 10/22] Sleep a tiny bit after we send SIGKILL so we give the cmd.Wait() a chance to detect the finished process. --- daemon/exec_helper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/daemon/exec_helper.go b/daemon/exec_helper.go index f1966745..2b3d77fc 100644 --- a/daemon/exec_helper.go +++ b/daemon/exec_helper.go @@ -61,5 +61,6 @@ func KillProcess(r *ExecRunner, timeout time.Duration) { if err != nil { log.Debugf("[%s] Failed to SIGKILL process group %s", r.Name(), err) } + time.Sleep(100 * time.Millisecond) } } From b38b85b3c1eb51e9eaa7f8c110bb6dcd05dbdf18 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 11:47:51 +0100 Subject: [PATCH 11/22] log if we finally failed to stop the process --- daemon/exec_runner.go | 1 + 1 file changed, 1 insertion(+) diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index ebdd1c30..cdc6be06 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -242,6 +242,7 @@ func (r *ExecRunner) stop() error { if !r.Running() { r.backend.SetStatus(backends.StatusStopped, "Stopped", "") } else { + log.Warnf("[%s] Failed to be stopped", r.Name()) r.backend.SetStatus(backends.StatusError, "Failed to be stopped", "") } From 024980c15933420c916e4d67058a0070d4f4b787 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 11:47:56 +0100 Subject: [PATCH 12/22] Remove second wait loop in restart It's enough to have one wait loop in stop() --- daemon/exec_runner.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index cdc6be06..ec45a516 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -257,11 +257,6 @@ func (r *ExecRunner) Restart() error { func (r *ExecRunner) restart() error { if r.Running() { r.stop() - limit := 5 - for timeout := 0; r.Running() && timeout < limit; timeout++ { - log.Debugf("[%s] waiting %ds/%ds for process to finish...", r.Name(), timeout, limit) - time.Sleep(1 * time.Second) - } } if r.Running() { // skip the hanging r.cmd.Wait() goroutine From e9fc465e7c321bf8de2cdef089e7e412357fb414 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 11:51:15 +0100 Subject: [PATCH 13/22] Move cmd.Wait() abortion into stop() Otherwise we could only restart() hanging processes. stop() and start() would still fail. --- daemon/exec_runner.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index ec45a516..a934e775 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -244,6 +244,9 @@ func (r *ExecRunner) stop() error { } else { log.Warnf("[%s] Failed to be stopped", r.Name()) r.backend.SetStatus(backends.StatusError, "Failed to be stopped", "") + // skip the hanging r.cmd.Wait() goroutine + r.terminate <- errors.New("timeout") + <-r.terminate // wait for termination } return nil @@ -258,11 +261,6 @@ func (r *ExecRunner) restart() error { if r.Running() { r.stop() } - if r.Running() { - // skip the hanging r.cmd.Wait() goroutine - r.terminate <- errors.New("timeout") - <-r.terminate // wait for termination - } // wipe collector log files after each try os.Truncate(r.stderr, 0) From 4aab929287421948573a045d52b6db750074f732 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 12:14:18 +0100 Subject: [PATCH 14/22] Make collector shutdown timeout configurable `collector_shutdown_timeout: "10s"` --- cfgfile/schema.go | 3 +++ context/context.go | 4 ++++ daemon/exec_helper.go | 4 ++-- daemon/exec_helper_windows.go | 2 +- daemon/exec_runner.go | 2 +- sidecar-example.yml | 4 ++++ 6 files changed, 15 insertions(+), 4 deletions(-) diff --git a/cfgfile/schema.go b/cfgfile/schema.go index ef70b624..0810ba5d 100644 --- a/cfgfile/schema.go +++ b/cfgfile/schema.go @@ -28,6 +28,8 @@ type SidecarConfig struct { CollectorValidationTimeoutString string `config:"collector_validation_timeout"` CollectorValidationTimeout time.Duration // set from CollectorValidationTimeoutString CollectorConfigurationDirectory string `config:"collector_configuration_directory"` + CollectorShutdownTimeoutString string `config:"collector_shutdown_timeout"` + CollectorShutdownTimeout time.Duration // set from CollectorShutdownTimeoutString LogRotateMaxFileSizeString string `config:"log_rotate_max_file_size"` LogRotateMaxFileSize int64 // set from LogRotateMaxFileSizeString LogRotateKeepFiles int `config:"log_rotate_keep_files"` @@ -54,6 +56,7 @@ log_path: "/var/log/graylog-sidecar" log_rotate_max_file_size: "10MiB" log_rotate_keep_files: 10 collector_validation_timeout: "1m" +collector_shutdown_timeout: "10s" collector_configuration_directory: "/var/lib/graylog-sidecar/generated" collector_binaries_accesslist: - "/usr/bin/filebeat" diff --git a/context/context.go b/context/context.go index 156b8837..24c9304b 100644 --- a/context/context.go +++ b/context/context.go @@ -112,6 +112,10 @@ func (ctx *Ctx) LoadConfig(path *string) error { if err != nil { log.Fatal("Cannot parse validation timeout duration: ", err) } + ctx.UserConfig.CollectorShutdownTimeout, err = time.ParseDuration(ctx.UserConfig.CollectorShutdownTimeoutString) + if err != nil { + log.Fatal("Cannot parse shutdown timeout duration: ", err) + } // collector_configuration_directory if ctx.UserConfig.CollectorConfigurationDirectory == "" { diff --git a/daemon/exec_helper.go b/daemon/exec_helper.go index 2b3d77fc..435e705d 100644 --- a/daemon/exec_helper.go +++ b/daemon/exec_helper.go @@ -28,7 +28,7 @@ func Setpgid(cmd *exec.Cmd) { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } -func KillProcess(r *ExecRunner, timeout time.Duration) { +func KillProcess(r *ExecRunner) { pid := r.cmd.Process.Pid if pid == -1 { @@ -48,7 +48,7 @@ func KillProcess(r *ExecRunner, timeout time.Duration) { log.Infof("[%s] Failed to SIGTERM process group %s", r.Name(), err) } - limit := timeout.Milliseconds() + limit := r.context.UserConfig.CollectorShutdownTimeout.Milliseconds() tick := 100 * time.Millisecond for t := tick.Milliseconds(); r.Running() && t < limit; t += tick.Milliseconds() { log.Debugf("[%s] Waiting for process group to finish (%vms / %vms)", r.Name(), t, limit) diff --git a/daemon/exec_helper_windows.go b/daemon/exec_helper_windows.go index 589ac593..b5c3308e 100644 --- a/daemon/exec_helper_windows.go +++ b/daemon/exec_helper_windows.go @@ -52,7 +52,7 @@ import ( func Setpgid(cmd *exec.Cmd) { // nop on windows } -func KillProcess(r *ExecRunner, _ time.Duration) { +func KillProcess(r *ExecRunner) { err := r.cmd.Process.Kill() if err != nil { log.Debugf("[%s] Failed to kill process %s", r.Name(), err) diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index a934e775..e9d313ac 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -237,7 +237,7 @@ func (r *ExecRunner) stop() error { log.Infof("[%s] Stopping", r.name) - KillProcess(r, 5*time.Second) + KillProcess(r) if !r.Running() { r.backend.SetStatus(backends.StatusStopped, "Stopped", "") diff --git a/sidecar-example.yml b/sidecar-example.yml index 399b40bb..fa1746b7 100644 --- a/sidecar-example.yml +++ b/sidecar-example.yml @@ -59,6 +59,10 @@ server_api_token: "" # How long to wait for the config validation command. #collector_validation_timeout: "1m" +# How long to wait for the collector to gracefully shutdown. +# After this timeout the sidecar tries to terminate the collector with SIGKILL +#collector_shutdown_timeout: "10s" + # Directory where the sidecar generates configurations for collectors. #collector_configuration_directory: "/var/lib/graylog-sidecar/generated" From 3bbe8557b04fe2c8b41cd389293c0c14f1ea8f4d Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 12:26:42 +0100 Subject: [PATCH 15/22] Determine backend stopped status in wait goroutine --- daemon/exec_runner.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index e9d313ac..ae4657cf 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -243,7 +243,6 @@ func (r *ExecRunner) stop() error { r.backend.SetStatus(backends.StatusStopped, "Stopped", "") } else { log.Warnf("[%s] Failed to be stopped", r.Name()) - r.backend.SetStatus(backends.StatusError, "Failed to be stopped", "") // skip the hanging r.cmd.Wait() goroutine r.terminate <- errors.New("timeout") <-r.terminate // wait for termination @@ -313,9 +312,13 @@ func (r *ExecRunner) run() { err := <-r.terminate if err != nil { log.Debugf("[%s] Wait() error %s", r.name, err) + if err.Error() == "timeout" { + r.backend.SetStatus(backends.StatusError, "Failed to be stopped", "") + } else { + r.backend.SetStatus(backends.StatusStopped, "Stopped", "") + } } r.setRunning(false) - r.backend.SetStatus(backends.StatusStopped, "Stopped", "") r.terminate <- nil //confirm termination }() } From be24a0d76d9aa873c89a03fa7445fc283fbaec0a Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 14:12:34 +0100 Subject: [PATCH 16/22] read termination ack --- daemon/exec_runner.go | 1 + 1 file changed, 1 insertion(+) diff --git a/daemon/exec_runner.go b/daemon/exec_runner.go index ae4657cf..b286e568 100644 --- a/daemon/exec_runner.go +++ b/daemon/exec_runner.go @@ -307,6 +307,7 @@ func (r *ExecRunner) run() { go func(terminate chan error) { err := r.cmd.Wait() terminate <- err + <-terminate // read acknowledgement }(r.terminate) err := <-r.terminate From 59e1cc743b41901646e53ab354fa51fd48484826 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 9 Jan 2023 15:36:21 +0100 Subject: [PATCH 17/22] Add changelog --- changelog/unreleased/issue-463.toml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changelog/unreleased/issue-463.toml diff --git a/changelog/unreleased/issue-463.toml b/changelog/unreleased/issue-463.toml new file mode 100644 index 00000000..d35468cd --- /dev/null +++ b/changelog/unreleased/issue-463.toml @@ -0,0 +1,6 @@ +type = "fixed" +message = "Make collector shutdown handling more resilient to hanging child processes" + +issues = ["463"] +pulls = ["462"] + From 298bd7c87a4c8ceb59d72b03298cc800a6fec1df Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Tue, 10 Jan 2023 10:42:41 +0100 Subject: [PATCH 18/22] Change log statements from Infof to Warnf --- daemon/exec_helper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daemon/exec_helper.go b/daemon/exec_helper.go index 435e705d..a5f229fe 100644 --- a/daemon/exec_helper.go +++ b/daemon/exec_helper.go @@ -45,7 +45,7 @@ func KillProcess(r *ExecRunner) { log.Debugf("[%s] SIGTERM process group", r.Name()) err := syscall.Kill(-pid, syscall.SIGTERM) if err != nil { - log.Infof("[%s] Failed to SIGTERM process group %s", r.Name(), err) + log.Warnf("[%s] Failed to SIGTERM process group %s", r.Name(), err) } limit := r.context.UserConfig.CollectorShutdownTimeout.Milliseconds() @@ -59,7 +59,7 @@ func KillProcess(r *ExecRunner) { log.Infof("[%s] Still running after SIGTERM. Sending SIGKILL to the process group", r.Name()) err := syscall.Kill(-pid, syscall.SIGKILL) if err != nil { - log.Debugf("[%s] Failed to SIGKILL process group %s", r.Name(), err) + log.Warnf("[%s] Failed to SIGKILL process group %s", r.Name(), err) } time.Sleep(100 * time.Millisecond) } From a8a1c39aa382f175d71482623e54f535331cdda5 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Tue, 10 Jan 2023 10:56:22 +0100 Subject: [PATCH 19/22] Moar changelogs --- changelog/unreleased/issue-463-bis.toml | 6 ++++++ changelog/unreleased/issue-463-quater.toml | 6 ++++++ changelog/unreleased/issue-463-ter.toml | 6 ++++++ 3 files changed, 18 insertions(+) create mode 100644 changelog/unreleased/issue-463-bis.toml create mode 100644 changelog/unreleased/issue-463-quater.toml create mode 100644 changelog/unreleased/issue-463-ter.toml diff --git a/changelog/unreleased/issue-463-bis.toml b/changelog/unreleased/issue-463-bis.toml new file mode 100644 index 00000000..8e8e5601 --- /dev/null +++ b/changelog/unreleased/issue-463-bis.toml @@ -0,0 +1,6 @@ +type = "changed" +message = "Terminate collector processes with SIGTERM instead of SIGHUP" + +issues = ["463"] +pulls = ["462"] + diff --git a/changelog/unreleased/issue-463-quater.toml b/changelog/unreleased/issue-463-quater.toml new file mode 100644 index 00000000..2d18b797 --- /dev/null +++ b/changelog/unreleased/issue-463-quater.toml @@ -0,0 +1,6 @@ +type = "added" +message = "Provide a new configuration option `collector_shutdown_timeout:10s` to determine how long to wait before sending a SIGKILL when shutting down collector processes" + +issues = ["463"] +pulls = ["462"] + diff --git a/changelog/unreleased/issue-463-ter.toml b/changelog/unreleased/issue-463-ter.toml new file mode 100644 index 00000000..ad9cbe11 --- /dev/null +++ b/changelog/unreleased/issue-463-ter.toml @@ -0,0 +1,6 @@ +type = "changed" +message = "Terminate collector processes by killing the process group instead of just the PID" + +issues = ["463"] +pulls = ["462"] + From 8b8db9e6ae6b00844d4d1bb28ac4ebce86ca7caf Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Tue, 10 Jan 2023 10:59:00 +0100 Subject: [PATCH 20/22] add dots --- changelog/unreleased/issue-463-bis.toml | 2 +- changelog/unreleased/issue-463-quater.toml | 2 +- changelog/unreleased/issue-463-ter.toml | 2 +- changelog/unreleased/issue-463.toml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/changelog/unreleased/issue-463-bis.toml b/changelog/unreleased/issue-463-bis.toml index 8e8e5601..47b2153a 100644 --- a/changelog/unreleased/issue-463-bis.toml +++ b/changelog/unreleased/issue-463-bis.toml @@ -1,5 +1,5 @@ type = "changed" -message = "Terminate collector processes with SIGTERM instead of SIGHUP" +message = "Terminate collector processes with SIGTERM instead of SIGHUP." issues = ["463"] pulls = ["462"] diff --git a/changelog/unreleased/issue-463-quater.toml b/changelog/unreleased/issue-463-quater.toml index 2d18b797..5143e667 100644 --- a/changelog/unreleased/issue-463-quater.toml +++ b/changelog/unreleased/issue-463-quater.toml @@ -1,5 +1,5 @@ type = "added" -message = "Provide a new configuration option `collector_shutdown_timeout:10s` to determine how long to wait before sending a SIGKILL when shutting down collector processes" +message = "Provide a new configuration option `collector_shutdown_timeout:10s` to determine how long to wait before sending a SIGKILL when shutting down collector processes." issues = ["463"] pulls = ["462"] diff --git a/changelog/unreleased/issue-463-ter.toml b/changelog/unreleased/issue-463-ter.toml index ad9cbe11..bd6f24e4 100644 --- a/changelog/unreleased/issue-463-ter.toml +++ b/changelog/unreleased/issue-463-ter.toml @@ -1,5 +1,5 @@ type = "changed" -message = "Terminate collector processes by killing the process group instead of just the PID" +message = "Terminate collector processes by killing the process group instead of just the PID." issues = ["463"] pulls = ["462"] diff --git a/changelog/unreleased/issue-463.toml b/changelog/unreleased/issue-463.toml index d35468cd..105143e7 100644 --- a/changelog/unreleased/issue-463.toml +++ b/changelog/unreleased/issue-463.toml @@ -1,5 +1,5 @@ type = "fixed" -message = "Make collector shutdown handling more resilient to hanging child processes" +message = "Make collector shutdown handling more resilient to hanging child processes." issues = ["463"] pulls = ["462"] From cb9b378b414aeb0b27a46accaa8aa5b04e232a3f Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Wed, 11 Jan 2023 09:18:12 +0100 Subject: [PATCH 21/22] Improve log message formatting --- daemon/exec_helper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daemon/exec_helper.go b/daemon/exec_helper.go index a5f229fe..00f40f79 100644 --- a/daemon/exec_helper.go +++ b/daemon/exec_helper.go @@ -45,7 +45,7 @@ func KillProcess(r *ExecRunner) { log.Debugf("[%s] SIGTERM process group", r.Name()) err := syscall.Kill(-pid, syscall.SIGTERM) if err != nil { - log.Warnf("[%s] Failed to SIGTERM process group %s", r.Name(), err) + log.Warnf("[%s] Failed to SIGTERM process group: %s", r.Name(), err) } limit := r.context.UserConfig.CollectorShutdownTimeout.Milliseconds() @@ -59,7 +59,7 @@ func KillProcess(r *ExecRunner) { log.Infof("[%s] Still running after SIGTERM. Sending SIGKILL to the process group", r.Name()) err := syscall.Kill(-pid, syscall.SIGKILL) if err != nil { - log.Warnf("[%s] Failed to SIGKILL process group %s", r.Name(), err) + log.Warnf("[%s] Failed to SIGKILL process group: %s", r.Name(), err) } time.Sleep(100 * time.Millisecond) } From 9c588ac992e21b11f303a03ee629998e2a2de71a Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Wed, 11 Jan 2023 12:00:10 +0100 Subject: [PATCH 22/22] Remove timout from sidecar shutdown routine We don't need a separate timeout here, because each runner is guaranteed to stop with their individual stop timeouts. --- daemon/distributor.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/daemon/distributor.go b/daemon/distributor.go index b3cde19b..2b51dcd1 100644 --- a/daemon/distributor.go +++ b/daemon/distributor.go @@ -49,20 +49,17 @@ func (dist *Distributor) Start(s service.Service) error { return nil } -// stop all backend runner parallel and wait till they are finished +// stop all backend runners in parallel and wait until they are finished func (dist *Distributor) Stop(s service.Service) error { log.Info("Stopping signal distributor") for _, runner := range Daemon.Runner { runner.Shutdown() } for _, runner := range Daemon.Runner { - limit := 100 - for timeout := 0; runner.Running() && timeout < limit; timeout++ { + for runner.Running() { + log.Debugf("[%s] Waiting for runner to finish", runner.Name()) time.Sleep(100 * time.Millisecond) } - if runner.Running() { - log.Warnf("[%s] collector failed to exit", runner.Name()) - } } dist.Running = false