From 01fbca0338061a88e5addedb55e56ad8a720ac63 Mon Sep 17 00:00:00 2001 From: DanKans Date: Wed, 12 Apr 2017 15:51:38 +0100 Subject: [PATCH 1/7] Add fluentd input plugin Plugin is scraping metrics from JSON endpoint exposed by in_monitor plugin. I've used custom function to parse JSON output due to output incoherency. Please review and share your thoughts. Thanks! --- plugins/inputs/all/all.go | 1 + plugins/inputs/fluentd/README.md | 54 +++++++ plugins/inputs/fluentd/fluentd.go | 196 +++++++++++++++++++++++++ plugins/inputs/fluentd/fluentd_test.go | 177 ++++++++++++++++++++++ 4 files changed, 428 insertions(+) create mode 100644 plugins/inputs/fluentd/README.md create mode 100644 plugins/inputs/fluentd/fluentd.go create mode 100644 plugins/inputs/fluentd/fluentd_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 0796a8422aafb..9bc7afaff1337 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -23,6 +23,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/exec" _ "github.com/influxdata/telegraf/plugins/inputs/fail2ban" _ "github.com/influxdata/telegraf/plugins/inputs/filestat" + _ "github.com/influxdata/telegraf/plugins/inputs/fluentd" _ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/hddtemp" diff --git a/plugins/inputs/fluentd/README.md b/plugins/inputs/fluentd/README.md new file mode 100644 index 0000000000000..cb72baab6497c --- /dev/null +++ b/plugins/inputs/fluentd/README.md @@ -0,0 +1,54 @@ +# Fluentd Input Plugin + +The fluentd plugin gathers metrics from plugin endpoint provided by [in_monitor plugin](http://docs.fluentd.org/v0.12/articles/monitoring). +This plugin understands data provided by /api/plugin.json resource (/api/config.json is not covered). + +### Configuration: + +```toml +# Read metrics exposed by fluentd in_monitor plugin +[[inputs.fluentd]] + ## + ## This plugin only reads information exposed by fluentd using /api/plugins.json. + ## Tested using 'fluentd' version '0.14.9' + ## + ## Endpoint: + ## - only one URI is allowed + ## - https is not supported + # Endpoint = "http://localhost:24220/api/plugins.json" + + ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) + # exclude = [ + # "monitor_agent", + # "dummy", + # ] +``` + +### Measurements & Fields: + +Fields may vary depends on type of the plugin + +- fluentd + - RetryCount (float, unit) + - BufferQueueLength (float, unit) + - BufferTotalQueuedSize (float, unit) + +### Tags: + +- All measurements have the following tags: + - PluginID (unique plugin id) + - PluginType (type of the plugin e.g. s3) + - PluginCategory (plugin category e.g. output) + +### Example Output: + +``` +$ telegraf --config fluentd.conf --input-filter fluentd --test +* Plugin: inputs.fluentd, Collection 1 +> fluentd,host=T440s,PluginID=object:9f748c,PluginCategory=input,PluginType=dummy BufferTotalQueuedSize=0,BufferQueueLength=0,RetryCount=0 1492006105000000000 +> fluentd,PluginCategory=input,PluginType=dummy,host=T440s,PluginID=object:8da98c BufferQueueLength=0,RetryCount=0,BufferTotalQueuedSize=0 1492006105000000000 +> fluentd,PluginID=object:820190,PluginCategory=input,PluginType=monitor_agent,host=T440s RetryCount=0,BufferTotalQueuedSize=0,BufferQueueLength=0 1492006105000000000 +> fluentd,PluginID=object:c5e054,PluginCategory=output,PluginType=stdout,host=T440s BufferQueueLength=0,RetryCount=0,BufferTotalQueuedSize=0 1492006105000000000 +> fluentd,PluginType=s3,host=T440s,PluginID=object:bd7a90,PluginCategory=output BufferQueueLength=0,RetryCount=0,BufferTotalQueuedSize=0 1492006105000000000 + +``` \ No newline at end of file diff --git a/plugins/inputs/fluentd/fluentd.go b/plugins/inputs/fluentd/fluentd.go new file mode 100644 index 0000000000000..e3d15db0c7b21 --- /dev/null +++ b/plugins/inputs/fluentd/fluentd.go @@ -0,0 +1,196 @@ +package fluentd + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "reflect" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const ( + measurement = "fluentd" + description = "Read metrics exposed by fluentd in_monitor plugin" + sampleConfig = ` + ## + ## This plugin only reads information exposed by fluentd using /api/plugins.json. + ## Tested using 'fluentd' version '0.14.9' + ## + ## Endpoint: + ## - only one URI is allowed + ## - https is not supported + # Endpoint = "http://localhost:24220/api/plugins.json" + + ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) + # exclude = [ + # "monitor_agent", + # "dummy", + # ] +` +) + +// Fluentd - plugin main structure +type Fluentd struct { + Endpoint string + Exclude []string + client *http.Client +} + +type pluginData struct { + PluginID string `json:"plugin_id"` + PluginType string `json:"type"` + PluginCategory string `json:"plugin_category"` + RetryCount float64 `json:"retry_count"` + BufferQueueLength float64 `json:"buffer_queue_length"` + BufferTotalQueuedSize float64 `json:"buffer_total_queued_size"` +} + +// parse JSON from fluentd Endpoint +// Parameters: +// data: unprocessed json recivied from endpoint +// +// Returns: +// pluginData: slice that contains parsed plugins +// error: error that may have occurred +func parse(data []byte) ([]pluginData, error) { + var ( + pdPoint pluginData + pdPointArray []pluginData + parsed map[string]interface{} + err error + ) + + if err = json.Unmarshal(data, &parsed); err != nil { + return pdPointArray, err + } + + switch parsed["plugins"].(type) { + case []interface{}: + // Iterate through all plugins in array + for _, plugin := range parsed["plugins"].([]interface{}) { + + tmpInterface := make(map[string]interface{}) + + // Go through all fields in plugin + for name, value := range plugin.(map[string]interface{}) { + + tags := reflect.ValueOf(pdPoint) + // Iterate through pluginData structure and assign field in case + // when we have field that name is coresponing with field tagged in JSON structure + for i := 0; i < tags.Type().NumField(); i++ { + if tag, ok := tags.Type().Field(i).Tag.Lookup("json"); ok { + if tag == name && value != nil { + tmpInterface[tag] = value + } + } + } + } + + // Marshal each plugin and Unmarshal it to fit into pluginData structure + tmpByte, err := json.Marshal(tmpInterface) + if err = json.Unmarshal(tmpByte, &pdPoint); err != nil { + return pdPointArray, fmt.Errorf("Processing JSON structure") + } + + pdPointArray = append(pdPointArray, pdPoint) + } + default: + return pdPointArray, fmt.Errorf("Unknown JSON structure") + } + + return pdPointArray, err +} + +// Description - display description +func (h *Fluentd) Description() string { return description } + +// SampleConfig - generate configuretion +func (h *Fluentd) SampleConfig() string { return sampleConfig } + +// Gather - Main code responsible for gathering, processing and creating metrics +func (h *Fluentd) Gather(acc telegraf.Accumulator) error { + + _, err := url.Parse(h.Endpoint) + if err != nil { + return fmt.Errorf("Invalid URL \"%s\"", h.Endpoint) + } + + if h.client == nil { + + tr := &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + } + + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + + h.client = client + } + + resp, err := h.client.Get(h.Endpoint) + + if err != nil { + return fmt.Errorf("Unable to perform HTTP client GET on \"%s\": %s", h.Endpoint, err) + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + + if err != nil { + return fmt.Errorf("Unable to read the HTTP body \"%s\": %s", string(body), err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("http status ok not met") + } + + dataPoints, err := parse(body) + + if err != nil { + return fmt.Errorf("Problem with parsing") + } + + // Go through all plugins one by one + for _, p := range dataPoints { + + skip := false + + // Check if this specific type was excluded in configuration + for _, exclude := range h.Exclude { + if exclude == p.PluginType { + skip = true + } + } + + // If not, create new metric and add it to Accumulator + if !skip { + tmpFields := make(map[string]interface{}) + + tmpTags := map[string]string{ + "PluginID": p.PluginID, + "PluginCategory": p.PluginCategory, + "PluginType": p.PluginType, + } + + tmpFields["BufferQueueLength"] = p.BufferQueueLength + tmpFields["RetryCount"] = p.RetryCount + tmpFields["BufferTotalQueuedSize"] = p.BufferTotalQueuedSize + + acc.AddFields(measurement, tmpFields, tmpTags) + } + } + + return nil +} + +func init() { + inputs.Add("fluentd", func() telegraf.Input { return &Fluentd{} }) +} diff --git a/plugins/inputs/fluentd/fluentd_test.go b/plugins/inputs/fluentd/fluentd_test.go new file mode 100644 index 0000000000000..9230a0b284cb0 --- /dev/null +++ b/plugins/inputs/fluentd/fluentd_test.go @@ -0,0 +1,177 @@ +package fluentd + +import ( + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +const sampleJSON = ` +{ + "plugins": [ + { + "plugin_id": "object:f48698", + "plugin_category": "input", + "type": "dummy", + "config": { + "@type": "dummy", + "@log_level": "info", + "tag": "stdout.page.node", + "rate": "", + "dummy": "{\"hello\":\"world_from_first_dummy\"}", + "auto_increment_key": "id1" + }, + "output_plugin": false, + "retry_count": null + }, + { + "plugin_id": "object:e27138", + "plugin_category": "input", + "type": "dummy", + "config": { + "@type": "dummy", + "@log_level": "info", + "tag": "stdout.superproject.supercontainer", + "rate": "", + "dummy": "{\"hello\":\"world_from_second_dummy\"}", + "auto_increment_key": "id1" + }, + "output_plugin": false, + "retry_count": null + }, + { + "plugin_id": "object:d74060", + "plugin_category": "input", + "type": "monitor_agent", + "config": { + "@type": "monitor_agent", + "@log_level": "error", + "bind": "0.0.0.0", + "port": "24220" + }, + "output_plugin": false, + "retry_count": null + }, + { + "plugin_id": "object:11a5e2c", + "plugin_category": "output", + "type": "stdout", + "config": { + "@type": "stdout" + }, + "output_plugin": true, + "retry_count": 0 + }, + { + "plugin_id": "object:11237ec", + "plugin_category": "output", + "type": "s3", + "config": { + "@type": "s3", + "@log_level": "info", + "aws_key_id": "xxxxxx", + "aws_sec_key": "xxxxxx", + "s3_bucket": "bucket", + "s3_endpoint": "http://mock:4567", + "path": "logs/%Y%m%d_%H/${tag[1]}/", + "time_slice_format": "%M", + "s3_object_key_format": "%{path}%{time_slice}_%{hostname}_%{index}_%{hex_random}.%{file_extension}", + "store_as": "gzip" + }, + "output_plugin": true, + "buffer_queue_length": 0, + "buffer_total_queued_size": 0, + "retry_count": 0 + } + ] +} +` + +var ( + err error + pluginOutput []pluginData + expectedOutput = []pluginData{ + {"object:f48698", "dummy", "input", 0, 0, 0}, + {"object:e27138", "dummy", "input", 0, 0, 0}, + {"object:d74060", "monitor_agent", "input", 0, 0, 0}, + {"object:11a5e2c", "stdout", "output", 0, 0, 0}, + {"object:11237ec", "s3", "output", 0, 0, 0}, + } + fluentdTest = &Fluentd{ + Endpoint: "http://localhost:8081", + } +) + +func Test_parse(t *testing.T) { + + t.Log("Testing parser function") + pluginOutput, err := parse([]byte(sampleJSON)) + + if err != nil { + t.Error(err) + } + + if len(pluginOutput) != len(expectedOutput) { + t.Errorf("lengthOfPluginOutput: expected %d, actual %d", len(pluginOutput), len(expectedOutput)) + } + + if !reflect.DeepEqual(pluginOutput, expectedOutput) { + t.Errorf("pluginOutput: expected\n %s, \nactual\n %s\n", pluginOutput, expectedOutput) + } + +} + +func Test_Gather(t *testing.T) { + if testing.Short() { + t.Skip("Skipping Gather function test") + } + + t.Log("Testing Gather function") + + t.Logf("Start HTTP mock (%s) with sampleJSON", fluentdTest.Endpoint) + + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintf(w, "%s", string(sampleJSON)) + })) + + requestURL, err := url.Parse(fluentdTest.Endpoint) + + ts.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + + ts.Start() + + defer ts.Close() + + var acc testutil.Accumulator + err = fluentdTest.Gather(&acc) + + if err != nil { + t.Error(err) + } + + if !acc.HasMeasurement("fluentd") { + t.Errorf("acc.HasMeasurement: expected fluentd") + } + + if len(expectedOutput) != len(acc.Metrics) { + t.Errorf("acc.Metrics: expected %d, actual %d", len(expectedOutput), len(acc.Metrics)) + } + + for i := 0; i < len(acc.Metrics); i++ { + assert.Equal(t, expectedOutput[i].PluginID, acc.Metrics[i].Tags["PluginID"]) + assert.Equal(t, expectedOutput[i].PluginType, acc.Metrics[i].Tags["PluginType"]) + assert.Equal(t, expectedOutput[i].PluginCategory, acc.Metrics[i].Tags["PluginCategory"]) + assert.Equal(t, expectedOutput[i].RetryCount, acc.Metrics[i].Fields["RetryCount"]) + assert.Equal(t, expectedOutput[i].BufferQueueLength, acc.Metrics[i].Fields["BufferQueueLength"]) + assert.Equal(t, expectedOutput[i].BufferTotalQueuedSize, acc.Metrics[i].Fields["BufferTotalQueuedSize"]) + } + +} From 6c71de815b4304095c10b3d103654a654e6711be Mon Sep 17 00:00:00 2001 From: DanKans Date: Wed, 12 Apr 2017 16:20:01 +0100 Subject: [PATCH 2/7] Simplify error for slice comparision --- plugins/inputs/fluentd/fluentd_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/fluentd/fluentd_test.go b/plugins/inputs/fluentd/fluentd_test.go index 9230a0b284cb0..3391928085fec 100644 --- a/plugins/inputs/fluentd/fluentd_test.go +++ b/plugins/inputs/fluentd/fluentd_test.go @@ -123,7 +123,7 @@ func Test_parse(t *testing.T) { } if !reflect.DeepEqual(pluginOutput, expectedOutput) { - t.Errorf("pluginOutput: expected\n %s, \nactual\n %s\n", pluginOutput, expectedOutput) + t.Errorf("pluginOutput is different from expectedOutput") } } From 115d26813b7c8fb0cbd0c2a94a0c35351eebdbab Mon Sep 17 00:00:00 2001 From: DanKans Date: Wed, 12 Apr 2017 18:20:27 +0100 Subject: [PATCH 3/7] Fix formatting --- plugins/inputs/fluentd/README.md | 27 +++++++++++++------------- plugins/inputs/fluentd/fluentd.go | 17 ++++++++-------- plugins/inputs/fluentd/fluentd_test.go | 2 +- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/plugins/inputs/fluentd/README.md b/plugins/inputs/fluentd/README.md index cb72baab6497c..b43787a0eb667 100644 --- a/plugins/inputs/fluentd/README.md +++ b/plugins/inputs/fluentd/README.md @@ -8,20 +8,19 @@ This plugin understands data provided by /api/plugin.json resource (/api/config. ```toml # Read metrics exposed by fluentd in_monitor plugin [[inputs.fluentd]] - ## - ## This plugin only reads information exposed by fluentd using /api/plugins.json. - ## Tested using 'fluentd' version '0.14.9' - ## - ## Endpoint: - ## - only one URI is allowed - ## - https is not supported - # Endpoint = "http://localhost:24220/api/plugins.json" - - ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) - # exclude = [ - # "monitor_agent", - # "dummy", - # ] + ## This plugin only reads information exposed by fluentd using /api/plugins.json. + ## Tested using 'fluentd' version '0.14.9' + ## + ## Endpoint: + ## - only one URI is allowed + ## - https is not supported + endpoint = "http://localhost:24220/api/plugins.json" + + ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) + exclude = [ + "monitor_agent", + "dummy", + ] ``` ### Measurements & Fields: diff --git a/plugins/inputs/fluentd/fluentd.go b/plugins/inputs/fluentd/fluentd.go index e3d15db0c7b21..7f19c88d93b65 100644 --- a/plugins/inputs/fluentd/fluentd.go +++ b/plugins/inputs/fluentd/fluentd.go @@ -17,20 +17,19 @@ const ( measurement = "fluentd" description = "Read metrics exposed by fluentd in_monitor plugin" sampleConfig = ` - ## - ## This plugin only reads information exposed by fluentd using /api/plugins.json. - ## Tested using 'fluentd' version '0.14.9' + ## This plugin only reads information exposed by fluentd using /api/plugins.json. + ## Tested using 'fluentd' version '0.14.9' ## - ## Endpoint: + ## Endpoint: ## - only one URI is allowed ## - https is not supported - # Endpoint = "http://localhost:24220/api/plugins.json" + endpoint = "http://localhost:24220/api/plugins.json" ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) - # exclude = [ - # "monitor_agent", - # "dummy", - # ] + exclude = [ + "monitor_agent", + "dummy", + ] ` ) diff --git a/plugins/inputs/fluentd/fluentd_test.go b/plugins/inputs/fluentd/fluentd_test.go index 3391928085fec..ad606cb8fde47 100644 --- a/plugins/inputs/fluentd/fluentd_test.go +++ b/plugins/inputs/fluentd/fluentd_test.go @@ -123,7 +123,7 @@ func Test_parse(t *testing.T) { } if !reflect.DeepEqual(pluginOutput, expectedOutput) { - t.Errorf("pluginOutput is different from expectedOutput") + t.Errorf("pluginOutput is different from expectedOutput") } } From 2ecbdc15106744d460375b3a6bb59202817a1b6c Mon Sep 17 00:00:00 2001 From: Daniel Salbert Date: Wed, 12 Jul 2017 15:16:17 +0100 Subject: [PATCH 4/7] apply required changes - Change CamelCase tags, measurements and values names to sneak_case - Move information about used version during tests to test file --- plugins/inputs/fluentd/README.md | 24 ++++++++++++------------ plugins/inputs/fluentd/fluentd.go | 13 ++++++------- plugins/inputs/fluentd/fluentd_test.go | 13 +++++++------ 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/plugins/inputs/fluentd/README.md b/plugins/inputs/fluentd/README.md index b43787a0eb667..a8b5782b5054d 100644 --- a/plugins/inputs/fluentd/README.md +++ b/plugins/inputs/fluentd/README.md @@ -28,26 +28,26 @@ This plugin understands data provided by /api/plugin.json resource (/api/config. Fields may vary depends on type of the plugin - fluentd - - RetryCount (float, unit) - - BufferQueueLength (float, unit) - - BufferTotalQueuedSize (float, unit) + - retry_count (float, unit) + - buffer_queue_length (float, unit) + - buffer_total_queued_size (float, unit) ### Tags: - All measurements have the following tags: - - PluginID (unique plugin id) - - PluginType (type of the plugin e.g. s3) - - PluginCategory (plugin category e.g. output) + - plugin_id (unique plugin id) + - plugin_type (type of the plugin e.g. s3) + - plugin_category (plugin category e.g. output) ### Example Output: ``` $ telegraf --config fluentd.conf --input-filter fluentd --test * Plugin: inputs.fluentd, Collection 1 -> fluentd,host=T440s,PluginID=object:9f748c,PluginCategory=input,PluginType=dummy BufferTotalQueuedSize=0,BufferQueueLength=0,RetryCount=0 1492006105000000000 -> fluentd,PluginCategory=input,PluginType=dummy,host=T440s,PluginID=object:8da98c BufferQueueLength=0,RetryCount=0,BufferTotalQueuedSize=0 1492006105000000000 -> fluentd,PluginID=object:820190,PluginCategory=input,PluginType=monitor_agent,host=T440s RetryCount=0,BufferTotalQueuedSize=0,BufferQueueLength=0 1492006105000000000 -> fluentd,PluginID=object:c5e054,PluginCategory=output,PluginType=stdout,host=T440s BufferQueueLength=0,RetryCount=0,BufferTotalQueuedSize=0 1492006105000000000 -> fluentd,PluginType=s3,host=T440s,PluginID=object:bd7a90,PluginCategory=output BufferQueueLength=0,RetryCount=0,BufferTotalQueuedSize=0 1492006105000000000 +> fluentd,host=T440s,plugin_id=object:9f748c,plugin_category=input,plugin_type=dummy buffer_total_queued_size=0,buffer_queue_length=0,retry_count=0 1492006105000000000 +> fluentd,plugin_category=input,plugin_type=dummy,host=T440s,plugin_id=object:8da98c buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000 +> fluentd,plugin_id=object:820190,plugin_category=input,plugin_type=monitor_agent,host=T440s retry_count=0,buffer_total_queued_size=0,buffer_queue_length=0 1492006105000000000 +> fluentd,plugin_id=object:c5e054,plugin_category=output,plugin_type=stdout,host=T440s buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000 +> fluentd,plugin_type=s3,host=T440s,plugin_id=object:bd7a90,plugin_category=output buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000 -``` \ No newline at end of file +``` diff --git a/plugins/inputs/fluentd/fluentd.go b/plugins/inputs/fluentd/fluentd.go index 7f19c88d93b65..156660d41917d 100644 --- a/plugins/inputs/fluentd/fluentd.go +++ b/plugins/inputs/fluentd/fluentd.go @@ -18,7 +18,6 @@ const ( description = "Read metrics exposed by fluentd in_monitor plugin" sampleConfig = ` ## This plugin only reads information exposed by fluentd using /api/plugins.json. - ## Tested using 'fluentd' version '0.14.9' ## ## Endpoint: ## - only one URI is allowed @@ -174,14 +173,14 @@ func (h *Fluentd) Gather(acc telegraf.Accumulator) error { tmpFields := make(map[string]interface{}) tmpTags := map[string]string{ - "PluginID": p.PluginID, - "PluginCategory": p.PluginCategory, - "PluginType": p.PluginType, + "plugin_id": p.PluginID, + "plugin_category": p.PluginCategory, + "plugin_type": p.PluginType, } - tmpFields["BufferQueueLength"] = p.BufferQueueLength - tmpFields["RetryCount"] = p.RetryCount - tmpFields["BufferTotalQueuedSize"] = p.BufferTotalQueuedSize + tmpFields["buffer_queue_length"] = p.BufferQueueLength + tmpFields["retry_count"] = p.RetryCount + tmpFields["buffer_total_queued_size"] = p.BufferTotalQueuedSize acc.AddFields(measurement, tmpFields, tmpTags) } diff --git a/plugins/inputs/fluentd/fluentd_test.go b/plugins/inputs/fluentd/fluentd_test.go index ad606cb8fde47..316579d5777cd 100644 --- a/plugins/inputs/fluentd/fluentd_test.go +++ b/plugins/inputs/fluentd/fluentd_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" ) +// sampleJSON from fluentd version '0.14.9' const sampleJSON = ` { "plugins": [ @@ -166,12 +167,12 @@ func Test_Gather(t *testing.T) { } for i := 0; i < len(acc.Metrics); i++ { - assert.Equal(t, expectedOutput[i].PluginID, acc.Metrics[i].Tags["PluginID"]) - assert.Equal(t, expectedOutput[i].PluginType, acc.Metrics[i].Tags["PluginType"]) - assert.Equal(t, expectedOutput[i].PluginCategory, acc.Metrics[i].Tags["PluginCategory"]) - assert.Equal(t, expectedOutput[i].RetryCount, acc.Metrics[i].Fields["RetryCount"]) - assert.Equal(t, expectedOutput[i].BufferQueueLength, acc.Metrics[i].Fields["BufferQueueLength"]) - assert.Equal(t, expectedOutput[i].BufferTotalQueuedSize, acc.Metrics[i].Fields["BufferTotalQueuedSize"]) + assert.Equal(t, expectedOutput[i].PluginID, acc.Metrics[i].Tags["plugin_id"]) + assert.Equal(t, expectedOutput[i].PluginType, acc.Metrics[i].Tags["plugin_type"]) + assert.Equal(t, expectedOutput[i].PluginCategory, acc.Metrics[i].Tags["plugin_category"]) + assert.Equal(t, expectedOutput[i].RetryCount, acc.Metrics[i].Fields["retry_count"]) + assert.Equal(t, expectedOutput[i].BufferQueueLength, acc.Metrics[i].Fields["buffer_queue_length"]) + assert.Equal(t, expectedOutput[i].BufferTotalQueuedSize, acc.Metrics[i].Fields["buffer_total_queued_size"]) } } From 24cf872b241dc277f19dbe270af38c2930a7af90 Mon Sep 17 00:00:00 2001 From: Daniel Salbert Date: Thu, 13 Jul 2017 10:54:00 +0100 Subject: [PATCH 5/7] simplify parse method and update README.md As danielnelson noticed, there is no need to use relfect and we can simplify whole parser using just Unmarshal method from json library, what I have done. --- plugins/inputs/fluentd/README.md | 25 +++++----- plugins/inputs/fluentd/fluentd.go | 81 ++++++++++--------------------- 2 files changed, 37 insertions(+), 69 deletions(-) diff --git a/plugins/inputs/fluentd/README.md b/plugins/inputs/fluentd/README.md index a8b5782b5054d..70222a62a4720 100644 --- a/plugins/inputs/fluentd/README.md +++ b/plugins/inputs/fluentd/README.md @@ -8,19 +8,18 @@ This plugin understands data provided by /api/plugin.json resource (/api/config. ```toml # Read metrics exposed by fluentd in_monitor plugin [[inputs.fluentd]] - ## This plugin only reads information exposed by fluentd using /api/plugins.json. - ## Tested using 'fluentd' version '0.14.9' - ## - ## Endpoint: - ## - only one URI is allowed - ## - https is not supported - endpoint = "http://localhost:24220/api/plugins.json" - - ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) - exclude = [ - "monitor_agent", - "dummy", - ] + ## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint). + ## + ## Endpoint: + ## - only one URI is allowed + ## - https is not supported + endpoint = "http://localhost:24220/api/plugins.json" + + ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) + exclude = [ + "monitor_agent", + "dummy", + ] ``` ### Measurements & Fields: diff --git a/plugins/inputs/fluentd/fluentd.go b/plugins/inputs/fluentd/fluentd.go index 156660d41917d..963bc14002028 100644 --- a/plugins/inputs/fluentd/fluentd.go +++ b/plugins/inputs/fluentd/fluentd.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "net/http" "net/url" - "reflect" "time" "github.com/influxdata/telegraf" @@ -17,18 +16,18 @@ const ( measurement = "fluentd" description = "Read metrics exposed by fluentd in_monitor plugin" sampleConfig = ` - ## This plugin only reads information exposed by fluentd using /api/plugins.json. - ## - ## Endpoint: - ## - only one URI is allowed - ## - https is not supported - endpoint = "http://localhost:24220/api/plugins.json" - - ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) - exclude = [ - "monitor_agent", - "dummy", - ] + ## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint). + ## + ## Endpoint: + ## - only one URI is allowed + ## - https is not supported + endpoint = "http://localhost:24220/api/plugins.json" + + ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) + exclude = [ + "monitor_agent", + "dummy", + ] ` ) @@ -39,6 +38,10 @@ type Fluentd struct { client *http.Client } +type endpointInfo struct { + Payload []pluginData `json:"plugins"` +} + type pluginData struct { PluginID string `json:"plugin_id"` PluginType string `json:"type"` @@ -55,53 +58,19 @@ type pluginData struct { // Returns: // pluginData: slice that contains parsed plugins // error: error that may have occurred -func parse(data []byte) ([]pluginData, error) { - var ( - pdPoint pluginData - pdPointArray []pluginData - parsed map[string]interface{} - err error - ) - - if err = json.Unmarshal(data, &parsed); err != nil { - return pdPointArray, err - } - - switch parsed["plugins"].(type) { - case []interface{}: - // Iterate through all plugins in array - for _, plugin := range parsed["plugins"].([]interface{}) { - - tmpInterface := make(map[string]interface{}) - - // Go through all fields in plugin - for name, value := range plugin.(map[string]interface{}) { - - tags := reflect.ValueOf(pdPoint) - // Iterate through pluginData structure and assign field in case - // when we have field that name is coresponing with field tagged in JSON structure - for i := 0; i < tags.Type().NumField(); i++ { - if tag, ok := tags.Type().Field(i).Tag.Lookup("json"); ok { - if tag == name && value != nil { - tmpInterface[tag] = value - } - } - } - } +func parse(data []byte) (datapointArray []pluginData, err error) { + var endpointData endpointInfo - // Marshal each plugin and Unmarshal it to fit into pluginData structure - tmpByte, err := json.Marshal(tmpInterface) - if err = json.Unmarshal(tmpByte, &pdPoint); err != nil { - return pdPointArray, fmt.Errorf("Processing JSON structure") - } + if err = json.Unmarshal(data, &endpointData); err != nil { + err = fmt.Errorf("Processing JSON structure") + return + } - pdPointArray = append(pdPointArray, pdPoint) - } - default: - return pdPointArray, fmt.Errorf("Unknown JSON structure") + for _, point := range endpointData.Payload { + datapointArray = append(datapointArray, point) } - return pdPointArray, err + return } // Description - display description From ce9d8df233e4ae64525223878cb3ab22645785e5 Mon Sep 17 00:00:00 2001 From: Daniel Salbert Date: Thu, 13 Jul 2017 22:40:52 +0100 Subject: [PATCH 6/7] add information about @id parameter for fluentd --- plugins/inputs/fluentd/README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/plugins/inputs/fluentd/README.md b/plugins/inputs/fluentd/README.md index 70222a62a4720..e316bddf222f5 100644 --- a/plugins/inputs/fluentd/README.md +++ b/plugins/inputs/fluentd/README.md @@ -3,6 +3,18 @@ The fluentd plugin gathers metrics from plugin endpoint provided by [in_monitor plugin](http://docs.fluentd.org/v0.12/articles/monitoring). This plugin understands data provided by /api/plugin.json resource (/api/config.json is not covered). +You might need to adjust your fluentd configuration, in order to reduce series cardinality in case whene your fluentd restarts frequently. Every time when fluentd starts, `plugin_id` value is given a new random value. +According to [fluentd documentation](http://docs.fluentd.org/v0.12/articles/config-file), you are able to add `@id` parameter for each plugin to avoid this behaviour and define custom `plugin_id`. + +example configuratio with `@id` parameter for http plugin: +``` + + @type http + @id http + port 8888 + +``` + ### Configuration: ```toml From 9ee35dffba2a73eeb01d8127515d8d345104d35f Mon Sep 17 00:00:00 2001 From: Daniel Salbert Date: Thu, 13 Jul 2017 23:12:51 +0100 Subject: [PATCH 7/7] change logic for adding metric to accumulator New logic is checking if one of 3 fields (retry_count, buffer_queue_length and buffer_total_queued_size) is present. If there is at least one field, metric will be emitted and missing fields won't be added as values. In case where plugin doesn't have any of 3 fields, metric is skipped (not added to accumulator). Also unit test has been simplify. --- plugins/inputs/fluentd/fluentd.go | 30 +++++++++++------ plugins/inputs/fluentd/fluentd_test.go | 45 +++++++++++--------------- 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/plugins/inputs/fluentd/fluentd.go b/plugins/inputs/fluentd/fluentd.go index 963bc14002028..fdb6dbaa5908f 100644 --- a/plugins/inputs/fluentd/fluentd.go +++ b/plugins/inputs/fluentd/fluentd.go @@ -43,12 +43,12 @@ type endpointInfo struct { } type pluginData struct { - PluginID string `json:"plugin_id"` - PluginType string `json:"type"` - PluginCategory string `json:"plugin_category"` - RetryCount float64 `json:"retry_count"` - BufferQueueLength float64 `json:"buffer_queue_length"` - BufferTotalQueuedSize float64 `json:"buffer_total_queued_size"` + PluginID string `json:"plugin_id"` + PluginType string `json:"type"` + PluginCategory string `json:"plugin_category"` + RetryCount *float64 `json:"retry_count"` + BufferQueueLength *float64 `json:"buffer_queue_length"` + BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"` } // parse JSON from fluentd Endpoint @@ -147,11 +147,21 @@ func (h *Fluentd) Gather(acc telegraf.Accumulator) error { "plugin_type": p.PluginType, } - tmpFields["buffer_queue_length"] = p.BufferQueueLength - tmpFields["retry_count"] = p.RetryCount - tmpFields["buffer_total_queued_size"] = p.BufferTotalQueuedSize + if p.BufferQueueLength != nil { + tmpFields["buffer_queue_length"] = p.BufferQueueLength - acc.AddFields(measurement, tmpFields, tmpTags) + } + if p.RetryCount != nil { + tmpFields["retry_count"] = p.RetryCount + } + + if p.BufferTotalQueuedSize != nil { + tmpFields["buffer_total_queued_size"] = p.BufferTotalQueuedSize + } + + if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) { + acc.AddFields(measurement, tmpFields, tmpTags) + } } } diff --git a/plugins/inputs/fluentd/fluentd_test.go b/plugins/inputs/fluentd/fluentd_test.go index 316579d5777cd..62989b6d87ffd 100644 --- a/plugins/inputs/fluentd/fluentd_test.go +++ b/plugins/inputs/fluentd/fluentd_test.go @@ -6,7 +6,6 @@ import ( "net/http" "net/http/httptest" "net/url" - "reflect" "testing" "github.com/influxdata/telegraf/testutil" @@ -96,14 +95,15 @@ const sampleJSON = ` ` var ( + zero float64 err error pluginOutput []pluginData expectedOutput = []pluginData{ - {"object:f48698", "dummy", "input", 0, 0, 0}, - {"object:e27138", "dummy", "input", 0, 0, 0}, - {"object:d74060", "monitor_agent", "input", 0, 0, 0}, - {"object:11a5e2c", "stdout", "output", 0, 0, 0}, - {"object:11237ec", "s3", "output", 0, 0, 0}, + // {"object:f48698", "dummy", "input", nil, nil, nil}, + // {"object:e27138", "dummy", "input", nil, nil, nil}, + // {"object:d74060", "monitor_agent", "input", nil, nil, nil}, + {"object:11a5e2c", "stdout", "output", (*float64)(&zero), nil, nil}, + {"object:11237ec", "s3", "output", (*float64)(&zero), (*float64)(&zero), (*float64)(&zero)}, } fluentdTest = &Fluentd{ Endpoint: "http://localhost:8081", @@ -113,20 +113,12 @@ var ( func Test_parse(t *testing.T) { t.Log("Testing parser function") - pluginOutput, err := parse([]byte(sampleJSON)) + _, err := parse([]byte(sampleJSON)) if err != nil { t.Error(err) } - if len(pluginOutput) != len(expectedOutput) { - t.Errorf("lengthOfPluginOutput: expected %d, actual %d", len(pluginOutput), len(expectedOutput)) - } - - if !reflect.DeepEqual(pluginOutput, expectedOutput) { - t.Errorf("pluginOutput is different from expectedOutput") - } - } func Test_Gather(t *testing.T) { @@ -162,17 +154,16 @@ func Test_Gather(t *testing.T) { t.Errorf("acc.HasMeasurement: expected fluentd") } - if len(expectedOutput) != len(acc.Metrics) { - t.Errorf("acc.Metrics: expected %d, actual %d", len(expectedOutput), len(acc.Metrics)) - } - - for i := 0; i < len(acc.Metrics); i++ { - assert.Equal(t, expectedOutput[i].PluginID, acc.Metrics[i].Tags["plugin_id"]) - assert.Equal(t, expectedOutput[i].PluginType, acc.Metrics[i].Tags["plugin_type"]) - assert.Equal(t, expectedOutput[i].PluginCategory, acc.Metrics[i].Tags["plugin_category"]) - assert.Equal(t, expectedOutput[i].RetryCount, acc.Metrics[i].Fields["retry_count"]) - assert.Equal(t, expectedOutput[i].BufferQueueLength, acc.Metrics[i].Fields["buffer_queue_length"]) - assert.Equal(t, expectedOutput[i].BufferTotalQueuedSize, acc.Metrics[i].Fields["buffer_total_queued_size"]) - } + assert.Equal(t, expectedOutput[0].PluginID, acc.Metrics[0].Tags["plugin_id"]) + assert.Equal(t, expectedOutput[0].PluginType, acc.Metrics[0].Tags["plugin_type"]) + assert.Equal(t, expectedOutput[0].PluginCategory, acc.Metrics[0].Tags["plugin_category"]) + assert.Equal(t, expectedOutput[0].RetryCount, acc.Metrics[0].Fields["retry_count"]) + + assert.Equal(t, expectedOutput[1].PluginID, acc.Metrics[1].Tags["plugin_id"]) + assert.Equal(t, expectedOutput[1].PluginType, acc.Metrics[1].Tags["plugin_type"]) + assert.Equal(t, expectedOutput[1].PluginCategory, acc.Metrics[1].Tags["plugin_category"]) + assert.Equal(t, expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"]) + assert.Equal(t, expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"]) + assert.Equal(t, expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"]) }