diff --git a/CHANGELOG.md b/CHANGELOG.md index fd27f461f77..9f689b19a59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 0.5.2 (Unreleased) +IMPROVEMENTS: + * client: Garbage collect Allocation Runners to free up disk resouces + [GH-2081] + BUG FIXES: * client: Fixed a race condition and remove panic when handling duplicate allocations [GH-2096] diff --git a/client/client.go b/client/client.go index 4f8c4832029..18fbb4ae803 100644 --- a/client/client.go +++ b/client/client.go @@ -141,8 +141,6 @@ type Client struct { // HostStatsCollector collects host resource usage stats hostStatsCollector *stats.HostStatsCollector - resourceUsage *stats.HostStats - resourceUsageLock sync.RWMutex shutdown bool shutdownCh chan struct{} @@ -154,6 +152,10 @@ type Client struct { // migratingAllocs is the set of allocs whose data migration is in flight migratingAllocs map[string]chan struct{} migratingAllocsLock sync.Mutex + + // garbageCollector is used to garbage collect terminal allocations present + // in the node automatically + garbageCollector *AllocGarbageCollector } var ( @@ -182,7 +184,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg start: time.Now(), connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), logger: logger, - hostStatsCollector: stats.NewHostStatsCollector(logger), allocs: make(map[string]*AllocRunner), blockedAllocations: make(map[string]*structs.Allocation), allocUpdates: make(chan *structs.Allocation, 64), @@ -198,6 +199,11 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg return nil, fmt.Errorf("failed to initialize client: %v", err) } + // Add the stats collector and the garbage collector + statsCollector := stats.NewHostStatsCollector(logger, c.config.AllocDir) + c.hostStatsCollector = statsCollector + c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, cfg.Node.Reserved.DiskMB) + // Setup the node if err := c.setupNode(); err != nil { return nil, fmt.Errorf("node setup failed: %v", err) @@ -367,6 +373,9 @@ func (c *Client) Shutdown() error { c.vaultClient.Stop() } + // Stop Garbage collector + c.garbageCollector.Stop() + // Destroy all the running allocations. if c.config.DevMode { c.allocLock.Lock() @@ -434,6 +443,17 @@ func (c *Client) Stats() map[string]map[string]string { return stats } +// CollectAllocation garbage collects a single allocation +func (c *Client) CollectAllocation(allocID string) error { + return c.garbageCollector.Collect(allocID) +} + +// CollectAllAllocs garbage collects all allocations on a node in the terminal +// state +func (c *Client) CollectAllAllocs() error { + return c.garbageCollector.CollectAll() +} + // Node returns the locally registered node func (c *Client) Node() *structs.Node { c.configLock.RLock() @@ -459,9 +479,7 @@ func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) { // HostStats returns all the stats related to a Nomad client func (c *Client) LatestHostStats() *stats.HostStats { - c.resourceUsageLock.RLock() - defer c.resourceUsageLock.RUnlock() - return c.resourceUsage + return c.hostStatsCollector.Stats() } // GetAllocFS returns the AllocFS interface for the alloc dir of an allocation @@ -1088,6 +1106,15 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) { } c.blockedAllocsLock.Unlock() + // Mark the allocation for GC if it is in terminal state + if alloc.Terminated() { + if ar, ok := c.getAllocRunners()[alloc.ID]; ok { + if err := c.garbageCollector.MarkForCollection(ar); err != nil { + c.logger.Printf("[DEBUG] client: couldn't add alloc %v for GC: %v", alloc.ID, err) + } + } + } + // Strip all the information that can be reconstructed at the server. Only // send the fields that are updatable by the client. stripped := new(structs.Allocation) @@ -1732,6 +1759,9 @@ func (c *Client) removeAlloc(alloc *structs.Allocation) error { delete(c.allocs, alloc.ID) c.allocLock.Unlock() + // Remove the allocrunner from garbage collector + c.garbageCollector.Remove(ar) + ar.Destroy() return nil } @@ -1761,6 +1791,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo return nil } + // Make room for the allocation + if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + c.logger.Printf("[ERR] client: error making room for allocation: %v", err) + } + c.configLock.RLock() ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient) ar.SetPreviousAllocDir(prevAllocDir) @@ -2068,20 +2103,16 @@ func (c *Client) collectHostStats() { for { select { case <-next.C: - ru, err := c.hostStatsCollector.Collect() + err := c.hostStatsCollector.Collect() next.Reset(c.config.StatsCollectionInterval) if err != nil { c.logger.Printf("[WARN] client: error fetching host resource usage stats: %v", err) continue } - c.resourceUsageLock.Lock() - c.resourceUsage = ru - c.resourceUsageLock.Unlock() - // Publish Node metrics if operator has opted in if c.config.PublishNodeMetrics { - c.emitStats(ru) + c.emitStats(c.hostStatsCollector.Stats()) } case <-c.shutdownCh: return diff --git a/client/gc.go b/client/gc.go new file mode 100644 index 00000000000..0e41d0950e4 --- /dev/null +++ b/client/gc.go @@ -0,0 +1,343 @@ +package client + +import ( + "container/heap" + "fmt" + "log" + "sync" + "time" + + "github.com/hashicorp/nomad/client/stats" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // diskUsageThreshold is the percent of used disk space beyond which Nomad + // GCs terminated allocations + diskUsageThreshold = 80 + + // gcInterval is the interval at which Nomad runs the garbage collector + gcInterval = 1 * time.Minute + + // inodeUsageThreshold is the percent of inode usage that Nomad tries to + // maintain, whenever we are over it we will attempt to GC terminal + // allocations + inodeUsageThreshold = 70 + + // MB is a constant which converts values in bytes to MB + MB = 1024 * 1024 +) + +// GCAlloc wraps an allocation runner and an index enabling it to be used within +// a PQ +type GCAlloc struct { + timeStamp time.Time + allocRunner *AllocRunner + index int +} + +type GCAllocPQImpl []*GCAlloc + +func (pq GCAllocPQImpl) Len() int { + return len(pq) +} + +func (pq GCAllocPQImpl) Less(i, j int) bool { + return pq[i].timeStamp.Before(pq[j].timeStamp) +} + +func (pq GCAllocPQImpl) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *GCAllocPQImpl) Push(x interface{}) { + n := len(*pq) + item := x.(*GCAlloc) + item.index = n + *pq = append(*pq, item) +} + +func (pq *GCAllocPQImpl) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// IndexedGCAllocPQ is an indexed PQ which maintains a list of allocation runner +// based on their termination time. +type IndexedGCAllocPQ struct { + index map[string]*GCAlloc + heap GCAllocPQImpl + + pqLock sync.Mutex +} + +func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { + return &IndexedGCAllocPQ{ + index: make(map[string]*GCAlloc), + heap: make(GCAllocPQImpl, 0), + } +} + +func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { + i.pqLock.Lock() + defer i.pqLock.Unlock() + + alloc := ar.Alloc() + if _, ok := i.index[alloc.ID]; ok { + return fmt.Errorf("alloc %v already being tracked for GC", alloc.ID) + } + gcAlloc := &GCAlloc{ + timeStamp: time.Now(), + allocRunner: ar, + } + i.index[alloc.ID] = gcAlloc + heap.Push(&i.heap, gcAlloc) + return nil +} + +func (i *IndexedGCAllocPQ) Pop() *GCAlloc { + i.pqLock.Lock() + defer i.pqLock.Unlock() + + if len(i.heap) == 0 { + return nil + } + + gcAlloc := heap.Pop(&i.heap).(*GCAlloc) + delete(i.index, gcAlloc.allocRunner.Alloc().ID) + return gcAlloc +} + +func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) { + i.pqLock.Lock() + defer i.pqLock.Unlock() + + if gcAlloc, ok := i.index[allocID]; ok { + heap.Remove(&i.heap, gcAlloc.index) + delete(i.index, allocID) + return gcAlloc, nil + } + + return nil, fmt.Errorf("alloc %q not present", allocID) +} + +func (i *IndexedGCAllocPQ) Length() int { + i.pqLock.Lock() + defer i.pqLock.Unlock() + + return len(i.heap) +} + +// AllocGarbageCollector garbage collects terminated allocations on a node +type AllocGarbageCollector struct { + allocRunners *IndexedGCAllocPQ + statsCollector stats.NodeStatsCollector + reservedDiskMB int + logger *log.Logger + shutdownCh chan struct{} +} + +// NewAllocGarbageCollector returns a garbage collector for terminated +// allocations on a node. +func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, reservedDiskMB int) *AllocGarbageCollector { + gc := &AllocGarbageCollector{ + allocRunners: NewIndexedGCAllocPQ(), + statsCollector: statsCollector, + reservedDiskMB: reservedDiskMB, + logger: logger, + shutdownCh: make(chan struct{}), + } + go gc.run() + + return gc +} + +func (a *AllocGarbageCollector) run() { + ticker := time.NewTicker(gcInterval) + for { + select { + case <-ticker.C: + if err := a.keepUsageBelowThreshold(); err != nil { + a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err) + } + case <-a.shutdownCh: + ticker.Stop() + return + } + } +} + +// keepUsageBelowThreshold collects disk usage information and garbage collects +// allocations to make disk space available. +func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { + for { + // Check if we have enough free space + err := a.statsCollector.Collect() + if err != nil { + return err + } + + // See if we are below thresholds for used disk space and inode usage + diskStats := a.statsCollector.Stats().AllocDirStats + + if diskStats == nil { + break + } + + if diskStats.UsedPercent <= diskUsageThreshold && + diskStats.InodesUsedPercent <= inodeUsageThreshold { + break + } + + // Collect an allocation + gcAlloc := a.allocRunners.Pop() + if gcAlloc == nil { + break + } + + ar := gcAlloc.allocRunner + alloc := ar.Alloc() + a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) + + // Destroy the alloc runner and wait until it exits + ar.Destroy() + select { + case <-ar.WaitCh(): + case <-a.shutdownCh: + } + } + return nil +} + +func (a *AllocGarbageCollector) Stop() { + close(a.shutdownCh) +} + +// Collect garbage collects a single allocation on a node +func (a *AllocGarbageCollector) Collect(allocID string) error { + gcAlloc, err := a.allocRunners.Remove(allocID) + if err != nil { + return fmt.Errorf("unable to collect allocation %q: %v", allocID, err) + } + + ar := gcAlloc.allocRunner + a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID) + ar.Destroy() + + return nil +} + +// CollectAll garbage collects all termianated allocations on a node +func (a *AllocGarbageCollector) CollectAll() error { + for { + gcAlloc := a.allocRunners.Pop() + if gcAlloc == nil { + break + } + ar := gcAlloc.allocRunner + a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID) + ar.Destroy() + } + return nil +} + +// MakeRoomFor garbage collects enough number of allocations in the terminal +// state to make room for new allocations +func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error { + totalResource := &structs.Resources{} + for _, alloc := range allocations { + if err := totalResource.Add(alloc.Resources); err != nil { + return err + } + } + + // If the host has enough free space to accomodate the new allocations then + // we don't need to garbage collect terminated allocations + if hostStats := a.statsCollector.Stats(); hostStats != nil { + var availableForAllocations uint64 + if hostStats.AllocDirStats.Available < uint64(a.reservedDiskMB*MB) { + availableForAllocations = 0 + } else { + availableForAllocations = hostStats.AllocDirStats.Available - uint64(a.reservedDiskMB*MB) + } + if uint64(totalResource.DiskMB*MB) < availableForAllocations { + return nil + } + } + + var diskCleared int + for { + // Collect host stats and see if we still need to remove older + // allocations + var allocDirStats *stats.DiskStats + if err := a.statsCollector.Collect(); err == nil { + if hostStats := a.statsCollector.Stats(); hostStats != nil { + allocDirStats = hostStats.AllocDirStats + } + } + + if allocDirStats != nil { + if allocDirStats.Available >= uint64(totalResource.DiskMB*MB) { + break + } + } else { + // Falling back to a simpler model to know if we have enough disk + // space if stats collection fails + if diskCleared >= totalResource.DiskMB { + break + } + } + + gcAlloc := a.allocRunners.Pop() + if gcAlloc == nil { + break + } + + ar := gcAlloc.allocRunner + alloc := ar.Alloc() + a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) + + // Destroy the alloc runner and wait until it exits + ar.Destroy() + select { + case <-ar.WaitCh(): + case <-a.shutdownCh: + } + + // Call stats collect again + diskCleared += alloc.Resources.DiskMB + } + return nil +} + +// MarkForCollection starts tracking an allocation for Garbage Collection +func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error { + if ar == nil { + return fmt.Errorf("nil allocation runner inserted for garbage collection") + } + if ar.Alloc() == nil { + a.logger.Printf("[INFO] client: alloc is nil, so garbage collecting") + ar.Destroy() + } + + a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) + return a.allocRunners.Push(ar) +} + +// Remove removes an alloc runner without garbage collecting it +func (a *AllocGarbageCollector) Remove(ar *AllocRunner) { + if ar == nil || ar.Alloc() == nil { + return + } + + alloc := ar.Alloc() + if _, err := a.allocRunners.Remove(alloc.ID); err == nil { + a.logger.Printf("[INFO] client: removed alloc runner %v from garbage collector", alloc.ID) + } +} diff --git a/client/gc_test.go b/client/gc_test.go new file mode 100644 index 00000000000..8ae97a3cc6e --- /dev/null +++ b/client/gc_test.go @@ -0,0 +1,347 @@ +package client + +import ( + "log" + "os" + "testing" + + "github.com/hashicorp/nomad/client/stats" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestIndexedGCAllocPQ(t *testing.T) { + pq := NewIndexedGCAllocPQ() + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar3 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar4 := testAllocRunnerFromAlloc(mock.Alloc(), false) + + pq.Push(ar1) + pq.Push(ar2) + pq.Push(ar3) + pq.Push(ar4) + + allocID := pq.Pop().allocRunner.Alloc().ID + if allocID != ar1.Alloc().ID { + t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) + } + + allocID = pq.Pop().allocRunner.Alloc().ID + if allocID != ar2.Alloc().ID { + t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) + } + + allocID = pq.Pop().allocRunner.Alloc().ID + if allocID != ar3.Alloc().ID { + t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) + } + + allocID = pq.Pop().allocRunner.Alloc().ID + if allocID != ar4.Alloc().ID { + t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) + } + + gcAlloc := pq.Pop() + if gcAlloc != nil { + t.Fatalf("expected nil, got %v", gcAlloc) + } +} + +type MockStatsCollector struct { + availableValues []uint64 + usedPercents []float64 + inodePercents []float64 + index int +} + +func (m *MockStatsCollector) Collect() error { + return nil +} + +func (m *MockStatsCollector) Stats() *stats.HostStats { + if len(m.availableValues) == 0 { + return nil + } + + available := m.availableValues[m.index] + usedPercent := m.usedPercents[m.index] + inodePercent := m.inodePercents[m.index] + + if m.index < len(m.availableValues)-1 { + m.index = m.index + 1 + } + return &stats.HostStats{ + AllocDirStats: &stats.DiskStats{ + Available: available, + UsedPercent: usedPercent, + InodesUsedPercent: inodePercent, + }, + } +} + +func TestAllocGarbageCollector_MarkForCollection(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, 0) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + + gcAlloc := gc.allocRunners.Pop() + if gcAlloc == nil || gcAlloc.allocRunner != ar1 { + t.Fatalf("bad gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_Collect(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, 0) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + if err := gc.Collect(ar1.Alloc().ID); err != nil { + t.Fatalf("err: %v", err) + } + gcAlloc := gc.allocRunners.Pop() + if gcAlloc == nil || gcAlloc.allocRunner != ar2 { + t.Fatalf("bad gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_CollectAll(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, 0) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + if err := gc.CollectAll(); err != nil { + t.Fatalf("err: %v", err) + } + gcAlloc := gc.allocRunners.Pop() + if gcAlloc != nil { + t.Fatalf("bad gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + // Make stats collector report 200MB free out of which 20MB is reserved + statsCollector.availableValues = []uint64{200 * MB} + statsCollector.usedPercents = []float64{0} + statsCollector.inodePercents = []float64{0} + + alloc := mock.Alloc() + alloc.Resources.DiskMB = 150 + if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // When we have enough disk available and don't need to do any GC so we + // should have two ARs in the GC queue + for i := 0; i < 2; i++ { + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + } +} + +func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + // Make stats collector report 80MB and 175MB free in subsequent calls + statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 175 * MB} + statsCollector.usedPercents = []float64{0, 0, 0} + statsCollector.inodePercents = []float64{0, 0, 0} + + alloc := mock.Alloc() + alloc.Resources.DiskMB = 150 + if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // We should be GC-ing one alloc + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + + if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { + t.Fatalf("gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + // Make stats collector report 80MB and 95MB free in subsequent calls + statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 95 * MB} + statsCollector.usedPercents = []float64{0, 0, 0} + statsCollector.inodePercents = []float64{0, 0, 0} + + alloc := mock.Alloc() + alloc.Resources.DiskMB = 150 + if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // We should be GC-ing all the alloc runners + if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { + t.Fatalf("gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + alloc := mock.Alloc() + alloc.Resources.DiskMB = 150 + if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // We should be GC-ing one alloc + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + + if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { + t.Fatalf("gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + statsCollector.availableValues = []uint64{1000} + statsCollector.usedPercents = []float64{20} + statsCollector.inodePercents = []float64{10} + + if err := gc.keepUsageBelowThreshold(); err != nil { + t.Fatalf("err: %v", err) + } + + // We shouldn't GC any of the allocs since the used percent values are below + // threshold + for i := 0; i < 2; i++ { + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + } +} + +func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + statsCollector.availableValues = []uint64{1000, 800} + statsCollector.usedPercents = []float64{85, 60} + statsCollector.inodePercents = []float64{50, 30} + + if err := gc.keepUsageBelowThreshold(); err != nil { + t.Fatalf("err: %v", err) + } + + // We should be GC-ing only one of the alloc runners since the second time + // used percent returns a number below threshold. + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + + if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { + t.Fatalf("gcAlloc: %v", gcAlloc) + } +} diff --git a/client/stats/host.go b/client/stats/host.go index 49fb0fdcb74..95f67c9449a 100644 --- a/client/stats/host.go +++ b/client/stats/host.go @@ -4,6 +4,7 @@ import ( "log" "math" "runtime" + "sync" "time" "github.com/shirou/gopsutil/cpu" @@ -19,6 +20,7 @@ type HostStats struct { Memory *MemoryStats CPU []*CPUStats DiskStats []*DiskStats + AllocDirStats *DiskStats Uptime uint64 Timestamp int64 CPUTicksConsumed float64 @@ -52,32 +54,45 @@ type DiskStats struct { InodesUsedPercent float64 } +// NodeStatsCollector is an interface which is used for the puproses of mocking +// the HostStatsCollector in the tests +type NodeStatsCollector interface { + Collect() error + Stats() *HostStats +} + // HostStatsCollector collects host resource usage stats type HostStatsCollector struct { clkSpeed float64 numCores int statsCalculator map[string]*HostCpuStatsCalculator logger *log.Logger + hostStats *HostStats + hostStatsLock sync.RWMutex + allocDir string } -// NewHostStatsCollector returns a HostStatsCollector -func NewHostStatsCollector(logger *log.Logger) *HostStatsCollector { +// NewHostStatsCollector returns a HostStatsCollector. The allocDir is passed in +// so that we can present the disk related statistics for the mountpoint where +// the allocation directory lives +func NewHostStatsCollector(logger *log.Logger, allocDir string) *HostStatsCollector { numCores := runtime.NumCPU() statsCalculator := make(map[string]*HostCpuStatsCalculator) collector := &HostStatsCollector{ statsCalculator: statsCalculator, numCores: numCores, logger: logger, + allocDir: allocDir, } return collector } // Collect collects stats related to resource usage of a host -func (h *HostStatsCollector) Collect() (*HostStats, error) { +func (h *HostStatsCollector) Collect() error { hs := &HostStats{Timestamp: time.Now().UTC().UnixNano()} memStats, err := mem.VirtualMemory() if err != nil { - return nil, err + return err } hs.Memory = &MemoryStats{ Total: memStats.Total, @@ -89,7 +104,7 @@ func (h *HostStatsCollector) Collect() (*HostStats, error) { ticksConsumed := 0.0 cpuStats, err := cpu.Times(true) if err != nil { - return nil, err + return err } cs := make([]*CPUStats, len(cpuStats)) for idx, cpuStat := range cpuStats { @@ -113,7 +128,7 @@ func (h *HostStatsCollector) Collect() (*HostStats, error) { partitions, err := disk.Partitions(false) if err != nil { - return nil, err + return err } var diskStats []*DiskStats for _, partition := range partitions { @@ -122,32 +137,62 @@ func (h *HostStatsCollector) Collect() (*HostStats, error) { h.logger.Printf("[WARN] client: error fetching host disk usage stats for %v: %v", partition.Mountpoint, err) continue } - ds := DiskStats{ - Device: partition.Device, - Mountpoint: partition.Mountpoint, - Size: usage.Total, - Used: usage.Used, - Available: usage.Free, - UsedPercent: usage.UsedPercent, - InodesUsedPercent: usage.InodesUsedPercent, - } - if math.IsNaN(ds.UsedPercent) { - ds.UsedPercent = 0.0 - } - if math.IsNaN(ds.InodesUsedPercent) { - ds.InodesUsedPercent = 0.0 - } - diskStats = append(diskStats, &ds) + ds := h.toDiskStats(usage, &partition) + diskStats = append(diskStats, ds) } hs.DiskStats = diskStats + // Getting the disk stats for the allocation directory + usage, err := disk.Usage(h.allocDir) + if err != nil { + return err + } + hs.AllocDirStats = h.toDiskStats(usage, nil) + uptime, err := host.Uptime() if err != nil { - return nil, err + return err } hs.Uptime = uptime - return hs, nil + h.hostStatsLock.Lock() + defer h.hostStatsLock.Unlock() + h.hostStats = hs + return nil +} + +// Stats returns the host stats that has been collected +func (h *HostStatsCollector) Stats() *HostStats { + h.hostStatsLock.RLock() + defer h.hostStatsLock.RUnlock() + return h.hostStats +} + +// toDiskStats merges UsageStat and PartitionStat to create a DiskStat +func (h *HostStatsCollector) toDiskStats(usage *disk.UsageStat, partitionStat *disk.PartitionStat) *DiskStats { + if usage == nil { + return nil + } + ds := DiskStats{ + Size: usage.Total, + Used: usage.Used, + Available: usage.Free, + UsedPercent: usage.UsedPercent, + InodesUsedPercent: usage.InodesUsedPercent, + } + if math.IsNaN(ds.UsedPercent) { + ds.UsedPercent = 0.0 + } + if math.IsNaN(ds.InodesUsedPercent) { + ds.InodesUsedPercent = 0.0 + } + + if partitionStat != nil { + ds.Device = partitionStat.Device + ds.Mountpoint = partitionStat.Mountpoint + } + + return &ds } // HostCpuStatsCalculator calculates cpu usage percentages diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 19e2f67ed54..3afdfebadb1 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -79,11 +79,24 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ return s.allocStats(allocID, resp, req) case "snapshot": return s.allocSnapshot(allocID, resp, req) + case "gc": + return s.allocGC(allocID, resp, req) } return nil, CodedError(404, resourceNotFoundErr) } +func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if s.agent.client == nil { + return nil, clientNotRunning + } + return nil, s.agent.Client().CollectAllAllocs() +} + +func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + return nil, s.agent.Client().CollectAllocation(allocID) +} + func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { allocFS, err := s.agent.Client().GetAllocFS(allocID) if err != nil { diff --git a/command/agent/http.go b/command/agent/http.go index 90fbbcdbb00..5a31f7d4e29 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -156,6 +156,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/client/fs/", s.wrap(s.FsRequest)) s.mux.HandleFunc("/v1/client/stats", s.wrap(s.ClientStatsRequest)) s.mux.HandleFunc("/v1/client/allocation/", s.wrap(s.ClientAllocRequest)) + s.mux.HandleFunc("/v1/client/gc", s.wrap(s.ClientGCRequest)) s.mux.HandleFunc("/v1/agent/self", s.wrap(s.AgentSelfRequest)) s.mux.HandleFunc("/v1/agent/join", s.wrap(s.AgentJoinRequest))