Skip to content

Commit

Permalink
Add new gc_max_allocs tuneable
Browse files Browse the repository at this point in the history
More than gc_max_allocs may be running on a node, but terminal allocs
will be garbage collected to try to keep the total number below the
limit.
  • Loading branch information
schmichael committed May 12, 2017
1 parent 616d5c2 commit cc11d9a
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 65 deletions.
19 changes: 13 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,15 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic

// Add the garbage collector
gcConfig := &GCConfig{
MaxAllocs: cfg.GCMaxAllocs,
DiskUsageThreshold: cfg.GCDiskUsageThreshold,
InodeUsageThreshold: cfg.GCInodeUsageThreshold,
Interval: cfg.GCInterval,
ParallelDestroys: cfg.GCParallelDestroys,
ReservedDiskMB: cfg.Node.Reserved.DiskMB,
}
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, gcConfig)
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, c, gcConfig)
go c.garbageCollector.Run()

// Setup the node
if err := c.setupNode(); err != nil {
Expand Down Expand Up @@ -482,17 +484,13 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {
c.allocLock.RLock()
numAllocs := len(c.allocs)
c.allocLock.RUnlock()

c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
stats := map[string]map[string]string{
"client": map[string]string{
"node_id": c.Node().ID,
"known_servers": c.servers.all().String(),
"num_allocations": strconv.Itoa(numAllocs),
"num_allocations": strconv.Itoa(c.NumAllocs()),
"last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)),
"heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL),
},
Expand Down Expand Up @@ -722,6 +720,15 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
return runners
}

// NumAllocs returns the number of allocs this client has. Used to
// fulfill the AllocCounter interface for the GC.
func (c *Client) NumAllocs() int {
c.allocLock.RLock()
n := len(c.allocs)
c.allocLock.RUnlock()
return n
}

// nodeID restores, or generates if necessary, a unique node ID and SecretID.
// The node ID is, if available, a persistent unique ID. The secret ID is a
// high-entropy random UUID.
Expand Down
5 changes: 5 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ type Config struct {
// beyond which the Nomad client triggers GC of the terminal allocations
GCInodeUsageThreshold float64

// GCMaxAllocs is the maximum number of allocations a node can have
// before garbage collection is triggered.
GCMaxAllocs int

// LogLevel is the level of the logs to putout
LogLevel string

Expand Down Expand Up @@ -205,6 +209,7 @@ func DefaultConfig() *Config {
GCParallelDestroys: 2,
GCDiskUsageThreshold: 80,
GCInodeUsageThreshold: 70,
GCMaxAllocs: 200,
}
}

Expand Down
94 changes: 70 additions & 24 deletions client/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,37 @@ const (

// GCConfig allows changing the behaviour of the garbage collector
type GCConfig struct {
// MaxAllocs is the maximum number of allocations to track before a GC
// is triggered.
MaxAllocs int
DiskUsageThreshold float64
InodeUsageThreshold float64
Interval time.Duration
ReservedDiskMB int
ParallelDestroys int
}

// AllocCounter is used by AllocGarbageCollector to discover how many
// allocations a node has and is generally fulfilled by the Client.
type AllocCounter interface {
NumAllocs() int
}

// AllocGarbageCollector garbage collects terminated allocations on a node
type AllocGarbageCollector struct {
allocRunners *IndexedGCAllocPQ
statsCollector stats.NodeStatsCollector
allocCounter AllocCounter
config *GCConfig
logger *log.Logger
destroyCh chan struct{}
shutdownCh chan struct{}
}

// NewAllocGarbageCollector returns a garbage collector for terminated
// allocations on a node.
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, config *GCConfig) *AllocGarbageCollector {
// allocations on a node. Must call Run() in a goroutine enable periodic
// garbage collection.
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector {
// Require at least 1 to make progress
if config.ParallelDestroys <= 0 {
logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
Expand All @@ -47,17 +58,18 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats
gc := &AllocGarbageCollector{
allocRunners: NewIndexedGCAllocPQ(),
statsCollector: statsCollector,
allocCounter: ac,
config: config,
logger: logger,
destroyCh: make(chan struct{}, config.ParallelDestroys),
shutdownCh: make(chan struct{}),
}

go gc.run()
return gc
}

func (a *AllocGarbageCollector) run() {
// Run the periodic garbage collector.
func (a *AllocGarbageCollector) Run() {
ticker := time.NewTicker(a.config.Interval)
for {
select {
Expand Down Expand Up @@ -100,31 +112,47 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
break
}

if diskStats.UsedPercent <= a.config.DiskUsageThreshold &&
diskStats.InodesUsedPercent <= a.config.InodeUsageThreshold {
reason := ""

switch {
case diskStats.UsedPercent > a.config.DiskUsageThreshold:
reason = fmt.Sprintf("disk usage of %.0f is over gc threshold of %.0f",
diskStats.UsedPercent, a.config.DiskUsageThreshold)
case diskStats.InodesUsedPercent > a.config.InodeUsageThreshold:
reason = fmt.Sprintf("inode usage of %.0f is over gc threshold of %.0f",
diskStats.InodesUsedPercent, a.config.InodeUsageThreshold)
case a.numAllocs() > a.config.MaxAllocs:
reason = fmt.Sprintf("number of allocations is over the limit (%d)", a.config.MaxAllocs)
}

// No reason to gc, exit
if reason == "" {
break
}

// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason)
break
}

ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)

// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(ar)
a.destroyAllocRunner(gcAlloc.allocRunner, reason)
}
return nil
}

// destroyAllocRunner is used to destroy an allocation runner. It will acquire a
// lock to restrict parallelism and then destroy the alloc runner, returning
// once the allocation has been destroyed.
func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner) {
func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason string) {
id := "<nil>"
if alloc := ar.Alloc(); alloc != nil {
id = alloc.ID
}
a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason)

// Acquire the destroy lock
select {
case <-a.shutdownCh:
Expand Down Expand Up @@ -155,11 +183,7 @@ func (a *AllocGarbageCollector) Collect(allocID string) error {
if err != nil {
return fmt.Errorf("unable to collect allocation %q: %v", allocID, err)
}

ar := gcAlloc.allocRunner
a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID)

a.destroyAllocRunner(ar)
a.destroyAllocRunner(gcAlloc.allocRunner, "forced collection")
return nil
}

Expand All @@ -177,16 +201,34 @@ func (a *AllocGarbageCollector) CollectAll() error {
break
}

ar := gcAlloc.allocRunner
a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID)
go a.destroyAllocRunner(ar)
go a.destroyAllocRunner(gcAlloc.allocRunner, "forced full collection")
}
return nil
}

// MakeRoomFor garbage collects enough number of allocations in the terminal
// state to make room for new allocations
func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error {
// GC allocs until below the max limit + the new allocations
max := a.config.MaxAllocs - len(allocations)
for a.numAllocs() > max {
select {
case <-a.shutdownCh:
return nil
default:
}

gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
// It's fine if we can't lower below the limit here as
// we'll keep trying to drop below the limit with each
// periodic gc
break
}

// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(gcAlloc.allocRunner, "new allocations")
}
totalResource := &structs.Resources{}
for _, alloc := range allocations {
if err := totalResource.Add(alloc.Resources); err != nil {
Expand Down Expand Up @@ -244,10 +286,9 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e

ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)

// Destroy the alloc runner and wait until it exits
a.destroyAllocRunner(ar)
a.destroyAllocRunner(ar, fmt.Sprintf("freeing %d MB for new allocations", alloc.Resources.DiskMB))

// Call stats collect again
diskCleared += alloc.Resources.DiskMB
Expand All @@ -261,8 +302,7 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error {
return fmt.Errorf("nil allocation runner inserted for garbage collection")
}
if ar.Alloc() == nil {
a.logger.Printf("[INFO] client: alloc is nil, so garbage collecting")
a.destroyAllocRunner(ar)
a.destroyAllocRunner(ar, "alloc is nil")
}

a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID)
Expand All @@ -281,6 +321,12 @@ func (a *AllocGarbageCollector) Remove(ar *AllocRunner) {
}
}

// numAllocs returns the total number of allocs tracked by the client as well
// as those marked for GC.
func (a *AllocGarbageCollector) numAllocs() int {
return a.allocRunners.Length() + a.allocCounter.NumAllocs()
}

// GCAlloc wraps an allocation runner and an index enabling it to be used within
// a PQ
type GCAlloc struct {
Expand Down
Loading

0 comments on commit cc11d9a

Please sign in to comment.