Skip to content

Commit

Permalink
provide -no-shutdown-delay flag for job/alloc stop (#11596)
Browse files Browse the repository at this point in the history
Some operators use very long group/task `shutdown_delay` settings to
safely drain network connections to their workloads after service
deregistration. But during incident response, they may want to cause
that drain to be skipped so they can quickly shed load.

Provide a `-no-shutdown-delay` flag on the `nomad alloc stop` and
`nomad job stop` commands that bypasses the delay. This sets a new
desired transition state on the affected allocations that the
allocation/task runner will identify during pre-kill on the client.

Note (as documented here) that using this flag will almost always
result in failed inbound network connections for workloads as the
tasks will exit before clients receive updated service discovery
information and won't be gracefully drained.
  • Loading branch information
tgross authored Dec 13, 2021
1 parent 43b3e16 commit 35c22bc
Show file tree
Hide file tree
Showing 18 changed files with 372 additions and 47 deletions.
3 changes: 3 additions & 0 deletions .changelog/11596.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
cli: provide `-no-shutdown-delay` option to `job stop` and `alloc stop` commands to ignore `shutdown_delay`
```
9 changes: 7 additions & 2 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ type DeregisterOptions struct {
// is useful when an operator wishes to push through a job deregistration
// in busy clusters with a large evaluation backlog.
EvalPriority int

// NoShutdownDelay, if set to true, will override the group and
// task shutdown_delay configuration and ignore the delay for any
// allocations stopped as a result of this Deregister call.
NoShutdownDelay bool
}

// DeregisterOpts is used to remove an existing job. See DeregisterOptions
Expand All @@ -312,8 +317,8 @@ func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOpt
// Protect against nil opts. url.Values expects a string, and so using
// fmt.Sprintf is the best way to do this.
if opts != nil {
endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v",
opts.Purge, opts.Global, opts.EvalPriority)
endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v&no_shutdown_delay=%t",
opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay)
}

wm, err := j.client.delete(endpoint, &resp, q)
Expand Down
12 changes: 12 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ type allocRunner struct {

taskHookCoordinator *taskHookCoordinator

shutdownDelayCtx context.Context
shutdownDelayCancelFn context.CancelFunc

// rpcClient is the RPC Client that should be used by the allocrunner and its
// hooks to communicate with Nomad Servers.
rpcClient RPCer
Expand Down Expand Up @@ -230,6 +233,10 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {

ar.taskHookCoordinator = newTaskHookCoordinator(ar.logger, tg.Tasks)

shutdownDelayCtx, shutdownDelayCancel := context.WithCancel(context.Background())
ar.shutdownDelayCtx = shutdownDelayCtx
ar.shutdownDelayCancelFn = shutdownDelayCancel

// Initialize the runners hooks.
if err := ar.initRunnerHooks(config.ClientConfig); err != nil {
return nil, err
Expand Down Expand Up @@ -265,6 +272,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task),
ShutdownDelayCtx: ar.shutdownDelayCtx,
}

if ar.cpusetManager != nil {
Expand Down Expand Up @@ -824,6 +832,10 @@ func (ar *allocRunner) Update(update *structs.Allocation) {
default:
}

if update.DesiredTransition.ShouldIgnoreShutdownDelay() {
ar.shutdownDelayCancelFn()
}

// Queue the new update
ar.allocUpdatedCh <- update
}
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
taskEnvBuilder: envBuilder,
networkStatusGetter: ar,
logger: hookLogger,
shutdownDelayCtx: ar.shutdownDelayCtx,
}),
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
Expand Down
13 changes: 10 additions & 3 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package allocrunner

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -29,9 +30,9 @@ type groupServiceHook struct {
consulClient consul.ConsulServiceAPI
consulNamespace string
prerun bool
delay time.Duration
deregistered bool
networkStatusGetter networkStatusGetter
shutdownDelayCtx context.Context

logger log.Logger

Expand All @@ -41,6 +42,7 @@ type groupServiceHook struct {
networks structs.Networks
ports structs.AllocatedPorts
taskEnvBuilder *taskenv.Builder
delay time.Duration

// Since Update() may be called concurrently with any other hook all
// hook methods must be fully serialized
Expand All @@ -54,6 +56,7 @@ type groupServiceHookConfig struct {
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
networkStatusGetter networkStatusGetter
shutdownDelayCtx context.Context
logger log.Logger
}

Expand All @@ -76,6 +79,7 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
networkStatusGetter: cfg.networkStatusGetter,
logger: cfg.logger.Named(groupServiceHookName),
services: cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup).Services,
shutdownDelayCtx: cfg.shutdownDelayCtx,
}

if cfg.alloc.AllocatedResources != nil {
Expand Down Expand Up @@ -187,9 +191,12 @@ func (h *groupServiceHook) preKillLocked() {

h.logger.Debug("delay before killing tasks", "group", h.group, "shutdown_delay", h.delay)

// Wait for specified shutdown_delay
select {
// Wait for specified shutdown_delay unless ignored
// This will block an agent from shutting down.
<-time.After(h.delay)
case <-time.After(h.delay):
case <-h.shutdownDelayCtx.Done():
}
}

func (h *groupServiceHook) Postrun() error {
Expand Down
22 changes: 22 additions & 0 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ type TaskRunner struct {
killErr error
killErrLock sync.Mutex

// shutdownDelayCtx is a context from the alloc runner which will
// tell us to exit early from shutdown_delay
shutdownDelayCtx context.Context
shutdownDelayCancelFn context.CancelFunc

// Logger is the logger for the task runner.
logger log.Logger

Expand Down Expand Up @@ -287,6 +292,13 @@ type Config struct {

// startConditionMetCtx is done when TR should start the task
StartConditionMetCtx <-chan struct{}

// ShutdownDelayCtx is a context from the alloc runner which will
// tell us to exit early from shutdown_delay
ShutdownDelayCtx context.Context

// ShutdownDelayCancelFn should only be used in testing.
ShutdownDelayCancelFn context.CancelFunc
}

func NewTaskRunner(config *Config) (*TaskRunner, error) {
Expand Down Expand Up @@ -342,6 +354,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
startConditionMetCtx: config.StartConditionMetCtx,
shutdownDelayCtx: config.ShutdownDelayCtx,
shutdownDelayCancelFn: config.ShutdownDelayCancelFn,
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -895,6 +909,8 @@ func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.E
select {
case result := <-resultCh:
return result
case <-tr.shutdownDelayCtx.Done():
break
case <-time.After(delay):
}
}
Expand Down Expand Up @@ -1478,3 +1494,9 @@ func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) {
func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
tr.allocHookResources = res
}

// shutdownDelayCancel is used for testing only and cancels the
// shutdownDelayCtx
func (tr *TaskRunner) shutdownDelayCancel() {
tr.shutdownDelayCancelFn()
}
116 changes: 99 additions & 17 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"time"

"github.com/golang/snappy"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/config"
Expand All @@ -26,16 +30,14 @@ import (
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
mockdriver "github.com/hashicorp/nomad/drivers/mock"
"github.com/hashicorp/nomad/drivers/rawexec"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type MockTaskStateUpdater struct {
Expand Down Expand Up @@ -94,26 +96,30 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
cleanup()
}

shutdownDelayCtx, shutdownDelayCancelFn := context.WithCancel(context.Background())

// Create a closed channel to mock TaskHookCoordinator.startConditionForTask.
// Closed channel indicates this task is not blocked on prestart hooks.
closedCh := make(chan struct{})
close(closedCh)

conf := &Config{
Alloc: alloc,
ClientConfig: clientConf,
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
ConsulSI: consulapi.NewMockServiceIdentitiesClient(),
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
StartConditionMetCtx: closedCh,
Alloc: alloc,
ClientConfig: clientConf,
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
ConsulSI: consulapi.NewMockServiceIdentitiesClient(),
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
StartConditionMetCtx: closedCh,
ShutdownDelayCtx: shutdownDelayCtx,
ShutdownDelayCancelFn: shutdownDelayCancelFn,
}
return conf, trCleanup
}
Expand Down Expand Up @@ -996,6 +1002,82 @@ WAIT:
}
}

// TestTaskRunner_NoShutdownDelay asserts services are removed from
// Consul and tasks are killed without waiting for ${shutdown_delay}
// when the alloc has the NoShutdownDelay transition flag set.
func TestTaskRunner_NoShutdownDelay(t *testing.T) {
t.Parallel()

// don't set this too high so that we don't block the test runner
// on shutting down the agent if the test fails
maxTestDuration := time.Duration(testutil.TestMultiplier()*10) * time.Second
maxTimeToFailDuration := time.Duration(testutil.TestMultiplier()) * time.Second

alloc := mock.Alloc()
alloc.DesiredTransition = structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)}
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Services[0].Tags = []string{"tag1"}
task.Services = task.Services[:1] // only need 1 for this test
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1000s",
}
task.ShutdownDelay = maxTestDuration

tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name)
defer cleanup()

mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient)

testWaitForTaskToStart(t, tr)

testutil.WaitForResult(func() (bool, error) {
ops := mockConsul.GetOps()
if n := len(ops); n != 1 {
return false, fmt.Errorf("expected 1 consul operation. Found %d", n)
}
return ops[0].Op == "add", fmt.Errorf("consul operation was not a registration: %#v", ops[0])
}, func(err error) {
t.Fatalf("err: %v", err)
})

testCtx, cancel := context.WithTimeout(context.Background(), maxTimeToFailDuration)
defer cancel()

killed := make(chan error)
go func() {
tr.shutdownDelayCancel()
err := tr.Kill(testCtx, structs.NewTaskEvent("test"))
killed <- err
}()

// Wait for first de-registration call. Note that unlike
// TestTaskRunner_ShutdownDelay, we're racing with task exit
// and can't assert that we only get the first deregistration op
// (from serviceHook.PreKill).
testutil.WaitForResult(func() (bool, error) {
ops := mockConsul.GetOps()
if n := len(ops); n < 2 {
return false, fmt.Errorf("expected at least 2 consul operations.")
}
return ops[1].Op == "remove", fmt.Errorf(
"consul operation was not a deregistration: %#v", ops[1])
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Wait for the task to exit
select {
case <-tr.WaitCh():
case <-time.After(maxTimeToFailDuration):
t.Fatalf("task kill did not ignore shutdown delay")
return
}

err := <-killed
require.NoError(t, err, "killing task returned unexpected error")
}

// TestTaskRunner_Dispatch_Payload asserts that a dispatch job runs and the
// payload was written to disk.
func TestTaskRunner_Dispatch_Payload(t *testing.T) {
Expand Down
12 changes: 11 additions & 1 deletion command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,18 @@ func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *ht
return nil, CodedError(405, ErrInvalidMethod)
}

noShutdownDelay := false
if noShutdownDelayQS := req.URL.Query().Get("no_shutdown_delay"); noShutdownDelayQS != "" {
var err error
noShutdownDelay, err = strconv.ParseBool(noShutdownDelayQS)
if err != nil {
return nil, fmt.Errorf("no_shutdown_delay value is not a boolean: %v", err)
}
}

sr := &structs.AllocStopRequest{
AllocID: allocID,
AllocID: allocID,
NoShutdownDelay: noShutdownDelay,
}
s.parseWriteRequest(req, &sr.WriteRequest)

Expand Down
12 changes: 12 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,18 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
return nil, err
}

// Identify the no_shutdown_delay query param and parse.
noShutdownDelayStr := req.URL.Query().Get("no_shutdown_delay")
var noShutdownDelay bool
if noShutdownDelayStr != "" {
var err error
noShutdownDelay, err = strconv.ParseBool(noShutdownDelayStr)
if err != nil {
return nil, fmt.Errorf("Failed to parse value of %qq (%v) as a bool: %v", "no_shutdown_delay", noShutdownDelayStr, err)
}
}
args.NoShutdownDelay = noShutdownDelay

// Validate the evaluation priority if the user supplied a non-default
// value. It's more efficient to do it here, within the agent rather than
// sending a bad request for the server to reject.
Expand Down
Loading

0 comments on commit 35c22bc

Please sign in to comment.