Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve process shutdown handling #462

Merged
merged 22 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/graylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"bytes"
"crypto/tls"
"encoding/json"
"github.com/Graylog2/collector-sidecar/helpers"
"io"
"net/http"
"strconv"
Expand Down Expand Up @@ -164,15 +165,15 @@ func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.C
registration := graylog.RegistrationRequest{}

registration.NodeName = ctx.UserConfig.NodeName
registration.NodeDetails.OperatingSystem = common.GetSystemName()
registration.NodeDetails.OperatingSystem = helpers.GetSystemName()

if ctx.UserConfig.SendStatus {
metrics := &graylog.MetricsRequest{
Disks75: common.GetFileSystemList75(ctx.UserConfig.WindowsDriveRange),
CpuIdle: common.GetCpuIdle(),
Load1: common.GetLoad1(),
}
registration.NodeDetails.IP = common.GetHostIP()
registration.NodeDetails.IP = helpers.GetHostIP()
registration.NodeDetails.Status = status
registration.NodeDetails.Metrics = metrics
if len(ctx.UserConfig.ListLogFiles) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions assignments/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package assignments

import (
"github.com/Graylog2/collector-sidecar/common"
"github.com/Graylog2/collector-sidecar/helpers"
"reflect"
)

Expand Down Expand Up @@ -92,7 +92,7 @@ func (as *assignmentStore) Update(assignments []ConfigurationAssignment) bool {

func (as *assignmentStore) cleanup(validBackendIds []string) {
for backendId := range as.assignments {
if !common.IsInList(backendId, validBackendIds) {
if !helpers.IsInList(backendId, validBackendIds) {
delete(as.assignments, backendId)
}
}
Expand Down
14 changes: 7 additions & 7 deletions backends/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package backends
import (
"bytes"
"fmt"
"github.com/Graylog2/collector-sidecar/helpers"
"os/exec"
"path/filepath"
"reflect"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/flynn-archive/go-shlex"

"github.com/Graylog2/collector-sidecar/api/graylog"
"github.com/Graylog2/collector-sidecar/common"
"github.com/Graylog2/collector-sidecar/context"
"github.com/Graylog2/collector-sidecar/system"
)
Expand All @@ -50,7 +50,7 @@ type Backend struct {

func BackendFromResponse(response graylog.ResponseCollectorBackend, configId string, ctx *context.Ctx) *Backend {
return &Backend{
Enabled: common.NewTrue(),
Enabled: helpers.NewTrue(),
Id: response.Id + "-" + configId,
CollectorId: response.Id,
ConfigId: configId,
Expand All @@ -74,10 +74,10 @@ func (b *Backend) Equals(a *Backend) bool {
}

func (b *Backend) EqualSettings(a *Backend) bool {
executeParameters, _ := common.Sprintf(
executeParameters, _ := helpers.Sprintf(
a.ExecuteParameters,
a.ConfigurationPath)
validationParameters, _ := common.Sprintf(
validationParameters, _ := helpers.Sprintf(
a.ValidationParameters,
a.ConfigurationPath)

Expand All @@ -104,7 +104,7 @@ func (b *Backend) CheckExecutableAgainstAccesslist(context *context.Ctx) error {
if len(context.UserConfig.CollectorBinariesAccesslist) <= 0 {
return nil
}
isListed, err := common.PathMatch(b.ExecutablePath, context.UserConfig.CollectorBinariesAccesslist)
isListed, err := helpers.PathMatch(b.ExecutablePath, context.UserConfig.CollectorBinariesAccesslist)
if err != nil {
return fmt.Errorf("Can not validate binary path: %s", err)
}
Expand All @@ -121,7 +121,7 @@ func (b *Backend) CheckExecutableAgainstAccesslist(context *context.Ctx) error {
}

func (b *Backend) CheckConfigPathAgainstAccesslist(context *context.Ctx) bool {
configuration, err := common.PathMatch(b.ConfigurationPath, context.UserConfig.CollectorBinariesAccesslist)
configuration, err := helpers.PathMatch(b.ConfigurationPath, context.UserConfig.CollectorBinariesAccesslist)
if err != nil {
log.Errorf("Can not validate configuration path: %s", err)
return false
Expand All @@ -145,7 +145,7 @@ func (b *Backend) ValidateConfigurationFile(context *context.Ctx) (error, string
var err error
var quotedArgs []string
if runtime.GOOS == "windows" {
quotedArgs = common.CommandLineToArgv(b.ValidationParameters)
quotedArgs = helpers.CommandLineToArgv(b.ValidationParameters)
} else {
quotedArgs, err = shlex.Split(b.ValidationParameters)
}
Expand Down
8 changes: 4 additions & 4 deletions backends/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package backends

import (
"github.com/Graylog2/collector-sidecar/common"
"github.com/Graylog2/collector-sidecar/helpers"
"github.com/Graylog2/collector-sidecar/logger"
)

Expand All @@ -32,13 +32,13 @@ type backendStore struct {

func (bs *backendStore) SetBackend(backend Backend) {
bs.backends[backend.Id] = &backend
executeParameters, err := common.Sprintf(backend.ExecuteParameters, backend.ConfigurationPath)
executeParameters, err := helpers.Sprintf(backend.ExecuteParameters, backend.ConfigurationPath)
if err != nil {
log.Errorf("Invalid execute parameters, skip adding backend: %s", backend.Name)
return
}
bs.backends[backend.Id].ExecuteParameters = executeParameters
validationParameters, err := common.Sprintf(backend.ValidationParameters, backend.ConfigurationPath)
validationParameters, err := helpers.Sprintf(backend.ValidationParameters, backend.ConfigurationPath)
if err != nil {
log.Errorf("Invalid validation parameters, skip adding backend: %s", backend.Name)
return
Expand Down Expand Up @@ -84,7 +84,7 @@ func (bs *backendStore) Update(backends []Backend) {

func (bs *backendStore) Cleanup(validBackendIds []string) {
for _, backend := range bs.backends {
if !common.IsInList(backend.Id, validBackendIds) {
if !helpers.IsInList(backend.Id, validBackendIds) {
log.Debug("Cleaning up backend: " + backend.Name)
delete(bs.backends, backend.Id)
}
Expand Down
3 changes: 2 additions & 1 deletion backends/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import (
"fmt"
"github.com/Graylog2/collector-sidecar/common"
"github.com/Graylog2/collector-sidecar/context"
"github.com/Graylog2/collector-sidecar/helpers"
"io/ioutil"
)

func (b *Backend) render() []byte {
var result bytes.Buffer
result.WriteString(b.Template)

return common.ConvertLineBreak(result.Bytes())
return helpers.ConvertLineBreak(result.Bytes())
}

func (b *Backend) renderToFile(context *context.Ctx) error {
Expand Down
3 changes: 3 additions & 0 deletions cfgfile/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type SidecarConfig struct {
CollectorValidationTimeoutString string `config:"collector_validation_timeout"`
CollectorValidationTimeout time.Duration // set from CollectorValidationTimeoutString
CollectorConfigurationDirectory string `config:"collector_configuration_directory"`
CollectorShutdownTimeoutString string `config:"collector_shutdown_timeout"`
CollectorShutdownTimeout time.Duration // set from CollectorShutdownTimeoutString
LogRotateMaxFileSizeString string `config:"log_rotate_max_file_size"`
LogRotateMaxFileSize int64 // set from LogRotateMaxFileSizeString
LogRotateKeepFiles int `config:"log_rotate_keep_files"`
Expand All @@ -54,6 +56,7 @@ log_path: "/var/log/graylog-sidecar"
log_rotate_max_file_size: "10MiB"
log_rotate_keep_files: 10
collector_validation_timeout: "1m"
collector_shutdown_timeout: "10s"
collector_configuration_directory: "/var/lib/graylog-sidecar/generated"
collector_binaries_accesslist:
- "/usr/bin/filebeat"
Expand Down
6 changes: 6 additions & 0 deletions changelog/unreleased/issue-463-bis.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type = "changed"
message = "Terminate collector processes with SIGTERM instead of SIGHUP."

issues = ["463"]
pulls = ["462"]

6 changes: 6 additions & 0 deletions changelog/unreleased/issue-463-quater.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type = "added"
message = "Provide a new configuration option `collector_shutdown_timeout:10s` to determine how long to wait before sending a SIGKILL when shutting down collector processes."

issues = ["463"]
pulls = ["462"]

6 changes: 6 additions & 0 deletions changelog/unreleased/issue-463-ter.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type = "changed"
message = "Terminate collector processes by killing the process group instead of just the PID."

issues = ["463"]
pulls = ["462"]

6 changes: 6 additions & 0 deletions changelog/unreleased/issue-463.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type = "fixed"
message = "Make collector shutdown handling more resilient to hanging child processes."

issues = ["463"]
pulls = ["462"]

11 changes: 8 additions & 3 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package context

import (
"github.com/Graylog2/collector-sidecar/common"
"github.com/Graylog2/collector-sidecar/helpers"
"github.com/docker/go-units"
"net/url"
"os"
Expand All @@ -25,7 +27,6 @@ import (
"time"

"github.com/Graylog2/collector-sidecar/cfgfile"
"github.com/Graylog2/collector-sidecar/common"
"github.com/Graylog2/collector-sidecar/logger"
"github.com/Graylog2/collector-sidecar/system"
)
Expand Down Expand Up @@ -71,15 +72,15 @@ func (ctx *Ctx) LoadConfig(path *string) error {
if ctx.UserConfig.NodeId == "" {
log.Fatal("No node ID was configured.")
}
ctx.NodeId = common.GetCollectorId(ctx.UserConfig.NodeId)
ctx.NodeId = helpers.GetCollectorId(ctx.UserConfig.NodeId)
if ctx.NodeId == "" {
log.Fatal("Empty node-id, exiting! Make sure a valid id is configured.")
}

// node_name
if ctx.UserConfig.NodeName == "" {
log.Info("No node name was configured, falling back to hostname")
ctx.UserConfig.NodeName, err = common.GetHostname()
ctx.UserConfig.NodeName, err = helpers.GetHostname()
if err != nil {
log.Fatal("No node name configured and not able to obtain hostname as alternative.")
}
Expand Down Expand Up @@ -111,6 +112,10 @@ func (ctx *Ctx) LoadConfig(path *string) error {
if err != nil {
log.Fatal("Cannot parse validation timeout duration: ", err)
}
ctx.UserConfig.CollectorShutdownTimeout, err = time.ParseDuration(ctx.UserConfig.CollectorShutdownTimeoutString)
if err != nil {
log.Fatal("Cannot parse shutdown timeout duration: ", err)
}

// collector_configuration_directory
if ctx.UserConfig.CollectorConfigurationDirectory == "" {
Expand Down
4 changes: 2 additions & 2 deletions daemon/action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package daemon
import (
"github.com/Graylog2/collector-sidecar/api/graylog"
"github.com/Graylog2/collector-sidecar/backends"
"github.com/Graylog2/collector-sidecar/common"
"github.com/Graylog2/collector-sidecar/helpers"
)

func HandleCollectorActions(actions []graylog.ResponseCollectorAction) {
Expand All @@ -37,7 +37,7 @@ func HandleCollectorActions(actions []graylog.ResponseCollectorAction) {
case action.Properties["stop"] == true:
stopAction(backend)
default:
log.Infof("Got unsupported collector command: %s", common.Inspect(action.Properties))
log.Infof("Got unsupported collector command: %s", helpers.Inspect(action.Properties))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package daemon
import (
"github.com/Graylog2/collector-sidecar/assignments"
"github.com/Graylog2/collector-sidecar/backends"
"github.com/Graylog2/collector-sidecar/common"
"github.com/Graylog2/collector-sidecar/context"
"github.com/Graylog2/collector-sidecar/helpers"
"github.com/Graylog2/collector-sidecar/logger"
)

Expand All @@ -45,7 +45,7 @@ func init() {
}

func NewConfig() *DaemonConfig {
rootDir, err := common.GetRootPath()
rootDir, err := helpers.GetRootPath()
if err != nil {
log.Error("Can not access root directory")
}
Expand Down
5 changes: 3 additions & 2 deletions daemon/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ func (dist *Distributor) Start(s service.Service) error {
return nil
}

// stop all backend runner parallel and wait till they are finished
// stop all backend runners in parallel and wait until they are finished
func (dist *Distributor) Stop(s service.Service) error {
log.Info("Stopping signal distributor")
for _, runner := range Daemon.Runner {
runner.Shutdown()
}
for _, runner := range Daemon.Runner {
for runner.Running() {
time.Sleep(300 * time.Millisecond)
log.Debugf("[%s] Waiting for runner to finish", runner.Name())
time.Sleep(100 * time.Millisecond)
}
}
dist.Running = false
Expand Down
66 changes: 66 additions & 0 deletions daemon/exec_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (C) 2020 Graylog, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the Server Side Public License, version 1,
// as published by MongoDB, Inc.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// Server Side Public License for more details.
//
// You should have received a copy of the Server Side Public License
// along with this program. If not, see
// <http://www.mongodb.com/licensing/server-side-public-license>.

//go:build !windows
// +build !windows

package daemon

import (
"os/exec"
"syscall"
"time"
)

func Setpgid(cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
}

func KillProcess(r *ExecRunner) {
pid := r.cmd.Process.Pid

if pid == -1 {
log.Debugf("[%s] Process already released", r.Name())
return
}
if pid == 0 {
log.Debugf("[%s] Process not initialized", r.Name())
return
}

// Signal the process group (-pid) instead of just the process. Otherwise, forked child processes
// can keep running and cause cmd.Wait to hang.
log.Debugf("[%s] SIGTERM process group", r.Name())
err := syscall.Kill(-pid, syscall.SIGTERM)
if err != nil {
log.Warnf("[%s] Failed to SIGTERM process group: %s", r.Name(), err)
}

limit := r.context.UserConfig.CollectorShutdownTimeout.Milliseconds()
tick := 100 * time.Millisecond
for t := tick.Milliseconds(); r.Running() && t < limit; t += tick.Milliseconds() {
log.Debugf("[%s] Waiting for process group to finish (%vms / %vms)", r.Name(), t, limit)
time.Sleep(tick)
}

if r.Running() {
log.Infof("[%s] Still running after SIGTERM. Sending SIGKILL to the process group", r.Name())
err := syscall.Kill(-pid, syscall.SIGKILL)
if err != nil {
log.Warnf("[%s] Failed to SIGKILL process group: %s", r.Name(), err)
}
time.Sleep(100 * time.Millisecond)
}
}
Loading