diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index c068a5fe7faf4..511b54aa7c545 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -155,6 +155,26 @@ func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) { } desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil) + for _, sample := range family.Samples { + // Get labels for this sample; unset labels will be set to the + // empty string + var labels []string + for _, label := range labelNames { + v := sample.Labels[label] + labels = append(labels, v) + } + + metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...) + if err != nil { + log.Printf("E! Error creating prometheus metric, "+ + "key: %s, labels: %v,\nerr: %s\n", + name, labels, err.Error()) + } + + ch <- metric + } + desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil) + for _, sample := range family.Samples { // Get labels for this sample; unset labels will be set to the // empty string @@ -201,6 +221,31 @@ func CreateSampleID(tags map[string]string) SampleID { return SampleID(strings.Join(pairs, ",")) } +func sanitize(value string) string { + return invalidNameCharRE.ReplaceAllString(value, "_") +} + +func valueType(tt telegraf.ValueType) prometheus.ValueType { + switch tt { + case telegraf.Counter: + return prometheus.CounterValue + case telegraf.Gauge: + return prometheus.GaugeValue + default: + return prometheus.UntypedValue + } +} + +// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric. +func CreateSampleID(tags map[string]string) SampleID { + pairs := make([]string, 0, len(tags)) + for k, v := range tags { + pairs = append(pairs, fmt.Sprintf("%s=%s", k, v)) + } + sort.Strings(pairs) + return SampleID(strings.Join(pairs, ",")) +} + func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { p.Lock() defer p.Unlock() @@ -214,7 +259,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { labels := make(map[string]string) for k, v := range tags { - labels[sanitize(k)] = v + labels[sanitize(k)] = sanitize(v) } for fn, fv := range point.Fields() { @@ -254,17 +299,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { } p.fam[mname] = fam } else { - // Metrics can be untyped even though the corresponding plugin - // creates them with a type. This happens when the metric was - // transferred over the network in a format that does not - // preserve value type and received using an input such as a - // queue consumer. To avoid issues we automatically upgrade - // value type from untyped to a typed metric. - if fam.ValueType == prometheus.UntypedValue { - fam.ValueType = vt - } - - if vt != prometheus.UntypedValue && fam.ValueType != vt { + if fam.ValueType != vt { // Don't return an error since this would be a permanent error log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name()) break diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go index 5be29a2412bcf..3ce211beb4d2a 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_test.go @@ -144,7 +144,7 @@ func TestWrite_Sanitize(t *testing.T) { require.True(t, ok) require.Equal(t, map[string]string{ - "tag_with_dash": "localhost.local"}, sample1.Labels) + "tag_with_dash": "localhost_local"}, sample1.Labels) } func TestWrite_Gauge(t *testing.T) { @@ -189,56 +189,6 @@ func TestWrite_MixedValueType(t *testing.T) { require.Equal(t, 1, len(fam.Samples)) } -func TestWrite_MixedValueTypeUpgrade(t *testing.T) { - now := time.Now() - p1, err := metric.New( - "foo", - map[string]string{"a": "x"}, - map[string]interface{}{"value": 1.0}, - now, - telegraf.Untyped) - p2, err := metric.New( - "foo", - map[string]string{"a": "y"}, - map[string]interface{}{"value": 2.0}, - now, - telegraf.Gauge) - var metrics = []telegraf.Metric{p1, p2} - - client := NewClient() - err = client.Write(metrics) - require.NoError(t, err) - - fam, ok := client.fam["foo"] - require.True(t, ok) - require.Equal(t, 2, len(fam.Samples)) -} - -func TestWrite_MixedValueTypeDowngrade(t *testing.T) { - now := time.Now() - p1, err := metric.New( - "foo", - map[string]string{"a": "x"}, - map[string]interface{}{"value": 1.0}, - now, - telegraf.Gauge) - p2, err := metric.New( - "foo", - map[string]string{"a": "y"}, - map[string]interface{}{"value": 2.0}, - now, - telegraf.Untyped) - var metrics = []telegraf.Metric{p1, p2} - - client := NewClient() - err = client.Write(metrics) - require.NoError(t, err) - - fam, ok := client.fam["foo"] - require.True(t, ok) - require.Equal(t, 2, len(fam.Samples)) -} - func TestWrite_Tags(t *testing.T) { now := time.Now() p1, err := metric.New(