Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor blocking/migrating allocations #3007

Merged
merged 12 commits into from
Aug 15, 2017
72 changes: 51 additions & 21 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ type AllocRunner struct {
vaultClient vaultclient.VaultClient
consulClient ConsulServiceAPI

otherAllocDir *allocdir.AllocDir
// prevAlloc allows for Waiting until a previous allocation exits and
// the migrates it data. If sticky volumes aren't used and there's no
// previous allocation a noop implementation is used so it always safe
// to call.
prevAlloc prevAllocWatcher

ctx context.Context
exitFn context.CancelFunc
Expand Down Expand Up @@ -149,8 +153,8 @@ type allocRunnerMutableState struct {

// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {
alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI,
prevAlloc prevAllocWatcher) *AllocRunner {

ar := &AllocRunner{
config: config,
Expand All @@ -160,6 +164,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
alloc: alloc,
allocID: alloc.ID,
allocBroadcast: cstructs.NewAllocBroadcaster(8),
prevAlloc: prevAlloc,
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
tasks: make(map[string]*TaskRunner),
Expand Down Expand Up @@ -476,6 +481,12 @@ func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}

// GetListener returns a listener for updates broadcast by this alloc runner.
// Callers are responsible for calling Close on their Listener.
func (r *AllocRunner) GetListener() *cstructs.AllocListener {
return r.allocBroadcast.Listen()
}

// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
Expand Down Expand Up @@ -742,33 +753,46 @@ func (r *AllocRunner) Run() {
return
}

// Create the execution context
r.allocDirLock.Lock()
// Build allocation directory (idempotent)
if err := r.allocDir.Build(); err != nil {
r.allocDirLock.Lock()
err := r.allocDir.Build()
r.allocDirLock.Unlock()

if err != nil {
r.logger.Printf("[ERR] client: failed to build task directories: %v", err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
r.allocDirLock.Unlock()
return
}

if r.otherAllocDir != nil {
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", r.allocID, err)
// Wait for a previous alloc - if any - to terminate
if err := r.prevAlloc.Wait(r.ctx); err != nil {
if err == context.Canceled {
return
}
if err := r.otherAllocDir.Destroy(); err != nil {
r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("error while waiting for previous alloc to terminate: %v", err))
return
}

// Wait for data to be migrated from a previous alloc if applicable
if err := r.prevAlloc.Migrate(r.ctx, r.allocDir); err != nil {
if err == context.Canceled {
return
}

// Soft-fail on migration errors
r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err)
}
r.allocDirLock.Unlock()

// Check if the allocation is in a terminal status. In this case, we don't
// start any of the task runners and directly wait for the destroy signal to
// clean up the allocation.
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.allocID)
// mark this allocation as completed.
r.setStatus(structs.AllocClientStatusComplete, "cancelled running tasks for allocation in terminal state")
// mark this allocation as completed if it is not already in a
// terminal state
if !alloc.Terminated() {
r.setStatus(structs.AllocClientStatusComplete, "canceled running tasks for allocation in terminal state")
}
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
return
Expand Down Expand Up @@ -857,12 +881,6 @@ OUTER:
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
}

// SetPreviousAllocDir sets the previous allocation directory of the current
// allocation
func (r *AllocRunner) SetPreviousAllocDir(allocDir *allocdir.AllocDir) {
r.otherAllocDir = allocDir
}

// destroyTaskRunners destroys the task runners, waits for them to terminate and
// then saves state.
func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
Expand Down Expand Up @@ -945,6 +963,18 @@ func (r *AllocRunner) handleDestroy() {
}
}

// IsWaiting returns true if this alloc is waiting on a previous allocation to
// terminate.
func (r *AllocRunner) IsWaiting() bool {
return r.prevAlloc.IsWaiting()
}

// IsMigrating returns true if this alloc is migrating data from a previous
// allocation.
func (r *AllocRunner) IsMigrating() bool {
return r.prevAlloc.IsMigrating()
}

// Update is used to update the allocation of the context
func (r *AllocRunner) Update(update *structs.Allocation) {
select {
Expand Down
51 changes: 32 additions & 19 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient())
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient(), noopPrevAlloc{})
return upd, ar
}

Expand Down Expand Up @@ -640,9 +640,10 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {

// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2)
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -733,9 +734,11 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
defer ar.allocLock.Unlock()

// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
ar2.logger = prefixedTestLogger("ar2: ")
l2 := prefixedTestLogger("ar2: ")
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2)
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -846,8 +849,10 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
}

// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
l2 := prefixedTestLogger("ar2: ")
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2)
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -1000,7 +1005,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
alloc.Job.Type = structs.JobTypeBatch
vclient := vaultclient.NewMockVaultClient()
cclient := newMockConsulServiceClient()
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient)
ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient, noopPrevAlloc{})
defer ar.Destroy()

// RestoreState should fail on the task state since we only test the
Expand Down Expand Up @@ -1252,6 +1257,9 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
})
}

// TestAllocRunner_MoveAllocDir asserts that a file written to an alloc's
// local/ dir will be moved to a replacement alloc's local/ dir if sticky
// volumes is on.
func TestAllocRunner_MoveAllocDir(t *testing.T) {
t.Parallel()
// Create an alloc runner
Expand Down Expand Up @@ -1286,19 +1294,24 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
ioutil.WriteFile(taskLocalFile, []byte("good bye world"), os.ModePerm)

// Create another alloc runner
alloc1 := mock.Alloc()
task = alloc1.Job.TaskGroups[0].Tasks[0]
alloc2 := mock.Alloc()
alloc2.PreviousAllocation = ar.allocID
alloc2.Job.TaskGroups[0].EphemeralDisk.Sticky = true
task = alloc2.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1s",
}
upd1, ar1 := testAllocRunnerFromAlloc(alloc1, false)
ar1.SetPreviousAllocDir(ar.allocDir)
go ar1.Run()
defer ar1.Destroy()
upd2, ar2 := testAllocRunnerFromAlloc(alloc2, false)

// Set prevAlloc like Client does
ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger)

go ar2.Run()
defer ar2.Destroy()

testutil.WaitForResult(func() (bool, error) {
_, last := upd1.Last()
_, last := upd2.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
Expand All @@ -1310,14 +1323,14 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
t.Fatalf("err: %v", err)
})

// Ensure that data from ar1 was moved to ar
taskDir = ar1.allocDir.TaskDirs[task.Name]
// Ensure that data from ar was moved to ar2
taskDir = ar2.allocDir.TaskDirs[task.Name]
taskLocalFile = filepath.Join(taskDir.LocalDir, "local_file")
if fileInfo, _ := os.Stat(taskLocalFile); fileInfo == nil {
t.Fatalf("file %v not found", taskLocalFile)
}

dataFile = filepath.Join(ar1.allocDir.SharedDir, "data", "data_file")
dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file")
if fileInfo, _ := os.Stat(dataFile); fileInfo == nil {
t.Fatalf("file %v not found", dataFile)
}
Expand Down
Loading