diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 18ed0d658c7..f628ef5017a 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -141,11 +141,6 @@ type allocRunner struct { // servers have been contacted for the first time in case of a failed // restore. serversContactedCh chan struct{} - - // waitOnServers defaults to false but will be set true if a restore - // fails and the Run method should wait until serversContactedCh is - // closed. - waitOnServers bool } // NewAllocRunner returns a new allocation runner. @@ -248,16 +243,6 @@ func (ar *allocRunner) Run() { // Start the alloc update handler go ar.handleAllocUpdates() - if ar.waitOnServers { - ar.logger.Info(" waiting to contact server before restarting") - select { - case <-ar.taskStateUpdateHandlerCh: - return - case <-ar.serversContactedCh: - ar.logger.Info("server contacted; unblocking waiting alloc") - } - } - // If task update chan has been closed, that means we've been shutdown. select { case <-ar.taskStateUpdateHandlerCh: @@ -368,50 +353,9 @@ func (ar *allocRunner) Restore() error { } } - ar.waitOnServers = ar.shouldWaitForServers(ds) return nil } -// shouldWaitForServers returns true if we suspect the alloc -// is potentially a completed alloc that got resurrected after AR was destroyed. -// In such cases, rerunning the alloc can lead to process and task exhaustion. -// -// The heaurstic used here is an alloc is suspect if it's in a pending state -// and no other task/status info is found. -// -// See: -// * https://github.com/hashicorp/nomad/pull/6207 -// * https://github.com/hashicorp/nomad/issues/5984 -// -// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported -func (ar *allocRunner) shouldWaitForServers(ds *structs.AllocDeploymentStatus) bool { - alloc := ar.Alloc() - - if alloc.ClientStatus != structs.AllocClientStatusPending { - return false - } - - // check if we restore a task but see no other data - if ds != nil { - return false - } - - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg == nil { - // corrupt alloc?! - return true - } - - for _, task := range tg.Tasks { - ls, tr, _ := ar.stateDB.GetTaskRunnerState(alloc.ID, task.Name) - if ls != nil || tr != nil { - return false - } - } - - return true -} - // persistDeploymentStatus stores AllocDeploymentStatus. func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) { if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 047ce06c11d..548602f1ebb 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1059,44 +1059,3 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) { require.NoError(t, err) require.Nil(t, ts) } - -// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported -func TestAllocRunner_WaitForServer_Detects_Suspicious_Allocs(t *testing.T) { - t.Parallel() - alloc := mock.BatchAlloc() - - conf, cleanup := testAllocRunnerConfig(t, alloc) - conf.StateDB = state.NewMemDB(conf.Logger) - - defer cleanup() - ar, err := NewAllocRunner(conf) - require.NoError(t, err) - defer destroy(ar) - - defer destroy(ar) - go ar.Run() - - select { - case <-ar.WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "timed out waiting for alloc to complete") - } - - // shouldn't wait after successful completion - require.False(t, ar.shouldWaitForServers(nil)) - - // new alloc runner shouldn't restore completed alloc - ar, err = NewAllocRunner(conf) - require.NoError(t, err) - ar.Restore() - require.False(t, ar.shouldWaitForServers(nil)) - - // simulate 0.9.5 behavior - require.NoError(t, conf.StateDB.DeleteAllocationBucket(alloc.ID)) - require.NoError(t, conf.StateDB.PutAllocation(alloc)) - - ar, err = NewAllocRunner(conf) - require.NoError(t, err) - ar.Restore() - require.True(t, ar.shouldWaitForServers(nil)) -} diff --git a/client/client.go b/client/client.go index ce6c1f2afcd..b9eba48e817 100644 --- a/client/client.go +++ b/client/client.go @@ -14,11 +14,11 @@ import ( "sync" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-multierror" + hclog "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner" "github.com/hashicorp/nomad/client/allocrunner/interfaces" @@ -1006,6 +1006,15 @@ func (c *Client) restoreState() error { // Load each alloc back for _, alloc := range allocs { + // COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported + // See isPotentiallyCompletedAlloc for details. Skipping suspicious allocs + // now. If allocs should be run, they will be started when the client + // gets allocs from servers. + if c.isPotentiallyCompletedAlloc(alloc) { + c.logger.Warn("found a alloc that may have been completed already, skipping restore", "alloc_id", alloc.ID) + continue + } + //XXX On Restore we give up on watching previous allocs because // we need the local AllocRunners initialized first. We could // add a second loop to initialize just the alloc watcher. @@ -1062,6 +1071,44 @@ func (c *Client) restoreState() error { return nil } +// isPotentiallyCompletedAlloc returns true if we suspect the alloc +// is potentially a completed alloc that got resurrected after AR was destroyed. +// In such cases, rerunning the alloc can lead to process and task exhaustion. +// +// The heuristic used here is an alloc is suspect if we see no other information +// and no other task/status info is found. +// +// See: +// * https://github.com/hashicorp/nomad/pull/6207 +// * https://github.com/hashicorp/nomad/issues/5984 +// +// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported +func (c *Client) isPotentiallyCompletedAlloc(alloc *structs.Allocation) bool { + if alloc.ClientStatus != structs.AllocClientStatusPending { + return false + } + + ds, _ := c.stateDB.GetDeploymentStatus(alloc.ID) + if ds != nil { + return false + } + + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil { + // corrupt alloc?! + return true + } + + for _, task := range tg.Tasks { + ls, tr, _ := c.stateDB.GetTaskRunnerState(alloc.ID, task.Name) + if ls != nil || tr != nil { + return false + } + } + + return true +} + func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) { c.invalidAllocsLock.Lock() c.invalidAllocs[alloc.ID] = struct{}{} diff --git a/client/client_test.go b/client/client_test.go index d9b2013b56f..b324bc78753 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -10,10 +10,12 @@ import ( "testing" "time" - "github.com/hashicorp/go-memdb" + memdb "github.com/hashicorp/go-memdb" + trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" "github.com/hashicorp/nomad/client/config" consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/pluginutils/catalog" "github.com/hashicorp/nomad/helper/testlog" @@ -27,7 +29,7 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" - "github.com/hashicorp/go-hclog" + hclog "github.com/hashicorp/go-hclog" cstate "github.com/hashicorp/nomad/client/state" ctestutil "github.com/hashicorp/nomad/client/testutil" "github.com/stretchr/testify/require" @@ -1644,3 +1646,44 @@ func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) { assert.EqualValues(t, n, un) } } + +// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported +func TestClient_Restore_PotentiallyCompletedAlloc(t *testing.T) { + t.Parallel() + + c, cleanup := TestClient(t, nil) + defer cleanup() + + c.stateDB = state.NewMemDB(c.logger) + + t.Run("plain alloc", func(t *testing.T) { + alloc := mock.BatchAlloc() + c.stateDB.PutAllocation(alloc) + + require.True(t, c.isPotentiallyCompletedAlloc(alloc)) + }) + + t.Run("alloc with a task with local state", func(t *testing.T) { + alloc := mock.BatchAlloc() + taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name + ls := &trstate.LocalState{} + + c.stateDB.PutAllocation(alloc) + c.stateDB.PutTaskRunnerLocalState(alloc.ID, taskName, ls) + + require.False(t, c.isPotentiallyCompletedAlloc(alloc)) + }) + + t.Run("alloc with a task with local state", func(t *testing.T) { + alloc := mock.BatchAlloc() + taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name + ts := &structs.TaskState{ + State: structs.TaskStateRunning, + } + + c.stateDB.PutAllocation(alloc) + c.stateDB.PutTaskState(alloc.ID, taskName, ts) + + require.False(t, c.isPotentiallyCompletedAlloc(alloc)) + }) +}