Skip to content

Commit

Permalink
Merge pull request #1189 from hashicorp/f-alloc-resource-usage
Browse files Browse the repository at this point in the history
Client APIs for reporting resource usage of host and allocations
  • Loading branch information
diptanu committed May 31, 2016
2 parents 6ef5cfc + f1a5348 commit 9496242
Show file tree
Hide file tree
Showing 115 changed files with 5,996 additions and 840 deletions.
27 changes: 27 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package api

import (
"fmt"
"sort"
"time"

"github.com/hashicorp/go-cleanhttp"
)

// Allocations is used to query the alloc-related endpoints.
Expand Down Expand Up @@ -40,6 +43,30 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query
return &resp, qm, nil
}

func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (map[string]*TaskResourceUsage, error) {
node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, err
}
if node.HTTPAddr == "" {
return nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
client, err := NewClient(&Config{
Address: fmt.Sprintf("http://%s", node.HTTPAddr),
HttpClient: cleanhttp.DefaultClient(),
})
if err != nil {
return nil, err
}
resp := make(map[string][]*TaskResourceUsage)
client.query("/v1/client/allocation/"+alloc.ID+"/stats", &resp, nil)
res := make(map[string]*TaskResourceUsage)
for task, ru := range resp {
res[task] = ru[0]
}
return res, nil
}

// Allocation is used for serialization of allocations.
type Allocation struct {
ID string
Expand Down
58 changes: 58 additions & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package api

import (
"fmt"
"sort"
"strconv"

"github.com/hashicorp/go-cleanhttp"
)

// Nodes is used to query node-related API endpoints
Expand Down Expand Up @@ -71,6 +74,29 @@ func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMet
return resp.EvalID, wm, nil
}

func (n *Nodes) Stats(nodeID string, q *QueryOptions) (*HostStats, error) {
node, _, err := n.client.Nodes().Info(nodeID, q)
if err != nil {
return nil, err
}
if node.HTTPAddr == "" {
return nil, fmt.Errorf("http addr of the node %q is running is not advertised", nodeID)
}
client, err := NewClient(&Config{
Address: fmt.Sprintf("http://%s", node.HTTPAddr),
HttpClient: cleanhttp.DefaultClient(),
})
if err != nil {
return nil, err
}
var resp []HostStats
if _, err := client.query("/v1/client/stats/", &resp, nil); err != nil {
return nil, err
}

return &resp[0], nil
}

// Node is used to deserialize a node entry.
type Node struct {
ID string
Expand All @@ -90,6 +116,38 @@ type Node struct {
ModifyIndex uint64
}

// HostStats represents resource usage stats of the host running a Nomad client
type HostStats struct {
Memory *HostMemoryStats
CPU []*HostCPUStats
DiskStats []*HostDiskStats
Uptime uint64
}

type HostMemoryStats struct {
Total uint64
Available uint64
Used uint64
Free uint64
}

type HostCPUStats struct {
CPU string
User float64
System float64
Idle float64
}

type HostDiskStats struct {
Device string
Mountpoint string
Size uint64
Used uint64
Available uint64
UsedPercent float64
InodesUsedPercent float64
}

// NodeListStub is a subset of information returned during
// node list operations.
type NodeListStub struct {
Expand Down
33 changes: 33 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,39 @@ import (
"time"
)

// MemoryStats holds memory usage related stats
type MemoryStats struct {
RSS uint64
Cache uint64
Swap uint64
MaxUsage uint64
KernelUsage uint64
KernelMaxUsage uint64
}

// CpuStats holds cpu usage related stats
type CpuStats struct {
SystemMode float64
UserMode float64
ThrottledPeriods uint64
ThrottledTime uint64
Percent float64
}

// ResourceUsage holds information related to cpu and memory stats
type ResourceUsage struct {
MemoryStats *MemoryStats
CpuStats *CpuStats
}

// TaskResourceUsage holds aggregated resource usage of all processes in a Task
// and the resource usage of the individual pids
type TaskResourceUsage struct {
ResourceUsage *ResourceUsage
Timestamp int64
Pids map[string]*ResourceUsage
}

// RestartPolicy defines how the Nomad client restarts
// tasks in a taskgroup when they fail
type RestartPolicy struct {
Expand Down
34 changes: 34 additions & 0 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ const (
// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation)

// AllocStatsReporter exposes stats related APIs of an allocation runner
type AllocStatsReporter interface {
AllocStats() map[string]TaskStatsReporter
}

// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
config *config.Config
Expand Down Expand Up @@ -471,6 +476,35 @@ func (r *AllocRunner) Update(update *structs.Allocation) {
}
}

// StatsReporter returns an interface to query resource usage statistics of an
// allocation
func (r *AllocRunner) StatsReporter() AllocStatsReporter {
return r
}

// AllocStats returns the stats reporter of all the tasks running in the
// allocation
func (r *AllocRunner) AllocStats() map[string]TaskStatsReporter {
r.taskLock.RLock()
defer r.taskLock.RUnlock()
res := make(map[string]TaskStatsReporter)
for task, tr := range r.tasks {
res[task] = tr.StatsReporter()
}
return res
}

// TaskStats returns the stats reporter for a specific task running in the
// allocation
func (r *AllocRunner) TaskStats(task string) (TaskStatsReporter, error) {
tr, ok := r.tasks[task]
if !ok {
return nil, fmt.Errorf("task %q not running in allocation %v", task, r.alloc.ID)
}

return tr.StatsReporter(), nil
}

// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
// checks if the current running allocation is behind and should be updated.
func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {
Expand Down
135 changes: 126 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/hashstructure"
Expand Down Expand Up @@ -76,11 +77,27 @@ const (
// DefaultConfig returns the default configuration
func DefaultConfig() *config.Config {
return &config.Config{
LogOutput: os.Stderr,
Region: "global",
LogOutput: os.Stderr,
Region: "global",
StatsDataPoints: 60,
StatsCollectionInterval: 1 * time.Second,
}
}

// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
// Client
type ClientStatsReporter interface {
// AllocStats returns a map of alloc ids and their corresponding stats
// collector
AllocStats() map[string]AllocStatsReporter

// HostStats returns resource usage stats for the host
HostStats() []*stats.HostStats

// HostStatsTS returns a time series of host resource usage stats
HostStatsTS(since int64) []*stats.HostStats
}

// Client is used to implement the client interaction with Nomad. Clients
// are expected to register as a schedulable node to the servers, and to
// run allocations as determined by the servers.
Expand Down Expand Up @@ -116,6 +133,11 @@ type Client struct {

consulService *consul.ConsulService

// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
resourceUsage *stats.RingBuff
resourceUsageLock sync.RWMutex

shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
Expand All @@ -126,15 +148,22 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Create a logger
logger := log.New(cfg.LogOutput, "", log.LstdFlags)

resourceUsage, err := stats.NewRingBuff(cfg.StatsDataPoints)
if err != nil {
return nil, err
}

// Create the client
c := &Client{
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(),
resourceUsage: resourceUsage,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
}

// Initialize the client
Expand Down Expand Up @@ -189,6 +218,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Start the client!
go c.run()

// Start collecting stats
go c.collectHostStats()

// Start the consul sync
go c.syncConsul()

Expand Down Expand Up @@ -394,6 +426,67 @@ func (c *Client) Node() *structs.Node {
return c.config.Node
}

// StatsReporter exposes the various APIs related resource usage of a Nomad
// client
func (c *Client) StatsReporter() ClientStatsReporter {
return c
}

// AllocStats returns all the stats reporter of the allocations running on a
// Nomad client
func (c *Client) AllocStats() map[string]AllocStatsReporter {
res := make(map[string]AllocStatsReporter)
allocRunners := c.getAllocRunners()
for alloc, ar := range allocRunners {
res[alloc] = ar
}
return res
}

// HostStats returns all the stats related to a Nomad client
func (c *Client) HostStats() []*stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
val := c.resourceUsage.Peek()
ru, _ := val.(*stats.HostStats)
return []*stats.HostStats{ru}
}

func (c *Client) HostStatsTS(since int64) []*stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()

values := c.resourceUsage.Values()
low := 0
high := len(values) - 1
var idx int

for {
mid := (low + high) >> 1
midVal, _ := values[mid].(*stats.HostStats)
if midVal.Timestamp < since {
low = mid + 1
} else if midVal.Timestamp > since {
high = mid - 1
} else if midVal.Timestamp == since {
idx = mid
break
}
if low > high {
idx = low
break
}
}
values = values[idx:]
ts := make([]*stats.HostStats, len(values))
for index, val := range values {
ru, _ := val.(*stats.HostStats)
ts[index] = ru
}
return ts

}

// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
ar, ok := c.allocs[allocID]
Expand Down Expand Up @@ -1227,3 +1320,27 @@ func (c *Client) syncConsul() {

}
}

// collectHostStats collects host resource usage stats periodically
func (c *Client) collectHostStats() {
// Start collecting host stats right away and then keep collecting every
// collection interval
next := time.NewTimer(0)
defer next.Stop()
for {
select {
case <-next.C:
ru, err := c.hostStatsCollector.Collect()
if err != nil {
c.logger.Printf("[DEBUG] client: error fetching host resource usage stats: %v", err)
continue
}
c.resourceUsageLock.RLock()
c.resourceUsage.Enqueue(ru)
c.resourceUsageLock.RUnlock()
next.Reset(c.config.StatsCollectionInterval)
case <-c.shutdownCh:
return
}
}
}
Loading

0 comments on commit 9496242

Please sign in to comment.