Skip to content

Commit

Permalink
Merge pull request #2425 from hashicorp/f-client-metrics
Browse files Browse the repository at this point in the history
Add metrics to show allocations on the client
  • Loading branch information
dadgar authored Mar 10, 2017
2 parents 85fffa1 + 0797379 commit 20b03df
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 37 deletions.
40 changes: 22 additions & 18 deletions api/evaluations.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,28 @@ func (e *Evaluations) Allocations(evalID string, q *QueryOptions) ([]*Allocation

// Evaluation is used to serialize an evaluation.
type Evaluation struct {
ID string
Priority int
Type string
TriggeredBy string
JobID string
JobModifyIndex uint64
NodeID string
NodeModifyIndex uint64
Status string
StatusDescription string
Wait time.Duration
NextEval string
PreviousEval string
BlockedEval string
FailedTGAllocs map[string]*AllocationMetric
QueuedAllocations map[string]int
CreateIndex uint64
ModifyIndex uint64
ID string
Priority int
Type string
TriggeredBy string
JobID string
JobModifyIndex uint64
NodeID string
NodeModifyIndex uint64
Status string
StatusDescription string
Wait time.Duration
NextEval string
PreviousEval string
BlockedEval string
FailedTGAllocs map[string]*AllocationMetric
ClassEligibility map[string]bool
EscapedComputedClass bool
AnnotatePlan bool
QueuedAllocations map[string]int
SnapshotIndex uint64
CreateIndex uint64
ModifyIndex uint64
}

// EvalIndexSort is a wrapper to sort evaluations by CreateIndex.
Expand Down
24 changes: 15 additions & 9 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,15 +574,21 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
// handleDestroy blocks till the AllocRunner should be destroyed and does the
// necessary cleanup.
func (r *AllocRunner) handleDestroy() {
select {
case <-r.destroyCh:
if err := r.DestroyContext(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
r.alloc.ID, err)
}
if err := r.DestroyState(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v",
r.alloc.ID, err)
for {
select {
case <-r.destroyCh:
if err := r.DestroyContext(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
r.alloc.ID, err)
}
if err := r.DestroyState(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v",
r.alloc.ID, err)
}

return
case <-r.updateCh:
r.logger.Printf("[ERR] client: dropping update to terminal alloc '%s'", r.alloc.ID)
}
}
}
Expand Down
46 changes: 40 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
go c.run()

// Start collecting stats
go c.collectHostStats()
go c.emitStats()

c.logger.Printf("[INFO] client: Node ID %q", c.Node().ID)
return c, nil
Expand Down Expand Up @@ -2170,8 +2170,8 @@ func (c *Client) consulReaperImpl() error {
return c.consulSyncer.ReapUnmatched(domains)
}

// collectHostStats collects host resource usage stats periodically
func (c *Client) collectHostStats() {
// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
// Start collecting host stats right away and then keep collecting every
// collection interval
next := time.NewTimer(0)
Expand All @@ -2188,16 +2188,18 @@ func (c *Client) collectHostStats() {

// Publish Node metrics if operator has opted in
if c.config.PublishNodeMetrics {
c.emitStats(c.hostStatsCollector.Stats())
c.emitHostStats(c.hostStatsCollector.Stats())
}

c.emitClientMetrics()
case <-c.shutdownCh:
return
}
}
}

// emitStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitStats(hStats *stats.HostStats) {
// emitHostStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitHostStats(hStats *stats.HostStats) {
nodeID := c.Node().ID
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
Expand Down Expand Up @@ -2263,6 +2265,38 @@ func (c *Client) emitStats(hStats *stats.HostStats) {
}
}

// emitClientMetrics emits lower volume client metrics
func (c *Client) emitClientMetrics() {
nodeID := c.Node().ID

// Emit allocation metrics
c.migratingAllocsLock.Lock()
migrating := len(c.migratingAllocs)
c.migratingAllocsLock.Unlock()

c.blockedAllocsLock.Lock()
blocked := len(c.blockedAllocations)
c.blockedAllocsLock.Unlock()

pending, running, terminal := 0, 0, 0
for _, ar := range c.getAllocRunners() {
switch ar.Alloc().ClientStatus {
case structs.AllocClientStatusPending:
pending++
case structs.AllocClientStatusRunning:
running++
case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed:
terminal++
}
}

metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating))
metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked))
metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending))
metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running))
metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal))
}

func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources {
// Unfortunately the allocs only have IP so we need to match them to the
// device
Expand Down
8 changes: 4 additions & 4 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3827,15 +3827,15 @@ type Evaluation struct {
// during the evaluation. This should not be set during normal operations.
AnnotatePlan bool

// QueuedAllocations is the number of unplaced allocations at the time the
// evaluation was processed. The map is keyed by Task Group names.
QueuedAllocations map[string]int

// SnapshotIndex is the Raft index of the snapshot used to process the
// evaluation. As such it will only be set once it has gone through the
// scheduler.
SnapshotIndex uint64

// QueuedAllocations is the number of unplaced allocations at the time the
// evaluation was processed. The map is keyed by Task Group names.
QueuedAllocations map[string]int

// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
Expand Down

0 comments on commit 20b03df

Please sign in to comment.