Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group shutdown_delay #6746

Merged
merged 4 commits into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## 0.10.3 (Unreleased)

FEATURES:

* jobspec: Add `shutdown_delay` to task groups so task groups can delay shutdown
after deregistering from Consul [[GH-6746](https://github.com/hashicorp/nomad/issues/6746)]

IMPROVEMENTS:

* scheduler: Removed penalty for allocation's previous node if the allocation did not fail. [[GH-6781](https://github.com/hashicorp/nomad/issues/6781)]
Expand Down
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ type TaskGroup struct {
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
}

// NewTaskGroup creates a new TaskGroup.
Expand Down
3 changes: 3 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
var mu sync.Mutex
states := make(map[string]*structs.TaskState, len(ar.tasks))

// run alloc prekill hooks
ar.preKillHooks()

// Kill leader first, synchronously
for name, tr := range ar.tasks {
if !tr.IsLeader() {
Expand Down
23 changes: 23 additions & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,29 @@ func (ar *allocRunner) destroy() error {
return merr.ErrorOrNil()
}

func (ar *allocRunner) preKillHooks() {
for _, hook := range ar.runnerHooks {
pre, ok := hook.(interfaces.RunnerPreKillHook)
if !ok {
continue
}

name := pre.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running alloc pre shutdown hook", "name", name, "start", start)
}

pre.PreKill()

if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished alloc pre shutdown hook", "name", name, "end", end, "duration", end.Sub(start))
}
}
}

// shutdownHooks calls graceful shutdown hooks for when the agent is exiting.
func (ar *allocRunner) shutdownHooks() {
for _, hook := range ar.runnerHooks {
Expand Down
127 changes: 127 additions & 0 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,133 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}

func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0

// Create a group service
tg := alloc.Job.TaskGroups[0]
tg.Services = []*structs.Service{
{
Name: "shutdown_service",
},
}

// Create two tasks in the group
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "10s",
}

task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": "10s",
}

alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
alloc.AllocatedResources.Tasks[task.Name] = tr
alloc.AllocatedResources.Tasks[task2.Name] = tr

// Set a shutdown delay
shutdownDelay := 1 * time.Second
alloc.Job.TaskGroups[0].ShutdownDelay = &shutdownDelay

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()

// Wait for tasks to start
upd := conf.StateUpdater.(*MockStateUpdater)
last := upd.Last()
testutil.WaitForResult(func() (bool, error) {
last = upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if n := len(last.TaskStates); n != 2 {
return false, fmt.Errorf("Not enough task states (want: 2; found %d)", n)
}
for name, state := range last.TaskStates {
if state.State != structs.TaskStateRunning {
return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Reset updates
upd.Reset()

// Stop alloc
shutdownInit := time.Now()
update := alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)

// Wait for tasks to stop
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}

fin := last.TaskStates["leader"].FinishedAt

if fin.IsZero() {
return false, nil
}

return true, nil
}, func(err error) {
last := upd.Last()
for name, state := range last.TaskStates {
t.Logf("%s: %s", name, state.State)
}
t.Fatalf("err: %v", err)
})

// Get consul client operations
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulOpts := consulClient.GetOps()
var groupRemoveOp cconsul.MockConsulOp
for _, op := range consulOpts {
// Grab the first deregistration request
if op.Op == "remove" && op.Name == "group-web" {
groupRemoveOp = op
break
}
}

// Ensure remove operation is close to shutdown initiation
require.True(t, groupRemoveOp.OccurredAt.Sub(shutdownInit) < 100*time.Millisecond)

last = upd.Last()
minShutdown := shutdownInit.Add(task.ShutdownDelay)
leaderFinished := last.TaskStates["leader"].FinishedAt
followerFinished := last.TaskStates["follower1"].FinishedAt

// Check that both tasks shut down after min possible shutdown time
require.Greater(t, leaderFinished.UnixNano(), minShutdown.UnixNano())
require.Greater(t, followerFinished.UnixNano(), minShutdown.UnixNano())

// Check that there is at least shutdown_delay between consul
// remove operation and task finished at time
require.True(t, leaderFinished.Sub(groupRemoveOp.OccurredAt) > shutdownDelay)
}

// TestAllocRunner_TaskLeader_StopTG asserts that when stopping an alloc with a
// leader the leader is stopped before other tasks.
func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
Expand Down
37 changes: 36 additions & 1 deletion client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package allocrunner

import (
"sync"
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand All @@ -20,6 +21,8 @@ type groupServiceHook struct {
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
delay time.Duration
deregistered bool

logger log.Logger

Expand All @@ -43,12 +46,20 @@ type groupServiceHookConfig struct {
}

func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
var shutdownDelay time.Duration
tg := cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup)

if tg.ShutdownDelay != nil {
shutdownDelay = *tg.ShutdownDelay
}

h := &groupServiceHook{
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
}
h.logger = cfg.logger.Named(h.Name())
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services
Expand Down Expand Up @@ -117,10 +128,34 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}

func (h *groupServiceHook) Postrun() error {
func (h *groupServiceHook) PreKill() {
h.mu.Lock()
defer h.mu.Unlock()

// If we have a shutdown delay deregister
// group services and then wait
// before continuing to kill tasks
h.deregister()
h.deregistered = true

if h.delay == 0 {
return
}

h.logger.Debug("waiting before removing group service", "shutdown_delay", h.delay)
drewbailey marked this conversation as resolved.
Show resolved Hide resolved

// Wait for specified shutdown_delay
// this will block an agent from shutting down
<-time.After(h.delay)
}

func (h *groupServiceHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()

if !h.deregistered {
h.deregister()
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/groupservice_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerUpdateHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPostrunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPreKillHook = (*groupServiceHook)(nil)

// TestGroupServiceHook_NoGroupServices asserts calling group service hooks
// without group services does not error.
Expand Down
9 changes: 9 additions & 0 deletions client/allocrunner/interfaces/runner_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ type RunnerPrerunHook interface {
Prerun() error
}

// RunnerPreKillHooks are executed inside of KillTasks before
// iterating and killing each task. It will run before the Leader
// task is killed.
type RunnerPreKillHook interface {
RunnerHook

PreKill()
}

// RunnerPostrunHooks are executed after calling TaskRunner.Run, even for
// terminal allocations. Therefore Postrun hooks must be safe to call without
// first calling Prerun hooks.
Expand Down
15 changes: 9 additions & 6 deletions client/consul/consul_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consul
import (
"fmt"
"sync"
"time"

log "github.com/hashicorp/go-hclog"

Expand All @@ -12,9 +13,10 @@ import (

// MockConsulOp represents the register/deregister operations.
type MockConsulOp struct {
Op string // add, remove, or update
AllocID string
Name string // task or group name
Op string // add, remove, or update
AllocID string
Name string // task or group name
OccurredAt time.Time
}

func NewMockConsulOp(op, allocID, name string) MockConsulOp {
Expand All @@ -25,9 +27,10 @@ func NewMockConsulOp(op, allocID, name string) MockConsulOp {
panic(fmt.Errorf("invalid consul op: %s", op))
}
return MockConsulOp{
Op: op,
AllocID: allocID,
Name: name,
Op: op,
AllocID: allocID,
Name: name,
OccurredAt: time.Now(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
Mode: *taskGroup.RestartPolicy.Mode,
}

if taskGroup.ShutdownDelay != nil {
tg.ShutdownDelay = taskGroup.ShutdownDelay
}

if taskGroup.ReschedulePolicy != nil {
tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Expand Down
14 changes: 12 additions & 2 deletions jobspec/parse_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"vault",
"migrate",
"spread",
"shutdown_delay",
"network",
"service",
"volume",
Expand All @@ -63,6 +64,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
if err := hcl.DecodeObject(&m, item.Val); err != nil {
return err
}

delete(m, "constraint")
delete(m, "affinity")
delete(m, "meta")
Expand All @@ -80,7 +82,16 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
// Build the group with the basic decode
var g api.TaskGroup
g.Name = helper.StringToPtr(n)
if err := mapstructure.WeakDecode(m, &g); err != nil {
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &g,
})

if err != nil {
return err
}
if err := dec.Decode(m); err != nil {
return err
}

Expand Down Expand Up @@ -201,7 +212,6 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
return multierror.Prefix(err, fmt.Sprintf("'%s',", n))
}
}

collection = append(collection, &g)
}

Expand Down
5 changes: 3 additions & 2 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,9 @@ func TestParse(t *testing.T) {
Datacenters: []string{"dc1"},
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("bar"),
Count: helper.IntToPtr(3),
Name: helper.StringToPtr("bar"),
ShutdownDelay: helper.TimeToPtr(14 * time.Second),
Count: helper.IntToPtr(3),
Networks: []*api.NetworkResource{
{
Mode: "bridge",
Expand Down
Loading