Skip to content

Commit

Permalink
Add support for Graphite 1.1.x tags (#4165)
Browse files Browse the repository at this point in the history
  • Loading branch information
R4scal authored and leodido committed May 22, 2018
1 parent ae90d72 commit 09e809a
Show file tree
Hide file tree
Showing 8 changed files with 580 additions and 23 deletions.
46 changes: 46 additions & 0 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
Fields with string values will be skipped. Boolean fields will be converted
to 1 (true) or 0 (false).

With enable `graphite_tag_support` option following influx metric -> graphite conversion would happen:

```
cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758
=>
cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
```

### Graphite Configuration

```toml
Expand All @@ -106,6 +115,43 @@ to 1 (true) or 0 (false).
prefix = "telegraf"
# graphite template
template = "host.tags.measurement.field"
# Enable Graphite tags support
# Defaults to "false"
graphite_tag_support = true
```


## Graphite 1.1

The Graphite11 data format translates Telegraf metrics into Graphite protocol which supports storing data using tags to identify each series. [Graphite Tag Support](http://graphite.readthedocs.io/en/latest/tags.html)

Which means the following influx metric -> graphite 1.1.x conversion would happen:

```
cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758
=>
cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
```

Fields with string values will be skipped. Boolean fields will be converted
to 1 (true) or 0 (false).

### Graphite Configuration

```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "graphite11"

# prefix each graphite bucket
prefix = "telegraf"
```

## JSON
Expand Down
9 changes: 9 additions & 0 deletions plugins/outputs/graphite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html)
via raw TCP.

<aside class="notice">
When `graphite_tag_support` is enabled, `name` as tag name is reserved for graphite metric and will be replaced with `_name`.
</aside>


## Configuration:

```toml
Expand All @@ -17,6 +22,10 @@ via raw TCP.
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## Enable Graphite tags support
## see http://graphite.readthedocs.io/en/latest/tags.html
## Defaults to "false"
# graphite_tag_support = true
## timeout in seconds for the write connection to graphite
timeout = 2

Expand Down
7 changes: 6 additions & 1 deletion plugins/outputs/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

type Graphite struct {
GraphiteTagSupport bool
// URL is only for backwards compatibility
Servers []string
Prefix string
Expand All @@ -35,6 +36,10 @@ var sampleConfig = `
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## Enable Graphite tags support
## see http://graphite.readthedocs.io/en/latest/tags.html
## Defaults to "false"
# graphite_tag_support = true
## timeout in seconds for the write connection to graphite
timeout = 2
Expand Down Expand Up @@ -129,7 +134,7 @@ func checkEOF(conn net.Conn) {
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template)
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport)
if err != nil {
return err
}
Expand Down
90 changes: 90 additions & 0 deletions plugins/outputs/graphite/graphite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,66 @@ func TestGraphiteOK(t *testing.T) {
g.Close()
}

func TestGraphiteOkWithTags(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithTags(t, &wg)

// Init plugin
g := Graphite{
Prefix: "my.prefix",
GraphiteTagSupport: true,
}

// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)

// Prepare point list
metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
t.Log("Send first data")
err2 := g.Write(metrics)
require.NoError(t, err2)

// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithTags(t, &wg2)
//Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")

require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
}

func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
Expand Down Expand Up @@ -127,3 +187,33 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
tcpServer.Close()
}()
}

func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1)
conn.Close()
tcpServer.Close()
}()
}

func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn2, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
data3, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3)
conn2.Close()
tcpServer.Close()
}()
}
2 changes: 1 addition & 1 deletion plugins/outputs/instrumental/instrumental.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
}
}

s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template)
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false)
if err != nil {
return err
}
Expand Down
92 changes: 75 additions & 17 deletions plugins/serializers/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ var (
)

type GraphiteSerializer struct {
Prefix string
Template string
Prefix string
Template string
TagSupport bool
}

func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
Expand All @@ -40,23 +41,42 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
// Convert UnixNano to Unix timestamps
timestamp := metric.Time().UnixNano() / 1000000000

bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
if bucket == "" {
return out, nil
}
switch s.TagSupport {
case true:
for fieldName, value := range metric.Fields() {
fieldValue := formatValue(value)
if fieldValue == "" {
continue
}
bucket := SerializeBucketNameWithTags(metric.Name(), metric.Tags(), s.Prefix, fieldName)
metricString := fmt.Sprintf("%s %s %d\n",
// insert "field" section of template
bucket,
//bucket,
fieldValue,
timestamp)
point := []byte(metricString)
out = append(out, point...)
}
default:
bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
if bucket == "" {
return out, nil
}

for fieldName, value := range metric.Fields() {
fieldValue := formatValue(value)
if fieldValue == "" {
continue
for fieldName, value := range metric.Fields() {
fieldValue := formatValue(value)
if fieldValue == "" {
continue
}
metricString := fmt.Sprintf("%s %s %d\n",
// insert "field" section of template
sanitize(InsertField(bucket, fieldName)),
fieldValue,
timestamp)
point := []byte(metricString)
out = append(out, point...)
}
metricString := fmt.Sprintf("%s %s %d\n",
// insert "field" section of template
sanitize(InsertField(bucket, fieldName)),
fieldValue,
timestamp)
point := []byte(metricString)
out = append(out, point...)
}
return out, nil
}
Expand Down Expand Up @@ -165,6 +185,44 @@ func SerializeBucketName(
return prefix + "." + strings.Join(out, ".")
}

// SerializeBucketNameWithTags will take the given measurement name and tags and
// produce a graphite bucket. It will use the Graphite11Serializer.
// http://graphite.readthedocs.io/en/latest/tags.html
func SerializeBucketNameWithTags(
measurement string,
tags map[string]string,
prefix string,
field string,
) string {
var out string
var tagsCopy []string
for k, v := range tags {
if k == "name" {
k = "_name"
}
tagsCopy = append(tagsCopy, sanitize(k+"="+v))
}
sort.Strings(tagsCopy)

if prefix != "" {
out = prefix + "."
}

out += measurement

if field != "value" {
out += "." + field
}

out = sanitize(out)

if len(tagsCopy) > 0 {
out += ";" + strings.Join(tagsCopy, ";")
}

return out
}

// InsertField takes the bucket string from SerializeBucketName and replaces the
// FIELDNAME portion. If fieldName == "value", it will simply delete the
// FIELDNAME portion.
Expand Down
Loading

0 comments on commit 09e809a

Please sign in to comment.