Skip to content

Commit

Permalink
Merge pull request #2081 from hashicorp/f-gc
Browse files Browse the repository at this point in the history
Garbage collector for allocations
  • Loading branch information
diptanu authored Dec 20, 2016
2 parents 4b14845 + 8ba749a commit dabb8de
Show file tree
Hide file tree
Showing 7 changed files with 820 additions and 36 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ BUG FIXES:

## 0.5.2 (Unreleased)

IMPROVEMENTS:
* client: Garbage collect Allocation Runners to free up disk resouces
[GH-2081]

BUG FIXES:
* client: Fixed a race condition and remove panic when handling duplicate
allocations [GH-2096]
Expand Down
55 changes: 43 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,6 @@ type Client struct {

// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
resourceUsage *stats.HostStats
resourceUsageLock sync.RWMutex

shutdown bool
shutdownCh chan struct{}
Expand All @@ -154,6 +152,10 @@ type Client struct {
// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]chan struct{}
migratingAllocsLock sync.Mutex

// garbageCollector is used to garbage collect terminal allocations present
// in the node automatically
garbageCollector *AllocGarbageCollector
}

var (
Expand Down Expand Up @@ -182,7 +184,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(logger),
allocs: make(map[string]*AllocRunner),
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
Expand All @@ -198,6 +199,11 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
return nil, fmt.Errorf("failed to initialize client: %v", err)
}

// Add the stats collector and the garbage collector
statsCollector := stats.NewHostStatsCollector(logger, c.config.AllocDir)
c.hostStatsCollector = statsCollector
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, cfg.Node.Reserved.DiskMB)

// Setup the node
if err := c.setupNode(); err != nil {
return nil, fmt.Errorf("node setup failed: %v", err)
Expand Down Expand Up @@ -367,6 +373,9 @@ func (c *Client) Shutdown() error {
c.vaultClient.Stop()
}

// Stop Garbage collector
c.garbageCollector.Stop()

// Destroy all the running allocations.
if c.config.DevMode {
c.allocLock.Lock()
Expand Down Expand Up @@ -434,6 +443,17 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}

// CollectAllocation garbage collects a single allocation
func (c *Client) CollectAllocation(allocID string) error {
return c.garbageCollector.Collect(allocID)
}

// CollectAllAllocs garbage collects all allocations on a node in the terminal
// state
func (c *Client) CollectAllAllocs() error {
return c.garbageCollector.CollectAll()
}

// Node returns the locally registered node
func (c *Client) Node() *structs.Node {
c.configLock.RLock()
Expand All @@ -459,9 +479,7 @@ func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) {

// HostStats returns all the stats related to a Nomad client
func (c *Client) LatestHostStats() *stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
return c.resourceUsage
return c.hostStatsCollector.Stats()
}

// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
Expand Down Expand Up @@ -1088,6 +1106,15 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
}
c.blockedAllocsLock.Unlock()

// Mark the allocation for GC if it is in terminal state
if alloc.Terminated() {
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
if err := c.garbageCollector.MarkForCollection(ar); err != nil {
c.logger.Printf("[DEBUG] client: couldn't add alloc %v for GC: %v", alloc.ID, err)
}
}
}

// Strip all the information that can be reconstructed at the server. Only
// send the fields that are updatable by the client.
stripped := new(structs.Allocation)
Expand Down Expand Up @@ -1732,6 +1759,9 @@ func (c *Client) removeAlloc(alloc *structs.Allocation) error {
delete(c.allocs, alloc.ID)
c.allocLock.Unlock()

// Remove the allocrunner from garbage collector
c.garbageCollector.Remove(ar)

ar.Destroy()
return nil
}
Expand Down Expand Up @@ -1761,6 +1791,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
return nil
}

// Make room for the allocation
if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
c.logger.Printf("[ERR] client: error making room for allocation: %v", err)
}

c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar.SetPreviousAllocDir(prevAllocDir)
Expand Down Expand Up @@ -2068,20 +2103,16 @@ func (c *Client) collectHostStats() {
for {
select {
case <-next.C:
ru, err := c.hostStatsCollector.Collect()
err := c.hostStatsCollector.Collect()
next.Reset(c.config.StatsCollectionInterval)
if err != nil {
c.logger.Printf("[WARN] client: error fetching host resource usage stats: %v", err)
continue
}

c.resourceUsageLock.Lock()
c.resourceUsage = ru
c.resourceUsageLock.Unlock()

// Publish Node metrics if operator has opted in
if c.config.PublishNodeMetrics {
c.emitStats(ru)
c.emitStats(c.hostStatsCollector.Stats())
}
case <-c.shutdownCh:
return
Expand Down
Loading

0 comments on commit dabb8de

Please sign in to comment.