Skip to content

Commit

Permalink
Add prometheus serializer and use it in prometheus output (influxdata…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Mathieu Lecarme committed Apr 17, 2020
1 parent 23920c1 commit dd2cab4
Show file tree
Hide file tree
Showing 20 changed files with 2,515 additions and 1,143 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ standard data formats that may be selected from when configuring many output
plugins.

1. [InfluxDB Line Protocol](/plugins/serializers/influx)
1. [JSON](/plugins/serializers/json)
1. [Carbon2](/plugins/serializers/carbon2)
1. [Graphite](/plugins/serializers/graphite)
1. [JSON](/plugins/serializers/json)
1. [Prometheus](/plugins/serializers/prometheus)
1. [SplunkMetric](/plugins/serializers/splunkmetric)
1. [Carbon2](/plugins/serializers/carbon2)
1. [Wavefront](/plugins/serializers/wavefront)

You will be able to identify the plugins with support by the presence of a
Expand Down
47 changes: 47 additions & 0 deletions internal/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"crypto/subtle"
"net"
"net/http"
)

Expand Down Expand Up @@ -43,3 +44,49 @@ func (h *basicAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request)

h.next.ServeHTTP(rw, req)
}

// IPRangeHandler returns a http handler that requires the remote address to be
// in the specified network.
func IPRangeHandler(network []*net.IPNet, onError ErrorFunc) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return &ipRangeHandler{
network: network,
onError: onError,
next: h,
}
}
}

type ipRangeHandler struct {
network []*net.IPNet
onError ErrorFunc
next http.Handler
}

func (h *ipRangeHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if len(h.network) == 0 {
h.next.ServeHTTP(rw, req)
return
}

remoteIPString, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
h.onError(rw, http.StatusForbidden)
return
}

remoteIP := net.ParseIP(remoteIPString)
if remoteIP == nil {
h.onError(rw, http.StatusForbidden)
return
}

for _, net := range h.network {
if net.Contains(remoteIP) {
h.next.ServeHTTP(rw, req)
return
}
}

h.onError(rw, http.StatusForbidden)
}
4 changes: 2 additions & 2 deletions plugins/inputs/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ const sampleConfig = `
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Name a tag containing the name of the file the data was parsed from. Leave empty
## Name a tag containing the name of the file the data was parsed from. Leave empty
## to disable.
# file_tag = ""
`
Expand Down
10 changes: 8 additions & 2 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ in Prometheus format.
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]

## Metric version (optional, default=1, supported values are 1 and 2)
# metric_version = 2
## Metric version controls the mapping from Prometheus metrics into
## Telegraf metrics. When using the prometheus_client output, use the same
## value in both plugins to ensure metrics are round-tripped without
## modification.
##
## example: metric_version = 1; deprecated in 1.13
## metric_version = 2; recommended version
# metric_version = 1

## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
Expand Down
23 changes: 19 additions & 4 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,14 @@ var sampleConfig = `
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]
## Metric version (optional, default=1, supported values are 1 and 2)
# metric_version = 2
## Metric version controls the mapping from Prometheus metrics into
## Telegraf metrics. When using the prometheus_client output, use the same
## value in both plugins to ensure metrics are round-tripped without
## modification.
##
## example: metric_version = 1; deprecated in 1.13
## metric_version = 2; recommended version
# metric_version = 1
## Url tag name (tag containing scrapped url. optional, default is "url")
# url_tag = "scrapeUrl"
Expand Down Expand Up @@ -95,7 +101,7 @@ var sampleConfig = `
# username = ""
# password = ""
## Specify timeout duration for slower prometheus clients (default is 3s)
## Specify timeout duration for slower prometheus clients (default is 3s)
# response_timeout = "3s"
## Optional TLS Config
Expand All @@ -114,6 +120,13 @@ func (p *Prometheus) Description() string {
return "Read metrics from one or many prometheus clients"
}

func (p *Prometheus) Init() error {
if p.MetricVersion != 2 {
p.Log.Warnf("Use of deprecated configuration: 'metric_version = 1'; please update to 'metric_version = 2'")
}
return nil
}

var ErrProtocolError = errors.New("prometheus protocol error")

func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL {
Expand Down Expand Up @@ -311,7 +324,9 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
tags := metric.Tags()
// strip user and password from URL
u.OriginalURL.User = nil
tags[p.URLTag] = u.OriginalURL.String()
if p.URLTag != "" {
tags[p.URLTag] = u.OriginalURL.String()
}
if u.Address != "" {
tags["address"] = u.Address
}
Expand Down
5 changes: 5 additions & 0 deletions plugins/outputs/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ This plugin writes telegraf metrics to files
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]

## Use batch serialization format instead of line based delimiting. The
## batch format allows for the production of non line based output formats and
## may more effiently encode and write metrics.
# use_batch_format = false

## The file will be rotated after the time interval specified. When set
## to 0 no time based rotation is performed.
# rotation_interval = "0h"
Expand Down
30 changes: 24 additions & 6 deletions plugins/outputs/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package file
import (
"fmt"
"io"
"log"
"os"

"github.com/influxdata/telegraf"
Expand All @@ -18,6 +17,8 @@ type File struct {
RotationInterval internal.Duration `toml:"rotation_interval"`
RotationMaxSize internal.Size `toml:"rotation_max_size"`
RotationMaxArchives int `toml:"rotation_max_archives"`
UseBatchFormat bool `toml:"use_batch_format"`
Log telegraf.Logger `toml:"-"`

writer io.Writer
closers []io.Closer
Expand All @@ -28,6 +29,11 @@ var sampleConfig = `
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]
## Use batch serialization format instead of line based delimiting. The
## batch format allows for the production of non line based output formats and
## may more effiently encode metric groups.
# use_batch_format = false
## The file will be rotated after the time interval specified. When set
## to 0 no time based rotation is performed.
# rotation_interval = "0d"
Expand Down Expand Up @@ -98,15 +104,27 @@ func (f *File) Description() string {
func (f *File) Write(metrics []telegraf.Metric) error {
var writeErr error = nil

for _, metric := range metrics {
b, err := f.serializer.Serialize(metric)
if f.UseBatchFormat {
octets, err := f.serializer.SerializeBatch(metrics)
if err != nil {
log.Printf("D! [outputs.file] Could not serialize metric: %v", err)
f.Log.Errorf("Could not serialize metric: %v", err)
}

_, err = f.writer.Write(b)
_, err = f.writer.Write(octets)
if err != nil {
writeErr = fmt.Errorf("E! [outputs.file] failed to write message: %v", err)
f.Log.Errorf("Error writing to file: %v", err)
}
} else {
for _, metric := range metrics {
b, err := f.serializer.Serialize(metric)
if err != nil {
f.Log.Debugf("Could not serialize metric: %v", err)
}

_, err = f.writer.Write(b)
if err != nil {
writeErr = fmt.Errorf("E! [outputs.file] failed to write message: %v", err)
}
}
}

Expand Down
13 changes: 11 additions & 2 deletions plugins/outputs/prometheus_client/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Prometheus Client Service Output Plugin
# Prometheus Output Plugin

This plugin starts a [Prometheus](https://prometheus.io/) Client, it exposes all metrics on `/metrics` (default) to be polled by a Prometheus server.
This plugin starts a [Prometheus](https://prometheus.io/) Client, it exposes
all metrics on `/metrics` (default) to be polled by a Prometheus server.

## Configuration

Expand All @@ -10,6 +11,14 @@ This plugin starts a [Prometheus](https://prometheus.io/) Client, it exposes all
## Address to listen on.
listen = ":9273"

## Metric version controls the mapping from Telegraf metrics into
## Prometheus format. When using the prometheus input, use the same value in
## both plugins to ensure metrics are round-tripped without modification.
##
## example: metric_version = 1; deprecated in 1.13
## metric_version = 2; recommended version
# metric_version = 1

## Use HTTP Basic Authentication.
# basic_username = "Foo"
# basic_password = "Bar"
Expand Down
Loading

0 comments on commit dd2cab4

Please sign in to comment.