Skip to content

Commit

Permalink
Ensure prometheus metrics have same set of labels (influxdata#2857)
Browse files Browse the repository at this point in the history
(cherry picked from commit 949072e)
  • Loading branch information
danielnelson authored and Nevins Bartolomeo committed Aug 23, 2017
1 parent b503f16 commit 5123ff6
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 63 deletions.
59 changes: 47 additions & 12 deletions plugins/outputs/prometheus_client/prometheus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
}
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)

for _, sample := range family.Samples {
// Get labels for this sample; unset labels will be set to the
// empty string
var labels []string
for _, label := range labelNames {
v := sample.Labels[label]
labels = append(labels, v)
}

metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
name, labels, err.Error())
}

ch <- metric
}
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)

for _, sample := range family.Samples {
// Get labels for this sample; unset labels will be set to the
// empty string
Expand Down Expand Up @@ -201,6 +221,31 @@ func CreateSampleID(tags map[string]string) SampleID {
return SampleID(strings.Join(pairs, ","))
}

func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}

func valueType(tt telegraf.ValueType) prometheus.ValueType {
switch tt {
case telegraf.Counter:
return prometheus.CounterValue
case telegraf.Gauge:
return prometheus.GaugeValue
default:
return prometheus.UntypedValue
}
}

// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
func CreateSampleID(tags map[string]string) SampleID {
pairs := make([]string, 0, len(tags))
for k, v := range tags {
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(pairs)
return SampleID(strings.Join(pairs, ","))
}

func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()
Expand All @@ -214,7 +259,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {

labels := make(map[string]string)
for k, v := range tags {
labels[sanitize(k)] = v
labels[sanitize(k)] = sanitize(v)
}

for fn, fv := range point.Fields() {
Expand Down Expand Up @@ -254,17 +299,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
}
p.fam[mname] = fam
} else {
// Metrics can be untyped even though the corresponding plugin
// creates them with a type. This happens when the metric was
// transferred over the network in a format that does not
// preserve value type and received using an input such as a
// queue consumer. To avoid issues we automatically upgrade
// value type from untyped to a typed metric.
if fam.ValueType == prometheus.UntypedValue {
fam.ValueType = vt
}

if vt != prometheus.UntypedValue && fam.ValueType != vt {
if fam.ValueType != vt {
// Don't return an error since this would be a permanent error
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
break
Expand Down
52 changes: 1 addition & 51 deletions plugins/outputs/prometheus_client/prometheus_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestWrite_Sanitize(t *testing.T) {
require.True(t, ok)

require.Equal(t, map[string]string{
"tag_with_dash": "localhost.local"}, sample1.Labels)
"tag_with_dash": "localhost_local"}, sample1.Labels)
}

func TestWrite_Gauge(t *testing.T) {
Expand Down Expand Up @@ -189,56 +189,6 @@ func TestWrite_MixedValueType(t *testing.T) {
require.Equal(t, 1, len(fam.Samples))
}

func TestWrite_MixedValueTypeUpgrade(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
map[string]string{"a": "x"},
map[string]interface{}{"value": 1.0},
now,
telegraf.Untyped)
p2, err := metric.New(
"foo",
map[string]string{"a": "y"},
map[string]interface{}{"value": 2.0},
now,
telegraf.Gauge)
var metrics = []telegraf.Metric{p1, p2}

client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)

fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
}

func TestWrite_MixedValueTypeDowngrade(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
map[string]string{"a": "x"},
map[string]interface{}{"value": 1.0},
now,
telegraf.Gauge)
p2, err := metric.New(
"foo",
map[string]string{"a": "y"},
map[string]interface{}{"value": 2.0},
now,
telegraf.Untyped)
var metrics = []telegraf.Metric{p1, p2}

client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)

fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
}

func TestWrite_Tags(t *testing.T) {
now := time.Now()
p1, err := metric.New(
Expand Down

0 comments on commit 5123ff6

Please sign in to comment.