Skip to content

Commit

Permalink
Making the gc allocator understand real disk usage
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Dec 17, 2016
1 parent 79fdad8 commit 61e534d
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 34 deletions.
12 changes: 8 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,13 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
tlsWrap = tw
}

statsCollector := stats.NewHostStatsCollector(logger)

// Create the client
c := &Client{
config: cfg,
consulSyncer: consulSyncer,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
logger: logger,
hostStatsCollector: statsCollector,
allocs: make(map[string]*AllocRunner),
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
Expand All @@ -195,14 +192,18 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
garbageCollector: NewAllocGarbageCollector(logger, statsCollector),
}

// Initialize the client
if err := c.init(); err != nil {
return nil, fmt.Errorf("failed to initialize client: %v", err)
}

// Add the stats collector and the garbage collector
statsCollector := stats.NewHostStatsCollector(logger, c.config.AllocDir)
c.hostStatsCollector = statsCollector
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, cfg.Node.Reserved.DiskMB)

// Setup the node
if err := c.setupNode(); err != nil {
return nil, fmt.Errorf("node setup failed: %v", err)
Expand Down Expand Up @@ -372,6 +373,9 @@ func (c *Client) Shutdown() error {
c.vaultClient.Stop()
}

// Stop Garbage collector
c.garbageCollector.Stop()

// Destroy all the running allocations.
if c.config.DevMode {
c.allocLock.Lock()
Expand Down
133 changes: 124 additions & 9 deletions client/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// diskUsageThreshold is the percent of used disk space beyond which Nomad
// GCs terminated allocations
diskUsageThreshold = 80

// gcInterval is the interval at which Nomad runs the garbage collector
gcInterval = 1 * time.Minute

// inodeUsageThreshold is the percent of inode usage that Nomad tries to
// maintain, whenever we are over it we will attempt to GC terminal
// allocations
inodeUsageThreshold = 70
)

type GCAlloc struct {
timeStamp time.Time
allocRunner *AllocRunner
Expand Down Expand Up @@ -54,6 +68,8 @@ func (pq *GCAllocPQImpl) Pop() interface{} {
type IndexedGCAllocPQ struct {
index map[string]*GCAlloc
heap GCAllocPQImpl

pqLock sync.Mutex
}

func NewIndexedGCAllocPQ() *IndexedGCAllocPQ {
Expand All @@ -64,6 +80,9 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ {
}

func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error {
i.pqLock.Lock()
defer i.pqLock.Unlock()

alloc := ar.Alloc()
if _, ok := i.index[alloc.ID]; ok {
return fmt.Errorf("alloc %v already being tracked for GC", alloc.ID)
Expand All @@ -78,6 +97,9 @@ func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error {
}

func (i *IndexedGCAllocPQ) Pop() *GCAlloc {
i.pqLock.Lock()
defer i.pqLock.Unlock()

if len(i.heap) == 0 {
return nil
}
Expand All @@ -88,6 +110,9 @@ func (i *IndexedGCAllocPQ) Pop() *GCAlloc {
}

func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) {
i.pqLock.Lock()
defer i.pqLock.Unlock()

if gcAlloc, ok := i.index[allocID]; ok {
heap.Remove(&i.heap, gcAlloc.index)
delete(i.index, allocID)
Expand All @@ -98,25 +123,90 @@ func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) {
}

func (i *IndexedGCAllocPQ) Length() int {
i.pqLock.Lock()
defer i.pqLock.Unlock()

return len(i.heap)
}

// AllocGarbageCollector garbage collects terminated allocations on a node
type AllocGarbageCollector struct {
allocRunners *IndexedGCAllocPQ
allocsLock sync.Mutex
statsCollector *stats.HostStatsCollector
statsCollector stats.NodeStatsCollector
reservedDiskMB int
logger *log.Logger
shutdownCh chan struct{}
}

// NewAllocGarbageCollector returns a garbage collector for terminated
// allocations on a node.
func NewAllocGarbageCollector(logger *log.Logger, statsCollector *stats.HostStatsCollector) *AllocGarbageCollector {
return &AllocGarbageCollector{
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, reservedDiskMB int) *AllocGarbageCollector {
gc := &AllocGarbageCollector{
allocRunners: NewIndexedGCAllocPQ(),
statsCollector: statsCollector,
reservedDiskMB: reservedDiskMB,
logger: logger,
shutdownCh: make(chan struct{}),
}
go gc.run()

return gc
}

func (a *AllocGarbageCollector) run() {
ticker := time.NewTicker(gcInterval)
for {
select {
case <-ticker.C:
if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client: error GCing allocation: %v", err)
}
case <-a.shutdownCh:
ticker.Stop()
return
}
}
}

// keepUsageBelowThreshold collects disk usage information and garbage collects
// allocations to make disk space available.
func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
for {
// Check if we have enough free space
err := a.statsCollector.Collect()
if err != nil {
return err
}

// See if we are below thresholds for used disk space and inode usage
diskStats := a.statsCollector.Stats().AllocDirStats
if diskStats.UsedPercent <= diskUsageThreshold &&
diskStats.InodesUsedPercent <= inodeUsageThreshold {
break
}

// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
break
}

ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)

// Destroy the alloc runner and wait until it exits
ar.Destroy()
select {
case <-ar.WaitCh():
case <-a.shutdownCh:
}
}
return nil
}

func (a *AllocGarbageCollector) Stop() {
a.shutdownCh <- struct{}{}
}

// Collect garbage collects a single allocation on a node
Expand Down Expand Up @@ -157,8 +247,30 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
}
}

// If the host has enough free space to accomodate the new allocations then
// we don't need to garbage collect terminated allocations
hostStats := a.statsCollector.Stats()
if hostStats != nil && uint64(totalResource.DiskMB*1024*1024) < hostStats.AllocDirStats.Available {
return nil
}

var diskCleared int
for {
// Collect host stats and see if we still need to remove older
// allocations
if err := a.statsCollector.Collect(); err == nil {
hostStats := a.statsCollector.Stats()
if hostStats.AllocDirStats.Available >= uint64(totalResource.DiskMB*1024*1024) {
break
}
} else {
// Falling back to a simpler model to know if we have enough disk
// space if stats collection fails
if diskCleared >= totalResource.DiskMB {
break
}
}

gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
break
Expand All @@ -167,11 +279,16 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)

// Destroy the alloc runner and wait until it exits
ar.Destroy()
diskCleared += alloc.Resources.DiskMB
if diskCleared >= totalResource.DiskMB {
break
select {
case <-ar.WaitCh():
case <-a.shutdownCh:
}

// Call stats collect again
diskCleared += alloc.Resources.DiskMB
}
return nil
}
Expand All @@ -187,7 +304,5 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error {
}

a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID)
a.allocsLock.Lock()
defer a.allocsLock.Unlock()
return a.allocRunners.Push(ar)
}
8 changes: 4 additions & 4 deletions client/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ func TestIndexedGCAllocPQ(t *testing.T) {
pq.Push(ar3)
pq.Push(ar4)

allocID := pq.Pop().alloc.Alloc().ID
allocID := pq.Pop().allocRunner.Alloc().ID
if allocID != ar1.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}

allocID = pq.Pop().alloc.Alloc().ID
allocID = pq.Pop().allocRunner.Alloc().ID
if allocID != ar2.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}

allocID = pq.Pop().alloc.Alloc().ID
allocID = pq.Pop().allocRunner.Alloc().ID
if allocID != ar3.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}

allocID = pq.Pop().alloc.Alloc().ID
allocID = pq.Pop().allocRunner.Alloc().ID
if allocID != ar4.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}
Expand Down
Loading

0 comments on commit 61e534d

Please sign in to comment.