Skip to content

Commit

Permalink
Alternative approach: avoid restoring
Browse files Browse the repository at this point in the history
This uses an alternative approach where we avoid restoring the alloc
runner in the first place, if we suspect that the alloc may have been
completed already.
  • Loading branch information
Mahmood Ali committed Aug 27, 2019
1 parent cbc521e commit 493945a
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 102 deletions.
56 changes: 0 additions & 56 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,6 @@ type allocRunner struct {
// servers have been contacted for the first time in case of a failed
// restore.
serversContactedCh chan struct{}

// waitOnServers defaults to false but will be set true if a restore
// fails and the Run method should wait until serversContactedCh is
// closed.
waitOnServers bool
}

// NewAllocRunner returns a new allocation runner.
Expand Down Expand Up @@ -248,16 +243,6 @@ func (ar *allocRunner) Run() {
// Start the alloc update handler
go ar.handleAllocUpdates()

if ar.waitOnServers {
ar.logger.Info(" waiting to contact server before restarting")
select {
case <-ar.taskStateUpdateHandlerCh:
return
case <-ar.serversContactedCh:
ar.logger.Info("server contacted; unblocking waiting alloc")
}
}

// If task update chan has been closed, that means we've been shutdown.
select {
case <-ar.taskStateUpdateHandlerCh:
Expand Down Expand Up @@ -368,50 +353,9 @@ func (ar *allocRunner) Restore() error {
}
}

ar.waitOnServers = ar.shouldWaitForServers(ds)
return nil
}

// shouldWaitForServers returns true if we suspect the alloc
// is potentially a completed alloc that got resurrected after AR was destroyed.
// In such cases, rerunning the alloc can lead to process and task exhaustion.
//
// The heaurstic used here is an alloc is suspect if it's in a pending state
// and no other task/status info is found.
//
// See:
// * https://github.com/hashicorp/nomad/pull/6207
// * https://github.com/hashicorp/nomad/issues/5984
//
// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
func (ar *allocRunner) shouldWaitForServers(ds *structs.AllocDeploymentStatus) bool {
alloc := ar.Alloc()

if alloc.ClientStatus != structs.AllocClientStatusPending {
return false
}

// check if we restore a task but see no other data
if ds != nil {
return false
}

tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
// corrupt alloc?!
return true
}

for _, task := range tg.Tasks {
ls, tr, _ := ar.stateDB.GetTaskRunnerState(alloc.ID, task.Name)
if ls != nil || tr != nil {
return false
}
}

return true
}

// persistDeploymentStatus stores AllocDeploymentStatus.
func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) {
if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil {
Expand Down
41 changes: 0 additions & 41 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,44 +1059,3 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
require.NoError(t, err)
require.Nil(t, ts)
}

// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
func TestAllocRunner_WaitForServer_Detects_Suspicious_Allocs(t *testing.T) {
t.Parallel()
alloc := mock.BatchAlloc()

conf, cleanup := testAllocRunnerConfig(t, alloc)
conf.StateDB = state.NewMemDB(conf.Logger)

defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)

defer destroy(ar)
go ar.Run()

select {
case <-ar.WaitCh():
case <-time.After(10 * time.Second):
require.Fail(t, "timed out waiting for alloc to complete")
}

// shouldn't wait after successful completion
require.False(t, ar.shouldWaitForServers(nil))

// new alloc runner shouldn't restore completed alloc
ar, err = NewAllocRunner(conf)
require.NoError(t, err)
ar.Restore()
require.False(t, ar.shouldWaitForServers(nil))

// simulate 0.9.5 behavior
require.NoError(t, conf.StateDB.DeleteAllocationBucket(alloc.ID))
require.NoError(t, conf.StateDB.PutAllocation(alloc))

ar, err = NewAllocRunner(conf)
require.NoError(t, err)
ar.Restore()
require.True(t, ar.shouldWaitForServers(nil))
}
53 changes: 50 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand Down Expand Up @@ -1006,6 +1006,15 @@ func (c *Client) restoreState() error {
// Load each alloc back
for _, alloc := range allocs {

// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
// See isPotentiallyCompletedAlloc for details. Skipping suspicious allocs
// now. If allocs should be run, they will be started when the client
// gets allocs from servers.
if c.isPotentiallyCompletedAlloc(alloc) {
c.logger.Warn("found a alloc that may have been completed already, skipping restore", "alloc_id", alloc.ID)
continue
}

//XXX On Restore we give up on watching previous allocs because
// we need the local AllocRunners initialized first. We could
// add a second loop to initialize just the alloc watcher.
Expand Down Expand Up @@ -1062,6 +1071,44 @@ func (c *Client) restoreState() error {
return nil
}

// isPotentiallyCompletedAlloc returns true if we suspect the alloc
// is potentially a completed alloc that got resurrected after AR was destroyed.
// In such cases, rerunning the alloc can lead to process and task exhaustion.
//
// The heuristic used here is an alloc is suspect if we see no other information
// and no other task/status info is found.
//
// See:
// * https://github.com/hashicorp/nomad/pull/6207
// * https://github.com/hashicorp/nomad/issues/5984
//
// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
func (c *Client) isPotentiallyCompletedAlloc(alloc *structs.Allocation) bool {
if alloc.ClientStatus != structs.AllocClientStatusPending {
return false
}

ds, _ := c.stateDB.GetDeploymentStatus(alloc.ID)
if ds != nil {
return false
}

tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
// corrupt alloc?!
return true
}

for _, task := range tg.Tasks {
ls, tr, _ := c.stateDB.GetTaskRunnerState(alloc.ID, task.Name)
if ls != nil || tr != nil {
return false
}
}

return true
}

func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) {
c.invalidAllocsLock.Lock()
c.invalidAllocs[alloc.ID] = struct{}{}
Expand Down
47 changes: 45 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"testing"
"time"

"github.com/hashicorp/go-memdb"
memdb "github.com/hashicorp/go-memdb"
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
"github.com/hashicorp/nomad/helper/testlog"
Expand All @@ -27,7 +29,7 @@ import (
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"

"github.com/hashicorp/go-hclog"
hclog "github.com/hashicorp/go-hclog"
cstate "github.com/hashicorp/nomad/client/state"
ctestutil "github.com/hashicorp/nomad/client/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1644,3 +1646,44 @@ func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) {
assert.EqualValues(t, n, un)
}
}

// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
func TestClient_Restore_PotentiallyCompletedAlloc(t *testing.T) {
t.Parallel()

c, cleanup := TestClient(t, nil)
defer cleanup()

c.stateDB = state.NewMemDB(c.logger)

t.Run("plain alloc", func(t *testing.T) {
alloc := mock.BatchAlloc()
c.stateDB.PutAllocation(alloc)

require.True(t, c.isPotentiallyCompletedAlloc(alloc))
})

t.Run("alloc with a task with local state", func(t *testing.T) {
alloc := mock.BatchAlloc()
taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name
ls := &trstate.LocalState{}

c.stateDB.PutAllocation(alloc)
c.stateDB.PutTaskRunnerLocalState(alloc.ID, taskName, ls)

require.False(t, c.isPotentiallyCompletedAlloc(alloc))
})

t.Run("alloc with a task with local state", func(t *testing.T) {
alloc := mock.BatchAlloc()
taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name
ts := &structs.TaskState{
State: structs.TaskStateRunning,
}

c.stateDB.PutAllocation(alloc)
c.stateDB.PutTaskState(alloc.ID, taskName, ts)

require.False(t, c.isPotentiallyCompletedAlloc(alloc))
})
}

0 comments on commit 493945a

Please sign in to comment.