Skip to content

Commit

Permalink
Merge pull request #3007 from hashicorp/b-blocking-refactor-2
Browse files Browse the repository at this point in the history
Refactor blocking/migrating allocations
  • Loading branch information
schmichael authored Aug 15, 2017
2 parents 6bf1ea5 + 44b6988 commit c459b6a
Show file tree
Hide file tree
Showing 9 changed files with 902 additions and 705 deletions.
72 changes: 51 additions & 21 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ type AllocRunner struct {
vaultClient vaultclient.VaultClient
consulClient ConsulServiceAPI

otherAllocDir *allocdir.AllocDir
// prevAlloc allows for Waiting until a previous allocation exits and
// the migrates it data. If sticky volumes aren't used and there's no
// previous allocation a noop implementation is used so it always safe
// to call.
prevAlloc prevAllocWatcher

ctx context.Context
exitFn context.CancelFunc
Expand Down Expand Up @@ -149,8 +153,8 @@ type allocRunnerMutableState struct {

// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {
alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI,
prevAlloc prevAllocWatcher) *AllocRunner {

ar := &AllocRunner{
config: config,
Expand All @@ -160,6 +164,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
alloc: alloc,
allocID: alloc.ID,
allocBroadcast: cstructs.NewAllocBroadcaster(8),
prevAlloc: prevAlloc,
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
tasks: make(map[string]*TaskRunner),
Expand Down Expand Up @@ -476,6 +481,12 @@ func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}

// GetListener returns a listener for updates broadcast by this alloc runner.
// Callers are responsible for calling Close on their Listener.
func (r *AllocRunner) GetListener() *cstructs.AllocListener {
return r.allocBroadcast.Listen()
}

// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
Expand Down Expand Up @@ -742,33 +753,46 @@ func (r *AllocRunner) Run() {
return
}

// Create the execution context
r.allocDirLock.Lock()
// Build allocation directory (idempotent)
if err := r.allocDir.Build(); err != nil {
r.allocDirLock.Lock()
err := r.allocDir.Build()
r.allocDirLock.Unlock()

if err != nil {
r.logger.Printf("[ERR] client: failed to build task directories: %v", err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
r.allocDirLock.Unlock()
return
}

if r.otherAllocDir != nil {
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", r.allocID, err)
// Wait for a previous alloc - if any - to terminate
if err := r.prevAlloc.Wait(r.ctx); err != nil {
if err == context.Canceled {
return
}
if err := r.otherAllocDir.Destroy(); err != nil {
r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("error while waiting for previous alloc to terminate: %v", err))
return
}

// Wait for data to be migrated from a previous alloc if applicable
if err := r.prevAlloc.Migrate(r.ctx, r.allocDir); err != nil {
if err == context.Canceled {
return
}

// Soft-fail on migration errors
r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err)
}
r.allocDirLock.Unlock()

// Check if the allocation is in a terminal status. In this case, we don't
// start any of the task runners and directly wait for the destroy signal to
// clean up the allocation.
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.allocID)
// mark this allocation as completed.
r.setStatus(structs.AllocClientStatusComplete, "cancelled running tasks for allocation in terminal state")
// mark this allocation as completed if it is not already in a
// terminal state
if !alloc.Terminated() {
r.setStatus(structs.AllocClientStatusComplete, "canceled running tasks for allocation in terminal state")
}
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
return
Expand Down Expand Up @@ -857,12 +881,6 @@ OUTER:
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
}

// SetPreviousAllocDir sets the previous allocation directory of the current
// allocation
func (r *AllocRunner) SetPreviousAllocDir(allocDir *allocdir.AllocDir) {
r.otherAllocDir = allocDir
}

// destroyTaskRunners destroys the task runners, waits for them to terminate and
// then saves state.
func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
Expand Down Expand Up @@ -945,6 +963,18 @@ func (r *AllocRunner) handleDestroy() {
}
}

// IsWaiting returns true if this alloc is waiting on a previous allocation to
// terminate.
func (r *AllocRunner) IsWaiting() bool {
return r.prevAlloc.IsWaiting()
}

// IsMigrating returns true if this alloc is migrating data from a previous
// allocation.
func (r *AllocRunner) IsMigrating() bool {
return r.prevAlloc.IsMigrating()
}

// Update is used to update the allocation of the context
func (r *AllocRunner) Update(update *structs.Allocation) {
select {
Expand Down
51 changes: 32 additions & 19 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient())
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient(), noopPrevAlloc{})
return upd, ar
}

Expand Down Expand Up @@ -640,9 +640,10 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {

// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2)
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -733,9 +734,11 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
defer ar.allocLock.Unlock()

// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
ar2.logger = prefixedTestLogger("ar2: ")
l2 := prefixedTestLogger("ar2: ")
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2)
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -846,8 +849,10 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
}

// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
l2 := prefixedTestLogger("ar2: ")
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2)
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -1000,7 +1005,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
alloc.Job.Type = structs.JobTypeBatch
vclient := vaultclient.NewMockVaultClient()
cclient := newMockConsulServiceClient()
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient)
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient, noopPrevAlloc{})
defer ar.Destroy()

// RestoreState should fail on the task state since we only test the
Expand Down Expand Up @@ -1252,6 +1257,9 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
})
}

// TestAllocRunner_MoveAllocDir asserts that a file written to an alloc's
// local/ dir will be moved to a replacement alloc's local/ dir if sticky
// volumes is on.
func TestAllocRunner_MoveAllocDir(t *testing.T) {
t.Parallel()
// Create an alloc runner
Expand Down Expand Up @@ -1286,19 +1294,24 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
ioutil.WriteFile(taskLocalFile, []byte("good bye world"), os.ModePerm)

// Create another alloc runner
alloc1 := mock.Alloc()
task = alloc1.Job.TaskGroups[0].Tasks[0]
alloc2 := mock.Alloc()
alloc2.PreviousAllocation = ar.allocID
alloc2.Job.TaskGroups[0].EphemeralDisk.Sticky = true
task = alloc2.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1s",
}
upd1, ar1 := testAllocRunnerFromAlloc(alloc1, false)
ar1.SetPreviousAllocDir(ar.allocDir)
go ar1.Run()
defer ar1.Destroy()
upd2, ar2 := testAllocRunnerFromAlloc(alloc2, false)

// Set prevAlloc like Client does
ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger)

go ar2.Run()
defer ar2.Destroy()

testutil.WaitForResult(func() (bool, error) {
_, last := upd1.Last()
_, last := upd2.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
Expand All @@ -1310,14 +1323,14 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
t.Fatalf("err: %v", err)
})

// Ensure that data from ar1 was moved to ar
taskDir = ar1.allocDir.TaskDirs[task.Name]
// Ensure that data from ar was moved to ar2
taskDir = ar2.allocDir.TaskDirs[task.Name]
taskLocalFile = filepath.Join(taskDir.LocalDir, "local_file")
if fileInfo, _ := os.Stat(taskLocalFile); fileInfo == nil {
t.Fatalf("file %v not found", taskLocalFile)
}

dataFile = filepath.Join(ar1.allocDir.SharedDir, "data", "data_file")
dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file")
if fileInfo, _ := os.Stat(dataFile); fileInfo == nil {
t.Fatalf("file %v not found", dataFile)
}
Expand Down
Loading

0 comments on commit c459b6a

Please sign in to comment.