Skip to content

Commit

Permalink
Merge pull request #6746 from hashicorp/f-shutdown-delay-tg
Browse files Browse the repository at this point in the history
Group shutdown_delay
  • Loading branch information
drewbailey authored Dec 18, 2019
2 parents f697529 + 22d521c commit f575742
Show file tree
Hide file tree
Showing 16 changed files with 260 additions and 13 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## 0.10.3 (Unreleased)

FEATURES:

* jobspec: Add `shutdown_delay` to task groups so task groups can delay shutdown
after deregistering from Consul [[GH-6746](https://github.com/hashicorp/nomad/issues/6746)]

IMPROVEMENTS:

* scheduler: Removed penalty for allocation's previous node if the allocation did not fail. [[GH-6781](https://github.com/hashicorp/nomad/issues/6781)]
Expand Down
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ type TaskGroup struct {
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
}

// NewTaskGroup creates a new TaskGroup.
Expand Down
3 changes: 3 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
var mu sync.Mutex
states := make(map[string]*structs.TaskState, len(ar.tasks))

// run alloc prekill hooks
ar.preKillHooks()

// Kill leader first, synchronously
for name, tr := range ar.tasks {
if !tr.IsLeader() {
Expand Down
23 changes: 23 additions & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,29 @@ func (ar *allocRunner) destroy() error {
return merr.ErrorOrNil()
}

func (ar *allocRunner) preKillHooks() {
for _, hook := range ar.runnerHooks {
pre, ok := hook.(interfaces.RunnerPreKillHook)
if !ok {
continue
}

name := pre.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running alloc pre shutdown hook", "name", name, "start", start)
}

pre.PreKill()

if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished alloc pre shutdown hook", "name", name, "end", end, "duration", end.Sub(start))
}
}
}

// shutdownHooks calls graceful shutdown hooks for when the agent is exiting.
func (ar *allocRunner) shutdownHooks() {
for _, hook := range ar.runnerHooks {
Expand Down
127 changes: 127 additions & 0 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,133 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}

func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
t.Parallel()

alloc := mock.Alloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0

// Create a group service
tg := alloc.Job.TaskGroups[0]
tg.Services = []*structs.Service{
{
Name: "shutdown_service",
},
}

// Create two tasks in the group
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "10s",
}

task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": "10s",
}

alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
alloc.AllocatedResources.Tasks[task.Name] = tr
alloc.AllocatedResources.Tasks[task2.Name] = tr

// Set a shutdown delay
shutdownDelay := 1 * time.Second
alloc.Job.TaskGroups[0].ShutdownDelay = &shutdownDelay

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()

// Wait for tasks to start
upd := conf.StateUpdater.(*MockStateUpdater)
last := upd.Last()
testutil.WaitForResult(func() (bool, error) {
last = upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if n := len(last.TaskStates); n != 2 {
return false, fmt.Errorf("Not enough task states (want: 2; found %d)", n)
}
for name, state := range last.TaskStates {
if state.State != structs.TaskStateRunning {
return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Reset updates
upd.Reset()

// Stop alloc
shutdownInit := time.Now()
update := alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)

// Wait for tasks to stop
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}

fin := last.TaskStates["leader"].FinishedAt

if fin.IsZero() {
return false, nil
}

return true, nil
}, func(err error) {
last := upd.Last()
for name, state := range last.TaskStates {
t.Logf("%s: %s", name, state.State)
}
t.Fatalf("err: %v", err)
})

// Get consul client operations
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulOpts := consulClient.GetOps()
var groupRemoveOp cconsul.MockConsulOp
for _, op := range consulOpts {
// Grab the first deregistration request
if op.Op == "remove" && op.Name == "group-web" {
groupRemoveOp = op
break
}
}

// Ensure remove operation is close to shutdown initiation
require.True(t, groupRemoveOp.OccurredAt.Sub(shutdownInit) < 100*time.Millisecond)

last = upd.Last()
minShutdown := shutdownInit.Add(task.ShutdownDelay)
leaderFinished := last.TaskStates["leader"].FinishedAt
followerFinished := last.TaskStates["follower1"].FinishedAt

// Check that both tasks shut down after min possible shutdown time
require.Greater(t, leaderFinished.UnixNano(), minShutdown.UnixNano())
require.Greater(t, followerFinished.UnixNano(), minShutdown.UnixNano())

// Check that there is at least shutdown_delay between consul
// remove operation and task finished at time
require.True(t, leaderFinished.Sub(groupRemoveOp.OccurredAt) > shutdownDelay)
}

// TestAllocRunner_TaskLeader_StopTG asserts that when stopping an alloc with a
// leader the leader is stopped before other tasks.
func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
Expand Down
37 changes: 36 additions & 1 deletion client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package allocrunner

import (
"sync"
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand All @@ -20,6 +21,8 @@ type groupServiceHook struct {
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
delay time.Duration
deregistered bool

logger log.Logger

Expand All @@ -43,12 +46,20 @@ type groupServiceHookConfig struct {
}

func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
var shutdownDelay time.Duration
tg := cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup)

if tg.ShutdownDelay != nil {
shutdownDelay = *tg.ShutdownDelay
}

h := &groupServiceHook{
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
}
h.logger = cfg.logger.Named(h.Name())
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services
Expand Down Expand Up @@ -117,10 +128,34 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}

func (h *groupServiceHook) Postrun() error {
func (h *groupServiceHook) PreKill() {
h.mu.Lock()
defer h.mu.Unlock()

// If we have a shutdown delay deregister
// group services and then wait
// before continuing to kill tasks
h.deregister()
h.deregistered = true

if h.delay == 0 {
return
}

h.logger.Debug("waiting before removing group service", "shutdown_delay", h.delay)

// Wait for specified shutdown_delay
// this will block an agent from shutting down
<-time.After(h.delay)
}

func (h *groupServiceHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()

if !h.deregistered {
h.deregister()
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/groupservice_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerUpdateHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPostrunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPreKillHook = (*groupServiceHook)(nil)

// TestGroupServiceHook_NoGroupServices asserts calling group service hooks
// without group services does not error.
Expand Down
9 changes: 9 additions & 0 deletions client/allocrunner/interfaces/runner_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ type RunnerPrerunHook interface {
Prerun() error
}

// RunnerPreKillHooks are executed inside of KillTasks before
// iterating and killing each task. It will run before the Leader
// task is killed.
type RunnerPreKillHook interface {
RunnerHook

PreKill()
}

// RunnerPostrunHooks are executed after calling TaskRunner.Run, even for
// terminal allocations. Therefore Postrun hooks must be safe to call without
// first calling Prerun hooks.
Expand Down
15 changes: 9 additions & 6 deletions client/consul/consul_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consul
import (
"fmt"
"sync"
"time"

log "github.com/hashicorp/go-hclog"

Expand All @@ -12,9 +13,10 @@ import (

// MockConsulOp represents the register/deregister operations.
type MockConsulOp struct {
Op string // add, remove, or update
AllocID string
Name string // task or group name
Op string // add, remove, or update
AllocID string
Name string // task or group name
OccurredAt time.Time
}

func NewMockConsulOp(op, allocID, name string) MockConsulOp {
Expand All @@ -25,9 +27,10 @@ func NewMockConsulOp(op, allocID, name string) MockConsulOp {
panic(fmt.Errorf("invalid consul op: %s", op))
}
return MockConsulOp{
Op: op,
AllocID: allocID,
Name: name,
Op: op,
AllocID: allocID,
Name: name,
OccurredAt: time.Now(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
Mode: *taskGroup.RestartPolicy.Mode,
}

if taskGroup.ShutdownDelay != nil {
tg.ShutdownDelay = taskGroup.ShutdownDelay
}

if taskGroup.ReschedulePolicy != nil {
tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Expand Down
14 changes: 12 additions & 2 deletions jobspec/parse_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"vault",
"migrate",
"spread",
"shutdown_delay",
"network",
"service",
"volume",
Expand All @@ -63,6 +64,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
if err := hcl.DecodeObject(&m, item.Val); err != nil {
return err
}

delete(m, "constraint")
delete(m, "affinity")
delete(m, "meta")
Expand All @@ -80,7 +82,16 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
// Build the group with the basic decode
var g api.TaskGroup
g.Name = helper.StringToPtr(n)
if err := mapstructure.WeakDecode(m, &g); err != nil {
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &g,
})

if err != nil {
return err
}
if err := dec.Decode(m); err != nil {
return err
}

Expand Down Expand Up @@ -201,7 +212,6 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
return multierror.Prefix(err, fmt.Sprintf("'%s',", n))
}
}

collection = append(collection, &g)
}

Expand Down
5 changes: 3 additions & 2 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,9 @@ func TestParse(t *testing.T) {
Datacenters: []string{"dc1"},
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("bar"),
Count: helper.IntToPtr(3),
Name: helper.StringToPtr("bar"),
ShutdownDelay: helper.TimeToPtr(14 * time.Second),
Count: helper.IntToPtr(3),
Networks: []*api.NetworkResource{
{
Mode: "bridge",
Expand Down
Loading

0 comments on commit f575742

Please sign in to comment.