Skip to content

Commit

Permalink
Add fluentd input plugin (#2661)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanKans authored and danielnelson committed Jul 13, 2017
1 parent 7857986 commit f4d67d8
Show file tree
Hide file tree
Showing 4 changed files with 407 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/exec"
_ "github.com/influxdata/telegraf/plugins/inputs/fail2ban"
_ "github.com/influxdata/telegraf/plugins/inputs/filestat"
_ "github.com/influxdata/telegraf/plugins/inputs/fluentd"
_ "github.com/influxdata/telegraf/plugins/inputs/graylog"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/hddtemp"
Expand Down
64 changes: 64 additions & 0 deletions plugins/inputs/fluentd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Fluentd Input Plugin

The fluentd plugin gathers metrics from plugin endpoint provided by [in_monitor plugin](http://docs.fluentd.org/v0.12/articles/monitoring).
This plugin understands data provided by /api/plugin.json resource (/api/config.json is not covered).

You might need to adjust your fluentd configuration, in order to reduce series cardinality in case whene your fluentd restarts frequently. Every time when fluentd starts, `plugin_id` value is given a new random value.
According to [fluentd documentation](http://docs.fluentd.org/v0.12/articles/config-file), you are able to add `@id` parameter for each plugin to avoid this behaviour and define custom `plugin_id`.

example configuratio with `@id` parameter for http plugin:
```
<source>
@type http
@id http
port 8888
</source>
```

### Configuration:

```toml
# Read metrics exposed by fluentd in_monitor plugin
[[inputs.fluentd]]
## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint).
##
## Endpoint:
## - only one URI is allowed
## - https is not supported
endpoint = "http://localhost:24220/api/plugins.json"

## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent)
exclude = [
"monitor_agent",
"dummy",
]
```

### Measurements & Fields:

Fields may vary depends on type of the plugin

- fluentd
- retry_count (float, unit)
- buffer_queue_length (float, unit)
- buffer_total_queued_size (float, unit)

### Tags:

- All measurements have the following tags:
- plugin_id (unique plugin id)
- plugin_type (type of the plugin e.g. s3)
- plugin_category (plugin category e.g. output)

### Example Output:

```
$ telegraf --config fluentd.conf --input-filter fluentd --test
* Plugin: inputs.fluentd, Collection 1
> fluentd,host=T440s,plugin_id=object:9f748c,plugin_category=input,plugin_type=dummy buffer_total_queued_size=0,buffer_queue_length=0,retry_count=0 1492006105000000000
> fluentd,plugin_category=input,plugin_type=dummy,host=T440s,plugin_id=object:8da98c buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000
> fluentd,plugin_id=object:820190,plugin_category=input,plugin_type=monitor_agent,host=T440s retry_count=0,buffer_total_queued_size=0,buffer_queue_length=0 1492006105000000000
> fluentd,plugin_id=object:c5e054,plugin_category=output,plugin_type=stdout,host=T440s buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000
> fluentd,plugin_type=s3,host=T440s,plugin_id=object:bd7a90,plugin_category=output buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000
```
173 changes: 173 additions & 0 deletions plugins/inputs/fluentd/fluentd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package fluentd

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)

const (
measurement = "fluentd"
description = "Read metrics exposed by fluentd in_monitor plugin"
sampleConfig = `
## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint).
##
## Endpoint:
## - only one URI is allowed
## - https is not supported
endpoint = "http://localhost:24220/api/plugins.json"
## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent)
exclude = [
"monitor_agent",
"dummy",
]
`
)

// Fluentd - plugin main structure
type Fluentd struct {
Endpoint string
Exclude []string
client *http.Client
}

type endpointInfo struct {
Payload []pluginData `json:"plugins"`
}

type pluginData struct {
PluginID string `json:"plugin_id"`
PluginType string `json:"type"`
PluginCategory string `json:"plugin_category"`
RetryCount *float64 `json:"retry_count"`
BufferQueueLength *float64 `json:"buffer_queue_length"`
BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"`
}

// parse JSON from fluentd Endpoint
// Parameters:
// data: unprocessed json recivied from endpoint
//
// Returns:
// pluginData: slice that contains parsed plugins
// error: error that may have occurred
func parse(data []byte) (datapointArray []pluginData, err error) {
var endpointData endpointInfo

if err = json.Unmarshal(data, &endpointData); err != nil {
err = fmt.Errorf("Processing JSON structure")
return
}

for _, point := range endpointData.Payload {
datapointArray = append(datapointArray, point)
}

return
}

// Description - display description
func (h *Fluentd) Description() string { return description }

// SampleConfig - generate configuretion
func (h *Fluentd) SampleConfig() string { return sampleConfig }

// Gather - Main code responsible for gathering, processing and creating metrics
func (h *Fluentd) Gather(acc telegraf.Accumulator) error {

_, err := url.Parse(h.Endpoint)
if err != nil {
return fmt.Errorf("Invalid URL \"%s\"", h.Endpoint)
}

if h.client == nil {

tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

h.client = client
}

resp, err := h.client.Get(h.Endpoint)

if err != nil {
return fmt.Errorf("Unable to perform HTTP client GET on \"%s\": %s", h.Endpoint, err)
}

defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)

if err != nil {
return fmt.Errorf("Unable to read the HTTP body \"%s\": %s", string(body), err)
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http status ok not met")
}

dataPoints, err := parse(body)

if err != nil {
return fmt.Errorf("Problem with parsing")
}

// Go through all plugins one by one
for _, p := range dataPoints {

skip := false

// Check if this specific type was excluded in configuration
for _, exclude := range h.Exclude {
if exclude == p.PluginType {
skip = true
}
}

// If not, create new metric and add it to Accumulator
if !skip {
tmpFields := make(map[string]interface{})

tmpTags := map[string]string{
"plugin_id": p.PluginID,
"plugin_category": p.PluginCategory,
"plugin_type": p.PluginType,
}

if p.BufferQueueLength != nil {
tmpFields["buffer_queue_length"] = p.BufferQueueLength

}
if p.RetryCount != nil {
tmpFields["retry_count"] = p.RetryCount
}

if p.BufferTotalQueuedSize != nil {
tmpFields["buffer_total_queued_size"] = p.BufferTotalQueuedSize
}

if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) {
acc.AddFields(measurement, tmpFields, tmpTags)
}
}
}

return nil
}

func init() {
inputs.Add("fluentd", func() telegraf.Input { return &Fluentd{} })
}
Loading

0 comments on commit f4d67d8

Please sign in to comment.