From df3f1b74fae517f94ce08bce940fc74d00ac38f4 Mon Sep 17 00:00:00 2001 From: DanKans Date: Thu, 13 Jul 2017 23:58:20 +0100 Subject: [PATCH] Add fluentd input plugin (#2661) --- plugins/inputs/all/all.go | 1 + plugins/inputs/fluentd/README.md | 64 +++++++++ plugins/inputs/fluentd/fluentd.go | 173 +++++++++++++++++++++++++ plugins/inputs/fluentd/fluentd_test.go | 169 ++++++++++++++++++++++++ 4 files changed, 407 insertions(+) create mode 100644 plugins/inputs/fluentd/README.md create mode 100644 plugins/inputs/fluentd/fluentd.go create mode 100644 plugins/inputs/fluentd/fluentd_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 0796a8422aafb..9bc7afaff1337 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -23,6 +23,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/exec" _ "github.com/influxdata/telegraf/plugins/inputs/fail2ban" _ "github.com/influxdata/telegraf/plugins/inputs/filestat" + _ "github.com/influxdata/telegraf/plugins/inputs/fluentd" _ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/hddtemp" diff --git a/plugins/inputs/fluentd/README.md b/plugins/inputs/fluentd/README.md new file mode 100644 index 0000000000000..e316bddf222f5 --- /dev/null +++ b/plugins/inputs/fluentd/README.md @@ -0,0 +1,64 @@ +# Fluentd Input Plugin + +The fluentd plugin gathers metrics from plugin endpoint provided by [in_monitor plugin](http://docs.fluentd.org/v0.12/articles/monitoring). +This plugin understands data provided by /api/plugin.json resource (/api/config.json is not covered). + +You might need to adjust your fluentd configuration, in order to reduce series cardinality in case whene your fluentd restarts frequently. Every time when fluentd starts, `plugin_id` value is given a new random value. +According to [fluentd documentation](http://docs.fluentd.org/v0.12/articles/config-file), you are able to add `@id` parameter for each plugin to avoid this behaviour and define custom `plugin_id`. + +example configuratio with `@id` parameter for http plugin: +``` + + @type http + @id http + port 8888 + +``` + +### Configuration: + +```toml +# Read metrics exposed by fluentd in_monitor plugin +[[inputs.fluentd]] + ## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint). + ## + ## Endpoint: + ## - only one URI is allowed + ## - https is not supported + endpoint = "http://localhost:24220/api/plugins.json" + + ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) + exclude = [ + "monitor_agent", + "dummy", + ] +``` + +### Measurements & Fields: + +Fields may vary depends on type of the plugin + +- fluentd + - retry_count (float, unit) + - buffer_queue_length (float, unit) + - buffer_total_queued_size (float, unit) + +### Tags: + +- All measurements have the following tags: + - plugin_id (unique plugin id) + - plugin_type (type of the plugin e.g. s3) + - plugin_category (plugin category e.g. output) + +### Example Output: + +``` +$ telegraf --config fluentd.conf --input-filter fluentd --test +* Plugin: inputs.fluentd, Collection 1 +> fluentd,host=T440s,plugin_id=object:9f748c,plugin_category=input,plugin_type=dummy buffer_total_queued_size=0,buffer_queue_length=0,retry_count=0 1492006105000000000 +> fluentd,plugin_category=input,plugin_type=dummy,host=T440s,plugin_id=object:8da98c buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000 +> fluentd,plugin_id=object:820190,plugin_category=input,plugin_type=monitor_agent,host=T440s retry_count=0,buffer_total_queued_size=0,buffer_queue_length=0 1492006105000000000 +> fluentd,plugin_id=object:c5e054,plugin_category=output,plugin_type=stdout,host=T440s buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000 +> fluentd,plugin_type=s3,host=T440s,plugin_id=object:bd7a90,plugin_category=output buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000 + +``` diff --git a/plugins/inputs/fluentd/fluentd.go b/plugins/inputs/fluentd/fluentd.go new file mode 100644 index 0000000000000..fdb6dbaa5908f --- /dev/null +++ b/plugins/inputs/fluentd/fluentd.go @@ -0,0 +1,173 @@ +package fluentd + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const ( + measurement = "fluentd" + description = "Read metrics exposed by fluentd in_monitor plugin" + sampleConfig = ` + ## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint). + ## + ## Endpoint: + ## - only one URI is allowed + ## - https is not supported + endpoint = "http://localhost:24220/api/plugins.json" + + ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) + exclude = [ + "monitor_agent", + "dummy", + ] +` +) + +// Fluentd - plugin main structure +type Fluentd struct { + Endpoint string + Exclude []string + client *http.Client +} + +type endpointInfo struct { + Payload []pluginData `json:"plugins"` +} + +type pluginData struct { + PluginID string `json:"plugin_id"` + PluginType string `json:"type"` + PluginCategory string `json:"plugin_category"` + RetryCount *float64 `json:"retry_count"` + BufferQueueLength *float64 `json:"buffer_queue_length"` + BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"` +} + +// parse JSON from fluentd Endpoint +// Parameters: +// data: unprocessed json recivied from endpoint +// +// Returns: +// pluginData: slice that contains parsed plugins +// error: error that may have occurred +func parse(data []byte) (datapointArray []pluginData, err error) { + var endpointData endpointInfo + + if err = json.Unmarshal(data, &endpointData); err != nil { + err = fmt.Errorf("Processing JSON structure") + return + } + + for _, point := range endpointData.Payload { + datapointArray = append(datapointArray, point) + } + + return +} + +// Description - display description +func (h *Fluentd) Description() string { return description } + +// SampleConfig - generate configuretion +func (h *Fluentd) SampleConfig() string { return sampleConfig } + +// Gather - Main code responsible for gathering, processing and creating metrics +func (h *Fluentd) Gather(acc telegraf.Accumulator) error { + + _, err := url.Parse(h.Endpoint) + if err != nil { + return fmt.Errorf("Invalid URL \"%s\"", h.Endpoint) + } + + if h.client == nil { + + tr := &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + } + + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + + h.client = client + } + + resp, err := h.client.Get(h.Endpoint) + + if err != nil { + return fmt.Errorf("Unable to perform HTTP client GET on \"%s\": %s", h.Endpoint, err) + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + + if err != nil { + return fmt.Errorf("Unable to read the HTTP body \"%s\": %s", string(body), err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("http status ok not met") + } + + dataPoints, err := parse(body) + + if err != nil { + return fmt.Errorf("Problem with parsing") + } + + // Go through all plugins one by one + for _, p := range dataPoints { + + skip := false + + // Check if this specific type was excluded in configuration + for _, exclude := range h.Exclude { + if exclude == p.PluginType { + skip = true + } + } + + // If not, create new metric and add it to Accumulator + if !skip { + tmpFields := make(map[string]interface{}) + + tmpTags := map[string]string{ + "plugin_id": p.PluginID, + "plugin_category": p.PluginCategory, + "plugin_type": p.PluginType, + } + + if p.BufferQueueLength != nil { + tmpFields["buffer_queue_length"] = p.BufferQueueLength + + } + if p.RetryCount != nil { + tmpFields["retry_count"] = p.RetryCount + } + + if p.BufferTotalQueuedSize != nil { + tmpFields["buffer_total_queued_size"] = p.BufferTotalQueuedSize + } + + if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) { + acc.AddFields(measurement, tmpFields, tmpTags) + } + } + } + + return nil +} + +func init() { + inputs.Add("fluentd", func() telegraf.Input { return &Fluentd{} }) +} diff --git a/plugins/inputs/fluentd/fluentd_test.go b/plugins/inputs/fluentd/fluentd_test.go new file mode 100644 index 0000000000000..62989b6d87ffd --- /dev/null +++ b/plugins/inputs/fluentd/fluentd_test.go @@ -0,0 +1,169 @@ +package fluentd + +import ( + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +// sampleJSON from fluentd version '0.14.9' +const sampleJSON = ` +{ + "plugins": [ + { + "plugin_id": "object:f48698", + "plugin_category": "input", + "type": "dummy", + "config": { + "@type": "dummy", + "@log_level": "info", + "tag": "stdout.page.node", + "rate": "", + "dummy": "{\"hello\":\"world_from_first_dummy\"}", + "auto_increment_key": "id1" + }, + "output_plugin": false, + "retry_count": null + }, + { + "plugin_id": "object:e27138", + "plugin_category": "input", + "type": "dummy", + "config": { + "@type": "dummy", + "@log_level": "info", + "tag": "stdout.superproject.supercontainer", + "rate": "", + "dummy": "{\"hello\":\"world_from_second_dummy\"}", + "auto_increment_key": "id1" + }, + "output_plugin": false, + "retry_count": null + }, + { + "plugin_id": "object:d74060", + "plugin_category": "input", + "type": "monitor_agent", + "config": { + "@type": "monitor_agent", + "@log_level": "error", + "bind": "0.0.0.0", + "port": "24220" + }, + "output_plugin": false, + "retry_count": null + }, + { + "plugin_id": "object:11a5e2c", + "plugin_category": "output", + "type": "stdout", + "config": { + "@type": "stdout" + }, + "output_plugin": true, + "retry_count": 0 + }, + { + "plugin_id": "object:11237ec", + "plugin_category": "output", + "type": "s3", + "config": { + "@type": "s3", + "@log_level": "info", + "aws_key_id": "xxxxxx", + "aws_sec_key": "xxxxxx", + "s3_bucket": "bucket", + "s3_endpoint": "http://mock:4567", + "path": "logs/%Y%m%d_%H/${tag[1]}/", + "time_slice_format": "%M", + "s3_object_key_format": "%{path}%{time_slice}_%{hostname}_%{index}_%{hex_random}.%{file_extension}", + "store_as": "gzip" + }, + "output_plugin": true, + "buffer_queue_length": 0, + "buffer_total_queued_size": 0, + "retry_count": 0 + } + ] +} +` + +var ( + zero float64 + err error + pluginOutput []pluginData + expectedOutput = []pluginData{ + // {"object:f48698", "dummy", "input", nil, nil, nil}, + // {"object:e27138", "dummy", "input", nil, nil, nil}, + // {"object:d74060", "monitor_agent", "input", nil, nil, nil}, + {"object:11a5e2c", "stdout", "output", (*float64)(&zero), nil, nil}, + {"object:11237ec", "s3", "output", (*float64)(&zero), (*float64)(&zero), (*float64)(&zero)}, + } + fluentdTest = &Fluentd{ + Endpoint: "http://localhost:8081", + } +) + +func Test_parse(t *testing.T) { + + t.Log("Testing parser function") + _, err := parse([]byte(sampleJSON)) + + if err != nil { + t.Error(err) + } + +} + +func Test_Gather(t *testing.T) { + if testing.Short() { + t.Skip("Skipping Gather function test") + } + + t.Log("Testing Gather function") + + t.Logf("Start HTTP mock (%s) with sampleJSON", fluentdTest.Endpoint) + + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintf(w, "%s", string(sampleJSON)) + })) + + requestURL, err := url.Parse(fluentdTest.Endpoint) + + ts.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + + ts.Start() + + defer ts.Close() + + var acc testutil.Accumulator + err = fluentdTest.Gather(&acc) + + if err != nil { + t.Error(err) + } + + if !acc.HasMeasurement("fluentd") { + t.Errorf("acc.HasMeasurement: expected fluentd") + } + + assert.Equal(t, expectedOutput[0].PluginID, acc.Metrics[0].Tags["plugin_id"]) + assert.Equal(t, expectedOutput[0].PluginType, acc.Metrics[0].Tags["plugin_type"]) + assert.Equal(t, expectedOutput[0].PluginCategory, acc.Metrics[0].Tags["plugin_category"]) + assert.Equal(t, expectedOutput[0].RetryCount, acc.Metrics[0].Fields["retry_count"]) + + assert.Equal(t, expectedOutput[1].PluginID, acc.Metrics[1].Tags["plugin_id"]) + assert.Equal(t, expectedOutput[1].PluginType, acc.Metrics[1].Tags["plugin_type"]) + assert.Equal(t, expectedOutput[1].PluginCategory, acc.Metrics[1].Tags["plugin_category"]) + assert.Equal(t, expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"]) + assert.Equal(t, expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"]) + assert.Equal(t, expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"]) + +}