diff --git a/client/client.go b/client/client.go index 442bd2cfa69..41267264cf4 100644 --- a/client/client.go +++ b/client/client.go @@ -150,7 +150,7 @@ type Client struct { vaultClient vaultclient.VaultClient // migratingAllocs is the set of allocs whose data migration is in flight - migratingAllocs map[string]chan struct{} + migratingAllocs map[string]*migrateAllocCtrl migratingAllocsLock sync.Mutex // garbageCollector is used to garbage collect terminal allocations present @@ -158,6 +158,32 @@ type Client struct { garbageCollector *AllocGarbageCollector } +// migrateAllocCtrl indicates whether migration is complete +type migrateAllocCtrl struct { + ch chan struct{} + closed bool + chLock sync.Mutex +} + +func newMigrateAllocCtrl() *migrateAllocCtrl { + return &migrateAllocCtrl{ + ch: make(chan struct{}), + } +} + +func (m *migrateAllocCtrl) closeCh() { + m.chLock.Lock() + defer m.chLock.Unlock() + + if m.closed { + return + } + + // If channel is not closed then close it + m.closed = true + close(m.ch) +} + var ( // noServersErr is returned by the RPC method when the client has no // configured servers. This is used to trigger Consul discovery if @@ -188,7 +214,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg blockedAllocations: make(map[string]*structs.Allocation), allocUpdates: make(chan *structs.Allocation, 64), shutdownCh: make(chan struct{}), - migratingAllocs: make(map[string]chan struct{}), + migratingAllocs: make(map[string]*migrateAllocCtrl), servers: newServerList(), triggerDiscoveryCh: make(chan struct{}), serversDiscoveredCh: make(chan struct{}), @@ -1420,7 +1446,7 @@ func (c *Client) runAllocs(update *allocUpdates) { // Stopping the migration if the allocation doesn't need any // migration if !update.updated.ShouldMigrate() { - close(ch) + ch.closeCh() } } } @@ -1455,7 +1481,7 @@ func (c *Client) runAllocs(update *allocUpdates) { // prevents a race between a finishing blockForRemoteAlloc and // another invocation of runAllocs if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok { - c.migratingAllocs[add.ID] = make(chan struct{}) + c.migratingAllocs[add.ID] = newMigrateAllocCtrl() go c.blockForRemoteAlloc(add) } } @@ -1533,7 +1559,7 @@ ADDALLOC: // waitForAllocTerminal waits for an allocation with the given alloc id to // transition to terminal state and blocks the caller until then. -func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*structs.Allocation, error) { +func (c *Client) waitForAllocTerminal(allocID string, stopCh *migrateAllocCtrl) (*structs.Allocation, error) { req := structs.AllocSpecificRequest{ AllocID: allocID, QueryOptions: structs.QueryOptions{ @@ -1551,7 +1577,7 @@ func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*st select { case <-time.After(retry): continue - case <-stopCh: + case <-stopCh.ch: return nil, fmt.Errorf("giving up waiting on alloc %v since migration is not needed", allocID) case <-c.shutdownCh: return nil, fmt.Errorf("aborting because client is shutting down") @@ -1665,7 +1691,7 @@ func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAll for { // See if the alloc still needs migration select { - case <-stopMigrating: + case <-stopMigrating.ch: os.RemoveAll(pathToAllocDir) c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID) return nil