diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index f628ef5017a..18ed0d658c7 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -141,6 +141,11 @@ type allocRunner struct { // servers have been contacted for the first time in case of a failed // restore. serversContactedCh chan struct{} + + // waitOnServers defaults to false but will be set true if a restore + // fails and the Run method should wait until serversContactedCh is + // closed. + waitOnServers bool } // NewAllocRunner returns a new allocation runner. @@ -243,6 +248,16 @@ func (ar *allocRunner) Run() { // Start the alloc update handler go ar.handleAllocUpdates() + if ar.waitOnServers { + ar.logger.Info(" waiting to contact server before restarting") + select { + case <-ar.taskStateUpdateHandlerCh: + return + case <-ar.serversContactedCh: + ar.logger.Info("server contacted; unblocking waiting alloc") + } + } + // If task update chan has been closed, that means we've been shutdown. select { case <-ar.taskStateUpdateHandlerCh: @@ -353,9 +368,50 @@ func (ar *allocRunner) Restore() error { } } + ar.waitOnServers = ar.shouldWaitForServers(ds) return nil } +// shouldWaitForServers returns true if we suspect the alloc +// is potentially a completed alloc that got resurrected after AR was destroyed. +// In such cases, rerunning the alloc can lead to process and task exhaustion. +// +// The heaurstic used here is an alloc is suspect if it's in a pending state +// and no other task/status info is found. +// +// See: +// * https://github.com/hashicorp/nomad/pull/6207 +// * https://github.com/hashicorp/nomad/issues/5984 +// +// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported +func (ar *allocRunner) shouldWaitForServers(ds *structs.AllocDeploymentStatus) bool { + alloc := ar.Alloc() + + if alloc.ClientStatus != structs.AllocClientStatusPending { + return false + } + + // check if we restore a task but see no other data + if ds != nil { + return false + } + + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil { + // corrupt alloc?! + return true + } + + for _, task := range tg.Tasks { + ls, tr, _ := ar.stateDB.GetTaskRunnerState(alloc.ID, task.Name) + if ls != nil || tr != nil { + return false + } + } + + return true +} + // persistDeploymentStatus stores AllocDeploymentStatus. func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) { if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 548602f1ebb..047ce06c11d 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1059,3 +1059,44 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) { require.NoError(t, err) require.Nil(t, ts) } + +// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported +func TestAllocRunner_WaitForServer_Detects_Suspicious_Allocs(t *testing.T) { + t.Parallel() + alloc := mock.BatchAlloc() + + conf, cleanup := testAllocRunnerConfig(t, alloc) + conf.StateDB = state.NewMemDB(conf.Logger) + + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + + defer destroy(ar) + go ar.Run() + + select { + case <-ar.WaitCh(): + case <-time.After(10 * time.Second): + require.Fail(t, "timed out waiting for alloc to complete") + } + + // shouldn't wait after successful completion + require.False(t, ar.shouldWaitForServers(nil)) + + // new alloc runner shouldn't restore completed alloc + ar, err = NewAllocRunner(conf) + require.NoError(t, err) + ar.Restore() + require.False(t, ar.shouldWaitForServers(nil)) + + // simulate 0.9.5 behavior + require.NoError(t, conf.StateDB.DeleteAllocationBucket(alloc.ID)) + require.NoError(t, conf.StateDB.PutAllocation(alloc)) + + ar, err = NewAllocRunner(conf) + require.NoError(t, err) + ar.Restore() + require.True(t, ar.shouldWaitForServers(nil)) +}