Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure prometheus metrics have same set of labels #2857

Merged
merged 3 commits into from
Jun 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 148 additions & 67 deletions plugins/outputs/prometheus_client/prometheus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"log"
"net/http"
"regexp"
"sort"
"strings"
"sync"
"time"

Expand All @@ -17,19 +19,40 @@ import (

var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)

type MetricWithExpiration struct {
Metric prometheus.Metric
// SampleID uniquely identifies a Sample
type SampleID string

// Sample represents the current value of a series.
type Sample struct {
// Labels are the Prometheus labels.
Labels map[string]string
// Value is the value in the Prometheus output.
Value float64
// Expiration is the deadline that this Sample is valid until.
Expiration time.Time
}

// MetricFamily contains the data required to build valid prometheus Metrics.
type MetricFamily struct {
// Samples are the Sample belonging to this MetricFamily.
Samples map[SampleID]*Sample
// Type of the Value.
ValueType prometheus.ValueType
// LabelSet is the label counts for all Samples.
LabelSet map[string]int
}

type PrometheusClient struct {
Listen string
ExpirationInterval internal.Duration `toml:"expiration_interval"`
server *http.Server

metrics map[string]*MetricWithExpiration
server *http.Server

sync.Mutex
// fam is the non-expired MetricFamily by Prometheus metric name.
fam map[string]*MetricFamily
// now returns the current time.
now func() time.Time
}

var sampleConfig = `
Expand All @@ -41,7 +64,6 @@ var sampleConfig = `
`

func (p *PrometheusClient) Start() error {
p.metrics = make(map[string]*MetricWithExpiration)
prometheus.Register(p)

if p.Listen == "" {
Expand Down Expand Up @@ -88,96 +110,153 @@ func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
}

// Implements prometheus.Collector
// Expire removes Samples that have expired.
func (p *PrometheusClient) Expire() {
now := p.now()
for name, family := range p.fam {
for key, sample := range family.Samples {
if p.ExpirationInterval.Duration != 0 && now.After(sample.Expiration) {
for k, _ := range sample.Labels {
family.LabelSet[k]--
}
delete(family.Samples, key)

if len(family.Samples) == 0 {
delete(p.fam, name)
}
}
}
}
}

// Collect implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock()
defer p.Unlock()

for key, m := range p.metrics {
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
delete(p.metrics, key)
} else {
ch <- m.Metric
p.Expire()

for name, family := range p.fam {
// Get list of all labels on MetricFamily
var labelNames []string
for k, v := range family.LabelSet {
if v > 0 {
labelNames = append(labelNames, k)
}
}
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)

for _, sample := range family.Samples {
// Get labels for this sample; unset labels will be set to the
// empty string
var labels []string
for _, label := range labelNames {
v := sample.Labels[label]
labels = append(labels, v)
}

metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
name, labels, err.Error())
}

ch <- metric
}
}
}

func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}

func valueType(tt telegraf.ValueType) prometheus.ValueType {
switch tt {
case telegraf.Counter:
return prometheus.CounterValue
case telegraf.Gauge:
return prometheus.GaugeValue
default:
return prometheus.UntypedValue
}
}

// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
func CreateSampleID(tags map[string]string) SampleID {
pairs := make([]string, 0, len(tags))
for k, v := range tags {
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(pairs)
return SampleID(strings.Join(pairs, ","))
}

func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()

if len(metrics) == 0 {
return nil
}
now := p.now()

for _, point := range metrics {
key := point.Name()
key = invalidNameCharRE.ReplaceAllString(key, "_")

// convert tags into prometheus labels
var labels []string
l := prometheus.Labels{}
for k, v := range point.Tags() {
k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 {
continue
}
labels = append(labels, k)
l[k] = v
}
tags := point.Tags()
vt := valueType(point.Type())
sampleID := CreateSampleID(tags)

// Get a type if it's available, defaulting to Untyped
var mType prometheus.ValueType
switch point.Type() {
case telegraf.Counter:
mType = prometheus.CounterValue
case telegraf.Gauge:
mType = prometheus.GaugeValue
default:
mType = prometheus.UntypedValue
labels := make(map[string]string)
for k, v := range tags {
labels[sanitize(k)] = sanitize(v)
}

for n, val := range point.Fields() {
for fn, fv := range point.Fields() {
// Ignore string and bool fields.
switch val.(type) {
case string:
continue
case bool:
var value float64
switch fv := fv.(type) {
case int64:
value = float64(fv)
case float64:
value = fv
default:
continue
}

// sanitize the measurement name
n = invalidNameCharRE.ReplaceAllString(n, "_")
sample := &Sample{
Labels: labels,
Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
}

// Special handling of value field; supports passthrough from
// the prometheus input.
var mname string
if n == "value" {
mname = key
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = fmt.Sprintf("%s_%s", key, n)
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
}

desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric
var err error

// switch for field type
switch val := val.(type) {
case int64:
metric, err = prometheus.NewConstMetric(desc, mType, float64(val))
case float64:
metric, err = prometheus.NewConstMetric(desc, mType, val)
default:
continue
}
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
ValueType: vt,
LabelSet: make(map[string]int),
}
p.fam[mname] = fam
} else {
if fam.ValueType != vt {
// Don't return an error since this would be a permanent error
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
break
}
}

p.metrics[desc.String()] = &MetricWithExpiration{
Metric: metric,
Expiration: time.Now().Add(p.ExpirationInterval.Duration),
for k, _ := range sample.Labels {
fam.LabelSet[k]++
}

fam.Samples[sampleID] = sample
}
}
return nil
Expand All @@ -187,6 +266,8 @@ func init() {
outputs.Add("prometheus_client", func() telegraf.Output {
return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
}
})
}
Loading