diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 72aefe4b7b6..1a396ac058e 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -76,8 +76,13 @@ type AllocRunner struct { // to call. prevAlloc prevAllocWatcher + // ctx is cancelled with exitFn to cause the alloc to be destroyed + // (stopped and GC'd). ctx context.Context exitFn context.CancelFunc + + // waitCh is closed when the Run method exits. At that point the alloc + // has stopped and been GC'd. waitCh chan struct{} // State related fields @@ -917,11 +922,6 @@ func (r *AllocRunner) handleDestroy() { // state as we wait for a destroy. alloc := r.Alloc() - //TODO(schmichael) updater can cause a GC which can block on this alloc - // runner shutting down. Since handleDestroy can be called by Run() we - // can't block shutdown here as it would cause a deadlock. - go r.updater(alloc) - // Broadcast and persist state synchronously r.sendBroadcast(alloc) if err := r.saveAllocRunnerState(); err != nil { @@ -935,6 +935,11 @@ func (r *AllocRunner) handleDestroy() { r.logger.Printf("[ERR] client: alloc %q unable unmount task directories: %v", r.allocID, err) } + // Update the server with the alloc's status -- also marks the alloc as + // being eligible for GC, so from this point on the alloc can be gc'd + // at any time. + r.updater(alloc) + for { select { case <-r.ctx.Done(): @@ -1065,6 +1070,17 @@ func (r *AllocRunner) Destroy() { r.allocBroadcast.Close() } +// IsDestroyed returns true if the AllocRunner is not running and has been +// destroyed (GC'd). +func (r *AllocRunner) IsDestroyed() bool { + select { + case <-r.waitCh: + return true + default: + return false + } +} + // WaitCh returns a channel to wait for termination func (r *AllocRunner) WaitCh() <-chan struct{} { return r.waitCh diff --git a/client/client.go b/client/client.go index 796d6e6ed5f..3c0faa5f1ef 100644 --- a/client/client.go +++ b/client/client.go @@ -124,7 +124,8 @@ type Client struct { // successfully serversDiscoveredCh chan struct{} - // allocs is the current set of allocations + // allocs maps alloc IDs to their AllocRunner. This map includes all + // AllocRunners - running and GC'd - until the server GCs them. allocs map[string]*AllocRunner allocLock sync.RWMutex @@ -486,15 +487,16 @@ func (c *Client) Stats() map[string]map[string]string { return stats } -// CollectAllocation garbage collects a single allocation -func (c *Client) CollectAllocation(allocID string) error { +// CollectAllocation garbage collects a single allocation on a node. Returns +// true if alloc was found and garbage collected; otherwise false. +func (c *Client) CollectAllocation(allocID string) bool { return c.garbageCollector.Collect(allocID) } // CollectAllAllocs garbage collects all allocations on a node in the terminal // state -func (c *Client) CollectAllAllocs() error { - return c.garbageCollector.CollectAll() +func (c *Client) CollectAllAllocs() { + c.garbageCollector.CollectAll() } // Node returns the locally registered node @@ -721,11 +723,16 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner { return runners } -// NumAllocs returns the number of allocs this client has. Used to +// NumAllocs returns the number of un-GC'd allocs this client has. Used to // fulfill the AllocCounter interface for the GC. func (c *Client) NumAllocs() int { + n := 0 c.allocLock.RLock() - n := len(c.allocs) + for _, a := range c.allocs { + if !a.IsDestroyed() { + n++ + } + } c.allocLock.RUnlock() return n } @@ -1205,6 +1212,7 @@ func (c *Client) updateNodeStatus() error { for _, s := range resp.Servers { addr, err := resolveServer(s.RPCAdvertiseAddr) if err != nil { + c.logger.Printf("[WARN] client: ignoring invalid server %q: %v", s.RPCAdvertiseAddr, err) continue } e := endpoint{name: s.RPCAdvertiseAddr, addr: addr} @@ -1234,9 +1242,19 @@ func (c *Client) updateNodeStatus() error { // updateAllocStatus is used to update the status of an allocation func (c *Client) updateAllocStatus(alloc *structs.Allocation) { if alloc.Terminated() { - // Terminated, mark for GC - if ar, ok := c.getAllocRunners()[alloc.ID]; ok { + // Terminated, mark for GC if we're still tracking this alloc + // runner. If it's not being tracked that means the server has + // already GC'd it (see removeAlloc). + c.allocLock.RLock() + ar, ok := c.allocs[alloc.ID] + c.allocLock.RUnlock() + + if ok { c.garbageCollector.MarkForCollection(ar) + + // Trigger a GC in case we're over thresholds and just + // waiting for eligible allocs. + c.garbageCollector.Trigger() } } @@ -1531,9 +1549,7 @@ func (c *Client) runAllocs(update *allocUpdates) { // Remove the old allocations for _, remove := range diff.removed { - if err := c.removeAlloc(remove); err != nil { - c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err) - } + c.removeAlloc(remove) } // Update the existing allocations @@ -1544,6 +1560,11 @@ func (c *Client) runAllocs(update *allocUpdates) { } } + // Make room for new allocations before running + if err := c.garbageCollector.MakeRoomFor(diff.added); err != nil { + c.logger.Printf("[ERR] client: error making room for new allocations: %v", err) + } + // Start the new allocations for _, add := range diff.added { migrateToken := update.migrateTokens[add.ID] @@ -1552,26 +1573,33 @@ func (c *Client) runAllocs(update *allocUpdates) { add.ID, err) } } + + // Trigger the GC once more now that new allocs are started that could + // have caused thesholds to be exceeded + c.garbageCollector.Trigger() } -// removeAlloc is invoked when we should remove an allocation -func (c *Client) removeAlloc(alloc *structs.Allocation) error { +// removeAlloc is invoked when we should remove an allocation because it has +// been removed by the server. +func (c *Client) removeAlloc(alloc *structs.Allocation) { c.allocLock.Lock() ar, ok := c.allocs[alloc.ID] if !ok { c.allocLock.Unlock() c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID) - return nil + return } + + // Stop tracking alloc runner as it's been GC'd by the server delete(c.allocs, alloc.ID) c.allocLock.Unlock() // Ensure the GC has a reference and then collect. Collecting through the GC // applies rate limiting c.garbageCollector.MarkForCollection(ar) - go c.garbageCollector.Collect(alloc.ID) - return nil + // GC immediately since the server has GC'd it + go c.garbageCollector.Collect(alloc.ID) } // updateAlloc is invoked when we should update an allocation @@ -1592,9 +1620,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error { // Check if we already have an alloc runner c.allocLock.Lock() + defer c.allocLock.Unlock() if _, ok := c.allocs[alloc.ID]; ok { c.logger.Printf("[DEBUG]: client: dropping duplicate add allocation request: %q", alloc.ID) - c.allocLock.Unlock() return nil } @@ -1618,14 +1646,6 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err) } - // Must release allocLock as GC acquires it to count allocs - c.allocLock.Unlock() - - // Make room for the allocation before running it - if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { - c.logger.Printf("[ERR] client: error making room for allocation: %v", err) - } - go ar.Run() return nil } diff --git a/client/client_test.go b/client/client_test.go index 407009de24b..c04e33f28e1 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -74,6 +74,9 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) { cb(config) } + // Enable raft as leader if we have bootstrap on + config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap + for i := 10; i >= 0; i-- { ports := freeport.GetT(t, 2) config.RPCAddr = &net.TCPAddr{ @@ -657,7 +660,6 @@ func TestClient_WatchAllocs(t *testing.T) { alloc2.JobID = job.ID alloc2.Job = job - // Insert at zero so they are pulled state := s1.State() if err := state.UpsertJob(100, job); err != nil { t.Fatal(err) @@ -681,23 +683,20 @@ func TestClient_WatchAllocs(t *testing.T) { }) // Delete one allocation - err = state.DeleteEval(103, nil, []string{alloc1.ID}) - if err != nil { + if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil { t.Fatalf("err: %v", err) } // Update the other allocation. Have to make a copy because the allocs are // shared in memory in the test and the modify index would be updated in the // alloc runner. - alloc2_2 := new(structs.Allocation) - *alloc2_2 = *alloc2 + alloc2_2 := alloc2.Copy() alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop - err = state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}) - if err != nil { - t.Fatalf("err: %v", err) + if err := state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}); err != nil { + t.Fatalf("err upserting stopped alloc: %v", err) } - // One allocations should get de-registered + // One allocation should get GC'd and removed testutil.WaitForResult(func() (bool, error) { c1.allocLock.RLock() num := len(c1.allocs) diff --git a/client/gc.go b/client/gc.go index 728b9c920ac..a7332ebc4cf 100644 --- a/client/gc.go +++ b/client/gc.go @@ -28,21 +28,36 @@ type GCConfig struct { ParallelDestroys int } -// AllocCounter is used by AllocGarbageCollector to discover how many -// allocations a node has and is generally fulfilled by the Client. +// AllocCounter is used by AllocGarbageCollector to discover how many un-GC'd +// allocations a client has and is generally fulfilled by the Client. type AllocCounter interface { NumAllocs() int } // AllocGarbageCollector garbage collects terminated allocations on a node type AllocGarbageCollector struct { - allocRunners *IndexedGCAllocPQ + config *GCConfig + + // allocRunners marked for GC + allocRunners *IndexedGCAllocPQ + + // statsCollector for node based thresholds (eg disk) statsCollector stats.NodeStatsCollector - allocCounter AllocCounter - config *GCConfig - logger *log.Logger - destroyCh chan struct{} - shutdownCh chan struct{} + + // allocCounter return the number of un-GC'd allocs on this node + allocCounter AllocCounter + + // destroyCh is a semaphore for rate limiting concurrent garbage + // collections + destroyCh chan struct{} + + // shutdownCh is closed when the GC's run method should exit + shutdownCh chan struct{} + + // triggerCh is ticked by the Trigger method to cause a GC + triggerCh chan struct{} + + logger *log.Logger } // NewAllocGarbageCollector returns a garbage collector for terminated @@ -51,7 +66,7 @@ type AllocGarbageCollector struct { func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector { // Require at least 1 to make progress if config.ParallelDestroys <= 0 { - logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys) + logger.Printf("[WARN] client.gc: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys) config.ParallelDestroys = 1 } @@ -63,6 +78,7 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats logger: logger, destroyCh: make(chan struct{}, config.ParallelDestroys), shutdownCh: make(chan struct{}), + triggerCh: make(chan struct{}, 1), } return gc @@ -71,16 +87,28 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats // Run the periodic garbage collector. func (a *AllocGarbageCollector) Run() { ticker := time.NewTicker(a.config.Interval) + a.logger.Printf("[DEBUG] client.gc: GC'ing ever %v", a.config.Interval) for { select { + case <-a.triggerCh: case <-ticker.C: - if err := a.keepUsageBelowThreshold(); err != nil { - a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err) - } case <-a.shutdownCh: ticker.Stop() return } + + if err := a.keepUsageBelowThreshold(); err != nil { + a.logger.Printf("[ERR] client.gc: error garbage collecting allocation: %v", err) + } + } +} + +// Force the garbage collector to run. +func (a *AllocGarbageCollector) Trigger() { + select { + case a.triggerCh <- struct{}{}: + default: + // already triggered } } @@ -95,25 +123,16 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { } // Check if we have enough free space - err := a.statsCollector.Collect() - if err != nil { + if err := a.statsCollector.Collect(); err != nil { return err } // See if we are below thresholds for used disk space and inode usage - // TODO(diptanu) figure out why this is nil - stats := a.statsCollector.Stats() - if stats == nil { - break - } - - diskStats := stats.AllocDirStats - if diskStats == nil { - break - } - + diskStats := a.statsCollector.Stats().AllocDirStats reason := "" + liveAllocs := a.allocCounter.NumAllocs() + switch { case diskStats.UsedPercent > a.config.DiskUsageThreshold: reason = fmt.Sprintf("disk usage of %.0f is over gc threshold of %.0f", @@ -121,19 +140,19 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { case diskStats.InodesUsedPercent > a.config.InodeUsageThreshold: reason = fmt.Sprintf("inode usage of %.0f is over gc threshold of %.0f", diskStats.InodesUsedPercent, a.config.InodeUsageThreshold) - case a.numAllocs() > a.config.MaxAllocs: - reason = fmt.Sprintf("number of allocations is over the limit (%d)", a.config.MaxAllocs) + case liveAllocs > a.config.MaxAllocs: + reason = fmt.Sprintf("number of allocations (%d) is over the limit (%d)", liveAllocs, a.config.MaxAllocs) } - // No reason to gc, exit if reason == "" { + // No reason to gc, exit break } // Collect an allocation gcAlloc := a.allocRunners.Pop() if gcAlloc == nil { - a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason) + a.logger.Printf("[WARN] client.gc: garbage collection due to %s skipped because no terminal allocations", reason) break } @@ -151,7 +170,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin if alloc := ar.Alloc(); alloc != nil { id = alloc.ID } - a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason) + a.logger.Printf("[INFO] client.gc: garbage collecting allocation %s due to %s", id, reason) // Acquire the destroy lock select { @@ -167,7 +186,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin case <-a.shutdownCh: } - a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID) + a.logger.Printf("[DEBUG] client.gc: garbage collected %q", ar.Alloc().ID) // Release the lock <-a.destroyCh @@ -177,41 +196,47 @@ func (a *AllocGarbageCollector) Stop() { close(a.shutdownCh) } -// Collect garbage collects a single allocation on a node -func (a *AllocGarbageCollector) Collect(allocID string) error { - gcAlloc, err := a.allocRunners.Remove(allocID) - if err != nil { - return fmt.Errorf("unable to collect allocation %q: %v", allocID, err) +// Collect garbage collects a single allocation on a node. Returns true if +// alloc was found and garbage collected; otherwise false. +func (a *AllocGarbageCollector) Collect(allocID string) bool { + if gcAlloc := a.allocRunners.Remove(allocID); gcAlloc != nil { + a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection") + return true } - a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection") - return nil + + a.logger.Printf("[DEBUG] client.gc: alloc %s is invalid or was already garbage collected", allocID) + return false } // CollectAll garbage collects all termianated allocations on a node -func (a *AllocGarbageCollector) CollectAll() error { +func (a *AllocGarbageCollector) CollectAll() { for { select { case <-a.shutdownCh: - return nil + return default: } gcAlloc := a.allocRunners.Pop() if gcAlloc == nil { - break + return } - go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full collection") + go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full node collection") } - return nil } // MakeRoomFor garbage collects enough number of allocations in the terminal // state to make room for new allocations func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error { + if len(allocations) == 0 { + // Nothing to make room for! + return nil + } + // GC allocs until below the max limit + the new allocations max := a.config.MaxAllocs - len(allocations) - for a.numAllocs() > max { + for a.allocCounter.NumAllocs() > max { select { case <-a.shutdownCh: return nil @@ -227,8 +252,9 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e } // Destroy the alloc runner and wait until it exits - a.destroyAllocRunner(gcAlloc.allocRunner, "new allocations") + a.destroyAllocRunner(gcAlloc.allocRunner, fmt.Sprintf("new allocations and over max (%d)", a.config.MaxAllocs)) } + totalResource := &structs.Resources{} for _, alloc := range allocations { if err := totalResource.Add(alloc.Resources); err != nil { @@ -303,28 +329,11 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) { return } - a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) - a.allocRunners.Push(ar) -} - -// Remove removes an alloc runner without garbage collecting it -func (a *AllocGarbageCollector) Remove(ar *AllocRunner) { - if ar == nil || ar.Alloc() == nil { - return - } - - alloc := ar.Alloc() - if _, err := a.allocRunners.Remove(alloc.ID); err == nil { - a.logger.Printf("[INFO] client: removed alloc runner %v from garbage collector", alloc.ID) + if a.allocRunners.Push(ar) { + a.logger.Printf("[INFO] client.gc: marking allocation %v for GC", ar.Alloc().ID) } } -// numAllocs returns the total number of allocs tracked by the client as well -// as those marked for GC. -func (a *AllocGarbageCollector) numAllocs() int { - return a.allocRunners.Length() + a.allocCounter.NumAllocs() -} - // GCAlloc wraps an allocation runner and an index enabling it to be used within // a PQ type GCAlloc struct { @@ -381,15 +390,16 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { } } -// Push an alloc runner into the GC queue -func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) { +// Push an alloc runner into the GC queue. Returns true if alloc was added, +// false if the alloc already existed. +func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) bool { i.pqLock.Lock() defer i.pqLock.Unlock() alloc := ar.Alloc() if _, ok := i.index[alloc.ID]; ok { // No work to do - return + return false } gcAlloc := &GCAlloc{ timeStamp: time.Now(), @@ -397,7 +407,7 @@ func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) { } i.index[alloc.ID] = gcAlloc heap.Push(&i.heap, gcAlloc) - return + return true } func (i *IndexedGCAllocPQ) Pop() *GCAlloc { @@ -413,17 +423,18 @@ func (i *IndexedGCAllocPQ) Pop() *GCAlloc { return gcAlloc } -func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) { +// Remove alloc from GC. Returns nil if alloc doesn't exist. +func (i *IndexedGCAllocPQ) Remove(allocID string) *GCAlloc { i.pqLock.Lock() defer i.pqLock.Unlock() if gcAlloc, ok := i.index[allocID]; ok { heap.Remove(&i.heap, gcAlloc.index) delete(i.index, allocID) - return gcAlloc, nil + return gcAlloc } - return nil, fmt.Errorf("alloc %q not present", allocID) + return nil } func (i *IndexedGCAllocPQ) Length() int { diff --git a/client/gc_test.go b/client/gc_test.go index 960fb2a808e..ac28239a193 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -1,12 +1,15 @@ package client import ( + "fmt" "testing" "time" + "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" ) func gcConfig() *GCConfig { @@ -128,9 +131,7 @@ func TestAllocGarbageCollector_Collect(t *testing.T) { close(ar1.waitCh) close(ar2.waitCh) - if err := gc.Collect(ar1.Alloc().ID); err != nil { - t.Fatalf("err: %v", err) - } + gc.Collect(ar1.Alloc().ID) gcAlloc := gc.allocRunners.Pop() if gcAlloc == nil || gcAlloc.allocRunner != ar2 { t.Fatalf("bad gcAlloc: %v", gcAlloc) @@ -147,9 +148,7 @@ func TestAllocGarbageCollector_CollectAll(t *testing.T) { gc.MarkForCollection(ar1) gc.MarkForCollection(ar2) - if err := gc.CollectAll(); err != nil { - t.Fatalf("err: %v", err) - } + gc.CollectAll() gcAlloc := gc.allocRunners.Pop() if gcAlloc != nil { t.Fatalf("bad gcAlloc: %v", gcAlloc) @@ -290,40 +289,132 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) } } -func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) { +// TestAllocGarbageCollector_MaxAllocs asserts that when making room for new +// allocs, terminal allocs are GC'd until old_allocs + new_allocs <= limit +func TestAllocGarbageCollector_MaxAllocs(t *testing.T) { t.Parallel() - const ( - liveAllocs = 3 - maxAllocs = 6 - gcAllocs = 4 - gcAllocsLeft = 1 - ) - - logger := testLogger() - statsCollector := &MockStatsCollector{ - availableValues: []uint64{10 * 1024 * MB}, - usedPercents: []float64{0}, - inodePercents: []float64{0}, + server, serverAddr := testServer(t, nil) + defer server.Shutdown() + testutil.WaitForLeader(t, server.RPC) + + const maxAllocs = 6 + client := testClient(t, func(c *config.Config) { + c.GCMaxAllocs = maxAllocs + c.GCDiskUsageThreshold = 100 + c.GCInodeUsageThreshold = 100 + c.GCParallelDestroys = 1 + c.GCInterval = time.Hour + + c.RPCHandler = server + c.Servers = []string{serverAddr} + c.ConsulConfig.ClientAutoJoin = new(bool) // squelch logs + }) + defer client.Shutdown() + waitTilNodeReady(client, t) + + callN := 0 + assertAllocs := func(expectedAll, expectedDestroyed int) { + // Wait for allocs to be started + callN++ + client.logger.Printf("[TEST] %d -- Waiting for %d total allocs, %d GC'd", callN, expectedAll, expectedDestroyed) + testutil.WaitForResult(func() (bool, error) { + all, destroyed := 0, 0 + for _, ar := range client.getAllocRunners() { + all++ + if ar.IsDestroyed() { + destroyed++ + } + } + return all == expectedAll && destroyed == expectedDestroyed, fmt.Errorf( + "expected %d allocs (found %d); expected %d destroy (found %d)", + expectedAll, all, expectedDestroyed, destroyed, + ) + }, func(err error) { + client.logger.Printf("[TEST] %d -- FAILED to find %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed) + t.Fatalf("%d alloc state: %v", callN, err) + }) + client.logger.Printf("[TEST] %d -- Found %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed) + } + + // Create a job + state := server.State() + job := mock.Job() + job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + job.TaskGroups[0].Tasks[0].Config["run_for"] = "30s" + nodeID := client.Node().ID + if err := state.UpsertJob(98, job); err != nil { + t.Fatalf("error upserting job: %v", err) + } + if err := state.UpsertJobSummary(99, mock.JobSummary(job.ID)); err != nil { + t.Fatalf("error upserting job summary: %v", err) + } + + newAlloc := func() *structs.Allocation { + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.Job = job + alloc.NodeID = nodeID + return alloc + } + + // Create the allocations + allocs := make([]*structs.Allocation, 7) + for i := 0; i < len(allocs); i++ { + allocs[i] = newAlloc() + } + + // Upsert a copy of the allocs as modifying the originals later would + // cause a race + { + allocsCopy := make([]*structs.Allocation, len(allocs)) + for i, a := range allocs { + allocsCopy[i] = a.Copy() + } + if err := state.UpsertAllocs(100, allocsCopy); err != nil { + t.Fatalf("error upserting initial allocs: %v", err) + } } - allocCounter := &MockAllocCounter{allocs: liveAllocs} - conf := gcConfig() - conf.MaxAllocs = maxAllocs - gc := NewAllocGarbageCollector(logger, statsCollector, allocCounter, conf) - for i := 0; i < gcAllocs; i++ { - _, ar := testAllocRunnerFromAlloc(mock.Alloc(), false) - close(ar.waitCh) - gc.MarkForCollection(ar) + // 7 total, 0 GC'd + assertAllocs(7, 0) + + // Set the first few as terminal so they're marked for gc + const terminalN = 4 + for i := 0; i < terminalN; i++ { + // Copy the alloc so the pointers aren't shared + alloc := allocs[i].Copy() + alloc.DesiredStatus = structs.AllocDesiredStatusStop + allocs[i] = alloc } + if err := state.UpsertAllocs(101, allocs[:terminalN]); err != nil { + t.Fatalf("error upserting stopped allocs: %v", err) + } + + // 7 total, 1 GC'd to get down to limit of 6 + assertAllocs(7, 1) - if err := gc.MakeRoomFor([]*structs.Allocation{mock.Alloc(), mock.Alloc()}); err != nil { - t.Fatalf("error making room for 2 new allocs: %v", err) + // Add one more alloc + if err := state.UpsertAllocs(102, []*structs.Allocation{newAlloc()}); err != nil { + t.Fatalf("error upserting new alloc: %v", err) } - // There should be gcAllocsLeft alloc runners left to be collected - if n := len(gc.allocRunners.index); n != gcAllocsLeft { - t.Fatalf("expected %d remaining GC-able alloc runners but found %d", gcAllocsLeft, n) + // 8 total, 1 GC'd to get down to limit of 6 + // If this fails it may be due to the gc's Run and MarkRoomFor methods + // gc'ing concurrently. May have to disable gc's run loop if this test + // is flaky. + assertAllocs(8, 2) + + // Add new allocs to cause the gc of old terminal ones + newAllocs := make([]*structs.Allocation, 4) + for i := 0; i < len(newAllocs); i++ { + newAllocs[i] = newAlloc() + } + if err := state.UpsertAllocs(200, newAllocs); err != nil { + t.Fatalf("error upserting %d new allocs: %v", len(newAllocs), err) } + + // 12 total, 4 GC'd total because all other allocs are alive + assertAllocs(12, 4) } func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) { @@ -391,39 +482,3 @@ func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { t.Fatalf("gcAlloc: %v", gcAlloc) } } - -func TestAllocGarbageCollector_MaxAllocsThreshold(t *testing.T) { - t.Parallel() - const ( - liveAllocs = 3 - maxAllocs = 6 - gcAllocs = 4 - gcAllocsLeft = 1 - ) - - logger := testLogger() - statsCollector := &MockStatsCollector{ - availableValues: []uint64{1000}, - usedPercents: []float64{0}, - inodePercents: []float64{0}, - } - allocCounter := &MockAllocCounter{allocs: liveAllocs} - conf := gcConfig() - conf.MaxAllocs = 4 - gc := NewAllocGarbageCollector(logger, statsCollector, allocCounter, conf) - - for i := 0; i < gcAllocs; i++ { - _, ar := testAllocRunnerFromAlloc(mock.Alloc(), false) - close(ar.waitCh) - gc.MarkForCollection(ar) - } - - if err := gc.keepUsageBelowThreshold(); err != nil { - t.Fatalf("error gc'ing: %v", err) - } - - // We should have gc'd down to MaxAllocs - if n := len(gc.allocRunners.index); n != gcAllocsLeft { - t.Fatalf("expected remaining gc allocs (%d) to equal %d", n, gcAllocsLeft) - } -} diff --git a/client/stats/host.go b/client/stats/host.go index d284973e754..8f0f92377db 100644 --- a/client/stats/host.go +++ b/client/stats/host.go @@ -190,9 +190,6 @@ func (h *HostStatsCollector) Stats() *HostStats { // toDiskStats merges UsageStat and PartitionStat to create a DiskStat func (h *HostStatsCollector) toDiskStats(usage *disk.UsageStat, partitionStat *disk.PartitionStat) *DiskStats { - if usage == nil { - return nil - } ds := DiskStats{ Size: usage.Total, Used: usage.Used, diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 7011a3cfc47..65bcbd0116f 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -115,7 +115,8 @@ func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request return nil, structs.ErrPermissionDenied } - return nil, s.agent.Client().CollectAllAllocs() + s.agent.Client().CollectAllAllocs() + return nil, nil } func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -131,7 +132,12 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http } else if aclObj != nil && !aclObj.AllowNsOp(namespace, acl.NamespaceCapabilitySubmitJob) { return nil, structs.ErrPermissionDenied } - return nil, s.agent.Client().CollectAllocation(allocID) + + if !s.agent.Client().CollectAllocation(allocID) { + // Could not find alloc + return nil, fmt.Errorf("unable to collect allocation: not present") + } + return nil, nil } func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {