Skip to content

Commit

Permalink
Merge pull request #3445 from hashicorp/b-gc
Browse files Browse the repository at this point in the history
Fix GC'd alloc tracking
  • Loading branch information
schmichael authored Oct 30, 2017
2 parents cf817c8 + 6c36f76 commit 84b9c3e
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 184 deletions.
26 changes: 21 additions & 5 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,13 @@ type AllocRunner struct {
// to call.
prevAlloc prevAllocWatcher

// ctx is cancelled with exitFn to cause the alloc to be destroyed
// (stopped and GC'd).
ctx context.Context
exitFn context.CancelFunc

// waitCh is closed when the Run method exits. At that point the alloc
// has stopped and been GC'd.
waitCh chan struct{}

// State related fields
Expand Down Expand Up @@ -917,11 +922,6 @@ func (r *AllocRunner) handleDestroy() {
// state as we wait for a destroy.
alloc := r.Alloc()

//TODO(schmichael) updater can cause a GC which can block on this alloc
// runner shutting down. Since handleDestroy can be called by Run() we
// can't block shutdown here as it would cause a deadlock.
go r.updater(alloc)

// Broadcast and persist state synchronously
r.sendBroadcast(alloc)
if err := r.saveAllocRunnerState(); err != nil {
Expand All @@ -935,6 +935,11 @@ func (r *AllocRunner) handleDestroy() {
r.logger.Printf("[ERR] client: alloc %q unable unmount task directories: %v", r.allocID, err)
}

// Update the server with the alloc's status -- also marks the alloc as
// being eligible for GC, so from this point on the alloc can be gc'd
// at any time.
r.updater(alloc)

for {
select {
case <-r.ctx.Done():
Expand Down Expand Up @@ -1065,6 +1070,17 @@ func (r *AllocRunner) Destroy() {
r.allocBroadcast.Close()
}

// IsDestroyed returns true if the AllocRunner is not running and has been
// destroyed (GC'd).
func (r *AllocRunner) IsDestroyed() bool {
select {
case <-r.waitCh:
return true
default:
return false
}
}

// WaitCh returns a channel to wait for termination
func (r *AllocRunner) WaitCh() <-chan struct{} {
return r.waitCh
Expand Down
72 changes: 46 additions & 26 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ type Client struct {
// successfully
serversDiscoveredCh chan struct{}

// allocs is the current set of allocations
// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
allocs map[string]*AllocRunner
allocLock sync.RWMutex

Expand Down Expand Up @@ -486,15 +487,16 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}

// CollectAllocation garbage collects a single allocation
func (c *Client) CollectAllocation(allocID string) error {
// CollectAllocation garbage collects a single allocation on a node. Returns
// true if alloc was found and garbage collected; otherwise false.
func (c *Client) CollectAllocation(allocID string) bool {
return c.garbageCollector.Collect(allocID)
}

// CollectAllAllocs garbage collects all allocations on a node in the terminal
// state
func (c *Client) CollectAllAllocs() error {
return c.garbageCollector.CollectAll()
func (c *Client) CollectAllAllocs() {
c.garbageCollector.CollectAll()
}

// Node returns the locally registered node
Expand Down Expand Up @@ -721,11 +723,16 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
return runners
}

// NumAllocs returns the number of allocs this client has. Used to
// NumAllocs returns the number of un-GC'd allocs this client has. Used to
// fulfill the AllocCounter interface for the GC.
func (c *Client) NumAllocs() int {
n := 0
c.allocLock.RLock()
n := len(c.allocs)
for _, a := range c.allocs {
if !a.IsDestroyed() {
n++
}
}
c.allocLock.RUnlock()
return n
}
Expand Down Expand Up @@ -1205,6 +1212,7 @@ func (c *Client) updateNodeStatus() error {
for _, s := range resp.Servers {
addr, err := resolveServer(s.RPCAdvertiseAddr)
if err != nil {
c.logger.Printf("[WARN] client: ignoring invalid server %q: %v", s.RPCAdvertiseAddr, err)
continue
}
e := endpoint{name: s.RPCAdvertiseAddr, addr: addr}
Expand Down Expand Up @@ -1234,9 +1242,19 @@ func (c *Client) updateNodeStatus() error {
// updateAllocStatus is used to update the status of an allocation
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
if alloc.Terminated() {
// Terminated, mark for GC
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
// Terminated, mark for GC if we're still tracking this alloc
// runner. If it's not being tracked that means the server has
// already GC'd it (see removeAlloc).
c.allocLock.RLock()
ar, ok := c.allocs[alloc.ID]
c.allocLock.RUnlock()

if ok {
c.garbageCollector.MarkForCollection(ar)

// Trigger a GC in case we're over thresholds and just
// waiting for eligible allocs.
c.garbageCollector.Trigger()
}
}

Expand Down Expand Up @@ -1531,9 +1549,7 @@ func (c *Client) runAllocs(update *allocUpdates) {

// Remove the old allocations
for _, remove := range diff.removed {
if err := c.removeAlloc(remove); err != nil {
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err)
}
c.removeAlloc(remove)
}

// Update the existing allocations
Expand All @@ -1544,6 +1560,11 @@ func (c *Client) runAllocs(update *allocUpdates) {
}
}

// Make room for new allocations before running
if err := c.garbageCollector.MakeRoomFor(diff.added); err != nil {
c.logger.Printf("[ERR] client: error making room for new allocations: %v", err)
}

// Start the new allocations
for _, add := range diff.added {
migrateToken := update.migrateTokens[add.ID]
Expand All @@ -1552,26 +1573,33 @@ func (c *Client) runAllocs(update *allocUpdates) {
add.ID, err)
}
}

// Trigger the GC once more now that new allocs are started that could
// have caused thesholds to be exceeded
c.garbageCollector.Trigger()
}

// removeAlloc is invoked when we should remove an allocation
func (c *Client) removeAlloc(alloc *structs.Allocation) error {
// removeAlloc is invoked when we should remove an allocation because it has
// been removed by the server.
func (c *Client) removeAlloc(alloc *structs.Allocation) {
c.allocLock.Lock()
ar, ok := c.allocs[alloc.ID]
if !ok {
c.allocLock.Unlock()
c.logger.Printf("[WARN] client: missing context for alloc '%s'", alloc.ID)
return nil
return
}

// Stop tracking alloc runner as it's been GC'd by the server
delete(c.allocs, alloc.ID)
c.allocLock.Unlock()

// Ensure the GC has a reference and then collect. Collecting through the GC
// applies rate limiting
c.garbageCollector.MarkForCollection(ar)
go c.garbageCollector.Collect(alloc.ID)

return nil
// GC immediately since the server has GC'd it
go c.garbageCollector.Collect(alloc.ID)
}

// updateAlloc is invoked when we should update an allocation
Expand All @@ -1592,9 +1620,9 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error {
// Check if we already have an alloc runner
c.allocLock.Lock()
defer c.allocLock.Unlock()
if _, ok := c.allocs[alloc.ID]; ok {
c.logger.Printf("[DEBUG]: client: dropping duplicate add allocation request: %q", alloc.ID)
c.allocLock.Unlock()
return nil
}

Expand All @@ -1618,14 +1646,6 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
}

// Must release allocLock as GC acquires it to count allocs
c.allocLock.Unlock()

// Make room for the allocation before running it
if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
c.logger.Printf("[ERR] client: error making room for allocation: %v", err)
}

go ar.Run()
return nil
}
Expand Down
17 changes: 8 additions & 9 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
cb(config)
}

// Enable raft as leader if we have bootstrap on
config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap

for i := 10; i >= 0; i-- {
ports := freeport.GetT(t, 2)
config.RPCAddr = &net.TCPAddr{
Expand Down Expand Up @@ -657,7 +660,6 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2.JobID = job.ID
alloc2.Job = job

// Insert at zero so they are pulled
state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
t.Fatal(err)
Expand All @@ -681,23 +683,20 @@ func TestClient_WatchAllocs(t *testing.T) {
})

// Delete one allocation
err = state.DeleteEval(103, nil, []string{alloc1.ID})
if err != nil {
if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil {
t.Fatalf("err: %v", err)
}

// Update the other allocation. Have to make a copy because the allocs are
// shared in memory in the test and the modify index would be updated in the
// alloc runner.
alloc2_2 := new(structs.Allocation)
*alloc2_2 = *alloc2
alloc2_2 := alloc2.Copy()
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(104, []*structs.Allocation{alloc2_2})
if err != nil {
t.Fatalf("err: %v", err)
if err := state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}); err != nil {
t.Fatalf("err upserting stopped alloc: %v", err)
}

// One allocations should get de-registered
// One allocation should get GC'd and removed
testutil.WaitForResult(func() (bool, error) {
c1.allocLock.RLock()
num := len(c1.allocs)
Expand Down
Loading

0 comments on commit 84b9c3e

Please sign in to comment.