Skip to content

Commit

Permalink
client: do not restart dead tasks until server is contacted (try 2)
Browse files Browse the repository at this point in the history
Refactoring of 104067b

Switch the MarkLive method for a chan that is closed by the client.
Thanks to @notnoop for the idea!

The old approach called a method on most existing ARs and TRs on every
runAllocs call. The new approach does a once.Do call in runAllocs to
accomplish the same thing with less work. Able to remove the gate
abstraction that did much more than was needed.
  • Loading branch information
schmichael committed May 10, 2019
1 parent 40e3e34 commit aced24f
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 316 deletions.
16 changes: 7 additions & 9 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ type allocRunner struct {
// driverManager is responsible for dispensing driver plugins and registering
// event handlers
driverManager drivermanager.Manager

// serversContactedCh is passed to TaskRunners so they can detect when
// servers have been contacted for the first time in case of a failed
// restore.
serversContactedCh chan struct{}
}

// NewAllocRunner returns a new allocation runner.
Expand Down Expand Up @@ -167,6 +172,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
prevAllocMigrator: config.PrevAllocMigrator,
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
serversContactedCh: config.ServersContactedCh,
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -205,6 +211,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
DeviceStatsReporter: ar.deviceStatsReporter,
DeviceManager: ar.devicemanager,
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
}

// Create, but do not Run, the task runner
Expand Down Expand Up @@ -721,15 +728,6 @@ func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) {

}

// MarkLive unblocks restored tasks that failed to reattach and are waiting to
// contact a server before restarting the dead task. The Client will call this
// method when the task should run, otherwise the task will be killed.
func (ar *allocRunner) MarkLive() {
for _, tr := range ar.tasks {
tr.MarkLive()
}
}

func (ar *allocRunner) Listener() *cstructs.AllocListener {
return ar.allocBroadcaster.Listen()
}
Expand Down
4 changes: 4 additions & 0 deletions client/allocrunner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ type Config struct {

// DriverManager handles dispensing of driver plugins
DriverManager drivermanager.Manager

// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}
}
72 changes: 37 additions & 35 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/gate"
"github.com/hashicorp/nomad/helper/pluginutils/hclspecutils"
"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -195,9 +194,15 @@ type TaskRunner struct {
// Defaults to defaultMaxEvents but overrideable for testing.
maxEvents int

// restoreGate is used to block restored tasks that failed to reattach
// from restarting until servers are contacted. #1795
restoreGate *gate.G
// serversContactedCh is passed to TaskRunners so they can detect when
// servers have been contacted for the first time in case of a failed
// restore.
serversContactedCh <-chan struct{}

// waitOnServers defaults to false but will be set true if a restore
// fails and the Run method should wait until serversContactedCh is
// closed.
waitOnServers bool
}

type Config struct {
Expand Down Expand Up @@ -227,6 +232,10 @@ type Config struct {
// DriverManager is used to dispense driver plugins and register event
// handlers
DriverManager drivermanager.Manager

// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}
}

func NewTaskRunner(config *Config) (*TaskRunner, error) {
Expand All @@ -250,12 +259,6 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
tstate = ts.Copy()
}

// Initialize restoreGate as open. It will only be closed if Restore is
// called and fails to reconnect to the task handle. In that case the
// we must wait until contact with the server is made before restarting
// or killing the task. #1795
restoreGate := gate.NewOpen()

tr := &TaskRunner{
alloc: config.Alloc,
allocID: config.Alloc.ID,
Expand All @@ -281,7 +284,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
restoreGate: restoreGate,
serversContactedCh: config.ServersContactedCh,
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -393,14 +396,15 @@ func (tr *TaskRunner) Run() {
// - should be handled serially.
go tr.handleUpdates()

// If restore failed, don't proceed until servers are contacted
if tr.restoreGate.IsClosed() {
// If restore failed wait until servers are contacted before running.
// #1795
if tr.waitOnServers {
tr.logger.Info("task failed to restore; waiting to contact server before restarting")
select {
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
return
case <-tr.restoreGate.Wait():
case <-tr.serversContactedCh:
tr.logger.Trace("server contacted; unblocking waiting task")
}
}
Expand Down Expand Up @@ -883,18 +887,27 @@ func (tr *TaskRunner) Restore() error {
//TODO if RecoverTask returned the DriverNetwork we wouldn't
// have to persist it at all!
restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)
if !restored && !tr.Alloc().TerminalStatus() {
// Restore failed, close the restore gate to block
// until server is contacted to prevent restarting
// terminal allocs. #1795
tr.logger.Trace("failed to reattach to task; will not run until server is contacted")
tr.restoreGate.Close()

ev := structs.NewTaskEvent(structs.TaskRestoreFailed).
SetDisplayMessage("failed to restore task; will not run until server is contacted")
tr.UpdateState(structs.TaskStatePending, ev)

// If the handle could not be restored, the alloc is
// non-terminal, and the task isn't a system job: wait until
// servers have been contacted before running. #1795
if restored {
return nil
}

alloc := tr.Alloc()
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
return nil
}

tr.logger.Trace("failed to reattach to task; will not run until server is contacted")
tr.waitOnServers = true

ev := structs.NewTaskEvent(structs.TaskRestoreFailed).
SetDisplayMessage("failed to restore task; will not run until server is contacted")
tr.UpdateState(structs.TaskStatePending, ev)
}

return nil
}

Expand Down Expand Up @@ -1108,10 +1121,6 @@ func (tr *TaskRunner) Update(update *structs.Allocation) {
// Trigger update hooks if not terminal
if !update.TerminalStatus() {
tr.triggerUpdateHooks()

// MarkLive in case task had failed to restore and were waiting
// to hear from the server.
tr.MarkLive()
}
}

Expand All @@ -1128,13 +1137,6 @@ func (tr *TaskRunner) triggerUpdateHooks() {
}
}

// MarkLive unblocks restored tasks that failed to reattach and are waiting to
// contact a server before restarting the dead task. The Client will call this
// method when the task should run, otherwise the task will be killed.
func (tr *TaskRunner) MarkLive() {
tr.restoreGate.Open()
}

// Shutdown TaskRunner gracefully without affecting the state of the task.
// Shutdown blocks until the main Run loop exits.
func (tr *TaskRunner) Shutdown() {
Expand Down
48 changes: 25 additions & 23 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,18 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
}

conf := &Config{
Alloc: alloc,
ClientConfig: clientConf,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
Alloc: alloc,
ClientConfig: clientConf,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
}
return conf, trCleanup
}
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) {
// kills the task before restarting a new TaskRunner. The new TaskRunner is
// returned once it is running and waiting in pending along with a cleanup
// func.
func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) {
func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) {
t.Parallel()

alloc := mock.Alloc()
Expand Down Expand Up @@ -234,10 +235,10 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) {
// Create a new TaskRunner and Restore the task
newTR, err := NewTaskRunner(conf)
require.NoError(t, err)
require.NoError(t, newTR.Restore())

// Assert the restore gate is *closed* because reattachment failed
require.True(t, newTR.restoreGate.IsClosed())
// Assert the TR will wait on servers because reattachment failed
require.NoError(t, newTR.Restore())
require.True(t, newTR.waitOnServers)

// Start new TR
go newTR.Run()
Expand All @@ -253,17 +254,17 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) {
ts := newTR.TaskState()
require.Equal(t, structs.TaskStatePending, ts.State)

return newTR, cleanup3
return newTR, conf, cleanup3
}

// TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until
// MarkAlive is called. #1795
func TestTaskRunner_Restore_Restart(t *testing.T) {
newTR, cleanup := setupRestoreFailureTest(t)
newTR, conf, cleanup := setupRestoreFailureTest(t)
defer cleanup()

// Fake contacting the server by opening the restore gate
newTR.MarkLive()
// Fake contacting the server by closing the chan
close(conf.ServersContactedCh)

testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State
Expand All @@ -276,10 +277,10 @@ func TestTaskRunner_Restore_Restart(t *testing.T) {
// TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until
// the task is killed. #1795
func TestTaskRunner_Restore_Kill(t *testing.T) {
newTR, cleanup := setupRestoreFailureTest(t)
newTR, _, cleanup := setupRestoreFailureTest(t)
defer cleanup()

// Sending the task a terminal update shouldn't kill it or mark it live
// Sending the task a terminal update shouldn't kill it or unblock it
alloc := newTR.Alloc().Copy()
alloc.DesiredStatus = structs.AllocDesiredStatusStop
newTR.Update(alloc)
Expand All @@ -301,12 +302,13 @@ func TestTaskRunner_Restore_Kill(t *testing.T) {
// TestTaskRunner_Restore_Update asserts restoring a dead task blocks until
// Update is called. #1795
func TestTaskRunner_Restore_Update(t *testing.T) {
newTR, cleanup := setupRestoreFailureTest(t)
newTR, conf, cleanup := setupRestoreFailureTest(t)
defer cleanup()

// Fake contacting the server by opening the restore gate
// Fake Client.runAllocs behavior by calling Update then closing chan
alloc := newTR.Alloc().Copy()
newTR.Update(alloc)
close(conf.ServersContactedCh)

testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State
Expand Down
23 changes: 12 additions & 11 deletions client/allocrunner/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,18 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu
clientConf, cleanup := clientconfig.TestClientConfig(t)
conf := &Config{
// Copy the alloc in case the caller edits and reuses it
Alloc: alloc.Copy(),
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
Alloc: alloc.Copy(),
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
}
return conf, cleanup
}
Expand Down
Loading

0 comments on commit aced24f

Please sign in to comment.