Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Garbage collector for allocations #2081

Merged
merged 9 commits into from
Dec 20, 2016
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,6 @@ type Client struct {

// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
resourceUsage *stats.HostStats
resourceUsageLock sync.RWMutex

shutdown bool
shutdownCh chan struct{}
Expand All @@ -154,6 +152,10 @@ type Client struct {
// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]chan struct{}
migratingAllocsLock sync.Mutex

// garbageCollector is used to garbage collect terminal allocations present
// in the node automatically
garbageCollector *AllocGarbageCollector
}

var (
Expand Down Expand Up @@ -182,7 +184,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(logger),
allocs: make(map[string]*AllocRunner),
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
Expand All @@ -198,6 +199,11 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
return nil, fmt.Errorf("failed to initialize client: %v", err)
}

// Add the stats collector and the garbage collector
statsCollector := stats.NewHostStatsCollector(logger, c.config.AllocDir)
c.hostStatsCollector = statsCollector
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, cfg.Node.Reserved.DiskMB)

// Setup the node
if err := c.setupNode(); err != nil {
return nil, fmt.Errorf("node setup failed: %v", err)
Expand Down Expand Up @@ -367,6 +373,9 @@ func (c *Client) Shutdown() error {
c.vaultClient.Stop()
}

// Stop Garbage collector
c.garbageCollector.Stop()

// Destroy all the running allocations.
if c.config.DevMode {
c.allocLock.Lock()
Expand Down Expand Up @@ -434,6 +443,23 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}

// CollectAllocation garbage collects a single allocation
func (c *Client) CollectAllocation(allocID string) error {
if err := c.garbageCollector.Collect(allocID); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return c.garbageCollector.Collect(allocID)

And below. Or wrap the errors

return err
}
return nil
}

// CollectAllAllocs garbage collects all allocations on a node in the terminal
// state
func (c *Client) CollectAllAllocs() error {
if err := c.garbageCollector.CollectAll(); err != nil {
return err
}
return nil
}

// Node returns the locally registered node
func (c *Client) Node() *structs.Node {
c.configLock.RLock()
Expand All @@ -459,9 +485,7 @@ func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) {

// HostStats returns all the stats related to a Nomad client
func (c *Client) LatestHostStats() *stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
return c.resourceUsage
return c.hostStatsCollector.Stats()
}

// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
Expand Down Expand Up @@ -1088,6 +1112,15 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
}
c.blockedAllocsLock.Unlock()

// Mark the allocation for GC if it is in terminal state
if alloc.Terminated() {
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
if err := c.garbageCollector.MarkForCollection(ar); err != nil {
c.logger.Printf("[DEBUG] client: couldn't add alloc %v for GC: %v", alloc.ID, err)
}
}
}

// Strip all the information that can be reconstructed at the server. Only
// send the fields that are updatable by the client.
stripped := new(structs.Allocation)
Expand Down Expand Up @@ -1761,6 +1794,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
return nil
}

// Make room for the allocation
if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
c.logger.Printf("[ERR] client: error making room for allocation: %v", err)
}

c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar.SetPreviousAllocDir(prevAllocDir)
Expand Down Expand Up @@ -2068,20 +2106,16 @@ func (c *Client) collectHostStats() {
for {
select {
case <-next.C:
ru, err := c.hostStatsCollector.Collect()
err := c.hostStatsCollector.Collect()
next.Reset(c.config.StatsCollectionInterval)
if err != nil {
c.logger.Printf("[WARN] client: error fetching host resource usage stats: %v", err)
continue
}

c.resourceUsageLock.Lock()
c.resourceUsage = ru
c.resourceUsageLock.Unlock()

// Publish Node metrics if operator has opted in
if c.config.PublishNodeMetrics {
c.emitStats(ru)
c.emitStats(c.hostStatsCollector.Stats())
}
case <-c.shutdownCh:
return
Expand Down
Loading