From de15046cd64f0d34b1ba312b03f51391207601d2 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 3 Aug 2017 17:37:27 -0700 Subject: [PATCH 01/12] Only set alloc status if it's not already terminal --- client/alloc_runner.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 3782f603941..50e25e19cda 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -767,8 +767,14 @@ func (r *AllocRunner) Run() { // clean up the allocation. if alloc.TerminalStatus() { r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.allocID) - // mark this allocation as completed. - r.setStatus(structs.AllocClientStatusComplete, "cancelled running tasks for allocation in terminal state") + // mark this allocation as completed if it is not already in a + // terminal state + r.allocLock.Lock() + terminal := r.alloc.Terminated() + r.allocLock.Unlock() + if !terminal { + r.setStatus(structs.AllocClientStatusComplete, "canceled running tasks for allocation in terminal state") + } r.handleDestroy() r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID) return From 0f584a0143bb38a6c519ce07b6045feb9f96a790 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 8 Aug 2017 14:50:14 -0700 Subject: [PATCH 02/12] initial attempt at refactoring blocked/migrating --- client/alloc_blocker.go | 185 ++++++++++++++++++++++++++++++++++++++++ client/alloc_runner.go | 82 ++++++++++++++---- client/client.go | 87 +++++++------------ client/gc.go | 15 ++-- 4 files changed, 286 insertions(+), 83 deletions(-) create mode 100644 client/alloc_blocker.go diff --git a/client/alloc_blocker.go b/client/alloc_blocker.go new file mode 100644 index 00000000000..0ec74ea60c9 --- /dev/null +++ b/client/alloc_blocker.go @@ -0,0 +1,185 @@ +package client + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/nomad/nomad/structs" +) + +// allocGetter is able to retrieve local and remote allocs. +type allocGetter interface { + // GetClientAlloc returns the alloc if an alloc ID is found locally, + // otherwise an error. + GetClientAlloc(allocID string) (*structs.Allocation, error) + + // RPC allows retrieving remote allocs. + RPC(method string, args interface{}, reply interface{}) error +} + +type allocBlocker struct { + // blocking is a map of allocs being watched to chans to signal their + // termination and optionally the node they were running on. + blocking map[string]chan string + blockingLock sync.Mutex + + // allocs is used to retrieve local and remote allocs + allocs allocGetter + + // region for making rpc calls + region string + + logger *log.Logger +} + +func newAllocBlocker(l *log.Logger, allocs allocGetter, region string) *allocBlocker { + return &allocBlocker{ + blocking: make(map[string]chan string), + allocs: allocs, + region: region, + logger: l, + } +} + +// allocTerminated marks a local allocation as terminated or GC'd. +func (a *allocBlocker) allocTerminated(allocID string) { + a.blockingLock.Lock() + defer a.blockingLock.Unlock() + if ch, ok := a.blocking[allocID]; ok { + //TODO(schmichael) REMOVE + a.logger.Printf("[TRACE] client: XXX closing and deleting terminated blocking alloc %q", allocID) + ch <- "" + delete(a.blocking, allocID) + } else { + //TODO(schmichael) REMOVE + a.logger.Printf("[TRACE] client: XXX no waiting on terminated alloc %q", allocID) + } +} + +// BlockOnAlloc blocks on an alloc terminating. +func (a *allocBlocker) BlockOnAlloc(ctx context.Context, allocID string) (string, error) { + // Register an intent to block until an alloc exists to prevent races + // between checking to see if it has already exited and waiting for it + // to exit + terminatedCh, err := a.watch(allocID) + if err != nil { + return "", err + } + + if alloc, err := a.allocs.GetClientAlloc(allocID); err == nil { + // Local alloc, return early if already terminated + if alloc.Terminated() { + return "", nil + } + } else { + // Remote alloc, setup blocking rpc call + go a.watchRemote(ctx, allocID) + } + + select { + case node := <-terminatedCh: + a.logger.Printf("[DEBUG] client: blocking alloc %q exited", allocID) + //TODO migrate?! + return node, nil + case <-ctx.Done(): + return "", ctx.Err() + } +} + +// watch for an alloc to terminate. Returns an error if there's already a +// watcher as blocked allocs to blockers should be 1:1. +func (a *allocBlocker) watch(allocID string) (<-chan string, error) { + a.blockingLock.Lock() + defer a.blockingLock.Unlock() + + ch, ok := a.blocking[allocID] + if ok { + return nil, fmt.Errorf("multiple blockers on alloc %q", allocID) + } + + ch = make(chan string) + a.blocking[allocID] = ch + return ch +} + +// watch for a non-local alloc to terminate using a blocking rpc call +func (a *allocBlocker) watchRemote(ctx context.Context, allocID string) { + req := structs.AllocSpecificRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Region: a.region, + AllowStale: true, + }, + } + + for { + resp := structs.SingleAllocResponse{} + err := a.allocs.RPC("Alloc.GetAlloc", &req, &resp) + if err != nil { + a.logger.Printf("[ERR] client: failed to query allocation %q: %v", allocID, err) + retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv) + select { + case <-time.After(retry): + continue + case <-ctx.Done(): + return + } + } + if resp.Alloc == nil { + //TODO(schmichael) confirm this assumption + a.logger.Printf("[DEBUG] client: blocking alloc %q has been GC'd", allocID) + a.allocTerminated(allocID, "") + } + if resp.Alloc.Terminated() { + // Terminated! + a.allocTerminated(allocID, resp.Alloc.NodeID) + } + + // Update the query index and requery. + if resp.Index > req.MinQueryIndex { + req.MinQueryIndex = resp.Index + } + } + +} + +// GetNodeAddr gets the node from the server with the given Node ID +func (a *allocBlocker) GetNodeAddr(ctx context.Context, nodeID string) (*structs.Node, error) { + req := structs.NodeSpecificRequest{ + NodeID: nodeID, + QueryOptions: structs.QueryOptions{ + Region: c.region, + AllowStale: true, + }, + } + + resp := structs.SingleNodeResponse{} + for { + err := c.allocs.RPC("Node.GetNode", &req, &resp) + if err != nil { + c.logger.Printf("[ERR] client: failed to query node info %q: %v", nodeID, err) + retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv) + select { + case <-time.After(retry): + continue + case <-ctx.Done(): + return nil, ctx.Err() + } + } + break + } + + if resp.Node == nil { + return nil, fmt.Errorf("node %q not found", nodeID) + } + + scheme := "http://" + if node.TLSEnabled { + scheme = "https://" + } + return scheme + node.HTTPAdrr, nil +} diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 50e25e19cda..6c1c58080cc 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -82,6 +82,9 @@ type AllocRunner struct { otherAllocDir *allocdir.AllocDir + // blocker blocks until a previous allocation has terminated + blocker *allocBlocker + ctx context.Context exitFn context.CancelFunc waitCh chan struct{} @@ -149,8 +152,8 @@ type allocRunnerMutableState struct { // NewAllocRunner is used to create a new allocation context func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater, - alloc *structs.Allocation, vaultClient vaultclient.VaultClient, - consulClient ConsulServiceAPI) *AllocRunner { + alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI, + blocker *allocBlocker, prevAllocDir *allocdir.AllocDir) *AllocRunner { ar := &AllocRunner{ config: config, @@ -160,6 +163,8 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, alloc: alloc, allocID: alloc.ID, allocBroadcast: cstructs.NewAllocBroadcaster(8), + blocker: blocker, + otherAllocDir: prevAllocDir, dirtyCh: make(chan struct{}, 1), allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)), tasks: make(map[string]*TaskRunner), @@ -742,7 +747,6 @@ func (r *AllocRunner) Run() { return } - // Create the execution context r.allocDirLock.Lock() // Build allocation directory (idempotent) if err := r.allocDir.Build(); err != nil { @@ -751,16 +755,40 @@ func (r *AllocRunner) Run() { r.allocDirLock.Unlock() return } + r.allocDirLock.Unlock() - if r.otherAllocDir != nil { - if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil { - r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", r.allocID, err) + // If there was a previous allocation block until it has terminated + if alloc.PreviousAllocation != "" { + //TODO remove me + r.logger.Printf("[TRACE] client: XXXX blocking %q on -> %q", alloc.ID, alloc.PreviousAllocation) + if nodeID, err := r.blocker.BlockOnAlloc(r.ctx, alloc.PreviousAllocation); err != nil { + if err == context.Canceled { + // Exiting + return + } + + // Non-canceled errors are fatal as an invariant has been broken. + r.logger.Printf("[ERR] client: alloc %q encountered an error waiting for alloc %q to terminate: %v", + alloc.ID, alloc.PreviousAllocation, err) + r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("error waiting for alloc %q to terminate: %v", + alloc.PreviousAllocation, err)) + return } - if err := r.otherAllocDir.Destroy(); err != nil { - r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err) + + // Move data if there's a previous alloc dir and sticky volumes is on + if tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky { + if err := r.migrateAllocDir(r.ctx, nodeID); err != nil { + if err == context.Canceled { + // Exiting + return + } + + //TODO(schmichael) task event? + r.logger.Printf("[WARN] client: alloc %q encountered an error migrating data from previous alloc %q: %v", + alloc.ID, alloc.PreviousAllocation, err) + } } } - r.allocDirLock.Unlock() // Check if the allocation is in a terminal status. In this case, we don't // start any of the task runners and directly wait for the destroy signal to @@ -769,10 +797,7 @@ func (r *AllocRunner) Run() { r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.allocID) // mark this allocation as completed if it is not already in a // terminal state - r.allocLock.Lock() - terminal := r.alloc.Terminated() - r.allocLock.Unlock() - if !terminal { + if !alloc.Terminated() { r.setStatus(structs.AllocClientStatusComplete, "canceled running tasks for allocation in terminal state") } r.handleDestroy() @@ -863,10 +888,33 @@ OUTER: r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID) } -// SetPreviousAllocDir sets the previous allocation directory of the current -// allocation -func (r *AllocRunner) SetPreviousAllocDir(allocDir *allocdir.AllocDir) { - r.otherAllocDir = allocDir +// migrateAllocDir handles migrating data from a previous alloc. Only call from +// Run. +func (r *AllocRunner) migrateAllocDir(ctx context.Context, nodeID string) error { + if r.otherAllocDir != nil { + // The other alloc was local, move it over + if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil { + r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", alloc.ID, err) + } + //TODO(schmichael) task event? + if err := r.otherAllocDir.Destroy(); err != nil { + r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err) + } + return nil + } + + // No previous alloc dir set, try to migrate from a remote node + if tg.EphemeralDisk.Migrate { + if nodeID == "" { + return fmt.Errorf("unable to find remote node") + } + + //TODO(schmichael) add a timeout to context? + nodeAddr, err := r.blocker.GetNodeAddr(ctx, nodeID) + if err != nil { + return err + } + } } // destroyTaskRunners destroys the task runners, waits for them to terminate and diff --git a/client/client.go b/client/client.go index 1e81fe79d1f..68750d48129 100644 --- a/client/client.go +++ b/client/client.go @@ -130,6 +130,9 @@ type Client struct { allocs map[string]*AllocRunner allocLock sync.RWMutex + // allocBlocker is used to block until local or remote allocations terminate + allocBlocker *allocBlocker + // blockedAllocations are allocations which are blocked because their // chained allocations haven't finished running blockedAllocations map[string]*structs.Allocation @@ -297,6 +300,10 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic return nil, fmt.Errorf("failed to setup vault client: %v", err) } + // Create an alloc blocker for tracking alloc terminations; must exist + // before NewAllocRunner is called by methods like restoreState and run + c.allocBlocker = newAllocBlocker(c.logger, c, c.config.Region) + // Restore the state if err := c.restoreState(); err != nil { logger.Printf("[ERR] client: failed to restore state: %v", err) @@ -658,7 +665,7 @@ func (c *Client) restoreState() error { alloc := &structs.Allocation{ID: id} c.configLock.RLock() - ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) + ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, c.allocBlocker) c.configLock.RUnlock() c.allocLock.Lock() @@ -1254,36 +1261,20 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) { // If this alloc was blocking another alloc and transitioned to a // terminal state then start the blocked allocation if alloc.Terminated() { - c.blockedAllocsLock.Lock() - blockedAlloc, ok := c.blockedAllocations[alloc.ID] - if ok { - var prevAllocDir *allocdir.AllocDir + // Mark the allocation for GC if it is in terminal state and won't be migrated + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil || tg.EphemeralDisk == nil || !tg.EphemeralDisk.Sticky { if ar, ok := c.getAllocRunners()[alloc.ID]; ok { - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky { - prevAllocDir = ar.GetAllocDir() - } - } - - delete(c.blockedAllocations, blockedAlloc.PreviousAllocation) - c.blockedAllocsLock.Unlock() - - c.logger.Printf("[DEBUG] client: unblocking alloc %q because alloc %q terminated", blockedAlloc.ID, alloc.ID) - - // Need to call addAlloc without holding the lock - if err := c.addAlloc(blockedAlloc, prevAllocDir); err != nil { - c.logger.Printf("[ERR] client: failed to add alloc which was previously blocked %q: %v", - blockedAlloc.ID, err) + c.garbageCollector.MarkForCollection(ar) } - } else { - c.blockedAllocsLock.Unlock() } + } - // Mark the allocation for GC if it is in terminal state - if ar, ok := c.getAllocRunners()[alloc.ID]; ok { - if err := c.garbageCollector.MarkForCollection(ar); err != nil { - c.logger.Printf("[DEBUG] client: couldn't add alloc %q for GC: %v", alloc.ID, err) - } + // If this allocation isn't pending and had a previous allocation, the + // previous allocation is now free to be GC'd. + if alloc.ClientStatus != structs.AllocClientStatusPending && alloc.PreviousAllocation != "" { + if ar, ok := c.getAllocRunners()[alloc.PreviousAllocation]; ok { + c.garbageCollector.MarkForCollection(ar) } } @@ -1919,35 +1910,6 @@ func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAll } } -// getNode gets the node from the server with the given Node ID -func (c *Client) getNode(nodeID string) (*structs.Node, error) { - req := structs.NodeSpecificRequest{ - NodeID: nodeID, - QueryOptions: structs.QueryOptions{ - Region: c.Region(), - AllowStale: true, - }, - } - - resp := structs.SingleNodeResponse{} - for { - err := c.RPC("Node.GetNode", &req, &resp) - if err != nil { - c.logger.Printf("[ERR] client: failed to query node info %q: %v", nodeID, err) - retry := c.retryIntv(getAllocRetryIntv) - select { - case <-time.After(retry): - continue - case <-c.shutdownCh: - return nil, fmt.Errorf("aborting because client is shutting down") - } - } - break - } - - return resp.Node, nil -} - // removeAlloc is invoked when we should remove an allocation func (c *Client) removeAlloc(alloc *structs.Allocation) error { c.allocLock.Lock() @@ -1983,7 +1945,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { } // addAlloc is invoked when we should add an allocation -func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.AllocDir) error { +func (c *Client) addAlloc(alloc *structs.Allocation) error { // Check if we already have an alloc runner c.allocLock.Lock() if _, ok := c.allocs[alloc.ID]; ok { @@ -1992,8 +1954,17 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo return nil } + // If the previous allocation is local, pass in its allocation dir + var prevAllocDir *allocdir.AllocDir + tg := alloc.Job.LookupTaskGroup(add.TaskGroup) + if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky && ar != nil { + if prevAR, ok := c.allocs[alloc.PreviousAllocation]; ok { + prevAllocDir = prevAR.GetAllocDir() + } + } + c.configLock.RLock() - ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService) + ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, c.allocBlocker, prevAllocDir) ar.SetPreviousAllocDir(prevAllocDir) c.configLock.RUnlock() diff --git a/client/gc.go b/client/gc.go index cc2b923144f..728b9c920ac 100644 --- a/client/gc.go +++ b/client/gc.go @@ -297,16 +297,14 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e } // MarkForCollection starts tracking an allocation for Garbage Collection -func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error { - if ar == nil { - return fmt.Errorf("nil allocation runner inserted for garbage collection") - } +func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) { if ar.Alloc() == nil { a.destroyAllocRunner(ar, "alloc is nil") + return } a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) - return a.allocRunners.Push(ar) + a.allocRunners.Push(ar) } // Remove removes an alloc runner without garbage collecting it @@ -383,14 +381,15 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { } } -func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { +// Push an alloc runner into the GC queue +func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) { i.pqLock.Lock() defer i.pqLock.Unlock() alloc := ar.Alloc() if _, ok := i.index[alloc.ID]; ok { // No work to do - return nil + return } gcAlloc := &GCAlloc{ timeStamp: time.Now(), @@ -398,7 +397,7 @@ func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { } i.index[alloc.ID] = gcAlloc heap.Push(&i.heap, gcAlloc) - return nil + return } func (i *IndexedGCAllocPQ) Pop() *GCAlloc { From 8c1811911e99e6980d8092965c059ef8bd0c8358 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 10 Aug 2017 10:56:51 -0700 Subject: [PATCH 03/12] switch from alloc blocker to new interface interface has 3 implementations: 1. local for blocking and moving data locally 2. remote for blocking and moving data from another node 3. noop for allocs that don't need to block --- client/alloc_blocker.go | 185 -------------- client/alloc_runner.go | 117 +++++---- client/alloc_watcher.go | 401 ++++++++++++++++++++++++++++++ client/allocdir/alloc_dir.go | 10 + client/client.go | 459 +++-------------------------------- 5 files changed, 495 insertions(+), 677 deletions(-) delete mode 100644 client/alloc_blocker.go create mode 100644 client/alloc_watcher.go diff --git a/client/alloc_blocker.go b/client/alloc_blocker.go deleted file mode 100644 index 0ec74ea60c9..00000000000 --- a/client/alloc_blocker.go +++ /dev/null @@ -1,185 +0,0 @@ -package client - -import ( - "context" - "fmt" - "log" - "sync" - "time" - - "github.com/hashicorp/consul/lib" - "github.com/hashicorp/nomad/nomad/structs" -) - -// allocGetter is able to retrieve local and remote allocs. -type allocGetter interface { - // GetClientAlloc returns the alloc if an alloc ID is found locally, - // otherwise an error. - GetClientAlloc(allocID string) (*structs.Allocation, error) - - // RPC allows retrieving remote allocs. - RPC(method string, args interface{}, reply interface{}) error -} - -type allocBlocker struct { - // blocking is a map of allocs being watched to chans to signal their - // termination and optionally the node they were running on. - blocking map[string]chan string - blockingLock sync.Mutex - - // allocs is used to retrieve local and remote allocs - allocs allocGetter - - // region for making rpc calls - region string - - logger *log.Logger -} - -func newAllocBlocker(l *log.Logger, allocs allocGetter, region string) *allocBlocker { - return &allocBlocker{ - blocking: make(map[string]chan string), - allocs: allocs, - region: region, - logger: l, - } -} - -// allocTerminated marks a local allocation as terminated or GC'd. -func (a *allocBlocker) allocTerminated(allocID string) { - a.blockingLock.Lock() - defer a.blockingLock.Unlock() - if ch, ok := a.blocking[allocID]; ok { - //TODO(schmichael) REMOVE - a.logger.Printf("[TRACE] client: XXX closing and deleting terminated blocking alloc %q", allocID) - ch <- "" - delete(a.blocking, allocID) - } else { - //TODO(schmichael) REMOVE - a.logger.Printf("[TRACE] client: XXX no waiting on terminated alloc %q", allocID) - } -} - -// BlockOnAlloc blocks on an alloc terminating. -func (a *allocBlocker) BlockOnAlloc(ctx context.Context, allocID string) (string, error) { - // Register an intent to block until an alloc exists to prevent races - // between checking to see if it has already exited and waiting for it - // to exit - terminatedCh, err := a.watch(allocID) - if err != nil { - return "", err - } - - if alloc, err := a.allocs.GetClientAlloc(allocID); err == nil { - // Local alloc, return early if already terminated - if alloc.Terminated() { - return "", nil - } - } else { - // Remote alloc, setup blocking rpc call - go a.watchRemote(ctx, allocID) - } - - select { - case node := <-terminatedCh: - a.logger.Printf("[DEBUG] client: blocking alloc %q exited", allocID) - //TODO migrate?! - return node, nil - case <-ctx.Done(): - return "", ctx.Err() - } -} - -// watch for an alloc to terminate. Returns an error if there's already a -// watcher as blocked allocs to blockers should be 1:1. -func (a *allocBlocker) watch(allocID string) (<-chan string, error) { - a.blockingLock.Lock() - defer a.blockingLock.Unlock() - - ch, ok := a.blocking[allocID] - if ok { - return nil, fmt.Errorf("multiple blockers on alloc %q", allocID) - } - - ch = make(chan string) - a.blocking[allocID] = ch - return ch -} - -// watch for a non-local alloc to terminate using a blocking rpc call -func (a *allocBlocker) watchRemote(ctx context.Context, allocID string) { - req := structs.AllocSpecificRequest{ - AllocID: allocID, - QueryOptions: structs.QueryOptions{ - Region: a.region, - AllowStale: true, - }, - } - - for { - resp := structs.SingleAllocResponse{} - err := a.allocs.RPC("Alloc.GetAlloc", &req, &resp) - if err != nil { - a.logger.Printf("[ERR] client: failed to query allocation %q: %v", allocID, err) - retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv) - select { - case <-time.After(retry): - continue - case <-ctx.Done(): - return - } - } - if resp.Alloc == nil { - //TODO(schmichael) confirm this assumption - a.logger.Printf("[DEBUG] client: blocking alloc %q has been GC'd", allocID) - a.allocTerminated(allocID, "") - } - if resp.Alloc.Terminated() { - // Terminated! - a.allocTerminated(allocID, resp.Alloc.NodeID) - } - - // Update the query index and requery. - if resp.Index > req.MinQueryIndex { - req.MinQueryIndex = resp.Index - } - } - -} - -// GetNodeAddr gets the node from the server with the given Node ID -func (a *allocBlocker) GetNodeAddr(ctx context.Context, nodeID string) (*structs.Node, error) { - req := structs.NodeSpecificRequest{ - NodeID: nodeID, - QueryOptions: structs.QueryOptions{ - Region: c.region, - AllowStale: true, - }, - } - - resp := structs.SingleNodeResponse{} - for { - err := c.allocs.RPC("Node.GetNode", &req, &resp) - if err != nil { - c.logger.Printf("[ERR] client: failed to query node info %q: %v", nodeID, err) - retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv) - select { - case <-time.After(retry): - continue - case <-ctx.Done(): - return nil, ctx.Err() - } - } - break - } - - if resp.Node == nil { - return nil, fmt.Errorf("node %q not found", nodeID) - } - - scheme := "http://" - if node.TLSEnabled { - scheme = "https://" - } - return scheme + node.HTTPAdrr, nil -} diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 6c1c58080cc..922d2e8e276 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -80,10 +80,14 @@ type AllocRunner struct { vaultClient vaultclient.VaultClient consulClient ConsulServiceAPI - otherAllocDir *allocdir.AllocDir + prevAlloc prevAllocWatcher - // blocker blocks until a previous allocation has terminated - blocker *allocBlocker + // blocked and migrating are true when alloc runner is waiting on the + // prevAllocWatcher. Writers must acquire the waitingLock and readers + // should use the helper methods Blocked and Migrating. + blocked bool + migrating bool + waitingLock sync.RWMutex ctx context.Context exitFn context.CancelFunc @@ -153,7 +157,7 @@ type allocRunnerMutableState struct { // NewAllocRunner is used to create a new allocation context func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater, alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI, - blocker *allocBlocker, prevAllocDir *allocdir.AllocDir) *AllocRunner { + prevAlloc prevAllocWatcher) *AllocRunner { ar := &AllocRunner{ config: config, @@ -163,8 +167,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, alloc: alloc, allocID: alloc.ID, allocBroadcast: cstructs.NewAllocBroadcaster(8), - blocker: blocker, - otherAllocDir: prevAllocDir, + prevAlloc: prevAlloc, dirtyCh: make(chan struct{}, 1), allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)), tasks: make(map[string]*TaskRunner), @@ -481,6 +484,12 @@ func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir { return r.allocDir } +// GetListener returns a listener for updates broadcast by this alloc runner. +// Callers are responsible for calling Close on their Listener. +func (r *AllocRunner) GetListener() *cstructs.AllocListener { + return r.allocBroadcast.Listen() +} + // copyTaskStates returns a copy of the passed task states. func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState { copy := make(map[string]*structs.TaskState, len(states)) @@ -757,39 +766,36 @@ func (r *AllocRunner) Run() { } r.allocDirLock.Unlock() - // If there was a previous allocation block until it has terminated - if alloc.PreviousAllocation != "" { - //TODO remove me - r.logger.Printf("[TRACE] client: XXXX blocking %q on -> %q", alloc.ID, alloc.PreviousAllocation) - if nodeID, err := r.blocker.BlockOnAlloc(r.ctx, alloc.PreviousAllocation); err != nil { - if err == context.Canceled { - // Exiting - return - } - - // Non-canceled errors are fatal as an invariant has been broken. - r.logger.Printf("[ERR] client: alloc %q encountered an error waiting for alloc %q to terminate: %v", - alloc.ID, alloc.PreviousAllocation, err) - r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("error waiting for alloc %q to terminate: %v", - alloc.PreviousAllocation, err)) + // Wait for a previous alloc - if any - to terminate + r.waitingLock.Lock() + r.blocked = true + r.waitingLock.Unlock() + if err := r.prevAlloc.Wait(r.ctx); err != nil { + if err == context.Canceled { return } + //TODO(schmichael) + panic("todo") + } - // Move data if there's a previous alloc dir and sticky volumes is on - if tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky { - if err := r.migrateAllocDir(r.ctx, nodeID); err != nil { - if err == context.Canceled { - // Exiting - return - } + r.waitingLock.Lock() + r.blocked = false + r.migrating = true + r.waitingLock.Unlock() - //TODO(schmichael) task event? - r.logger.Printf("[WARN] client: alloc %q encountered an error migrating data from previous alloc %q: %v", - alloc.ID, alloc.PreviousAllocation, err) - } + // Wait for data to be migrated from a previous alloc if applicable + if err := r.prevAlloc.Migrate(r.ctx, r.allocDir); err != nil { + if err == context.Canceled { + return } + //TODO(schmichael) + panic("todo") } + r.waitingLock.Lock() + r.migrating = false + r.waitingLock.Unlock() + // Check if the allocation is in a terminal status. In this case, we don't // start any of the task runners and directly wait for the destroy signal to // clean up the allocation. @@ -888,35 +894,6 @@ OUTER: r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID) } -// migrateAllocDir handles migrating data from a previous alloc. Only call from -// Run. -func (r *AllocRunner) migrateAllocDir(ctx context.Context, nodeID string) error { - if r.otherAllocDir != nil { - // The other alloc was local, move it over - if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil { - r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", alloc.ID, err) - } - //TODO(schmichael) task event? - if err := r.otherAllocDir.Destroy(); err != nil { - r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err) - } - return nil - } - - // No previous alloc dir set, try to migrate from a remote node - if tg.EphemeralDisk.Migrate { - if nodeID == "" { - return fmt.Errorf("unable to find remote node") - } - - //TODO(schmichael) add a timeout to context? - nodeAddr, err := r.blocker.GetNodeAddr(ctx, nodeID) - if err != nil { - return err - } - } -} - // destroyTaskRunners destroys the task runners, waits for them to terminate and // then saves state. func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) { @@ -999,6 +976,24 @@ func (r *AllocRunner) handleDestroy() { } } +// Blocked returns true if this alloc is waiting on a previous allocation to +// terminate. +func (r *AllocRunner) Blocked() bool { + r.waitingLock.RLock() + b := r.blocked + r.waitingLock.RUnlock() + return b +} + +// Migrating returns true if this alloc is migrating data from a previous +// allocation. +func (r *AllocRunner) Migrating() bool { + r.waitingLock.RLock() + m := r.migrating + r.waitingLock.RUnlock() + return m +} + // Update is used to update the allocation of the context func (r *AllocRunner) Update(update *structs.Allocation) { select { diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go new file mode 100644 index 00000000000..9dae47da515 --- /dev/null +++ b/client/alloc_watcher.go @@ -0,0 +1,401 @@ +package client + +import ( + "archive/tar" + "context" + "fmt" + "io" + "log" + "os" + "path/filepath" + "time" + + "github.com/hashicorp/consul/lib" + nomadapi "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/config" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" +) + +type rpcer interface { + // RPC allows retrieving remote allocs. + RPC(method string, args interface{}, reply interface{}) error +} + +type prevAllocWatcher interface { + // Wait for previous alloc to terminate + Wait(context.Context) error + + // Migrate data from previous alloc + Migrate(ctx context.Context, dest *allocdir.AllocDir) error +} + +// newAllocWatcher creates a prevAllocWatcher appropriate for whether this +// allocs previous allocation was local or remote. If this alloc has no +// previous alloc then a noop implementation is returned. +func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher { + if alloc.PreviousAllocation == "" { + // No previous allocation, use noop transitioner + return noopPrevAlloc{} + } + + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + + if prevAR != nil { + // Previous allocation is local, use local transitioner + return &localPrevAlloc{ + allocID: alloc.ID, + prevAllocID: alloc.PreviousAllocation, + tasks: tg.Tasks, + sticky: tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky, + prevAllocDir: prevAR.GetAllocDir(), + prevListener: prevAR.GetListener(), + prevWaitCh: prevAR.WaitCh(), + logger: l, + } + } + + return &remotePrevAlloc{ + allocID: alloc.ID, + prevAllocID: alloc.PreviousAllocation, + tasks: tg.Tasks, + config: config, + migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate, + rpc: rpc, + logger: l, + } +} + +// localPrevAlloc is a prevAllocWatcher for previous allocations on the same +// node as an updated allocation. +type localPrevAlloc struct { + allocID string + prevAllocID string + tasks []*structs.Task + + sticky bool + prevAllocDir *allocdir.AllocDir + prevListener *cstructs.AllocListener + prevWaitCh <-chan struct{} + + logger *log.Logger +} + +// Wait on a local alloc to become terminal, exit, or the context to be done. +func (p *localPrevAlloc) Wait(ctx context.Context) error { + defer p.prevListener.Close() + p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate", p.allocID, p.prevAllocID) + for { + select { + case prevAlloc := <-p.prevListener.Ch: + if prevAlloc.Terminated() { + return nil + } + case <-p.prevWaitCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Migrate from previous local alloc dir to destination alloc dir. +func (p *localPrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) error { + if !p.sticky { + // Not a sticky volume, nothing to migrate + return nil + } + p.logger.Printf("[DEBUG] client: alloc %q copying previous alloc %q", p.allocID, p.prevAllocID) + + if err := dest.Move(p.prevAllocDir, p.tasks); err != nil { + p.logger.Printf("[ERR] client: failed to move previous alloc dir %q: %v", p.prevAllocDir.AllocDir, err) + } + if err := p.prevAllocDir.Destroy(); err != nil { + p.logger.Printf("[ERR] client: error destroying allocdir %v: %v", p.prevAllocDir.AllocDir, err) + } + return nil +} + +// remotePrevAlloc is a prevAllcWatcher for previous allocations on remote +// nodes as an updated allocation. +type remotePrevAlloc struct { + allocID string + prevAllocID string + tasks []*structs.Task + + config *config.Config + migrate bool + rpc rpcer + + // nodeID is the node the previous alloc. Set by Wait() for use in + // Migrate() iff the previous alloc has not already been GC'd. + nodeID string + + logger *log.Logger +} + +func (p *remotePrevAlloc) Wait(ctx context.Context) error { + p.logger.Printf("[DEBUG] client: alloc %q waiting for remote previous alloc %q to terminate", p.allocID, p.prevAllocID) + req := structs.AllocSpecificRequest{ + AllocID: p.prevAllocID, + QueryOptions: structs.QueryOptions{ + Region: p.config.Region, + AllowStale: true, + }, + } + + done := func() bool { + select { + case <-ctx.Done(): + return true + default: + return false + } + } + + for !done() { + resp := structs.SingleAllocResponse{} + err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp) + if err != nil { + p.logger.Printf("[ERR] client: failed to query previous alloc %q: %v", p.prevAllocID, err) + retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv) + select { + case <-time.After(retry): + continue + case <-ctx.Done(): + return ctx.Err() + } + } + if resp.Alloc == nil { + p.logger.Printf("[DEBUG] client: blocking alloc %q has been GC'd", p.prevAllocID) + return nil + } + if resp.Alloc.Terminated() { + // Terminated! + p.nodeID = resp.Alloc.NodeID + return nil + } + + // Update the query index and requery. + if resp.Index > req.MinQueryIndex { + req.MinQueryIndex = resp.Index + } + } + + if done() { + return ctx.Err() + } + + return nil +} + +// Migrate alloc data from a remote node if the new alloc has migration enabled +// and the old alloc hasn't been GC'd. +func (p *remotePrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) error { + if !p.migrate { + // Volume wasn't configured to be migrated, return early + return nil + } + p.logger.Printf("[DEBUG] client: alloc %q copying from remote previous alloc %q", p.allocID, p.prevAllocID) + + if p.nodeID == "" { + // NodeID couldn't be found; likely alloc was GC'd + p.logger.Printf("[WARN] client: alloc %q couldn't migrate data from previous alloc %q; previous alloc may have been GC'd", + p.allocID, p.prevAllocID) + return nil + } + + addr, err := p.getNodeAddr(ctx, p.nodeID) + if err != nil { + return err + } + + prevAllocDir, err := p.migrateAllocDir(ctx, addr) + if err != nil { + return err + } + + if err := dest.Move(prevAllocDir, p.tasks); err != nil { + // cleanup on error + prevAllocDir.Destroy() + return err + } + + if err := prevAllocDir.Destroy(); err != nil { + p.logger.Printf("[ERR] client: error destroying allocdir %q: %v", prevAllocDir.AllocDir, err) + } + return nil +} + +// getNodeAddr gets the node from the server with the given Node ID +func (p *remotePrevAlloc) getNodeAddr(ctx context.Context, nodeID string) (string, error) { + req := structs.NodeSpecificRequest{ + NodeID: nodeID, + QueryOptions: structs.QueryOptions{ + Region: p.config.Region, + AllowStale: true, + }, + } + + resp := structs.SingleNodeResponse{} + for { + err := p.rpc.RPC("Node.GetNode", &req, &resp) + if err != nil { + p.logger.Printf("[ERR] client: failed to query node info %q: %v", nodeID, err) + retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv) + select { + case <-time.After(retry): + continue + case <-ctx.Done(): + return "", ctx.Err() + } + } + break + } + + if resp.Node == nil { + return "", fmt.Errorf("node %q not found", nodeID) + } + + scheme := "http://" + if resp.Node.TLSEnabled { + scheme = "https://" + } + return scheme + resp.Node.HTTPAddr, nil +} + +// migrate a remote alloc dir to local node +func (p *remotePrevAlloc) migrateAllocDir(ctx context.Context, nodeAddr string) (*allocdir.AllocDir, error) { + // Create the previous alloc dir + prevAllocDir := allocdir.NewAllocDir(p.logger, filepath.Join(p.config.AllocDir, p.prevAllocID)) + if err := prevAllocDir.Build(); err != nil { + return nil, fmt.Errorf("error building alloc dir for previous alloc %q: %v", p.prevAllocID, err) + } + + // Create an API client + apiConfig := nomadapi.DefaultConfig() + apiConfig.Address = nodeAddr + apiConfig.TLSConfig = &nomadapi.TLSConfig{ + CACert: p.config.TLSConfig.CAFile, + ClientCert: p.config.TLSConfig.CertFile, + ClientKey: p.config.TLSConfig.KeyFile, + } + apiClient, err := nomadapi.NewClient(apiConfig) + if err != nil { + return nil, err + } + + url := fmt.Sprintf("/v1/client/allocation/%v/snapshot", p.prevAllocID) + resp, err := apiClient.Raw().Response(url, nil) + if err != nil { + prevAllocDir.Destroy() + return nil, fmt.Errorf("error getting snapshot from previous alloc %q: %v", p.prevAllocID, err) + } + + if err := p.streamAllocDir(ctx, resp, prevAllocDir.AllocDir); err != nil { + prevAllocDir.Destroy() + return nil, err + } + + return prevAllocDir, nil +} + +// stream remote alloc to dir to a local path. Caller should cleanup dest on +// error. +func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser, dest string) error { + p.logger.Printf("[DEBUG] client: alloc %q streaming snapshot of previous alloc %q to %q", p.allocID, p.prevAllocID, dest) + tr := tar.NewReader(resp) + defer resp.Close() + + canceled := func() bool { + select { + case <-ctx.Done(): + p.logger.Printf("[INFO] client: stopping migration of previous alloc %q for new alloc: %v", + p.prevAllocID, p.allocID) + return true + default: + return false + } + } + + buf := make([]byte, 1024) + for !canceled() { + // Get the next header + hdr, err := tr.Next() + + // Snapshot has ended + if err == io.EOF { + return nil + } + + if err != nil { + return fmt.Errorf("error streaming previous alloc %p for new alloc %q: %v", + p.prevAllocID, p.allocID, err) + } + + // If the header is for a directory we create the directory + if hdr.Typeflag == tar.TypeDir { + os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode)) + continue + } + // If the header is for a symlink we create the symlink + if hdr.Typeflag == tar.TypeSymlink { + if err = os.Symlink(hdr.Linkname, filepath.Join(dest, hdr.Name)); err != nil { + return fmt.Errorf("error creating symlink: %v", err) + } + continue + } + // If the header is a file, we write to a file + if hdr.Typeflag == tar.TypeReg { + f, err := os.Create(filepath.Join(dest, hdr.Name)) + if err != nil { + return fmt.Errorf("error creating file: %v", err) + } + + // Setting the permissions of the file as the origin. + if err := f.Chmod(os.FileMode(hdr.Mode)); err != nil { + f.Close() + return fmt.Errorf("error chmoding file %v", err) + } + if err := f.Chown(hdr.Uid, hdr.Gid); err != nil { + f.Close() + return fmt.Errorf("error chowning file %v", err) + } + + // We write in chunks so that we can test if the client + // is still alive + for !canceled() { + n, err := tr.Read(buf) + if err != nil { + f.Close() + if err != io.EOF { + return fmt.Errorf("error reading snapshot: %v", err) + } + break + } + if _, err := f.Write(buf[:n]); err != nil { + f.Close() + return fmt.Errorf("error writing to file %q: %v", f.Name(), err) + } + } + + } + } + + if canceled() { + return ctx.Err() + } + + return nil +} + +// noopPrevAlloc does not block or migrate on a previous allocation and never +// returns an error. +type noopPrevAlloc struct{} + +// Wait returns nil immediately. +func (noopPrevAlloc) Wait(context.Context) error { return nil } + +// Migrate returns nil immediately. +func (noopPrevAlloc) Migrate(context.Context, *allocdir.AllocDir) error { return nil } diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index ece467c1b39..a16d35f02d1 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -65,6 +65,9 @@ type AllocDir struct { // TaskDirs is a mapping of task names to their non-shared directory. TaskDirs map[string]*TaskDir + // built is true if Build has successfully run + built bool + logger *log.Logger } @@ -188,6 +191,11 @@ func (d *AllocDir) Snapshot(w io.Writer) error { // Move other alloc directory's shared path and local dir to this alloc dir. func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error { + if !d.built { + // Enfornce the invariant that Build is called before Move + return fmt.Errorf("unable to move to %q - alloc dir is not built", d.AllocDir) + } + // Move the data directory otherDataDir := filepath.Join(other.SharedDir, SharedDataDir) dataDir := filepath.Join(d.SharedDir, SharedDataDir) @@ -296,6 +304,8 @@ func (d *AllocDir) Build() error { } } + // Mark as built + d.built = true return nil } diff --git a/client/client.go b/client/client.go index 68750d48129..bd2125df11c 100644 --- a/client/client.go +++ b/client/client.go @@ -1,10 +1,8 @@ package client import ( - "archive/tar" "errors" "fmt" - "io" "io/ioutil" "log" "net" @@ -20,7 +18,6 @@ import ( consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-multierror" - nomadapi "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" @@ -130,18 +127,6 @@ type Client struct { allocs map[string]*AllocRunner allocLock sync.RWMutex - // allocBlocker is used to block until local or remote allocations terminate - allocBlocker *allocBlocker - - // blockedAllocations are allocations which are blocked because their - // chained allocations haven't finished running - blockedAllocations map[string]*structs.Allocation - blockedAllocsLock sync.RWMutex - - // migratingAllocs is the set of allocs whose data migration is in flight - migratingAllocs map[string]*migrateAllocCtrl - migratingAllocsLock sync.RWMutex - // allocUpdates stores allocations that need to be synced to the server. allocUpdates chan *structs.Allocation @@ -167,34 +152,6 @@ type Client struct { garbageCollector *AllocGarbageCollector } -// migrateAllocCtrl indicates whether migration is complete -type migrateAllocCtrl struct { - alloc *structs.Allocation - ch chan struct{} - closed bool - chLock sync.Mutex -} - -func newMigrateAllocCtrl(alloc *structs.Allocation) *migrateAllocCtrl { - return &migrateAllocCtrl{ - ch: make(chan struct{}), - alloc: alloc, - } -} - -func (m *migrateAllocCtrl) closeCh() { - m.chLock.Lock() - defer m.chLock.Unlock() - - if m.closed { - return - } - - // If channel is not closed then close it - m.closed = true - close(m.ch) -} - var ( // noServersErr is returned by the RPC method when the client has no // configured servers. This is used to trigger Consul discovery if @@ -223,10 +180,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), logger: logger, allocs: make(map[string]*AllocRunner), - blockedAllocations: make(map[string]*structs.Allocation), allocUpdates: make(chan *structs.Allocation, 64), shutdownCh: make(chan struct{}), - migratingAllocs: make(map[string]*migrateAllocCtrl), servers: newServerList(), triggerDiscoveryCh: make(chan struct{}), serversDiscoveredCh: make(chan struct{}), @@ -300,10 +255,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic return nil, fmt.Errorf("failed to setup vault client: %v", err) } - // Create an alloc blocker for tracking alloc terminations; must exist - // before NewAllocRunner is called by methods like restoreState and run - c.allocBlocker = newAllocBlocker(c.logger, c, c.config.Region) - // Restore the state if err := c.restoreState(); err != nil { logger.Printf("[ERR] client: failed to restore state: %v", err) @@ -664,8 +615,11 @@ func (c *Client) restoreState() error { for _, id := range allocs { alloc := &structs.Allocation{ID: id} + // don't worry about blocking/migrating when restoring + watcher := noopPrevAlloc{} + c.configLock.RLock() - ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, c.allocBlocker) + ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher) c.configLock.RUnlock() c.allocLock.Lock() @@ -742,15 +696,6 @@ func (c *Client) NumAllocs() int { c.allocLock.RLock() n := len(c.allocs) c.allocLock.RUnlock() - - c.blockedAllocsLock.RLock() - n += len(c.blockedAllocations) - c.blockedAllocsLock.RUnlock() - - c.migratingAllocsLock.RLock() - n += len(c.migratingAllocs) - c.migratingAllocsLock.RUnlock() - return n } @@ -1258,22 +1203,9 @@ func (c *Client) updateNodeStatus() error { // updateAllocStatus is used to update the status of an allocation func (c *Client) updateAllocStatus(alloc *structs.Allocation) { - // If this alloc was blocking another alloc and transitioned to a - // terminal state then start the blocked allocation if alloc.Terminated() { - // Mark the allocation for GC if it is in terminal state and won't be migrated - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg == nil || tg.EphemeralDisk == nil || !tg.EphemeralDisk.Sticky { - if ar, ok := c.getAllocRunners()[alloc.ID]; ok { - c.garbageCollector.MarkForCollection(ar) - } - } - } - - // If this allocation isn't pending and had a previous allocation, the - // previous allocation is now free to be GC'd. - if alloc.ClientStatus != structs.AllocClientStatusPending && alloc.PreviousAllocation != "" { - if ar, ok := c.getAllocRunners()[alloc.PreviousAllocation]; ok { + // Terminated, mark for GC + if ar, ok := c.getAllocRunners()[alloc.ID]; ok { c.garbageCollector.MarkForCollection(ar) } } @@ -1576,340 +1508,17 @@ func (c *Client) runAllocs(update *allocUpdates) { c.logger.Printf("[ERR] client: failed to update alloc %q: %v", update.exist.ID, err) } - - // See if the updated alloc is getting migrated - c.migratingAllocsLock.RLock() - ch, ok := c.migratingAllocs[update.updated.ID] - c.migratingAllocsLock.RUnlock() - if ok { - // Stopping the migration if the allocation doesn't need any - // migration - if !update.updated.ShouldMigrate() { - ch.closeCh() - } - } } // Start the new allocations for _, add := range diff.added { - // If the allocation is chained and the previous allocation hasn't - // terminated yet, then add the alloc to the blocked queue. - c.blockedAllocsLock.Lock() - ar, ok := c.getAllocRunners()[add.PreviousAllocation] - if ok && !ar.Alloc().Terminated() { - // Check if the alloc is already present in the blocked allocations - // map - if _, ok := c.blockedAllocations[add.PreviousAllocation]; !ok { - c.logger.Printf("[DEBUG] client: added alloc %q to blocked queue for previous alloc %q", - add.ID, add.PreviousAllocation) - c.blockedAllocations[add.PreviousAllocation] = add - } - c.blockedAllocsLock.Unlock() - continue - } - c.blockedAllocsLock.Unlock() - - // This means the allocation has a previous allocation on another node - // so we will block for the previous allocation to complete - if add.PreviousAllocation != "" && !ok { - // Ensure that we are not blocking for the remote allocation if we - // have already blocked - c.migratingAllocsLock.Lock() - if _, ok := c.migratingAllocs[add.ID]; !ok { - // Check that we don't have an alloc runner already. This - // prevents a race between a finishing blockForRemoteAlloc and - // another invocation of runAllocs - if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok { - c.migratingAllocs[add.ID] = newMigrateAllocCtrl(add) - go c.blockForRemoteAlloc(add) - } - } - c.migratingAllocsLock.Unlock() - continue - } - - // Setting the previous allocdir if the allocation had a terminal - // previous allocation - var prevAllocDir *allocdir.AllocDir - tg := add.Job.LookupTaskGroup(add.TaskGroup) - if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky && ar != nil { - prevAllocDir = ar.GetAllocDir() - } - - if err := c.addAlloc(add, prevAllocDir); err != nil { + if err := c.addAlloc(add); err != nil { c.logger.Printf("[ERR] client: failed to add alloc '%s': %v", add.ID, err) } } } -// blockForRemoteAlloc blocks until the previous allocation of an allocation has -// been terminated and migrates the snapshot data -func (c *Client) blockForRemoteAlloc(alloc *structs.Allocation) { - // Removing the allocation from the set of allocs which are currently - // undergoing migration - defer func() { - c.migratingAllocsLock.Lock() - delete(c.migratingAllocs, alloc.ID) - c.migratingAllocsLock.Unlock() - }() - - // prevAllocDir is the allocation directory of the previous allocation - var prevAllocDir *allocdir.AllocDir - - // If the allocation is not sticky then we won't wait for the previous - // allocation to be terminal - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg == nil { - c.logger.Printf("[ERR] client: task group %q not found in job %q", tg.Name, alloc.Job.ID) - goto ADDALLOC - } - - // Wait for the remote previous alloc to be terminal if the alloc is sticky - if tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky && tg.EphemeralDisk.Migrate { - c.logger.Printf("[DEBUG] client: blocking alloc %q for previous allocation %q", alloc.ID, alloc.PreviousAllocation) - // Block until the previous allocation migrates to terminal state - stopCh := c.migratingAllocs[alloc.ID] - prevAlloc, err := c.waitForAllocTerminal(alloc.PreviousAllocation, stopCh) - if err != nil { - c.logger.Printf("[ERR] client: error waiting for allocation %q: %v", - alloc.PreviousAllocation, err) - } - - // Migrate the data from the remote node - prevAllocDir, err = c.migrateRemoteAllocDir(prevAlloc, alloc.ID) - if err != nil { - c.logger.Printf("[ERR] client: error migrating data from remote alloc %q: %v", - alloc.PreviousAllocation, err) - } - } - -ADDALLOC: - // Add the allocation - if err := c.addAlloc(alloc, prevAllocDir); err != nil { - c.logger.Printf("[ERR] client: error adding alloc: %v", err) - } -} - -// waitForAllocTerminal waits for an allocation with the given alloc id to -// transition to terminal state and blocks the caller until then. -func (c *Client) waitForAllocTerminal(allocID string, stopCh *migrateAllocCtrl) (*structs.Allocation, error) { - req := structs.AllocSpecificRequest{ - AllocID: allocID, - QueryOptions: structs.QueryOptions{ - Region: c.Region(), - AllowStale: true, - }, - } - - for { - resp := structs.SingleAllocResponse{} - err := c.RPC("Alloc.GetAlloc", &req, &resp) - if err != nil { - c.logger.Printf("[ERR] client: failed to query allocation %q: %v", allocID, err) - retry := c.retryIntv(getAllocRetryIntv) - select { - case <-time.After(retry): - continue - case <-stopCh.ch: - return nil, fmt.Errorf("giving up waiting on alloc %q since migration is not needed", allocID) - case <-c.shutdownCh: - return nil, fmt.Errorf("aborting because client is shutting down") - } - } - if resp.Alloc == nil { - return nil, nil - } - if resp.Alloc.Terminated() { - return resp.Alloc, nil - } - - // Update the query index. - if resp.Index > req.MinQueryIndex { - req.MinQueryIndex = resp.Index - } - - } -} - -// migrateRemoteAllocDir migrates the allocation directory from a remote node to -// the current node -func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string) (*allocdir.AllocDir, error) { - if alloc == nil { - return nil, nil - } - - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg == nil { - return nil, fmt.Errorf("Task Group %q not found in job %q", tg.Name, alloc.Job.ID) - } - - // Skip migration of data if the ephemeral disk is not sticky or - // migration is turned off. - if tg.EphemeralDisk == nil || !tg.EphemeralDisk.Sticky || !tg.EphemeralDisk.Migrate { - return nil, nil - } - - node, err := c.getNode(alloc.NodeID) - - // If the node is down then skip migrating the data - if err != nil { - return nil, fmt.Errorf("error retreiving node %v: %v", alloc.NodeID, err) - } - - // Check if node is nil - if node == nil { - return nil, fmt.Errorf("node %q doesn't exist", alloc.NodeID) - } - - // skip migration if the remote node is down - if node.Status == structs.NodeStatusDown { - c.logger.Printf("[INFO] client: not migrating data from alloc %q since node %q is down", alloc.ID, alloc.NodeID) - return nil, nil - } - - // Create the previous alloc dir - pathToAllocDir := filepath.Join(c.config.AllocDir, alloc.ID) - if err := os.MkdirAll(pathToAllocDir, 0777); err != nil { - c.logger.Printf("[ERR] client: error creating previous allocation dir: %v", err) - } - - // Get the snapshot - scheme := "http" - if node.TLSEnabled { - scheme = "https" - } - // Create an API client - apiConfig := nomadapi.DefaultConfig() - apiConfig.Address = fmt.Sprintf("%s://%s", scheme, node.HTTPAddr) - apiConfig.TLSConfig = &nomadapi.TLSConfig{ - CACert: c.config.TLSConfig.CAFile, - ClientCert: c.config.TLSConfig.CertFile, - ClientKey: c.config.TLSConfig.KeyFile, - } - apiClient, err := nomadapi.NewClient(apiConfig) - if err != nil { - return nil, err - } - - url := fmt.Sprintf("/v1/client/allocation/%v/snapshot", alloc.ID) - resp, err := apiClient.Raw().Response(url, nil) - if err != nil { - os.RemoveAll(pathToAllocDir) - c.logger.Printf("[ERR] client: error getting snapshot for alloc %q: %v", alloc.ID, err) - return nil, fmt.Errorf("error getting snapshot for alloc %q: %v", alloc.ID, err) - } - - if err := c.unarchiveAllocDir(resp, allocID, pathToAllocDir); err != nil { - return nil, err - } - - // If there were no errors then we create the allocdir - prevAllocDir := allocdir.NewAllocDir(c.logger, pathToAllocDir) - return prevAllocDir, nil -} - -// unarchiveAllocDir reads the stream of a compressed allocation directory and -// writes them to the disk. -func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAllocDir string) error { - tr := tar.NewReader(resp) - defer resp.Close() - - buf := make([]byte, 1024) - - stopMigrating, ok := c.migratingAllocs[allocID] - if !ok { - os.RemoveAll(pathToAllocDir) - return fmt.Errorf("Allocation %q is not marked for remote migration", allocID) - } - for { - // See if the alloc still needs migration - select { - case <-stopMigrating.ch: - os.RemoveAll(pathToAllocDir) - c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID) - return nil - case <-c.shutdownCh: - os.RemoveAll(pathToAllocDir) - c.logger.Printf("[INFO] client: stopping migration of alloc %q since client is shutting down", allocID) - return nil - default: - } - - // Get the next header - hdr, err := tr.Next() - - // Snapshot has ended - if err == io.EOF { - return nil - } - // If there is an error then we avoid creating the alloc dir - if err != nil { - os.RemoveAll(pathToAllocDir) - return fmt.Errorf("error creating alloc dir for alloc %q: %v", allocID, err) - } - - // If the header is for a directory we create the directory - if hdr.Typeflag == tar.TypeDir { - os.MkdirAll(filepath.Join(pathToAllocDir, hdr.Name), os.FileMode(hdr.Mode)) - continue - } - // If the header is for a symlink we create the symlink - if hdr.Typeflag == tar.TypeSymlink { - if err = os.Symlink(hdr.Linkname, filepath.Join(pathToAllocDir, hdr.Name)); err != nil { - c.logger.Printf("[ERR] client: error creating symlink: %v", err) - } - continue - } - // If the header is a file, we write to a file - if hdr.Typeflag == tar.TypeReg { - f, err := os.Create(filepath.Join(pathToAllocDir, hdr.Name)) - if err != nil { - c.logger.Printf("[ERR] client: error creating file: %v", err) - continue - } - - // Setting the permissions of the file as the origin. - if err := f.Chmod(os.FileMode(hdr.Mode)); err != nil { - f.Close() - c.logger.Printf("[ERR] client: error chmod-ing file %s: %v", f.Name(), err) - return fmt.Errorf("error chmoding file %v", err) - } - if err := f.Chown(hdr.Uid, hdr.Gid); err != nil { - f.Close() - c.logger.Printf("[ERR] client: error chown-ing file %s: %v", f.Name(), err) - return fmt.Errorf("error chowning file %v", err) - } - - // We write in chunks of 32 bytes so that we can test if - // the client is still alive - for { - if c.shutdown { - f.Close() - os.RemoveAll(pathToAllocDir) - c.logger.Printf("[INFO] client: stopping migration of alloc %q because client is shutting down", allocID) - return nil - } - - n, err := tr.Read(buf) - if err != nil { - f.Close() - if err != io.EOF { - return fmt.Errorf("error reading snapshot: %v", err) - } - break - } - if _, err := f.Write(buf[:n]); err != nil { - f.Close() - os.RemoveAll(pathToAllocDir) - return fmt.Errorf("error writing to file %q: %v", f.Name(), err) - } - } - - } - } -} - // removeAlloc is invoked when we should remove an allocation func (c *Client) removeAlloc(alloc *structs.Allocation) error { c.allocLock.Lock() @@ -1954,18 +1563,17 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error { return nil } - // If the previous allocation is local, pass in its allocation dir - var prevAllocDir *allocdir.AllocDir - tg := alloc.Job.LookupTaskGroup(add.TaskGroup) - if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky && ar != nil { - if prevAR, ok := c.allocs[alloc.PreviousAllocation]; ok { - prevAllocDir = prevAR.GetAllocDir() - } + // get the previous alloc runner - if one exists - for the + // blocking/migrating watcher + var prevAR *AllocRunner + if alloc.PreviousAllocation != "" { + prevAR = c.allocs[alloc.PreviousAllocation] } c.configLock.RLock() - ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, c.allocBlocker, prevAllocDir) - ar.SetPreviousAllocDir(prevAllocDir) + prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger) + + ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc) c.configLock.RUnlock() // Store the alloc runner. @@ -1991,8 +1599,8 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error { // with vault. func (c *Client) setupVaultClient() error { var err error - if c.vaultClient, err = - vaultclient.NewVaultClient(c.config.VaultConfig, c.logger, c.deriveToken); err != nil { + c.vaultClient, err = vaultclient.NewVaultClient(c.config.VaultConfig, c.logger, c.deriveToken) + if err != nil { return err } @@ -2325,19 +1933,18 @@ func (c *Client) emitClientMetrics() { nodeID := c.Node().ID // Emit allocation metrics - c.blockedAllocsLock.RLock() - blocked := len(c.blockedAllocations) - c.blockedAllocsLock.RUnlock() - - c.migratingAllocsLock.RLock() - migrating := len(c.migratingAllocs) - c.migratingAllocsLock.RUnlock() - - pending, running, terminal := 0, 0, 0 + blocked, migrating, pending, running, terminal := 0, 0, 0, 0, 0 for _, ar := range c.getAllocRunners() { switch ar.Alloc().ClientStatus { case structs.AllocClientStatusPending: - pending++ + switch { + case ar.Blocked(): + blocked++ + case ar.Migrating(): + migrating++ + default: + pending++ + } case structs.AllocClientStatusRunning: running++ case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed: @@ -2398,22 +2005,12 @@ func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resource // allAllocs returns all the allocations managed by the client func (c *Client) allAllocs() map[string]*structs.Allocation { - allocs := make(map[string]*structs.Allocation, 16) + ars := c.getAllocRunners() + allocs := make(map[string]*structs.Allocation, len(ars)) for _, ar := range c.getAllocRunners() { a := ar.Alloc() allocs[a.ID] = a } - c.blockedAllocsLock.RLock() - for _, alloc := range c.blockedAllocations { - allocs[alloc.ID] = alloc - } - c.blockedAllocsLock.RUnlock() - - c.migratingAllocsLock.RLock() - for _, ctrl := range c.migratingAllocs { - allocs[ctrl.alloc.ID] = ctrl.alloc - } - c.migratingAllocsLock.RUnlock() return allocs } From 113d8e3667ef6cd4975fbe5645f6c04c183b3a87 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 10 Aug 2017 17:50:00 -0700 Subject: [PATCH 04/12] Set failed status instead of panic'ing Fixup some TODOs and formatting left from new prevAllocWatcher code. --- client/alloc_runner.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 922d2e8e276..e4a38b02eed 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -756,26 +756,28 @@ func (r *AllocRunner) Run() { return } - r.allocDirLock.Lock() // Build allocation directory (idempotent) - if err := r.allocDir.Build(); err != nil { + r.allocDirLock.Lock() + err := r.allocDir.Build() + r.allocDirLock.Unlock() + + if err != nil { r.logger.Printf("[ERR] client: failed to build task directories: %v", err) r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup)) - r.allocDirLock.Unlock() return } - r.allocDirLock.Unlock() // Wait for a previous alloc - if any - to terminate r.waitingLock.Lock() r.blocked = true r.waitingLock.Unlock() + if err := r.prevAlloc.Wait(r.ctx); err != nil { if err == context.Canceled { return } - //TODO(schmichael) - panic("todo") + r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("error while waiting for previous alloc to terminate: %v", err)) + return } r.waitingLock.Lock() @@ -788,8 +790,8 @@ func (r *AllocRunner) Run() { if err == context.Canceled { return } - //TODO(schmichael) - panic("todo") + r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("error while migrating data from previous alloc: %v", err)) + return } r.waitingLock.Lock() From 8253439de288c4b2c2c4859bcb54a02bb886b611 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 11 Aug 2017 10:27:21 -0700 Subject: [PATCH 05/12] Update tests for new blocking/migrating code --- client/alloc_runner_test.go | 51 +++++++----- client/alloc_watcher.go | 20 +++++ client/alloc_watcher_test.go | 142 ++++++++++++++++++++++++++++++++ client/client_test.go | 151 ++++------------------------------- client/gc_test.go | 76 +++++------------- 5 files changed, 227 insertions(+), 213 deletions(-) create mode 100644 client/alloc_watcher_test.go diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 33cf07bad01..acbbcf4c606 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -61,7 +61,7 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl alloc.Job.Type = structs.JobTypeBatch } vclient := vaultclient.NewMockVaultClient() - ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient()) + ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, newMockConsulServiceClient(), noopPrevAlloc{}) return upd, ar } @@ -640,9 +640,10 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("----- ar2: ") + alloc2 := &structs.Allocation{ID: ar.alloc.ID} + prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2) ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, - &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, - ar.consulClient) + alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) @@ -733,9 +734,11 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { defer ar.allocLock.Unlock() // Create a new alloc runner - ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd.Update, - &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient) - ar2.logger = prefixedTestLogger("ar2: ") + l2 := prefixedTestLogger("ar2: ") + alloc2 := &structs.Allocation{ID: ar.alloc.ID} + prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2) + ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, + alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) @@ -846,8 +849,10 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { } // Create a new alloc runner - l2 := prefixedTestLogger("----- ar2: ") - ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient) + l2 := prefixedTestLogger("ar2: ") + alloc2 := &structs.Allocation{ID: ar.alloc.ID} + prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2) + ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) @@ -1000,7 +1005,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) { alloc.Job.Type = structs.JobTypeBatch vclient := vaultclient.NewMockVaultClient() cclient := newMockConsulServiceClient() - ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient) + ar := NewAllocRunner(logger, conf, db, upd.Update, alloc, vclient, cclient, noopPrevAlloc{}) defer ar.Destroy() // RestoreState should fail on the task state since we only test the @@ -1252,6 +1257,9 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { }) } +// TestAllocRunner_MoveAllocDir asserts that a file written to an alloc's +// local/ dir will be moved to a replacement alloc's local/ dir if sticky +// volumes is on. func TestAllocRunner_MoveAllocDir(t *testing.T) { t.Parallel() // Create an alloc runner @@ -1286,19 +1294,24 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { ioutil.WriteFile(taskLocalFile, []byte("good bye world"), os.ModePerm) // Create another alloc runner - alloc1 := mock.Alloc() - task = alloc1.Job.TaskGroups[0].Tasks[0] + alloc2 := mock.Alloc() + alloc2.PreviousAllocation = ar.allocID + alloc2.Job.TaskGroups[0].EphemeralDisk.Sticky = true + task = alloc2.Job.TaskGroups[0].Tasks[0] task.Driver = "mock_driver" task.Config = map[string]interface{}{ "run_for": "1s", } - upd1, ar1 := testAllocRunnerFromAlloc(alloc1, false) - ar1.SetPreviousAllocDir(ar.allocDir) - go ar1.Run() - defer ar1.Destroy() + upd2, ar2 := testAllocRunnerFromAlloc(alloc2, false) + + // Set prevAlloc like Client does + ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger) + + go ar2.Run() + defer ar2.Destroy() testutil.WaitForResult(func() (bool, error) { - _, last := upd1.Last() + _, last := upd2.Last() if last == nil { return false, fmt.Errorf("No updates") } @@ -1310,14 +1323,14 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { t.Fatalf("err: %v", err) }) - // Ensure that data from ar1 was moved to ar - taskDir = ar1.allocDir.TaskDirs[task.Name] + // Ensure that data from ar was moved to ar2 + taskDir = ar2.allocDir.TaskDirs[task.Name] taskLocalFile = filepath.Join(taskDir.LocalDir, "local_file") if fileInfo, _ := os.Stat(taskLocalFile); fileInfo == nil { t.Fatalf("file %v not found", taskLocalFile) } - dataFile = filepath.Join(ar1.allocDir.SharedDir, "data", "data_file") + dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file") if fileInfo, _ := os.Stat(dataFile); fileInfo == nil { t.Fatalf("file %v not found", dataFile) } diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 9dae47da515..02e1a7c76d6 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -18,11 +18,18 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// rpcer is the interface needed by a prevAllocWatcher to make RPC calls. type rpcer interface { // RPC allows retrieving remote allocs. RPC(method string, args interface{}, reply interface{}) error } +// terminated is the interface needed by a prevAllocWatcher to check if an +// alloc is terminated. +type terminated interface { + Terminated() bool +} + type prevAllocWatcher interface { // Wait for previous alloc to terminate Wait(context.Context) error @@ -52,6 +59,7 @@ func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, prevAllocDir: prevAR.GetAllocDir(), prevListener: prevAR.GetListener(), prevWaitCh: prevAR.WaitCh(), + prevStatus: prevAR.Alloc(), logger: l, } } @@ -77,6 +85,7 @@ type localPrevAlloc struct { sticky bool prevAllocDir *allocdir.AllocDir prevListener *cstructs.AllocListener + prevStatus terminated prevWaitCh <-chan struct{} logger *log.Logger @@ -85,16 +94,27 @@ type localPrevAlloc struct { // Wait on a local alloc to become terminal, exit, or the context to be done. func (p *localPrevAlloc) Wait(ctx context.Context) error { defer p.prevListener.Close() + + if p.prevStatus.Terminated() { + // Fast path - previous alloc already terminated! + return nil + } + + // Block until previous alloc exits p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate", p.allocID, p.prevAllocID) for { + p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX LOOP", p.allocID, p.prevAllocID) select { case prevAlloc := <-p.prevListener.Ch: + p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX UPDATE %v", p.allocID, p.prevAllocID, prevAlloc.Terminated()) if prevAlloc.Terminated() { return nil } case <-p.prevWaitCh: + p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX CLOSED", p.allocID, p.prevAllocID) return nil case <-ctx.Done(): + p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX DONE", p.allocID, p.prevAllocID) return ctx.Err() } } diff --git a/client/alloc_watcher_test.go b/client/alloc_watcher_test.go new file mode 100644 index 00000000000..132bd1966be --- /dev/null +++ b/client/alloc_watcher_test.go @@ -0,0 +1,142 @@ +package client + +import ( + "archive/tar" + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/hashicorp/nomad/client/config" +) + +func TestPrevAlloc_StreamAllocDir(t *testing.T) { + t.Parallel() + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir) + + if err := os.Mkdir(filepath.Join(dir, "foo"), 0777); err != nil { + t.Fatalf("err: %v", err) + } + dirInfo, err := os.Stat(filepath.Join(dir, "foo")) + if err != nil { + t.Fatalf("err: %v", err) + } + f, err := os.Create(filepath.Join(dir, "foo", "bar")) + if err != nil { + t.Fatalf("err: %v", err) + } + if _, err := f.WriteString("foo"); err != nil { + t.Fatalf("err: %v", err) + } + if err := f.Chmod(0644); err != nil { + t.Fatalf("err: %v", err) + } + fInfo, err := f.Stat() + if err != nil { + t.Fatalf("err: %v", err) + } + f.Close() + if err := os.Symlink("bar", filepath.Join(dir, "foo", "baz")); err != nil { + t.Fatalf("err: %v", err) + } + linkInfo, err := os.Lstat(filepath.Join(dir, "foo", "baz")) + if err != nil { + t.Fatalf("err: %v", err) + } + + buf := new(bytes.Buffer) + tw := tar.NewWriter(buf) + + walkFn := func(path string, fileInfo os.FileInfo, err error) error { + // Include the path of the file name relative to the alloc dir + // so that we can put the files in the right directories + link := "" + if fileInfo.Mode()&os.ModeSymlink != 0 { + target, err := os.Readlink(path) + if err != nil { + return fmt.Errorf("error reading symlink: %v", err) + } + link = target + } + hdr, err := tar.FileInfoHeader(fileInfo, link) + if err != nil { + return fmt.Errorf("error creating file header: %v", err) + } + hdr.Name = fileInfo.Name() + tw.WriteHeader(hdr) + + // If it's a directory or symlink we just write the header into the tar + if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) { + return nil + } + + // Write the file into the archive + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + + if _, err := io.Copy(tw, file); err != nil { + return err + } + + return nil + } + + if err := filepath.Walk(dir, walkFn); err != nil { + t.Fatalf("err: %v", err) + } + tw.Close() + + dir1, err := ioutil.TempDir("", "nomadtest-") + if err != nil { + t.Fatalf("err: %v", err) + } + defer os.RemoveAll(dir1) + + c1 := testClient(t, func(c *config.Config) { + c.RPCHandler = nil + }) + defer c1.Shutdown() + + rc := ioutil.NopCloser(buf) + + prevAlloc := &remotePrevAlloc{logger: testLogger()} + if err := prevAlloc.streamAllocDir(context.Background(), rc, dir1); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure foo is present + fi, err := os.Stat(filepath.Join(dir1, "foo")) + if err != nil { + t.Fatalf("err: %v", err) + } + if fi.Mode() != dirInfo.Mode() { + t.Fatalf("mode: %v", fi.Mode()) + } + + fi1, err := os.Stat(filepath.Join(dir1, "bar")) + if err != nil { + t.Fatalf("err: %v", err) + } + if fi1.Mode() != fInfo.Mode() { + t.Fatalf("mode: %v", fi1.Mode()) + } + + fi2, err := os.Lstat(filepath.Join(dir1, "baz")) + if err != nil { + t.Fatalf("err: %v", err) + } + if fi2.Mode() != linkInfo.Mode() { + t.Fatalf("mode: %v", fi2.Mode()) + } +} diff --git a/client/client_test.go b/client/client_test.go index 15462a230cb..601998849e4 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,10 +1,7 @@ package client import ( - "archive/tar" - "bytes" "fmt" - "io" "io/ioutil" "log" "math/rand" @@ -879,11 +876,14 @@ func TestClient_BlockedAllocations(t *testing.T) { // Enusre that the chained allocation is being tracked as blocked testutil.WaitForResult(func() (bool, error) { - alloc, ok := c1.blockedAllocations[alloc2.PreviousAllocation] - if ok && alloc.ID == alloc2.ID { - return true, nil + ar := c1.getAllocRunners()[alloc2.ID] + if ar == nil { + return false, fmt.Errorf("alloc 2's alloc runner does not exist") + } + if !ar.Blocked() { + return false, fmt.Errorf("alloc 2 is not blocked") } - return false, fmt.Errorf("no blocked allocations") + return true, nil }, func(err error) { t.Fatalf("err: %v", err) }) @@ -897,9 +897,13 @@ func TestClient_BlockedAllocations(t *testing.T) { // Ensure that there are no blocked allocations testutil.WaitForResult(func() (bool, error) { - _, ok := c1.blockedAllocations[alloc2.PreviousAllocation] - if ok { - return false, fmt.Errorf("blocked evals present") + for id, ar := range c1.getAllocRunners() { + if ar.Blocked() { + return false, fmt.Errorf("%q still blocked", id) + } + if ar.Migrating() { + return false, fmt.Errorf("%q still migrating", id) + } } return true, nil }, func(err error) { @@ -915,130 +919,3 @@ func TestClient_BlockedAllocations(t *testing.T) { <-ar.WaitCh() } } - -func TestClient_UnarchiveAllocDir(t *testing.T) { - t.Parallel() - dir, err := ioutil.TempDir("", "") - if err != nil { - t.Fatalf("err: %v", err) - } - defer os.RemoveAll(dir) - - if err := os.Mkdir(filepath.Join(dir, "foo"), 0777); err != nil { - t.Fatalf("err: %v", err) - } - dirInfo, err := os.Stat(filepath.Join(dir, "foo")) - if err != nil { - t.Fatalf("err: %v", err) - } - f, err := os.Create(filepath.Join(dir, "foo", "bar")) - if err != nil { - t.Fatalf("err: %v", err) - } - if _, err := f.WriteString("foo"); err != nil { - t.Fatalf("err: %v", err) - } - if err := f.Chmod(0644); err != nil { - t.Fatalf("err: %v", err) - } - fInfo, err := f.Stat() - if err != nil { - t.Fatalf("err: %v", err) - } - f.Close() - if err := os.Symlink("bar", filepath.Join(dir, "foo", "baz")); err != nil { - t.Fatalf("err: %v", err) - } - linkInfo, err := os.Lstat(filepath.Join(dir, "foo", "baz")) - if err != nil { - t.Fatalf("err: %v", err) - } - - buf := new(bytes.Buffer) - tw := tar.NewWriter(buf) - - walkFn := func(path string, fileInfo os.FileInfo, err error) error { - // Include the path of the file name relative to the alloc dir - // so that we can put the files in the right directories - link := "" - if fileInfo.Mode()&os.ModeSymlink != 0 { - target, err := os.Readlink(path) - if err != nil { - return fmt.Errorf("error reading symlink: %v", err) - } - link = target - } - hdr, err := tar.FileInfoHeader(fileInfo, link) - if err != nil { - return fmt.Errorf("error creating file header: %v", err) - } - hdr.Name = fileInfo.Name() - tw.WriteHeader(hdr) - - // If it's a directory or symlink we just write the header into the tar - if fileInfo.IsDir() || (fileInfo.Mode()&os.ModeSymlink != 0) { - return nil - } - - // Write the file into the archive - file, err := os.Open(path) - if err != nil { - return err - } - defer file.Close() - - if _, err := io.Copy(tw, file); err != nil { - return err - } - - return nil - } - - if err := filepath.Walk(dir, walkFn); err != nil { - t.Fatalf("err: %v", err) - } - tw.Close() - - dir1, err := ioutil.TempDir("", "") - if err != nil { - t.Fatalf("err: %v", err) - } - defer os.RemoveAll(dir1) - - c1 := testClient(t, func(c *config.Config) { - c.RPCHandler = nil - }) - defer c1.Shutdown() - - rc := ioutil.NopCloser(buf) - - c1.migratingAllocs["123"] = newMigrateAllocCtrl(mock.Alloc()) - if err := c1.unarchiveAllocDir(rc, "123", dir1); err != nil { - t.Fatalf("err: %v", err) - } - - // Ensure foo is present - fi, err := os.Stat(filepath.Join(dir1, "foo")) - if err != nil { - t.Fatalf("err: %v", err) - } - if fi.Mode() != dirInfo.Mode() { - t.Fatalf("mode: %v", fi.Mode()) - } - - fi1, err := os.Stat(filepath.Join(dir1, "bar")) - if err != nil { - t.Fatalf("err: %v", err) - } - if fi1.Mode() != fInfo.Mode() { - t.Fatalf("mode: %v", fi1.Mode()) - } - - fi2, err := os.Lstat(filepath.Join(dir1, "baz")) - if err != nil { - t.Fatalf("err: %v", err) - } - if fi2.Mode() != linkInfo.Mode() { - t.Fatalf("mode: %v", fi2.Mode()) - } -} diff --git a/client/gc_test.go b/client/gc_test.go index 7425eede0d3..960fb2a808e 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -106,9 +106,7 @@ func TestAllocGarbageCollector_MarkForCollection(t *testing.T) { gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, &MockAllocCounter{}, gcConfig()) _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) - if err := gc.MarkForCollection(ar1); err != nil { - t.Fatalf("err: %v", err) - } + gc.MarkForCollection(ar1) gcAlloc := gc.allocRunners.Pop() if gcAlloc == nil || gcAlloc.allocRunner != ar1 { @@ -123,12 +121,8 @@ func TestAllocGarbageCollector_Collect(t *testing.T) { _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) - if err := gc.MarkForCollection(ar1); err != nil { - t.Fatalf("err: %v", err) - } - if err := gc.MarkForCollection(ar2); err != nil { - t.Fatalf("err: %v", err) - } + gc.MarkForCollection(ar1) + gc.MarkForCollection(ar2) // Fake that ar.Run() exits close(ar1.waitCh) @@ -150,12 +144,8 @@ func TestAllocGarbageCollector_CollectAll(t *testing.T) { _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) - if err := gc.MarkForCollection(ar1); err != nil { - t.Fatalf("err: %v", err) - } - if err := gc.MarkForCollection(ar2); err != nil { - t.Fatalf("err: %v", err) - } + gc.MarkForCollection(ar1) + gc.MarkForCollection(ar2) if err := gc.CollectAll(); err != nil { t.Fatalf("err: %v", err) @@ -178,12 +168,8 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) close(ar1.waitCh) _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) close(ar2.waitCh) - if err := gc.MarkForCollection(ar1); err != nil { - t.Fatalf("err: %v", err) - } - if err := gc.MarkForCollection(ar2); err != nil { - t.Fatalf("err: %v", err) - } + gc.MarkForCollection(ar1) + gc.MarkForCollection(ar2) // Make stats collector report 200MB free out of which 20MB is reserved statsCollector.availableValues = []uint64{200 * MB} @@ -217,12 +203,8 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) { close(ar1.waitCh) _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) close(ar2.waitCh) - if err := gc.MarkForCollection(ar1); err != nil { - t.Fatalf("err: %v", err) - } - if err := gc.MarkForCollection(ar2); err != nil { - t.Fatalf("err: %v", err) - } + gc.MarkForCollection(ar1) + gc.MarkForCollection(ar2) // Make stats collector report 80MB and 175MB free in subsequent calls statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 175 * MB} @@ -257,12 +239,8 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) { close(ar1.waitCh) _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) close(ar2.waitCh) - if err := gc.MarkForCollection(ar1); err != nil { - t.Fatalf("err: %v", err) - } - if err := gc.MarkForCollection(ar2); err != nil { - t.Fatalf("err: %v", err) - } + gc.MarkForCollection(ar1) + gc.MarkForCollection(ar2) // Make stats collector report 80MB and 95MB free in subsequent calls statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 95 * MB} @@ -293,12 +271,8 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) close(ar1.waitCh) _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) close(ar2.waitCh) - if err := gc.MarkForCollection(ar1); err != nil { - t.Fatalf("err: %v", err) - } - if err := gc.MarkForCollection(ar2); err != nil { - t.Fatalf("err: %v", err) - } + gc.MarkForCollection(ar1) + gc.MarkForCollection(ar2) alloc := mock.Alloc() alloc.Resources.DiskMB = 150 @@ -339,9 +313,7 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) { for i := 0; i < gcAllocs; i++ { _, ar := testAllocRunnerFromAlloc(mock.Alloc(), false) close(ar.waitCh) - if err := gc.MarkForCollection(ar); err != nil { - t.Fatalf("error marking alloc for gc: %v", err) - } + gc.MarkForCollection(ar) } if err := gc.MakeRoomFor([]*structs.Allocation{mock.Alloc(), mock.Alloc()}); err != nil { @@ -366,12 +338,8 @@ func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) { close(ar1.waitCh) _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) close(ar2.waitCh) - if err := gc.MarkForCollection(ar1); err != nil { - t.Fatalf("err: %v", err) - } - if err := gc.MarkForCollection(ar2); err != nil { - t.Fatalf("err: %v", err) - } + gc.MarkForCollection(ar1) + gc.MarkForCollection(ar2) statsCollector.availableValues = []uint64{1000} statsCollector.usedPercents = []float64{20} @@ -402,12 +370,8 @@ func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { close(ar1.waitCh) _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) close(ar2.waitCh) - if err := gc.MarkForCollection(ar1); err != nil { - t.Fatalf("err: %v", err) - } - if err := gc.MarkForCollection(ar2); err != nil { - t.Fatalf("err: %v", err) - } + gc.MarkForCollection(ar1) + gc.MarkForCollection(ar2) statsCollector.availableValues = []uint64{1000, 800} statsCollector.usedPercents = []float64{85, 60} @@ -451,9 +415,7 @@ func TestAllocGarbageCollector_MaxAllocsThreshold(t *testing.T) { for i := 0; i < gcAllocs; i++ { _, ar := testAllocRunnerFromAlloc(mock.Alloc(), false) close(ar.waitCh) - if err := gc.MarkForCollection(ar); err != nil { - t.Fatalf("error marking alloc for gc: %v", err) - } + gc.MarkForCollection(ar) } if err := gc.keepUsageBelowThreshold(); err != nil { From 4f7c5e6ed1a75e57ab8d2c744270829f775d92f9 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 11 Aug 2017 16:14:58 -0700 Subject: [PATCH 06/12] Exit if alloc listener closes Add test for that case, add comments, remove debug logging --- client/alloc_watcher.go | 59 ++++++++++++++++++++++++++---------- client/alloc_watcher_test.go | 56 ++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 16 deletions(-) diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 02e1a7c76d6..07bec976d10 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -30,6 +30,9 @@ type terminated interface { Terminated() bool } +// prevAllocWatcher allows AllocRunners to wait for a previous allocation to +// terminate and migrate its data whether or not the previous allocation is +// local or remote. type prevAllocWatcher interface { // Wait for previous alloc to terminate Wait(context.Context) error @@ -39,7 +42,7 @@ type prevAllocWatcher interface { } // newAllocWatcher creates a prevAllocWatcher appropriate for whether this -// allocs previous allocation was local or remote. If this alloc has no +// alloc's previous allocation was local or remote. If this alloc has no // previous alloc then a noop implementation is returned. func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher { if alloc.PreviousAllocation == "" { @@ -78,15 +81,31 @@ func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, // localPrevAlloc is a prevAllocWatcher for previous allocations on the same // node as an updated allocation. type localPrevAlloc struct { - allocID string + // allocID is the ID of the alloc being blocked + allocID string + + // prevAllocID is the ID of the alloc being replaced prevAllocID string - tasks []*structs.Task - sticky bool + // tasks on the new alloc + tasks []*structs.Task + + // sticky is true if data should be moved + sticky bool + + // prevAllocDir is the alloc dir for the previous alloc prevAllocDir *allocdir.AllocDir + + // prevListener allows blocking for updates to the previous alloc prevListener *cstructs.AllocListener - prevStatus terminated - prevWaitCh <-chan struct{} + + // prevStatus allows checking if the previous alloc has already + // terminated (and therefore won't send updates to the listener) + prevStatus terminated + + // prevWaitCh is closed when the previous alloc is GC'd which is a + // failsafe against blocking the new alloc forever + prevWaitCh <-chan struct{} logger *log.Logger } @@ -103,18 +122,14 @@ func (p *localPrevAlloc) Wait(ctx context.Context) error { // Block until previous alloc exits p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate", p.allocID, p.prevAllocID) for { - p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX LOOP", p.allocID, p.prevAllocID) select { - case prevAlloc := <-p.prevListener.Ch: - p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX UPDATE %v", p.allocID, p.prevAllocID, prevAlloc.Terminated()) - if prevAlloc.Terminated() { + case prevAlloc, ok := <-p.prevListener.Ch: + if !ok || prevAlloc.Terminated() { return nil } case <-p.prevWaitCh: - p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX CLOSED", p.allocID, p.prevAllocID) return nil case <-ctx.Done(): - p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate - XXX DONE", p.allocID, p.prevAllocID) return ctx.Err() } } @@ -140,13 +155,24 @@ func (p *localPrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) e // remotePrevAlloc is a prevAllcWatcher for previous allocations on remote // nodes as an updated allocation. type remotePrevAlloc struct { - allocID string + // allocID is the ID of the alloc being blocked + allocID string + + // prevAllocID is the ID of the alloc being replaced prevAllocID string - tasks []*structs.Task - config *config.Config + // tasks on the new alloc + tasks []*structs.Task + + // config for the Client to get AllocDir and Region + config *config.Config + + // migrate is true if data should be moved between nodes migrate bool - rpc rpcer + + // rpc provides an RPC method for watching for updates to the previous + // alloc and determining what node it was on. + rpc rpcer // nodeID is the node the previous alloc. Set by Wait() for use in // Migrate() iff the previous alloc has not already been GC'd. @@ -155,6 +181,7 @@ type remotePrevAlloc struct { logger *log.Logger } +// Wait until the remote previousl allocation has terminated. func (p *remotePrevAlloc) Wait(ctx context.Context) error { p.logger.Printf("[DEBUG] client: alloc %q waiting for remote previous alloc %q to terminate", p.allocID, p.prevAllocID) req := structs.AllocSpecificRequest{ diff --git a/client/alloc_watcher_test.go b/client/alloc_watcher_test.go index 132bd1966be..01e9f1d7e95 100644 --- a/client/alloc_watcher_test.go +++ b/client/alloc_watcher_test.go @@ -10,10 +10,66 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/nomad/mock" ) +// TestPrevAlloc_LocalPrevAlloc asserts that when a previous alloc runner is +// set a localPrevAlloc will block on it. +func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { + _, prevAR := testAllocRunner(false) + prevAR.alloc.Job.TaskGroups[0].Tasks[0].Config["run_for"] = "10s" + + newAlloc := mock.Alloc() + newAlloc.PreviousAllocation = prevAR.Alloc().ID + newAlloc.Job.TaskGroups[0].EphemeralDisk.Sticky = false + task := newAlloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config["run_for"] = "500ms" + + waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger()) + + // Wait in a goroutine with a context to make sure it exits at the right time + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer cancel() + waiter.Wait(ctx) + }() + + select { + case <-ctx.Done(): + t.Fatalf("Wait exited too early") + case <-time.After(33 * time.Millisecond): + // Good! It's blocking + } + + // Start the previous allocs to cause it to update but not terminate + go prevAR.Run() + defer prevAR.Destroy() + + select { + case <-ctx.Done(): + t.Fatalf("Wait exited too early") + case <-time.After(33 * time.Millisecond): + // Good! It's still blocking + } + + // Stop the previous alloc + prevAR.Destroy() + + select { + case <-ctx.Done(): + // Good! We unblocked when the previous alloc stopped + case <-time.After(time.Second): + t.Fatalf("Wait exited too early") + } +} + +// TestPrevAlloc_StreamAllocDir asserts that streaming a tar to an alloc dir +// works. func TestPrevAlloc_StreamAllocDir(t *testing.T) { t.Parallel() dir, err := ioutil.TempDir("", "") From 537d0e5ab52050ab1cf08c751093798bff876e74 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 11 Aug 2017 16:50:30 -0700 Subject: [PATCH 07/12] Soft fail on migration errors --- client/alloc_runner.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index e4a38b02eed..6459e9dfcc9 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -790,8 +790,9 @@ func (r *AllocRunner) Run() { if err == context.Canceled { return } - r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("error while migrating data from previous alloc: %v", err)) - return + + // Soft-fail on migration errors + r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err) } r.waitingLock.Lock() From 85b9dd9cce8db8b1a0193ae28c4daae0839b3c1c Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 14 Aug 2017 16:02:28 -0700 Subject: [PATCH 08/12] Move migrating state into prevAllocWatcher --- client/alloc_runner.go | 42 +++++------------- client/alloc_watcher.go | 94 +++++++++++++++++++++++++++++++++++++++++ client/client.go | 4 +- client/client_test.go | 6 +-- 4 files changed, 109 insertions(+), 37 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 6459e9dfcc9..877fc95ee64 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -80,15 +80,12 @@ type AllocRunner struct { vaultClient vaultclient.VaultClient consulClient ConsulServiceAPI + // prevAlloc allows for Waiting until a previous allocation exits and + // the migrates it data. If sticky volumes aren't used and there's no + // previous allocation a noop implementation is used so it always safe + // to call. prevAlloc prevAllocWatcher - // blocked and migrating are true when alloc runner is waiting on the - // prevAllocWatcher. Writers must acquire the waitingLock and readers - // should use the helper methods Blocked and Migrating. - blocked bool - migrating bool - waitingLock sync.RWMutex - ctx context.Context exitFn context.CancelFunc waitCh chan struct{} @@ -768,10 +765,6 @@ func (r *AllocRunner) Run() { } // Wait for a previous alloc - if any - to terminate - r.waitingLock.Lock() - r.blocked = true - r.waitingLock.Unlock() - if err := r.prevAlloc.Wait(r.ctx); err != nil { if err == context.Canceled { return @@ -780,11 +773,6 @@ func (r *AllocRunner) Run() { return } - r.waitingLock.Lock() - r.blocked = false - r.migrating = true - r.waitingLock.Unlock() - // Wait for data to be migrated from a previous alloc if applicable if err := r.prevAlloc.Migrate(r.ctx, r.allocDir); err != nil { if err == context.Canceled { @@ -795,10 +783,6 @@ func (r *AllocRunner) Run() { r.logger.Printf("[WARN] client: alloc %q error while migrating data from previous alloc: %v", r.allocID, err) } - r.waitingLock.Lock() - r.migrating = false - r.waitingLock.Unlock() - // Check if the allocation is in a terminal status. In this case, we don't // start any of the task runners and directly wait for the destroy signal to // clean up the allocation. @@ -979,22 +963,16 @@ func (r *AllocRunner) handleDestroy() { } } -// Blocked returns true if this alloc is waiting on a previous allocation to +// IsWaiting returns true if this alloc is waiting on a previous allocation to // terminate. -func (r *AllocRunner) Blocked() bool { - r.waitingLock.RLock() - b := r.blocked - r.waitingLock.RUnlock() - return b +func (r *AllocRunner) IsWaiting() bool { + return r.prevAlloc.IsWaiting() } -// Migrating returns true if this alloc is migrating data from a previous +// IsMigrating returns true if this alloc is migrating data from a previous // allocation. -func (r *AllocRunner) Migrating() bool { - r.waitingLock.RLock() - m := r.migrating - r.waitingLock.RUnlock() - return m +func (r *AllocRunner) IsMigrating() bool { + return r.prevAlloc.IsMigrating() } // Update is used to update the allocation of the context diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 07bec976d10..d05789a54b8 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -8,6 +8,7 @@ import ( "log" "os" "path/filepath" + "sync" "time" "github.com/hashicorp/consul/lib" @@ -39,6 +40,12 @@ type prevAllocWatcher interface { // Migrate data from previous alloc Migrate(ctx context.Context, dest *allocdir.AllocDir) error + + // IsWaiting returns true if a concurrent caller is blocked in Wait + IsWaiting() bool + + // IsMigrating returns true if a concurrent caller is in Migrate + IsMigrating() bool } // newAllocWatcher creates a prevAllocWatcher appropriate for whether this @@ -107,11 +114,43 @@ type localPrevAlloc struct { // failsafe against blocking the new alloc forever prevWaitCh <-chan struct{} + // waiting and migrating are true when alloc runner is waiting on the + // prevAllocWatcher. Writers must acquire the waitingLock and readers + // should use the helper methods IsWaiting and IsMigrating. + waiting bool + migrating bool + waitingLock sync.RWMutex + logger *log.Logger } +// IsWaiting returns true if there's a concurrent call inside Wait +func (p *localPrevAlloc) IsWaiting() bool { + p.waitingLock.RLock() + b := p.waiting + p.waitingLock.RUnlock() + return b +} + +// IsMigrating returns true if there's a concurrent call inside Migrate +func (p *localPrevAlloc) IsMigrating() bool { + p.waitingLock.RLock() + b := p.migrating + p.waitingLock.RUnlock() + return b +} + // Wait on a local alloc to become terminal, exit, or the context to be done. func (p *localPrevAlloc) Wait(ctx context.Context) error { + p.waitingLock.Lock() + p.waiting = true + p.waitingLock.Unlock() + defer func() { + p.waitingLock.Lock() + p.waiting = false + p.waitingLock.Unlock() + }() + defer p.prevListener.Close() if p.prevStatus.Terminated() { @@ -141,6 +180,16 @@ func (p *localPrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) e // Not a sticky volume, nothing to migrate return nil } + + p.waitingLock.Lock() + p.migrating = true + p.waitingLock.Unlock() + defer func() { + p.waitingLock.Lock() + p.migrating = false + p.waitingLock.Unlock() + }() + p.logger.Printf("[DEBUG] client: alloc %q copying previous alloc %q", p.allocID, p.prevAllocID) if err := dest.Move(p.prevAllocDir, p.tasks); err != nil { @@ -178,11 +227,43 @@ type remotePrevAlloc struct { // Migrate() iff the previous alloc has not already been GC'd. nodeID string + // waiting and migrating are true when alloc runner is waiting on the + // prevAllocWatcher. Writers must acquire the waitingLock and readers + // should use the helper methods IsWaiting and IsMigrating. + waiting bool + migrating bool + waitingLock sync.RWMutex + logger *log.Logger } +// IsWaiting returns true if there's a concurrent call inside Wait +func (p *remotePrevAlloc) IsWaiting() bool { + p.waitingLock.RLock() + b := p.waiting + p.waitingLock.RUnlock() + return b +} + +// IsMigrating returns true if there's a concurrent call inside Migrate +func (p *remotePrevAlloc) IsMigrating() bool { + p.waitingLock.RLock() + b := p.migrating + p.waitingLock.RUnlock() + return b +} + // Wait until the remote previousl allocation has terminated. func (p *remotePrevAlloc) Wait(ctx context.Context) error { + p.waitingLock.Lock() + p.waiting = true + p.waitingLock.Unlock() + defer func() { + p.waitingLock.Lock() + p.waiting = false + p.waitingLock.Unlock() + }() + p.logger.Printf("[DEBUG] client: alloc %q waiting for remote previous alloc %q to terminate", p.allocID, p.prevAllocID) req := structs.AllocSpecificRequest{ AllocID: p.prevAllocID, @@ -244,6 +325,16 @@ func (p *remotePrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) // Volume wasn't configured to be migrated, return early return nil } + + p.waitingLock.Lock() + p.migrating = true + p.waitingLock.Unlock() + defer func() { + p.waitingLock.Lock() + p.migrating = false + p.waitingLock.Unlock() + }() + p.logger.Printf("[DEBUG] client: alloc %q copying from remote previous alloc %q", p.allocID, p.prevAllocID) if p.nodeID == "" { @@ -446,3 +537,6 @@ func (noopPrevAlloc) Wait(context.Context) error { return nil } // Migrate returns nil immediately. func (noopPrevAlloc) Migrate(context.Context, *allocdir.AllocDir) error { return nil } + +func (noopPrevAlloc) IsWaiting() bool { return false } +func (noopPrevAlloc) IsMigrating() bool { return false } diff --git a/client/client.go b/client/client.go index bd2125df11c..b72e54a1a9e 100644 --- a/client/client.go +++ b/client/client.go @@ -1938,9 +1938,9 @@ func (c *Client) emitClientMetrics() { switch ar.Alloc().ClientStatus { case structs.AllocClientStatusPending: switch { - case ar.Blocked(): + case ar.IsWaiting(): blocked++ - case ar.Migrating(): + case ar.IsMigrating(): migrating++ default: pending++ diff --git a/client/client_test.go b/client/client_test.go index 601998849e4..30baff15b57 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -880,7 +880,7 @@ func TestClient_BlockedAllocations(t *testing.T) { if ar == nil { return false, fmt.Errorf("alloc 2's alloc runner does not exist") } - if !ar.Blocked() { + if !ar.IsWaiting() { return false, fmt.Errorf("alloc 2 is not blocked") } return true, nil @@ -898,10 +898,10 @@ func TestClient_BlockedAllocations(t *testing.T) { // Ensure that there are no blocked allocations testutil.WaitForResult(func() (bool, error) { for id, ar := range c1.getAllocRunners() { - if ar.Blocked() { + if ar.IsWaiting() { return false, fmt.Errorf("%q still blocked", id) } - if ar.Migrating() { + if ar.IsMigrating() { return false, fmt.Errorf("%q still migrating", id) } } From ce4b919de51b12136fd447a0231e561fea916c1a Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 14 Aug 2017 16:48:56 -0700 Subject: [PATCH 09/12] Return move errors from local Migrate like remote Since alloc runner just logs these errors and continues there's no reason not to return it. --- client/alloc_watcher.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index d05789a54b8..17a32852d9d 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -110,8 +110,8 @@ type localPrevAlloc struct { // terminated (and therefore won't send updates to the listener) prevStatus terminated - // prevWaitCh is closed when the previous alloc is GC'd which is a - // failsafe against blocking the new alloc forever + // prevWaitCh is closed when the previous alloc is garbage collected + // which is a failsafe against blocking the new alloc forever prevWaitCh <-chan struct{} // waiting and migrating are true when alloc runner is waiting on the @@ -192,13 +192,14 @@ func (p *localPrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) e p.logger.Printf("[DEBUG] client: alloc %q copying previous alloc %q", p.allocID, p.prevAllocID) - if err := dest.Move(p.prevAllocDir, p.tasks); err != nil { - p.logger.Printf("[ERR] client: failed to move previous alloc dir %q: %v", p.prevAllocDir.AllocDir, err) - } + moveErr := dest.Move(p.prevAllocDir, p.tasks) + + // Always cleanup previous alloc if err := p.prevAllocDir.Destroy(); err != nil { p.logger.Printf("[ERR] client: error destroying allocdir %v: %v", p.prevAllocDir.AllocDir, err) } - return nil + + return moveErr } // remotePrevAlloc is a prevAllcWatcher for previous allocations on remote From 8983bc07cceea8e5426a954f2de8ba2a1c91717d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 14 Aug 2017 16:55:59 -0700 Subject: [PATCH 10/12] spelling --- client/allocdir/alloc_dir.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index a16d35f02d1..780e1b8ccc3 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -192,7 +192,7 @@ func (d *AllocDir) Snapshot(w io.Writer) error { // Move other alloc directory's shared path and local dir to this alloc dir. func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error { if !d.built { - // Enfornce the invariant that Build is called before Move + // Enforce the invariant that Build is called before Move return fmt.Errorf("unable to move to %q - alloc dir is not built", d.AllocDir) } From d55b4c1a2bcf638ce0f5504797ac45cca328b8d1 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 14 Aug 2017 16:59:03 -0700 Subject: [PATCH 11/12] Cleanup comments and return val --- client/alloc_watcher.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 17a32852d9d..cbd35fe45f4 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -312,11 +312,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { } } - if done() { - return ctx.Err() - } - - return nil + return ctx.Err() } // Migrate alloc data from a remote node if the new alloc has migration enabled @@ -404,7 +400,8 @@ func (p *remotePrevAlloc) getNodeAddr(ctx context.Context, nodeID string) (strin return scheme + resp.Node.HTTPAddr, nil } -// migrate a remote alloc dir to local node +// migrate a remote alloc dir to local node. Caller is responsible for calling +// Destroy on the returned allocdir if no error occurs. func (p *remotePrevAlloc) migrateAllocDir(ctx context.Context, nodeAddr string) (*allocdir.AllocDir, error) { // Create the previous alloc dir prevAllocDir := allocdir.NewAllocDir(p.logger, filepath.Join(p.config.AllocDir, p.prevAllocID)) From 44b69882a1bf2d47fe871e7d2a4133720facfc3f Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 15 Aug 2017 10:37:02 -0700 Subject: [PATCH 12/12] Fix formatting --- client/alloc_watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index cbd35fe45f4..8b029a7f7cc 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -466,7 +466,7 @@ func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser } if err != nil { - return fmt.Errorf("error streaming previous alloc %p for new alloc %q: %v", + return fmt.Errorf("error streaming previous alloc %q for new alloc %q: %v", p.prevAllocID, p.allocID, err) }