Skip to content

Commit

Permalink
Merge pull request #3147 from hashicorp/b-tagged-metrics
Browse files Browse the repository at this point in the history
Add support for tagged metrics
  • Loading branch information
chelseakomlo authored Sep 5, 2017
2 parents 2111a84 + 0d434d0 commit 949df54
Show file tree
Hide file tree
Showing 28 changed files with 1,236 additions and 321 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.7 (Unreleased)

IMPROVEMENTS:
* telemetry: Add support for tagged metrics for Nomad clients [GH-3147]

## 0.6.3 (Unreleased)

BUG FIXES:
Expand Down
184 changes: 148 additions & 36 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
"github.com/boltdb/bolt"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -153,6 +153,10 @@ type Client struct {

// clientACLResolver holds the ACL resolution state
clientACLResolver

// baseLabels are used when emitting tagged metrics. All client metrics will
// have these tags, and optionally more.
baseLabels []metrics.Label
}

var (
Expand Down Expand Up @@ -1843,6 +1847,10 @@ DISCOLOOP:

// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
// Assign labels directly before emitting stats so the information expected
// is ready
c.baseLabels = []metrics.Label{{Name: "node_id", Value: c.Node().ID}, {Name: "datacenter", Value: c.Node().Datacenter}}

// Start collecting host stats right away and then keep collecting every
// collection interval
next := time.NewTimer(0)
Expand All @@ -1859,7 +1867,7 @@ func (c *Client) emitStats() {

// Publish Node metrics if operator has opted in
if c.config.PublishNodeMetrics {
c.emitHostStats(c.hostStatsCollector.Stats())
c.emitHostStats()
}

c.emitClientMetrics()
Expand All @@ -1869,58 +1877,120 @@ func (c *Client) emitStats() {
}
}

// emitHostStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitHostStats(hStats *stats.HostStats) {
nodeID := c.Node().ID
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "used"}, float32(hStats.Memory.Used))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "free"}, float32(hStats.Memory.Free))
// setGaugeForMemoryStats proxies metrics for memory specific statistics
func (c *Client) setGaugeForMemoryStats(nodeID string, hStats *stats.HostStats) {
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "total"}, float32(hStats.Memory.Total), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "available"}, float32(hStats.Memory.Available), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "used"}, float32(hStats.Memory.Used), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "free"}, float32(hStats.Memory.Free), c.baseLabels)
}

metrics.SetGauge([]string{"uptime"}, float32(hStats.Uptime))
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "used"}, float32(hStats.Memory.Used))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "free"}, float32(hStats.Memory.Free))
}
}

// setGaugeForCPUStats proxies metrics for CPU specific statistics
func (c *Client) setGaugeForCPUStats(nodeID string, hStats *stats.HostStats) {
for _, cpu := range hStats.CPU {
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "total"}, float32(cpu.Total))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "user"}, float32(cpu.User))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "idle"}, float32(cpu.Idle))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "system"}, float32(cpu.System))
if !c.config.DisableTaggedMetrics {
labels := append(c.baseLabels, metrics.Label{"cpu", cpu.CPU})

metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "total"}, float32(cpu.Total), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "user"}, float32(cpu.User), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "idle"}, float32(cpu.Idle), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "system"}, float32(cpu.System), labels)
}

if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "total"}, float32(cpu.Total))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "user"}, float32(cpu.User))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "idle"}, float32(cpu.Idle))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "system"}, float32(cpu.System))
}
}
}

// setGaugeForDiskStats proxies metrics for disk specific statistics
func (c *Client) setGaugeForDiskStats(nodeID string, hStats *stats.HostStats) {
for _, disk := range hStats.DiskStats {
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "size"}, float32(disk.Size))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used"}, float32(disk.Used))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "available"}, float32(disk.Available))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent))
if !c.config.DisableTaggedMetrics {
labels := append(c.baseLabels, metrics.Label{"disk", disk.Device})

metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "size"}, float32(disk.Size), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "used"}, float32(disk.Used), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "available"}, float32(disk.Available), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "used_percent"}, float32(disk.UsedPercent), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "inodes_percent"}, float32(disk.InodesUsedPercent), labels)
}

if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "size"}, float32(disk.Size))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used"}, float32(disk.Used))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "available"}, float32(disk.Available))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent))
}
}
}

// Get all the resources for the node
c.configLock.RLock()
// setGaugeForAllocationStats proxies metrics for allocation specific statistics
func (c *Client) setGaugeForAllocationStats(nodeID string) {
node := c.configCopy.Node
c.configLock.RUnlock()
total := node.Resources
res := node.Reserved
allocated := c.getAllocatedResources(node)

// Emit allocated
metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.MemoryMB))
metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.DiskMB))
metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.CPU))
metrics.SetGauge([]string{"client", "allocated", "iops", nodeID}, float32(allocated.IOPS))
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocated", "memory"}, float32(allocated.MemoryMB), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocated", "disk"}, float32(allocated.DiskMB), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocated", "cpu"}, float32(allocated.CPU), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocated", "iops"}, float32(allocated.IOPS), c.baseLabels)
}

if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.MemoryMB))
metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.DiskMB))
metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.CPU))
metrics.SetGauge([]string{"client", "allocated", "iops", nodeID}, float32(allocated.IOPS))
}

for _, n := range allocated.Networks {
metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits))
if !c.config.DisableTaggedMetrics {
labels := append(c.baseLabels, metrics.Label{"device", n.Device})
metrics.SetGaugeWithLabels([]string{"client", "allocated", "network"}, float32(n.MBits), labels)
}

if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits))
}
}

// Emit unallocated
unallocatedMem := total.MemoryMB - res.MemoryMB - allocated.MemoryMB
unallocatedDisk := total.DiskMB - res.DiskMB - allocated.DiskMB
unallocatedCpu := total.CPU - res.CPU - allocated.CPU
unallocatedIops := total.IOPS - res.IOPS - allocated.IOPS
metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem))
metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk))
metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu))
metrics.SetGauge([]string{"client", "unallocated", "iops", nodeID}, float32(unallocatedIops))

if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "memory"}, float32(unallocatedMem), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "disk"}, float32(unallocatedDisk), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "cpu"}, float32(unallocatedCpu), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "iops"}, float32(unallocatedIops), c.baseLabels)
}

if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem))
metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk))
metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu))
metrics.SetGauge([]string{"client", "unallocated", "iops", nodeID}, float32(unallocatedIops))
}

for _, n := range allocated.Networks {
totalMbits := 0
Expand All @@ -1932,10 +2002,42 @@ func (c *Client) emitHostStats(hStats *stats.HostStats) {
}

unallocatedMbits := totalMbits - n.MBits
metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits))

if !c.config.DisableTaggedMetrics {
labels := append(c.baseLabels, metrics.Label{"device", n.Device})
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "network"}, float32(unallocatedMbits), labels)
}

if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits))
}
}
}

// No lables are required so we emit with only a key/value syntax
func (c *Client) setGaugeForUptime(hStats *stats.HostStats) {
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"uptime"}, float32(hStats.Uptime), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"uptime"}, float32(hStats.Uptime))
}
}

// emitHostStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitHostStats() {
nodeID := c.Node().ID
hStats := c.hostStatsCollector.Stats()

c.setGaugeForMemoryStats(nodeID, hStats)
c.setGaugeForUptime(hStats)
c.setGaugeForCPUStats(nodeID, hStats)
c.setGaugeForDiskStats(nodeID, hStats)

// TODO: This should be moved to emitClientMetrics
c.setGaugeForAllocationStats(nodeID)
}

// emitClientMetrics emits lower volume client metrics
func (c *Client) emitClientMetrics() {
nodeID := c.Node().ID
Expand All @@ -1960,11 +2062,21 @@ func (c *Client) emitClientMetrics() {
}
}

metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating))
metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked))
metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending))
metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running))
metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal))
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocations", "migrating"}, float32(migrating), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "blocked"}, float32(blocked), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "pending"}, float32(pending), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "running"}, float32(running), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "terminal"}, float32(terminal), c.baseLabels)
}

if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating))
metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked))
metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending))
metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running))
metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal))
}
}

func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources {
Expand Down
27 changes: 27 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/hashstructure"
"github.com/stretchr/testify/assert"

ctestutil "github.com/hashicorp/nomad/client/testutil"
)
Expand Down Expand Up @@ -136,6 +137,32 @@ func TestClient_StartStop(t *testing.T) {
}
}

// Certain labels for metrics are dependant on client intial setup. This tests
// that the client has properly initialized before we assign values to labels
func TestClient_BaseLabels(t *testing.T) {
t.Parallel()
assert := assert.New(t)

client := testClient(t, nil)
if err := client.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}

// directly invoke this function, as otherwise this will fail on a CI build
// due to a race condition
client.emitStats()

baseLabels := client.baseLabels
assert.NotEqual(0, len(baseLabels))

nodeID := client.Node().ID
for _, e := range baseLabels {
if e.Name == "node_id" {
assert.Equal(nodeID, e.Value)
}
}
}

func TestClient_RPC(t *testing.T) {
t.Parallel()
s1, addr := testServer(t, nil)
Expand Down
38 changes: 24 additions & 14 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ type Config struct {

// ACLPolicyTTL is how long we cache policy values for
ACLPolicyTTL time.Duration

// DisableTaggedMetrics determines whether metrics will be displayed via a
// key/value/tag format, or simply a key/value format
DisableTaggedMetrics bool

// BackwardsCompatibleMetrics determines whether to show methods of
// displaying metrics for older verions, or to only show the new format
BackwardsCompatibleMetrics bool
}

func (c *Config) Copy() *Config {
Expand All @@ -205,20 +213,22 @@ func (c *Config) Copy() *Config {
// DefaultConfig returns the default configuration
func DefaultConfig() *Config {
return &Config{
Version: version.GetVersion(),
VaultConfig: config.DefaultVaultConfig(),
ConsulConfig: config.DefaultConsulConfig(),
LogOutput: os.Stderr,
Region: "global",
StatsCollectionInterval: 1 * time.Second,
TLSConfig: &config.TLSConfig{},
LogLevel: "DEBUG",
GCInterval: 1 * time.Minute,
GCParallelDestroys: 2,
GCDiskUsageThreshold: 80,
GCInodeUsageThreshold: 70,
GCMaxAllocs: 50,
NoHostUUID: true,
Version: version.GetVersion(),
VaultConfig: config.DefaultVaultConfig(),
ConsulConfig: config.DefaultConsulConfig(),
LogOutput: os.Stderr,
Region: "global",
StatsCollectionInterval: 1 * time.Second,
TLSConfig: &config.TLSConfig{},
LogLevel: "DEBUG",
GCInterval: 1 * time.Minute,
GCParallelDestroys: 2,
GCDiskUsageThreshold: 80,
GCInodeUsageThreshold: 70,
GCMaxAllocs: 50,
NoHostUUID: true,
DisableTaggedMetrics: false,
BackwardsCompatibleMetrics: false,
}
}

Expand Down
Loading

0 comments on commit 949df54

Please sign in to comment.