Skip to content

Commit

Permalink
client: do not restart dead tasks until server is contacted
Browse files Browse the repository at this point in the history
Fixes #1795

Running restored allocations and pulling what allocations to run from
the server happen concurrently. This means that if a client is rebooted,
and has its allocations rescheduled, it may restart the dead allocations
before it contacts the server and determines they should be dead.

This commit makes tasks that fail to reattach on restore wait until the
server is contacted before restarting.
  • Loading branch information
schmichael committed May 8, 2019
1 parent 62c84e0 commit 104067b
Show file tree
Hide file tree
Showing 7 changed files with 438 additions and 3 deletions.
9 changes: 9 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,15 @@ func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) {

}

// MarkLive unblocks restored tasks that failed to reattach and are waiting to
// contact a server before restarting the dead task. The Client will call this
// method when the task should run, otherwise the task will be killed.
func (ar *allocRunner) MarkLive() {
for _, tr := range ar.tasks {
tr.MarkLive()
}
}

func (ar *allocRunner) Listener() *cstructs.AllocListener {
return ar.allocBroadcaster.Listen()
}
Expand Down
48 changes: 47 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/gate"
"github.com/hashicorp/nomad/helper/pluginutils/hclspecutils"
"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -193,6 +194,10 @@ type TaskRunner struct {
// maxEvents is the capacity of the TaskEvents on the TaskState.
// Defaults to defaultMaxEvents but overrideable for testing.
maxEvents int

// restoreGate is used to block restored tasks that failed to reattach
// from restarting until servers are contacted. #1795
restoreGate *gate.G
}

type Config struct {
Expand Down Expand Up @@ -245,6 +250,12 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
tstate = ts.Copy()
}

// Initialize restoreGate as open. It will only be closed if Restore is
// called and fails to reconnect to the task handle. In that case the
// we must wait until contact with the server is made before restarting
// or killing the task. #1795
restoreGate := gate.NewOpen()

tr := &TaskRunner{
alloc: config.Alloc,
allocID: config.Alloc.ID,
Expand All @@ -270,6 +281,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
restoreGate: restoreGate,
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -381,6 +393,18 @@ func (tr *TaskRunner) Run() {
// - should be handled serially.
go tr.handleUpdates()

// If restore failed, don't proceed until servers are contacted
if tr.restoreGate.IsClosed() {
tr.logger.Info("task failed to restore; waiting to contact server before restarting")
select {
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
return
case <-tr.restoreGate.Wait():
tr.logger.Trace("server contacted; unblocking waiting task")
}
}

MAIN:
for !tr.Alloc().TerminalStatus() {
select {
Expand Down Expand Up @@ -858,7 +882,18 @@ func (tr *TaskRunner) Restore() error {
if taskHandle := tr.localState.TaskHandle; taskHandle != nil {
//TODO if RecoverTask returned the DriverNetwork we wouldn't
// have to persist it at all!
tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)
restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)
if !restored && !tr.Alloc().TerminalStatus() {
// Restore failed, close the restore gate to block
// until server is contacted to prevent restarting
// terminal allocs. #1795
tr.logger.Trace("failed to reattach to task; will not run until server is contacted")
tr.restoreGate.Close()

ev := structs.NewTaskEvent(structs.TaskRestoreFailed).
SetDisplayMessage("failed to restore task; will not run until server is contacted")
tr.UpdateState(structs.TaskStatePending, ev)
}
}
return nil
}
Expand Down Expand Up @@ -1073,6 +1108,10 @@ func (tr *TaskRunner) Update(update *structs.Allocation) {
// Trigger update hooks if not terminal
if !update.TerminalStatus() {
tr.triggerUpdateHooks()

// MarkLive in case task had failed to restore and were waiting
// to hear from the server.
tr.MarkLive()
}
}

Expand All @@ -1089,6 +1128,13 @@ func (tr *TaskRunner) triggerUpdateHooks() {
}
}

// MarkLive unblocks restored tasks that failed to reattach and are waiting to
// contact a server before restarting the dead task. The Client will call this
// method when the task should run, otherwise the task will be killed.
func (tr *TaskRunner) MarkLive() {
tr.restoreGate.Open()
}

// Shutdown TaskRunner gracefully without affecting the state of the task.
// Shutdown blocks until the main Run loop exits.
func (tr *TaskRunner) Shutdown() {
Expand Down
142 changes: 140 additions & 2 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"github.com/hashicorp/nomad/client/vaultclient"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
mockdriver "github.com/hashicorp/nomad/drivers/mock"
"github.com/hashicorp/nomad/drivers/rawexec"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -124,8 +126,8 @@ func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string)
}
}

// TestTaskRunner_Restore asserts restoring a running task does not rerun the
// task.
// TestTaskRunner_Restore_Running asserts restoring a running task does not
// rerun the task.
func TestTaskRunner_Restore_Running(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down Expand Up @@ -178,6 +180,142 @@ func TestTaskRunner_Restore_Running(t *testing.T) {
assert.Equal(t, 1, started)
}

// setupRestoreFailureTest starts a service, shuts down the task runner, and
// kills the task before restarting a new TaskRunner. The new TaskRunner is
// returned once it is running and waiting in pending along with a cleanup
// func.
func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) {
t.Parallel()

alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "raw_exec"
task.Config = map[string]interface{}{
"command": "sleep",
"args": []string{"30"},
}
conf, cleanup1 := testTaskRunnerConfig(t, alloc, task.Name)
conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs

// Run the first TaskRunner
origTR, err := NewTaskRunner(conf)
require.NoError(t, err)
go origTR.Run()
cleanup2 := func() {
origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
cleanup1()
}

// Wait for it to be running
testWaitForTaskToStart(t, origTR)

handle := origTR.getDriverHandle()
require.NotNil(t, handle)
taskID := handle.taskID

// Cause TR to exit without shutting down task
origTR.Shutdown()

// Get the mock driver plugin
driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name)
require.NoError(t, err)
rawexecDriver := driverPlugin.(*rawexec.Driver)

// Assert the task is still running despite TR having exited
taskStatus, err := rawexecDriver.InspectTask(taskID)
require.NoError(t, err)
require.Equal(t, drivers.TaskStateRunning, taskStatus.State)

// Kill the task so it fails to recover when restore is called
require.NoError(t, rawexecDriver.DestroyTask(taskID, true))
_, err = rawexecDriver.InspectTask(taskID)
require.EqualError(t, err, drivers.ErrTaskNotFound.Error())

// Create a new TaskRunner and Restore the task
newTR, err := NewTaskRunner(conf)
require.NoError(t, err)
require.NoError(t, newTR.Restore())

// Assert the restore gate is *closed* because reattachment failed
require.True(t, newTR.restoreGate.IsClosed())

// Start new TR
go newTR.Run()
cleanup3 := func() {
newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
cleanup2()
cleanup1()
}

// Assert task has not been restarted
_, err = rawexecDriver.InspectTask(taskID)
require.EqualError(t, err, drivers.ErrTaskNotFound.Error())
ts := newTR.TaskState()
require.Equal(t, structs.TaskStatePending, ts.State)

return newTR, cleanup3
}

// TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until
// MarkAlive is called. #1795
func TestTaskRunner_Restore_Restart(t *testing.T) {
newTR, cleanup := setupRestoreFailureTest(t)
defer cleanup()

// Fake contacting the server by opening the restore gate
newTR.MarkLive()

testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State
return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts)
}, func(err error) {
require.NoError(t, err)
})
}

// TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until
// the task is killed. #1795
func TestTaskRunner_Restore_Kill(t *testing.T) {
newTR, cleanup := setupRestoreFailureTest(t)
defer cleanup()

// Sending the task a terminal update shouldn't kill it or mark it live
alloc := newTR.Alloc().Copy()
alloc.DesiredStatus = structs.AllocDesiredStatusStop
newTR.Update(alloc)

require.Equal(t, structs.TaskStatePending, newTR.TaskState().State)

// AllocRunner will immediately kill tasks after sending a terminal
// update.
newTR.Kill(context.Background(), structs.NewTaskEvent(structs.TaskKilling))

select {
case <-newTR.WaitCh():
// It died as expected!
case <-time.After(10 * time.Second):
require.Fail(t, "timeout waiting for task to die")
}
}

// TestTaskRunner_Restore_Update asserts restoring a dead task blocks until
// Update is called. #1795
func TestTaskRunner_Restore_Update(t *testing.T) {
newTR, cleanup := setupRestoreFailureTest(t)
defer cleanup()

// Fake contacting the server by opening the restore gate
alloc := newTR.Alloc().Copy()
newTR.Update(alloc)

testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State
return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts)
}, func(err error) {
require.NoError(t, err)
})
}

// TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are
// interpolated.
func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) {
Expand Down
25 changes: 25 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type AllocRunner interface {
Run()
StatsReporter() interfaces.AllocStatsReporter
Update(*structs.Allocation)
MarkLive()
WaitCh() <-chan struct{}
DestroyCh() <-chan struct{}
ShutdownCh() <-chan struct{}
Expand Down Expand Up @@ -1989,6 +1990,12 @@ func (c *Client) runAllocs(update *allocUpdates) {

errs := 0

// Mark existing allocations as live in case they failed to reattach on
// restore and are waiting to hear from the server before restarting.
for _, live := range diff.ignore {
c.markAllocLive(live)
}

// Remove the old allocations
for _, remove := range diff.removed {
c.removeAlloc(remove)
Expand Down Expand Up @@ -2072,6 +2079,24 @@ func makeFailedAlloc(add *structs.Allocation, err error) *structs.Allocation {
return stripped
}

// markAllocLive is invoked when an alloc should be running but has not been
// updated or just been added. This allows unblocking tasks that failed to
// reattach on restored and are waiting to hear from the server.
func (c *Client) markAllocLive(allocID string) {
c.allocLock.Lock()
defer c.allocLock.Unlock()

ar, ok := c.allocs[allocID]
if !ok {
// This should never happen as alloc diffing should cause
// unknown allocs to be added, not marked live.
c.logger.Warn("unknown alloc should be running but is not", "alloc_id", allocID)
return
}

ar.MarkLive()
}

// removeAlloc is invoked when we should remove an allocation because it has
// been removed by the server.
func (c *Client) removeAlloc(allocID string) {
Expand Down
Loading

0 comments on commit 104067b

Please sign in to comment.