-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor blocking/migrating allocations #3007
Merged
+902
−705
Merged
Changes from 7 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
de15046
Only set alloc status if it's not already terminal
schmichael 0f584a0
initial attempt at refactoring blocked/migrating
schmichael 8c18119
switch from alloc blocker to new interface
schmichael 113d8e3
Set failed status instead of panic'ing
schmichael 8253439
Update tests for new blocking/migrating code
schmichael 4f7c5e6
Exit if alloc listener closes
schmichael 537d0e5
Soft fail on migration errors
schmichael 85b9dd9
Move migrating state into prevAllocWatcher
schmichael ce4b919
Return move errors from local Migrate like remote
schmichael 8983bc0
spelling
schmichael d55b4c1
Cleanup comments and return val
schmichael 44b6988
Fix formatting
schmichael File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,7 +80,14 @@ type AllocRunner struct { | |
vaultClient vaultclient.VaultClient | ||
consulClient ConsulServiceAPI | ||
|
||
otherAllocDir *allocdir.AllocDir | ||
prevAlloc prevAllocWatcher | ||
|
||
// blocked and migrating are true when alloc runner is waiting on the | ||
// prevAllocWatcher. Writers must acquire the waitingLock and readers | ||
// should use the helper methods Blocked and Migrating. | ||
blocked bool | ||
migrating bool | ||
waitingLock sync.RWMutex | ||
|
||
ctx context.Context | ||
exitFn context.CancelFunc | ||
|
@@ -149,8 +156,8 @@ type allocRunnerMutableState struct { | |
|
||
// NewAllocRunner is used to create a new allocation context | ||
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater, | ||
alloc *structs.Allocation, vaultClient vaultclient.VaultClient, | ||
consulClient ConsulServiceAPI) *AllocRunner { | ||
alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI, | ||
prevAlloc prevAllocWatcher) *AllocRunner { | ||
|
||
ar := &AllocRunner{ | ||
config: config, | ||
|
@@ -160,6 +167,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, | |
alloc: alloc, | ||
allocID: alloc.ID, | ||
allocBroadcast: cstructs.NewAllocBroadcaster(8), | ||
prevAlloc: prevAlloc, | ||
dirtyCh: make(chan struct{}, 1), | ||
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)), | ||
tasks: make(map[string]*TaskRunner), | ||
|
@@ -476,6 +484,12 @@ func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir { | |
return r.allocDir | ||
} | ||
|
||
// GetListener returns a listener for updates broadcast by this alloc runner. | ||
// Callers are responsible for calling Close on their Listener. | ||
func (r *AllocRunner) GetListener() *cstructs.AllocListener { | ||
return r.allocBroadcast.Listen() | ||
} | ||
|
||
// copyTaskStates returns a copy of the passed task states. | ||
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState { | ||
copy := make(map[string]*structs.TaskState, len(states)) | ||
|
@@ -742,33 +756,59 @@ func (r *AllocRunner) Run() { | |
return | ||
} | ||
|
||
// Create the execution context | ||
r.allocDirLock.Lock() | ||
// Build allocation directory (idempotent) | ||
if err := r.allocDir.Build(); err != nil { | ||
r.allocDirLock.Lock() | ||
err := r.allocDir.Build() | ||
r.allocDirLock.Unlock() | ||
|
||
if err != nil { | ||
r.logger.Printf("[ERR] client: failed to build task directories: %v", err) | ||
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup)) | ||
r.allocDirLock.Unlock() | ||
return | ||
} | ||
|
||
if r.otherAllocDir != nil { | ||
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil { | ||
r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", r.allocID, err) | ||
// Wait for a previous alloc - if any - to terminate | ||
r.waitingLock.Lock() | ||
r.blocked = true | ||
r.waitingLock.Unlock() | ||
|
||
if err := r.prevAlloc.Wait(r.ctx); err != nil { | ||
if err == context.Canceled { | ||
return | ||
} | ||
if err := r.otherAllocDir.Destroy(); err != nil { | ||
r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err) | ||
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("error while waiting for previous alloc to terminate: %v", err)) | ||
return | ||
} | ||
|
||
r.waitingLock.Lock() | ||
r.blocked = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment in watcher. Would like to not have this |
||
r.migrating = true | ||
r.waitingLock.Unlock() | ||
|
||
// Wait for data to be migrated from a previous alloc if applicable | ||
if err := r.prevAlloc.Migrate(r.ctx, r.allocDir); err != nil { | ||
if err == context.Canceled { | ||
return | ||
} | ||
|
||
// Soft-fail on migration errors | ||
r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err) | ||
} | ||
r.allocDirLock.Unlock() | ||
|
||
r.waitingLock.Lock() | ||
r.migrating = false | ||
r.waitingLock.Unlock() | ||
|
||
// Check if the allocation is in a terminal status. In this case, we don't | ||
// start any of the task runners and directly wait for the destroy signal to | ||
// clean up the allocation. | ||
if alloc.TerminalStatus() { | ||
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.allocID) | ||
// mark this allocation as completed. | ||
r.setStatus(structs.AllocClientStatusComplete, "cancelled running tasks for allocation in terminal state") | ||
// mark this allocation as completed if it is not already in a | ||
// terminal state | ||
if !alloc.Terminated() { | ||
r.setStatus(structs.AllocClientStatusComplete, "canceled running tasks for allocation in terminal state") | ||
} | ||
r.handleDestroy() | ||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID) | ||
return | ||
|
@@ -857,12 +897,6 @@ OUTER: | |
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID) | ||
} | ||
|
||
// SetPreviousAllocDir sets the previous allocation directory of the current | ||
// allocation | ||
func (r *AllocRunner) SetPreviousAllocDir(allocDir *allocdir.AllocDir) { | ||
r.otherAllocDir = allocDir | ||
} | ||
|
||
// destroyTaskRunners destroys the task runners, waits for them to terminate and | ||
// then saves state. | ||
func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) { | ||
|
@@ -945,6 +979,24 @@ func (r *AllocRunner) handleDestroy() { | |
} | ||
} | ||
|
||
// Blocked returns true if this alloc is waiting on a previous allocation to | ||
// terminate. | ||
func (r *AllocRunner) Blocked() bool { | ||
r.waitingLock.RLock() | ||
b := r.blocked | ||
r.waitingLock.RUnlock() | ||
return b | ||
} | ||
|
||
// Migrating returns true if this alloc is migrating data from a previous | ||
// allocation. | ||
func (r *AllocRunner) Migrating() bool { | ||
r.waitingLock.RLock() | ||
m := r.migrating | ||
r.waitingLock.RUnlock() | ||
return m | ||
} | ||
|
||
// Update is used to update the allocation of the context | ||
func (r *AllocRunner) Update(update *structs.Allocation) { | ||
select { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment