Skip to content

Commit

Permalink
Merge pull request #5669 from hashicorp/b-block-on-servers-4
Browse files Browse the repository at this point in the history
client: do not restart restored tasks until server is contacted
  • Loading branch information
schmichael authored May 14, 2019
2 parents 315beb6 + abd809d commit 5b65afb
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 31 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ __BACKWARDS INCOMPATIBILITIES:__
to update your code. [[GH-5536](https://github.com/hashicorp/nomad/pull/5536)]
* client: The format of check IDs in Consul has changed. If you rely upon
Nomad's check IDs you will need to update your code. [[GH-5536](https://github.com/hashicorp/nomad/pull/5536)]
* client: On startup a client will reattach to running tasks as before but
will not restart exited tasks. Exited tasks will be restarted only after the
client has reestablished communication with servers. System jobs will always
be restarted. [[GH-5669](https://github.com/hashicorp/nomad/pull/5669)]

FEATURES:

Expand Down
7 changes: 7 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ type allocRunner struct {
// driverManager is responsible for dispensing driver plugins and registering
// event handlers
driverManager drivermanager.Manager

// serversContactedCh is passed to TaskRunners so they can detect when
// servers have been contacted for the first time in case of a failed
// restore.
serversContactedCh chan struct{}
}

// NewAllocRunner returns a new allocation runner.
Expand Down Expand Up @@ -167,6 +172,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
prevAllocMigrator: config.PrevAllocMigrator,
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
serversContactedCh: config.ServersContactedCh,
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -205,6 +211,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
DeviceStatsReporter: ar.deviceStatsReporter,
DeviceManager: ar.devicemanager,
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
}

// Create, but do not Run, the task runner
Expand Down
4 changes: 4 additions & 0 deletions client/allocrunner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ type Config struct {

// DriverManager handles dispensing of driver plugins
DriverManager drivermanager.Manager

// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}
}
49 changes: 48 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ type TaskRunner struct {
// maxEvents is the capacity of the TaskEvents on the TaskState.
// Defaults to defaultMaxEvents but overrideable for testing.
maxEvents int

// serversContactedCh is passed to TaskRunners so they can detect when
// GetClientAllocs has been called in case of a failed restore.
serversContactedCh <-chan struct{}

// waitOnServers defaults to false but will be set true if a restore
// fails and the Run method should wait until serversContactedCh is
// closed.
waitOnServers bool
}

type Config struct {
Expand Down Expand Up @@ -222,6 +231,10 @@ type Config struct {
// DriverManager is used to dispense driver plugins and register event
// handlers
DriverManager drivermanager.Manager

// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}
}

func NewTaskRunner(config *Config) (*TaskRunner, error) {
Expand Down Expand Up @@ -270,6 +283,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -381,6 +395,19 @@ func (tr *TaskRunner) Run() {
// - should be handled serially.
go tr.handleUpdates()

// If restore failed wait until servers are contacted before running.
// #1795
if tr.waitOnServers {
tr.logger.Info("task failed to restore; waiting to contact server before restarting")
select {
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
return
case <-tr.serversContactedCh:
tr.logger.Trace("server contacted; unblocking waiting task")
}
}

MAIN:
for !tr.Alloc().TerminalStatus() {
select {
Expand Down Expand Up @@ -858,8 +885,28 @@ func (tr *TaskRunner) Restore() error {
if taskHandle := tr.localState.TaskHandle; taskHandle != nil {
//TODO if RecoverTask returned the DriverNetwork we wouldn't
// have to persist it at all!
tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)
restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)

// If the handle could not be restored, the alloc is
// non-terminal, and the task isn't a system job: wait until
// servers have been contacted before running. #1795
if restored {
return nil
}

alloc := tr.Alloc()
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
return nil
}

tr.logger.Trace("failed to reattach to task; will not run until server is contacted")
tr.waitOnServers = true

ev := structs.NewTaskEvent(structs.TaskRestoreFailed).
SetDisplayMessage("failed to restore task; will not run until server is contacted")
tr.UpdateState(structs.TaskStatePending, ev)
}

return nil
}

Expand Down
Loading

0 comments on commit 5b65afb

Please sign in to comment.