Skip to content

Commit

Permalink
Making the gc allocator understand real disk usage
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Dec 16, 2016
1 parent 79fdad8 commit 439f283
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 31 deletions.
7 changes: 5 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
tlsWrap = tw
}

statsCollector := stats.NewHostStatsCollector(logger)
statsCollector := stats.NewHostStatsCollector(logger, cfg.AllocDir)

// Create the client
c := &Client{
Expand All @@ -195,7 +195,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
garbageCollector: NewAllocGarbageCollector(logger, statsCollector),
garbageCollector: NewAllocGarbageCollector(logger, statsCollector, cfg.Node.Reserved.DiskMB),
}

// Initialize the client
Expand Down Expand Up @@ -372,6 +372,9 @@ func (c *Client) Shutdown() error {
c.vaultClient.Stop()
}

// Stop Garbage collector
c.garbageCollector.Stop()

// Destroy all the running allocations.
if c.config.DevMode {
c.allocLock.Lock()
Expand Down
125 changes: 117 additions & 8 deletions client/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// availableDiskPercent is the percent of disk that Nomad tries to keep free
availableDiskPercent = 0.2

// gcInterval is the interval at which Nomad runs the garbage collector
gcInterval = 5 * time.Second
)

type GCAlloc struct {
timeStamp time.Time
allocRunner *AllocRunner
Expand Down Expand Up @@ -54,6 +62,8 @@ func (pq *GCAllocPQImpl) Pop() interface{} {
type IndexedGCAllocPQ struct {
index map[string]*GCAlloc
heap GCAllocPQImpl

pqLock sync.Mutex
}

func NewIndexedGCAllocPQ() *IndexedGCAllocPQ {
Expand All @@ -64,6 +74,9 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ {
}

func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error {
i.pqLock.Lock()
defer i.pqLock.Unlock()

alloc := ar.Alloc()
if _, ok := i.index[alloc.ID]; ok {
return fmt.Errorf("alloc %v already being tracked for GC", alloc.ID)
Expand All @@ -78,6 +91,9 @@ func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error {
}

func (i *IndexedGCAllocPQ) Pop() *GCAlloc {
i.pqLock.Lock()
defer i.pqLock.Unlock()

if len(i.heap) == 0 {
return nil
}
Expand All @@ -88,6 +104,9 @@ func (i *IndexedGCAllocPQ) Pop() *GCAlloc {
}

func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) {
i.pqLock.Lock()
defer i.pqLock.Unlock()

if gcAlloc, ok := i.index[allocID]; ok {
heap.Remove(&i.heap, gcAlloc.index)
delete(i.index, allocID)
Expand All @@ -98,25 +117,90 @@ func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) {
}

func (i *IndexedGCAllocPQ) Length() int {
i.pqLock.Lock()
defer i.pqLock.Unlock()

return len(i.heap)
}

// AllocGarbageCollector garbage collects terminated allocations on a node
type AllocGarbageCollector struct {
allocRunners *IndexedGCAllocPQ
allocsLock sync.Mutex
statsCollector *stats.HostStatsCollector
reservedDiskMB int
logger *log.Logger
shutdownCh chan struct{}
}

// NewAllocGarbageCollector returns a garbage collector for terminated
// allocations on a node.
func NewAllocGarbageCollector(logger *log.Logger, statsCollector *stats.HostStatsCollector) *AllocGarbageCollector {
return &AllocGarbageCollector{
func NewAllocGarbageCollector(logger *log.Logger, statsCollector *stats.HostStatsCollector, reservedDiskMB int) *AllocGarbageCollector {
gc := &AllocGarbageCollector{
allocRunners: NewIndexedGCAllocPQ(),
statsCollector: statsCollector,
reservedDiskMB: reservedDiskMB,
logger: logger,
shutdownCh: make(chan struct{}),
}
go gc.run()

return gc
}

func (a *AllocGarbageCollector) run() {
ticker := time.NewTicker(gcInterval)
for {
select {
case <-ticker.C:
if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client: error GCing allocation: %v", err)
}
case <-a.shutdownCh:
ticker.Stop()
return
}
}
}

// keepUsageBelowThreshold collects disk usage information and garbage collects
// allocations to make disk space available.
func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
for {
// Check if we have enough free space
err := a.statsCollector.Collect()
if err != nil {
return err
}

// See if we have enough available disk space
diskStats := a.statsCollector.Stats().AllocDirStats
freeSpaceNeeded := float64(diskStats.Available-uint64(a.reservedDiskMB*1024*1024)) * 0.2
if diskStats.Available >= uint64(freeSpaceNeeded) {
break
}

// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
break
}

ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)

// Destroy the alloc runner and wait until it exits
ar.Destroy()
select {
case <-ar.WaitCh():
case <-a.shutdownCh:
}
}
return nil
}

func (a *AllocGarbageCollector) Stop() {
a.shutdownCh <- struct{}{}
}

// Collect garbage collects a single allocation on a node
Expand Down Expand Up @@ -157,8 +241,30 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
}
}

// If the host has enough free space to accomodate the new allocations then
// we don't need to garbage collect terminated allocations
hostStats := a.statsCollector.Stats()
if hostStats != nil && uint64(totalResource.DiskMB*1024*1024) < hostStats.AllocDirStats.Available {
return nil
}

var diskCleared int
for {
// Collect host stats and see if we still need to remove older
// allocations
if err := a.statsCollector.Collect(); err == nil {
hostStats := a.statsCollector.Stats()
if hostStats.AllocDirStats.Available >= uint64(totalResource.DiskMB*1024*1024) {
break
}
} else {
// Falling back to a simpler model to know if we have enough disk
// space if stats collection fails
if diskCleared >= totalResource.DiskMB {
break
}
}

gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
break
Expand All @@ -167,11 +273,16 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e
ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)

// Destroy the alloc runner and wait until it exits
ar.Destroy()
diskCleared += alloc.Resources.DiskMB
if diskCleared >= totalResource.DiskMB {
break
select {
case <-ar.WaitCh():
case <-a.shutdownCh:
}

// Call stats collect again
diskCleared += alloc.Resources.DiskMB
}
return nil
}
Expand All @@ -187,7 +298,5 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error {
}

a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID)
a.allocsLock.Lock()
defer a.allocsLock.Unlock()
return a.allocRunners.Push(ar)
}
8 changes: 4 additions & 4 deletions client/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ func TestIndexedGCAllocPQ(t *testing.T) {
pq.Push(ar3)
pq.Push(ar4)

allocID := pq.Pop().alloc.Alloc().ID
allocID := pq.Pop().allocRunner.Alloc().ID
if allocID != ar1.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}

allocID = pq.Pop().alloc.Alloc().ID
allocID = pq.Pop().allocRunner.Alloc().ID
if allocID != ar2.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}

allocID = pq.Pop().alloc.Alloc().ID
allocID = pq.Pop().allocRunner.Alloc().ID
if allocID != ar3.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}

allocID = pq.Pop().alloc.Alloc().ID
allocID = pq.Pop().allocRunner.Alloc().ID
if allocID != ar4.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}
Expand Down
57 changes: 40 additions & 17 deletions client/stats/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type HostStats struct {
Memory *MemoryStats
CPU []*CPUStats
DiskStats []*DiskStats
AllocDirStats *DiskStats
Uptime uint64
Timestamp int64
CPUTicksConsumed float64
Expand Down Expand Up @@ -61,16 +62,18 @@ type HostStatsCollector struct {
logger *log.Logger
hostStats *HostStats
hostStatsLock sync.RWMutex
allocDir string
}

// NewHostStatsCollector returns a HostStatsCollector
func NewHostStatsCollector(logger *log.Logger) *HostStatsCollector {
func NewHostStatsCollector(logger *log.Logger, allocDir string) *HostStatsCollector {
numCores := runtime.NumCPU()
statsCalculator := make(map[string]*HostCpuStatsCalculator)
collector := &HostStatsCollector{
statsCalculator: statsCalculator,
numCores: numCores,
logger: logger,
allocDir: allocDir,
}
return collector
}
Expand Down Expand Up @@ -125,25 +128,17 @@ func (h *HostStatsCollector) Collect() error {
h.logger.Printf("[WARN] client: error fetching host disk usage stats for %v: %v", partition.Mountpoint, err)
continue
}
ds := DiskStats{
Device: partition.Device,
Mountpoint: partition.Mountpoint,
Size: usage.Total,
Used: usage.Used,
Available: usage.Free,
UsedPercent: usage.UsedPercent,
InodesUsedPercent: usage.InodesUsedPercent,
}
if math.IsNaN(ds.UsedPercent) {
ds.UsedPercent = 0.0
}
if math.IsNaN(ds.InodesUsedPercent) {
ds.InodesUsedPercent = 0.0
}
diskStats = append(diskStats, &ds)
ds := h.toDiskStats(usage, &partition)
diskStats = append(diskStats, ds)
}
hs.DiskStats = diskStats

usage, err := disk.Usage(h.allocDir)
if err != nil {
return err
}
hs.AllocDirStats = h.toDiskStats(usage, nil)

uptime, err := host.Uptime()
if err != nil {
return err
Expand All @@ -156,12 +151,40 @@ func (h *HostStatsCollector) Collect() error {
return nil
}

// Stats returns the host stats that has been collected
func (h *HostStatsCollector) Stats() *HostStats {
h.hostStatsLock.RLock()
defer h.hostStatsLock.RUnlock()
return h.hostStats
}

// toDiskStats merges UsageStat and PartitionStat to create a DiskStat
func (h *HostStatsCollector) toDiskStats(usage *disk.UsageStat, partitionStat *disk.PartitionStat) *DiskStats {
if usage == nil {
return nil
}
ds := DiskStats{
Size: usage.Total,
Used: usage.Used,
Available: usage.Free,
UsedPercent: usage.UsedPercent,
InodesUsedPercent: usage.InodesUsedPercent,
}
if math.IsNaN(ds.UsedPercent) {
ds.UsedPercent = 0.0
}
if math.IsNaN(ds.InodesUsedPercent) {
ds.InodesUsedPercent = 0.0
}

if partitionStat != nil {
ds.Device = partitionStat.Device
ds.Mountpoint = partitionStat.Mountpoint
}

return &ds
}

// HostCpuStatsCalculator calculates cpu usage percentages
type HostCpuStatsCalculator struct {
prevIdle float64
Expand Down

0 comments on commit 439f283

Please sign in to comment.