Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement alloc runner task restart hook #9869

Merged
merged 3 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

BUG FIXES:

* consul: Fixed a bug where failing tasks with group services would only cause the allocation to restart once instead of respecting the `restart` field. [[GH-9869](https://github.com/hashicorp/nomad/issues/9869)]
* consul/connect: Fixed a bug where gateway proxy connection default timeout not set [[GH-9851](https://github.com/hashicorp/nomad/pull/9851)]
* consul/connect: Fixed a bug preventing more than one connect gateway per Nomad client [[GH-9849](https://github.com/hashicorp/nomad/pull/9849)]
* scheduler: Fixed a bug where shared ports were not persisted during inplace updates for service jobs. [[GH-9830](https://github.com/hashicorp/nomad/issues/9830)]
Expand All @@ -16,7 +17,7 @@ IMPROVEMENTS:
* consul/connect: Interpolate the connect, service meta, and service canary meta blocks with the task environment [[GH-9586](https://github.com/hashicorp/nomad/pull/9586)]
* consul/connect: enable configuring custom gateway task [[GH-9639](https://github.com/hashicorp/nomad/pull/9639)]
* cli: Added JSON/go template formatting to agent-info command. [[GH-9788](https://github.com/hashicorp/nomad/pull/9788)]


BUG FIXES:
* client: Fixed a bug where non-`docker` tasks with network isolation were restarted on client restart. [[GH-9757](https://github.com/hashicorp/nomad/issues/9757)]
Expand Down
6 changes: 6 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,9 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
var err *multierror.Error
var errMutex sync.Mutex

// run alloc task restart hooks
ar.taskRestartHooks()

go func() {
var wg sync.WaitGroup
defer close(waitCh)
Expand Down Expand Up @@ -1170,6 +1173,9 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa
func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {
var err *multierror.Error

// run alloc task restart hooks
ar.taskRestartHooks()

for tn := range ar.tasks {
rerr := ar.RestartTask(tn, taskEvent.Copy())
if rerr != nil {
Expand Down
25 changes: 25 additions & 0 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,28 @@ func (ar *allocRunner) shutdownHooks() {
}
}
}

func (ar *allocRunner) taskRestartHooks() {
for _, hook := range ar.runnerHooks {
re, ok := hook.(interfaces.RunnerTaskRestartHook)
if !ok {
continue
}

name := re.Name()
var start time.Time
if ar.logger.IsTrace() {
start = time.Now()
ar.logger.Trace("running alloc task restart hook",
"name", name, "start", start)
}

re.PreTaskRestart()

if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished alloc task restart hook",
"name", name, "end", end, "duration", end.Sub(start))
}
}
}
19 changes: 19 additions & 0 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func (h *groupServiceHook) Prerun() error {
h.prerun = true
h.mu.Unlock()
}()
return h.prerunLocked()
}

func (h *groupServiceHook) prerunLocked() error {
if len(h.services) == 0 {
return nil
}
Expand Down Expand Up @@ -145,10 +148,26 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}

func (h *groupServiceHook) PreTaskRestart() error {
h.mu.Lock()
defer func() {
// Mark prerun as true to unblock Updates
h.prerun = true
h.mu.Unlock()
}()

h.preKillLocked()
return h.prerunLocked()
}

func (h *groupServiceHook) PreKill() {
h.mu.Lock()
defer h.mu.Unlock()
h.preKillLocked()
}

// implements the PreKill hook but requires the caller hold the lock
func (h *groupServiceHook) preKillLocked() {
// If we have a shutdown delay deregister
// group services and then wait
// before continuing to kill tasks
Expand Down
45 changes: 32 additions & 13 deletions client/allocrunner/groupservice_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerUpdateHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPostrunHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerPreKillHook = (*groupServiceHook)(nil)
var _ interfaces.RunnerTaskRestartHook = (*groupServiceHook)(nil)

// TestGroupServiceHook_NoGroupServices asserts calling group service hooks
// without group services does not error.
Expand Down Expand Up @@ -50,11 +51,17 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {

require.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())

ops := consulClient.GetOps()
require.Len(t, ops, 4)
require.Equal(t, "add", ops[0].Op)
require.Equal(t, "update", ops[1].Op)
require.Equal(t, "remove", ops[2].Op)
require.Len(t, ops, 7)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun (1st)
require.Equal(t, "remove", ops[3].Op) // Postrun (2nd)
require.Equal(t, "remove", ops[4].Op) // Restart -> preKill (1st)
require.Equal(t, "remove", ops[5].Op) // Restart -> preKill (2nd)
require.Equal(t, "add", ops[6].Op) // Restart -> preRun
}

// TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks
Expand Down Expand Up @@ -117,15 +124,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {

require.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())

ops := consulClient.GetOps()
require.Len(t, ops, 4)
require.Equal(t, "add", ops[0].Op)
require.Equal(t, "update", ops[1].Op)
require.Equal(t, "remove", ops[2].Op)
require.Len(t, ops, 7)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun (1st)
require.Equal(t, "remove", ops[3].Op) // Postrun (2nd)
require.Equal(t, "remove", ops[4].Op) // Restart -> preKill (1st)
require.Equal(t, "remove", ops[5].Op) // Restart -> preKill (2nd)
require.Equal(t, "add", ops[6].Op) // Restart -> preRun
}

// TestGroupServiceHook_Error asserts group service hooks with group
// services but no group network returns an error.
// services but no group network is handled gracefully.
func TestGroupServiceHook_NoNetwork(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -159,11 +172,17 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {

require.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())

ops := consulClient.GetOps()
require.Len(t, ops, 4)
require.Equal(t, "add", ops[0].Op)
require.Equal(t, "update", ops[1].Op)
require.Equal(t, "remove", ops[2].Op)
require.Len(t, ops, 7)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun (1st)
require.Equal(t, "remove", ops[3].Op) // Postrun (2nd)
require.Equal(t, "remove", ops[4].Op) // Restart -> preKill (1st)
require.Equal(t, "remove", ops[5].Op) // Restart -> preKill (2nd)
require.Equal(t, "add", ops[6].Op) // Restart -> preRun
}

func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions client/allocrunner/interfaces/runner_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type RunnerUpdateRequest struct {
Alloc *structs.Allocation
}

// RunnerTaskRestartHooks are executed just before the allocation runner is
// going to restart all tasks.
type RunnerTaskRestartHook interface {
RunnerHook

PreTaskRestart() error
}

// ShutdownHook may be implemented by AllocRunner or TaskRunner hooks and will
// be called when the agent process is being shutdown gracefully.
type ShutdownHook interface {
Expand Down
106 changes: 106 additions & 0 deletions e2e/consul/check_restart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package consul

import (
"fmt"
"os"
"reflect"
"regexp"
"strings"
"time"

e2e "github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
)

const ns = ""

type CheckRestartE2ETest struct {
framework.TC
jobIds []string
}

func (tc *CheckRestartE2ETest) BeforeAll(f *framework.F) {
e2e.WaitForLeader(f.T(), tc.Nomad())
e2e.WaitForNodesReady(f.T(), tc.Nomad(), 1)
}

func (tc *CheckRestartE2ETest) AfterEach(f *framework.F) {
if os.Getenv("NOMAD_TEST_SKIPCLEANUP") == "1" {
return
}

for _, id := range tc.jobIds {
_, err := e2e.Command("nomad", "job", "stop", "-purge", id)
f.Assert().NoError(err)
}
tc.jobIds = []string{}
_, err := e2e.Command("nomad", "system", "gc")
f.Assert().NoError(err)
}

// TestGroupCheckRestart runs a job with a group service that will never
// become healthy. Both tasks should be restarted up to the 'restart' limit.
func (tc *CheckRestartE2ETest) TestGroupCheckRestart(f *framework.F) {

jobID := "test-group-check-restart-" + uuid.Short()
f.NoError(e2e.Register(jobID, "consul/input/checks_group_restart.nomad"))
tc.jobIds = append(tc.jobIds, jobID)

var allocID string

f.NoError(
e2e.WaitForAllocStatusComparison(
func() ([]string, error) { return e2e.AllocStatuses(jobID, ns) },
func(got []string) bool { return reflect.DeepEqual(got, []string{"failed"}) },
&e2e.WaitConfig{Interval: time.Second * 10, Retries: 30},
))

expected := "Exceeded allowed attempts 2 in interval 5m0s and mode is \"fail\""

out, err := e2e.Command("nomad", "alloc", "status", allocID)
f.NoError(err, "could not get allocation status")
f.Contains(out, expected,
fmt.Errorf("expected '%s', got\n%v", expected, out))

re := regexp.MustCompile(`Total Restarts += (.*)\n`)
match := re.FindAllStringSubmatch(out, -1)
for _, m := range match {
f.Equal("2", strings.TrimSpace(m[1]),
fmt.Errorf("expected exactly 2 restarts for both tasks, got:\n%v", out))
}
}

// TestTaskCheckRestart runs a job with a task service that will never become
// healthy. Only the failed task should be restarted up to the 'restart'
// limit.
func (tc *CheckRestartE2ETest) TestTaskCheckRestart(f *framework.F) {

jobID := "test-task-check-restart-" + uuid.Short()
f.NoError(e2e.Register(jobID, "consul/input/checks_task_restart.nomad"))
tc.jobIds = append(tc.jobIds, jobID)

var allocID string

f.NoError(
e2e.WaitForAllocStatusComparison(
func() ([]string, error) { return e2e.AllocStatuses(jobID, ns) },
func(got []string) bool { return reflect.DeepEqual(got, []string{"failed"}) },
&e2e.WaitConfig{Interval: time.Second * 10, Retries: 30},
))

expected := "Exceeded allowed attempts 2 in interval 5m0s and mode is \"fail\""

out, err := e2e.Command("nomad", "alloc", "status", allocID)
f.NoError(err, "could not get allocation status")
f.Contains(out, expected,
fmt.Errorf("expected '%s', got\n%v", expected, out))

re := regexp.MustCompile(`Total Restarts += (.*)\n`)
match := re.FindAllStringSubmatch(out, -1)
f.Equal("2", strings.TrimSpace(match[0][1]),
fmt.Errorf("expected exactly 2 restarts for failed task, got:\n%v", out))

f.Equal("0", strings.TrimSpace(match[1][1]),
fmt.Errorf("expected exactly no restarts for healthy task, got:\n%v", out))
}
1 change: 1 addition & 0 deletions e2e/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
Cases: []framework.TestCase{
new(ConsulE2ETest),
new(ScriptChecksE2ETest),
new(CheckRestartE2ETest),
},
})
}
Expand Down
64 changes: 64 additions & 0 deletions e2e/consul/input/checks_group_restart.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
job "group_check_restart" {
datacenters = ["dc1"]
type = "service"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "group_check_restart" {
network {
mode = "bridge"
}

restart {
attempts = 2
delay = "1s"
interval = "5m"
mode = "fail"
}

service {
name = "group-service-1"
port = "9003"

# this check should always time out and so the service
# should not be marked healthy, resulting in the tasks
# getting restarted
check {
name = "always-dead"
type = "script"
task = "fail"
interval = "2s"
timeout = "1s"
command = "sleep"
args = ["10"]

check_restart {
limit = 2
grace = "5s"
ignore_warnings = false
}
}
}

task "fail" {
driver = "raw_exec"

config {
command = "bash"
args = ["-c", "sleep 15000"]
}
}

task "ok" {
driver = "raw_exec"

config {
command = "bash"
args = ["-c", "sleep 15000"]
}
}
}
}
Loading