Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Graphite 1.1.x protocol #4165

Merged
merged 4 commits into from
May 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
Fields with string values will be skipped. Boolean fields will be converted
to 1 (true) or 0 (false).

With enable `graphite_tag_support` option following influx metric -> graphite conversion would happen:

```
cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758
=>
cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
```

### Graphite Configuration

```toml
Expand All @@ -106,6 +115,43 @@ to 1 (true) or 0 (false).
prefix = "telegraf"
# graphite template
template = "host.tags.measurement.field"
# Enable Graphite tags support
# Defaults to "false"
graphite_tag_support = true
```


## Graphite 1.1

The Graphite11 data format translates Telegraf metrics into Graphite protocol which supports storing data using tags to identify each series. [Graphite Tag Support](http://graphite.readthedocs.io/en/latest/tags.html)

Which means the following influx metric -> graphite 1.1.x conversion would happen:

```
cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758
=>
cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
```

Fields with string values will be skipped. Boolean fields will be converted
to 1 (true) or 0 (false).

### Graphite Configuration

```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "graphite11"

# prefix each graphite bucket
prefix = "telegraf"
```

## JSON
Expand Down
9 changes: 9 additions & 0 deletions plugins/outputs/graphite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html)
via raw TCP.

<aside class="notice">
When `graphite_tag_support` is enabled, `name` as tag name is reserved for graphite metric and will be replaced with `_name`.
</aside>


## Configuration:

```toml
Expand All @@ -17,6 +22,10 @@ via raw TCP.
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## Enable Graphite tags support
## see http://graphite.readthedocs.io/en/latest/tags.html
## Defaults to "false"
# graphite_tag_support = true
## timeout in seconds for the write connection to graphite
timeout = 2

Expand Down
7 changes: 6 additions & 1 deletion plugins/outputs/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

type Graphite struct {
GraphiteTagSupport bool
// URL is only for backwards compatibility
Servers []string
Prefix string
Expand All @@ -35,6 +36,10 @@ var sampleConfig = `
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## Enable Graphite tags support
## see http://graphite.readthedocs.io/en/latest/tags.html
## Defaults to "false"
# graphite_tag_support = true
## timeout in seconds for the write connection to graphite
timeout = 2

Expand Down Expand Up @@ -129,7 +134,7 @@ func checkEOF(conn net.Conn) {
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template)
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport)
if err != nil {
return err
}
Expand Down
90 changes: 90 additions & 0 deletions plugins/outputs/graphite/graphite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,66 @@ func TestGraphiteOK(t *testing.T) {
g.Close()
}

func TestGraphiteOkWithTags(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithTags(t, &wg)

// Init plugin
g := Graphite{
Prefix: "my.prefix",
GraphiteTagSupport: true,
}

// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)

// Prepare point list
metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
t.Log("Send first data")
err2 := g.Write(metrics)
require.NoError(t, err2)

// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithTags(t, &wg2)
//Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")

require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
}

func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
Expand Down Expand Up @@ -127,3 +187,33 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
tcpServer.Close()
}()
}

func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1)
conn.Close()
tcpServer.Close()
}()
}

func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn2, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
data3, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3)
conn2.Close()
tcpServer.Close()
}()
}
2 changes: 1 addition & 1 deletion plugins/outputs/instrumental/instrumental.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
}
}

s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template)
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false)
if err != nil {
return err
}
Expand Down
92 changes: 75 additions & 17 deletions plugins/serializers/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ var (
)

type GraphiteSerializer struct {
Prefix string
Template string
Prefix string
Template string
TagSupport bool
}

func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
Expand All @@ -40,23 +41,42 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
// Convert UnixNano to Unix timestamps
timestamp := metric.Time().UnixNano() / 1000000000

bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
if bucket == "" {
return out, nil
}
switch s.TagSupport {
case true:
for fieldName, value := range metric.Fields() {
fieldValue := formatValue(value)
if fieldValue == "" {
continue
}
bucket := SerializeBucketNameWithTags(metric.Name(), metric.Tags(), s.Prefix, fieldName)
metricString := fmt.Sprintf("%s %s %d\n",
// insert "field" section of template
bucket,
//bucket,
fieldValue,
timestamp)
point := []byte(metricString)
out = append(out, point...)
}
default:
bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
if bucket == "" {
return out, nil
}

for fieldName, value := range metric.Fields() {
fieldValue := formatValue(value)
if fieldValue == "" {
continue
for fieldName, value := range metric.Fields() {
fieldValue := formatValue(value)
if fieldValue == "" {
continue
}
metricString := fmt.Sprintf("%s %s %d\n",
// insert "field" section of template
sanitize(InsertField(bucket, fieldName)),
fieldValue,
timestamp)
point := []byte(metricString)
out = append(out, point...)
}
metricString := fmt.Sprintf("%s %s %d\n",
// insert "field" section of template
sanitize(InsertField(bucket, fieldName)),
fieldValue,
timestamp)
point := []byte(metricString)
out = append(out, point...)
}
return out, nil
}
Expand Down Expand Up @@ -165,6 +185,44 @@ func SerializeBucketName(
return prefix + "." + strings.Join(out, ".")
}

// SerializeBucketNameWithTags will take the given measurement name and tags and
// produce a graphite bucket. It will use the Graphite11Serializer.
// http://graphite.readthedocs.io/en/latest/tags.html
func SerializeBucketNameWithTags(
measurement string,
tags map[string]string,
prefix string,
field string,
) string {
var out string
var tagsCopy []string
for k, v := range tags {
if k == "name" {
k = "_name"
}
tagsCopy = append(tagsCopy, sanitize(k+"="+v))
}
sort.Strings(tagsCopy)

if prefix != "" {
out = prefix + "."
}

out += measurement

if field != "value" {
out += "." + field
}

out = sanitize(out)

if len(tagsCopy) > 0 {
out += ";" + strings.Join(tagsCopy, ";")
}

return out
}

// InsertField takes the bucket string from SerializeBucketName and replaces the
// FIELDNAME portion. If fieldName == "value", it will simply delete the
// FIELDNAME portion.
Expand Down
Loading