Skip to content

Commit

Permalink
Ensure prometheus metrics have same set of labels (#2857)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Jun 14, 2017
1 parent 246f342 commit 949072e
Show file tree
Hide file tree
Showing 2 changed files with 455 additions and 129 deletions.
215 changes: 148 additions & 67 deletions plugins/outputs/prometheus_client/prometheus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"log"
"net/http"
"regexp"
"sort"
"strings"
"sync"
"time"

Expand All @@ -17,19 +19,40 @@ import (

var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)

type MetricWithExpiration struct {
Metric prometheus.Metric
// SampleID uniquely identifies a Sample
type SampleID string

// Sample represents the current value of a series.
type Sample struct {
// Labels are the Prometheus labels.
Labels map[string]string
// Value is the value in the Prometheus output.
Value float64
// Expiration is the deadline that this Sample is valid until.
Expiration time.Time
}

// MetricFamily contains the data required to build valid prometheus Metrics.
type MetricFamily struct {
// Samples are the Sample belonging to this MetricFamily.
Samples map[SampleID]*Sample
// Type of the Value.
ValueType prometheus.ValueType
// LabelSet is the label counts for all Samples.
LabelSet map[string]int
}

type PrometheusClient struct {
Listen string
ExpirationInterval internal.Duration `toml:"expiration_interval"`
server *http.Server

metrics map[string]*MetricWithExpiration
server *http.Server

sync.Mutex
// fam is the non-expired MetricFamily by Prometheus metric name.
fam map[string]*MetricFamily
// now returns the current time.
now func() time.Time
}

var sampleConfig = `
Expand All @@ -41,7 +64,6 @@ var sampleConfig = `
`

func (p *PrometheusClient) Start() error {
p.metrics = make(map[string]*MetricWithExpiration)
prometheus.Register(p)

if p.Listen == "" {
Expand Down Expand Up @@ -88,96 +110,153 @@ func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
}

// Implements prometheus.Collector
// Expire removes Samples that have expired.
func (p *PrometheusClient) Expire() {
now := p.now()
for name, family := range p.fam {
for key, sample := range family.Samples {
if p.ExpirationInterval.Duration != 0 && now.After(sample.Expiration) {
for k, _ := range sample.Labels {
family.LabelSet[k]--
}
delete(family.Samples, key)

if len(family.Samples) == 0 {
delete(p.fam, name)
}
}
}
}
}

// Collect implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock()
defer p.Unlock()

for key, m := range p.metrics {
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
delete(p.metrics, key)
} else {
ch <- m.Metric
p.Expire()

for name, family := range p.fam {
// Get list of all labels on MetricFamily
var labelNames []string
for k, v := range family.LabelSet {
if v > 0 {
labelNames = append(labelNames, k)
}
}
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)

for _, sample := range family.Samples {
// Get labels for this sample; unset labels will be set to the
// empty string
var labels []string
for _, label := range labelNames {
v := sample.Labels[label]
labels = append(labels, v)
}

metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
name, labels, err.Error())
}

ch <- metric
}
}
}

func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}

func valueType(tt telegraf.ValueType) prometheus.ValueType {
switch tt {
case telegraf.Counter:
return prometheus.CounterValue
case telegraf.Gauge:
return prometheus.GaugeValue
default:
return prometheus.UntypedValue
}
}

// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
func CreateSampleID(tags map[string]string) SampleID {
pairs := make([]string, 0, len(tags))
for k, v := range tags {
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(pairs)
return SampleID(strings.Join(pairs, ","))
}

func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()

if len(metrics) == 0 {
return nil
}
now := p.now()

for _, point := range metrics {
key := point.Name()
key = invalidNameCharRE.ReplaceAllString(key, "_")

// convert tags into prometheus labels
var labels []string
l := prometheus.Labels{}
for k, v := range point.Tags() {
k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 {
continue
}
labels = append(labels, k)
l[k] = v
}
tags := point.Tags()
vt := valueType(point.Type())
sampleID := CreateSampleID(tags)

// Get a type if it's available, defaulting to Untyped
var mType prometheus.ValueType
switch point.Type() {
case telegraf.Counter:
mType = prometheus.CounterValue
case telegraf.Gauge:
mType = prometheus.GaugeValue
default:
mType = prometheus.UntypedValue
labels := make(map[string]string)
for k, v := range tags {
labels[sanitize(k)] = sanitize(v)
}

for n, val := range point.Fields() {
for fn, fv := range point.Fields() {
// Ignore string and bool fields.
switch val.(type) {
case string:
continue
case bool:
var value float64
switch fv := fv.(type) {
case int64:
value = float64(fv)
case float64:
value = fv
default:
continue
}

// sanitize the measurement name
n = invalidNameCharRE.ReplaceAllString(n, "_")
sample := &Sample{
Labels: labels,
Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
}

// Special handling of value field; supports passthrough from
// the prometheus input.
var mname string
if n == "value" {
mname = key
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = fmt.Sprintf("%s_%s", key, n)
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
}

desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric
var err error

// switch for field type
switch val := val.(type) {
case int64:
metric, err = prometheus.NewConstMetric(desc, mType, float64(val))
case float64:
metric, err = prometheus.NewConstMetric(desc, mType, val)
default:
continue
}
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
ValueType: vt,
LabelSet: make(map[string]int),
}
p.fam[mname] = fam
} else {
if fam.ValueType != vt {
// Don't return an error since this would be a permanent error
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
break
}
}

p.metrics[desc.String()] = &MetricWithExpiration{
Metric: metric,
Expiration: time.Now().Add(p.ExpirationInterval.Duration),
for k, _ := range sample.Labels {
fam.LabelSet[k]++
}

fam.Samples[sampleID] = sample
}
}
return nil
Expand All @@ -187,6 +266,8 @@ func init() {
outputs.Add("prometheus_client", func() telegraf.Output {
return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
}
})
}
Loading

0 comments on commit 949072e

Please sign in to comment.