Skip to content

Commit

Permalink
shutdown delay for task groups
Browse files Browse the repository at this point in the history
copy struct values

ensure groupserviceHook implements RunnerPreKillhook

run deregister first

test that shutdown times are delayed

move magic number into variable

address feedback, reduce stutter

wip

wip

bug fixes
  • Loading branch information
drewbailey committed Nov 25, 2019
1 parent ca736e6 commit 91d8faa
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 6 deletions.
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ type TaskGroup struct {
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
}

// NewTaskGroup creates a new TaskGroup.
Expand Down
3 changes: 3 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
var mu sync.Mutex
states := make(map[string]*structs.TaskState, len(ar.tasks))

// run alloc prekill hooks
ar.preKillHooks()

// Kill leader first, synchronously
for name, tr := range ar.tasks {
if !tr.IsLeader() {
Expand Down
23 changes: 23 additions & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,29 @@ func (ar *allocRunner) destroy() error {
return merr.ErrorOrNil()
}

func (ar *allocRunner) preKillHooks() {
for _, hook := range ar.runnerHooks {
pre, ok := hook.(interfaces.RunnerPreKillHook)
if !ok {
continue
}

name := pre.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running alloc pre shutdown hook", "name", name, "start", start)
}

pre.PreKill()

if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished alloc pre shutdown hook", "name", name, "end", end, "duration", end.Sub(start))
}
}
}

// shutdownHooks calls graceful shutdown hooks for when the agent is exiting.
func (ar *allocRunner) shutdownHooks() {
for _, hook := range ar.runnerHooks {
Expand Down
95 changes: 95 additions & 0 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,101 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}

func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
t.Parallel()
shutdownDelay := 1 * time.Second

alloc := mock.Alloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0

// Create 3 tasks in the task group
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "10s",
}

task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": "10s",
}

alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
alloc.AllocatedResources.Tasks[task.Name] = tr
alloc.AllocatedResources.Tasks[task2.Name] = tr

// Set a shutdown delay
alloc.Job.TaskGroups[0].ShutdownDelay = &shutdownDelay

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()

// Wait for tasks to start
upd := conf.StateUpdater.(*MockStateUpdater)
last := upd.Last()
testutil.WaitForResult(func() (bool, error) {
last = upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if n := len(last.TaskStates); n != 2 {
return false, fmt.Errorf("Not enough task states (want: 2; found %d)", n)
}
for name, state := range last.TaskStates {
if state.State != structs.TaskStateRunning {
return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Reset updates
upd.Reset()

// Stop alloc
now := time.Now()
update := alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)

// Wait for tasks to stop
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}

fin := last.TaskStates["leader"].FinishedAt

if fin.IsZero() {
return false, nil
}

return true, nil
}, func(err error) {
last := upd.Last()
for name, state := range last.TaskStates {
t.Logf("%s: %s", name, state.State)
}
t.Fatalf("err: %v", err)
})

last = upd.Last()
require.Greater(t, last.TaskStates["leader"].FinishedAt.UnixNano(), now.Add(shutdownDelay).UnixNano())
require.Greater(t, last.TaskStates["follower1"].FinishedAt.UnixNano(), now.Add(shutdownDelay).UnixNano())
}

// TestAllocRunner_TaskLeader_StopTG asserts that when stopping an alloc with a
// leader the leader is stopped before other tasks.
func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
Expand Down
32 changes: 31 additions & 1 deletion client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package allocrunner

import (
"sync"
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand All @@ -20,6 +21,8 @@ type groupServiceHook struct {
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
delay time.Duration
deregistered bool

logger log.Logger

Expand All @@ -43,12 +46,20 @@ type groupServiceHookConfig struct {
}

func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
var shutdownDelay time.Duration
tg := cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup)

if tg != nil && tg.ShutdownDelay != nil {
shutdownDelay = *tg.ShutdownDelay
}

h := &groupServiceHook{
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
}
h.logger = cfg.logger.Named(h.Name())
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services
Expand Down Expand Up @@ -117,10 +128,29 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}

func (h *groupServiceHook) Postrun() error {
func (h *groupServiceHook) PreKill() {
h.mu.Lock()
defer h.mu.Unlock()

// If we have a shutdown delay deregister
// group services and then wait
// before continuing to kill tasks
h.deregister()
h.deregistered = true

h.logger.Debug("waiting before removing group service", "shutdown_delay", h.delay)
select {
case <-time.After(h.delay):
}
}

func (h *groupServiceHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()

if !h.deregistered {
h.deregister()
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/groupservice_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerUpdateHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPostrunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPreKillHook = (*groupServiceHook)(nil)

// TestGroupServiceHook_NoGroupServices asserts calling group service hooks
// without group services does not error.
Expand Down
7 changes: 7 additions & 0 deletions client/allocrunner/interfaces/runner_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ type ShutdownHook interface {

Shutdown()
}

//
type RunnerPreKillHook interface {
RunnerHook

PreKill()
}
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
tg.Name = *taskGroup.Name
tg.Count = *taskGroup.Count
tg.Meta = taskGroup.Meta
tg.ShutdownDelay = taskGroup.ShutdownDelay
tg.Constraints = ApiConstraintsToStructs(taskGroup.Constraints)
tg.Affinities = ApiAffinitiesToStructs(taskGroup.Affinities)
tg.Networks = ApiNetworkResourceToStructs(taskGroup.Networks)
Expand Down
14 changes: 12 additions & 2 deletions jobspec/parse_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"vault",
"migrate",
"spread",
"shutdown_delay",
"network",
"service",
"volume",
Expand All @@ -63,6 +64,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
if err := hcl.DecodeObject(&m, item.Val); err != nil {
return err
}

delete(m, "constraint")
delete(m, "affinity")
delete(m, "meta")
Expand All @@ -80,7 +82,16 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
// Build the group with the basic decode
var g api.TaskGroup
g.Name = helper.StringToPtr(n)
if err := mapstructure.WeakDecode(m, &g); err != nil {
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &g,
})

if err != nil {
return err
}
if err := dec.Decode(m); err != nil {
return err
}

Expand Down Expand Up @@ -201,7 +212,6 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
return multierror.Prefix(err, fmt.Sprintf("'%s',", n))
}
}

collection = append(collection, &g)
}

Expand Down
5 changes: 3 additions & 2 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,9 @@ func TestParse(t *testing.T) {
Datacenters: []string{"dc1"},
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("bar"),
Count: helper.IntToPtr(3),
Name: helper.StringToPtr("bar"),
ShutdownDelay: 14 * time.Second,
Count: helper.IntToPtr(3),
Networks: []*api.NetworkResource{
{
Mode: "bridge",
Expand Down
3 changes: 2 additions & 1 deletion jobspec/test-fixtures/tg-network.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ job "foo" {
datacenters = ["dc1"]

group "bar" {
count = 3
count = 3
shutdown_delay = "14s"

network {
mode = "bridge"
Expand Down
10 changes: 10 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3547,6 +3547,10 @@ func (j *Job) Validate() error {
taskGroups[tg.Name] = idx
}

// if tg.ShutdownDelay < 0 {
// mErr.Errors = append(mErr.Errors, errors.New("ShutdownDelay must be a positive value"))
// }

if j.Type == "system" && tg.Count > 1 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Job task group %s has count %d. Count cannot exceed 1 with system scheduler",
Expand Down Expand Up @@ -4736,6 +4740,8 @@ type TaskGroup struct {

// Volumes is a map of volumes that have been requested by the task group.
Volumes map[string]*VolumeRequest

ShutdownDelay *time.Duration
}

func (tg *TaskGroup) Copy() *TaskGroup {
Expand Down Expand Up @@ -4782,6 +4788,10 @@ func (tg *TaskGroup) Copy() *TaskGroup {
}
}

if tg.ShutdownDelay != nil {
ntg.ShutdownDelay = helper.TimeToPtr(*tg.ShutdownDelay)
}

return ntg
}

Expand Down

0 comments on commit 91d8faa

Please sign in to comment.