From 3c614988bc443ef268c9c350ee0cc4f523799823 Mon Sep 17 00:00:00 2001 From: Nicolas Grange Date: Wed, 6 Dec 2017 17:40:10 +1100 Subject: [PATCH 1/4] add generic http input plugin which supports setting an input data formats --- README.md | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/http/README.md | 42 ++++++++ plugins/inputs/http/http.go | 165 +++++++++++++++++++++++++++++++ plugins/inputs/http/http_test.go | 47 +++++++++ 5 files changed, 256 insertions(+) create mode 100644 plugins/inputs/http/README.md create mode 100644 plugins/inputs/http/http.go create mode 100644 plugins/inputs/http/http_test.go diff --git a/README.md b/README.md index c2a5fb0640fe0..414c3d96c7c66 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,7 @@ configuration options. * [graylog](./plugins/inputs/graylog) * [haproxy](./plugins/inputs/haproxy) * [hddtemp](./plugins/inputs/hddtemp) +* [http](./plugins/inputs/http) (generic HTTP plugin, supports using input data formats) * [http_response](./plugins/inputs/http_response) * [httpjson](./plugins/inputs/httpjson) (generic JSON-emitting http service plugin) * [internal](./plugins/inputs/internal) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index aaf5b6ae74dbc..d348c8c163568 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -29,6 +29,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/hddtemp" + _ "github.com/influxdata/telegraf/plugins/inputs/http" _ "github.com/influxdata/telegraf/plugins/inputs/http_listener" _ "github.com/influxdata/telegraf/plugins/inputs/http_response" _ "github.com/influxdata/telegraf/plugins/inputs/httpjson" diff --git a/plugins/inputs/http/README.md b/plugins/inputs/http/README.md new file mode 100644 index 0000000000000..23964a646134e --- /dev/null +++ b/plugins/inputs/http/README.md @@ -0,0 +1,42 @@ +# HTTP Input Plugin + +The HTTP input plugin gathers formatted metrics from one or more HTTP(S) endpoints. +It requires `data_format` to be specified so it can use the corresponding Parser to convert the returned payload into measurements, fields and tags. +See [DATA_FORMATS_INPUT.md](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md) for the list of supported formats. + +### Configuration: + +This section contains the default TOML to configure the plugin. You can +generate it using `telegraf --usage http`. + +```toml +# Read formatted metrics from one or more HTTP endpoints +[[inputs.http]] + ## One or more URLs from which to read formatted metrics + urls = [ + "http://localhost:2015/simple.json" + ] + + ## Optional HTTP Basic Auth Credentials + # username = "username" + # password = "pa$$word" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## http request & header timeout + ## defaults to 5s if not set + timeout = "10s" + + ## Mandatory data_format + ## See available options at https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "json" +``` + +### Metrics: + +The metrics collected by this input plugin will depend on the configurated `data_format` and the payload returned by the HTTP endpoint(s). diff --git a/plugins/inputs/http/http.go b/plugins/inputs/http/http.go new file mode 100644 index 0000000000000..c3e9cfea74d4b --- /dev/null +++ b/plugins/inputs/http/http.go @@ -0,0 +1,165 @@ +package http + +import ( + "fmt" + "io/ioutil" + "net/http" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type HTTP struct { + URLs []string `toml:"urls"` + + // HTTP Basic Auth Credentials + Username string + Password string + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool + + Timeout internal.Duration + + client *http.Client + + // The parser will automatically be set by Telegraf core code because + // this plugin implements the ParserInput interface (i.e. the SetParser method) + parser parsers.Parser +} + +var sampleConfig = ` + ## One or more URLs from which to read formatted metrics + urls = [ + "http://localhost:2015/simple.json" + ] + + ## Optional HTTP Basic Auth Credentials + # username = "username" + # password = "pa$$word" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## http request & header timeout + ## defaults to 5s if not set + timeout = "10s" + + ## Mandatory data_format + ## See available options at https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "json" +` + +// SampleConfig returns the default configuration of the Input +func (*HTTP) SampleConfig() string { + return sampleConfig +} + +// Description returns a one-sentence description on the Input +func (*HTTP) Description() string { + return "Read formatted metrics from one or more HTTP endpoints" +} + +// Gather takes in an accumulator and adds the metrics that the Input +// gathers. This is called every "interval" +func (h *HTTP) Gather(acc telegraf.Accumulator) error { + if h.client == nil { + tlsCfg, err := internal.GetTLSConfig( + h.SSLCert, h.SSLKey, h.SSLCA, h.InsecureSkipVerify) + if err != nil { + return err + } + h.client = &http.Client{ + Transport: &http.Transport{ + ResponseHeaderTimeout: h.Timeout.Duration, + TLSClientConfig: tlsCfg, + }, + Timeout: h.Timeout.Duration, + } + } + + var wg sync.WaitGroup + for _, u := range h.URLs { + wg.Add(1) + go func(url string) { + defer wg.Done() + if err := h.gatherURL(acc, url); err != nil { + acc.AddError(fmt.Errorf("[url=%s]: %s", url, err)) + } + }(u) + } + + wg.Wait() + + return nil +} + +// SetParser takes the data_format from the config and finds the right parser for that format +func (h *HTTP) SetParser(parser parsers.Parser) { + h.parser = parser +} + +// Gathers data from a particular URL +// Parameters: +// acc : The telegraf Accumulator to use +// url : endpoint to send request to +// +// Returns: +// error: Any error that may have occurred +func (h *HTTP) gatherURL( + acc telegraf.Accumulator, + url string, +) error { + request, err := http.NewRequest("GET", url, nil) + if err != nil { + return err + } + + if h.Username != "" { + request.SetBasicAuth(h.Username, h.Password) + } + + resp, err := h.client.Do(request) + if err != nil { + return err + } + defer resp.Body.Close() + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + metrics, err := h.parser.Parse(b) + if err != nil { + return err + } + + for _, metric := range metrics { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + + return nil +} + +func init() { + inputs.Add("http", func() telegraf.Input { + return &HTTP{ + Timeout: internal.Duration{Duration: time.Second * 5}, + } + }) +} diff --git a/plugins/inputs/http/http_test.go b/plugins/inputs/http/http_test.go new file mode 100644 index 0000000000000..9269bd1fa92f2 --- /dev/null +++ b/plugins/inputs/http/http_test.go @@ -0,0 +1,47 @@ +package http_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + plugin "github.com/influxdata/telegraf/plugins/inputs/http" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestHTTPwithJSONFormat(t *testing.T) { + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/endpoint" { + _, _ = w.Write([]byte(simpleJSON)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer fakeServer.Close() + + plugin := &plugin.HTTP{ + URLs: []string{fakeServer.URL + "/endpoint"}, + } + metricName := "metricName" + p, _ := parsers.NewJSONParser(metricName, nil, nil) + plugin.SetParser(p) + + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + require.Len(t, acc.Metrics, 1) + + // basic check to see if we got the right field and value + var metric = acc.Metrics[0] + require.Equal(t, metric.Measurement, metricName) + require.Len(t, acc.Metrics[0].Fields, 1) + require.Equal(t, acc.Metrics[0].Fields["a"], 1.2) +} + +const simpleJSON = ` +{ + "a": 1.2 +} +` From 48b5894603c1d99b8e47c44d1f29d417c4de2768 Mon Sep 17 00:00:00 2001 From: Nicolas Grange Date: Sun, 10 Dec 2017 21:51:10 +1100 Subject: [PATCH 2/4] Clean up sample config and remove setting of ResponseHeaderTimeout --- plugins/inputs/http/README.md | 20 ++++++++++---------- plugins/inputs/http/http.go | 17 ++++++++--------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/plugins/inputs/http/README.md b/plugins/inputs/http/README.md index 23964a646134e..430a19b2b7e2b 100644 --- a/plugins/inputs/http/README.md +++ b/plugins/inputs/http/README.md @@ -1,8 +1,8 @@ # HTTP Input Plugin -The HTTP input plugin gathers formatted metrics from one or more HTTP(S) endpoints. -It requires `data_format` to be specified so it can use the corresponding Parser to convert the returned payload into measurements, fields and tags. -See [DATA_FORMATS_INPUT.md](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md) for the list of supported formats. +The HTTP input plugin collects metrics from one or more HTTP(S) endpoints. The metrics need to be formatted in one of the supported data formats. Each data format has its own unique set of configuration options, read more about them here: + https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + ### Configuration: @@ -14,7 +14,7 @@ generate it using `telegraf --usage http`. [[inputs.http]] ## One or more URLs from which to read formatted metrics urls = [ - "http://localhost:2015/simple.json" + "http://localhost/metrics" ] ## Optional HTTP Basic Auth Credentials @@ -28,13 +28,13 @@ generate it using `telegraf --usage http`. ## Use SSL but skip chain & host verification # insecure_skip_verify = false - ## http request & header timeout - ## defaults to 5s if not set - timeout = "10s" + # timeout = "5s" - ## Mandatory data_format - ## See available options at https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "json" + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + # data_format = "influx" ``` ### Metrics: diff --git a/plugins/inputs/http/http.go b/plugins/inputs/http/http.go index c3e9cfea74d4b..c9678eee63aa0 100644 --- a/plugins/inputs/http/http.go +++ b/plugins/inputs/http/http.go @@ -41,7 +41,7 @@ type HTTP struct { var sampleConfig = ` ## One or more URLs from which to read formatted metrics urls = [ - "http://localhost:2015/simple.json" + "http://localhost/metrics" ] ## Optional HTTP Basic Auth Credentials @@ -55,13 +55,13 @@ var sampleConfig = ` ## Use SSL but skip chain & host verification # insecure_skip_verify = false - ## http request & header timeout - ## defaults to 5s if not set - timeout = "10s" + # timeout = "5s" - ## Mandatory data_format - ## See available options at https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "json" + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + # data_format = "influx" ` // SampleConfig returns the default configuration of the Input @@ -85,8 +85,7 @@ func (h *HTTP) Gather(acc telegraf.Accumulator) error { } h.client = &http.Client{ Transport: &http.Transport{ - ResponseHeaderTimeout: h.Timeout.Duration, - TLSClientConfig: tlsCfg, + TLSClientConfig: tlsCfg, }, Timeout: h.Timeout.Duration, } From 6f39654a28ac5647f6e81e073619b7ad302e1f74 Mon Sep 17 00:00:00 2001 From: Nicolas Grange Date: Sat, 10 Feb 2018 23:14:42 +1100 Subject: [PATCH 3/4] add option to tag all metrics with the url --- plugins/inputs/http/http.go | 9 +++++++++ plugins/inputs/http/http_test.go | 7 +++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/http/http.go b/plugins/inputs/http/http.go index c9678eee63aa0..f8e78fe226596 100644 --- a/plugins/inputs/http/http.go +++ b/plugins/inputs/http/http.go @@ -20,6 +20,9 @@ type HTTP struct { Username string Password string + // Option to add "url" tag to each metric + TagURL bool `toml:"tag_url"` + // Path to CA file SSLCA string `toml:"ssl_ca"` // Path to host cert file @@ -48,6 +51,9 @@ var sampleConfig = ` # username = "username" # password = "pa$$word" + ## Tag all metrics with the url + # tag_url = true + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" @@ -149,6 +155,9 @@ func (h *HTTP) gatherURL( } for _, metric := range metrics { + if h.TagURL { + metric.AddTag("url", url) + } acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } diff --git a/plugins/inputs/http/http_test.go b/plugins/inputs/http/http_test.go index 9269bd1fa92f2..c8e2c73c1d066 100644 --- a/plugins/inputs/http/http_test.go +++ b/plugins/inputs/http/http_test.go @@ -21,8 +21,10 @@ func TestHTTPwithJSONFormat(t *testing.T) { })) defer fakeServer.Close() + url := fakeServer.URL + "/endpoint" plugin := &plugin.HTTP{ - URLs: []string{fakeServer.URL + "/endpoint"}, + URLs: []string{url}, + TagURL: true, } metricName := "metricName" p, _ := parsers.NewJSONParser(metricName, nil, nil) @@ -33,11 +35,12 @@ func TestHTTPwithJSONFormat(t *testing.T) { require.Len(t, acc.Metrics, 1) - // basic check to see if we got the right field and value + // basic check to see if we got the right field, value and tag var metric = acc.Metrics[0] require.Equal(t, metric.Measurement, metricName) require.Len(t, acc.Metrics[0].Fields, 1) require.Equal(t, acc.Metrics[0].Fields["a"], 1.2) + require.Equal(t, acc.Metrics[0].Tags["url"], url) } const simpleJSON = ` From 7cfba8b2b9a91cd765f87cbfc22a7965002ac927 Mon Sep 17 00:00:00 2001 From: Nicolas Grange Date: Sun, 11 Feb 2018 02:27:57 +1100 Subject: [PATCH 4/4] add support for setting custom HTTP headers --- plugins/inputs/http/http.go | 27 +++++++++++++ plugins/inputs/http/http_test.go | 67 ++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/plugins/inputs/http/http.go b/plugins/inputs/http/http.go index f8e78fe226596..6a98ab4fc8c34 100644 --- a/plugins/inputs/http/http.go +++ b/plugins/inputs/http/http.go @@ -1,9 +1,11 @@ package http import ( + "errors" "fmt" "io/ioutil" "net/http" + "strings" "sync" "time" @@ -16,6 +18,8 @@ import ( type HTTP struct { URLs []string `toml:"urls"` + Headers map[string]string + // HTTP Basic Auth Credentials Username string Password string @@ -47,6 +51,9 @@ var sampleConfig = ` "http://localhost/metrics" ] + ## Optional HTTP headers + # headers = {"X-Special-Header" = "Special-Value"} + ## Optional HTTP Basic Auth Credentials # username = "username" # password = "pa$$word" @@ -134,6 +141,14 @@ func (h *HTTP) gatherURL( return err } + for k, v := range h.Headers { + if strings.ToLower(k) == "host" { + request.Host = v + } else { + request.Header.Add(k, v) + } + } + if h.Username != "" { request.SetBasicAuth(h.Username, h.Password) } @@ -144,11 +159,23 @@ func (h *HTTP) gatherURL( } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("Received status code %d (%s), expected %d (%s)", + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + } + b, err := ioutil.ReadAll(resp.Body) if err != nil { return err } + if h.parser == nil { + return errors.New("Parser is not set") + } + metrics, err := h.parser.Parse(b) if err != nil { return err diff --git a/plugins/inputs/http/http_test.go b/plugins/inputs/http/http_test.go index c8e2c73c1d066..fd352829ca33a 100644 --- a/plugins/inputs/http/http_test.go +++ b/plugins/inputs/http/http_test.go @@ -43,6 +43,73 @@ func TestHTTPwithJSONFormat(t *testing.T) { require.Equal(t, acc.Metrics[0].Tags["url"], url) } +func TestHTTPHeaders(t *testing.T) { + header := "X-Special-Header" + headerValue := "Special-Value" + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/endpoint" { + if r.Header.Get(header) == headerValue { + _, _ = w.Write([]byte(simpleJSON)) + } else { + w.WriteHeader(http.StatusForbidden) + } + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer fakeServer.Close() + + url := fakeServer.URL + "/endpoint" + plugin := &plugin.HTTP{ + URLs: []string{url}, + Headers: map[string]string{header: headerValue}, + } + metricName := "metricName" + p, _ := parsers.NewJSONParser(metricName, nil, nil) + plugin.SetParser(p) + + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) +} + +func TestInvalidStatusCode(t *testing.T) { + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer fakeServer.Close() + + url := fakeServer.URL + "/endpoint" + plugin := &plugin.HTTP{ + URLs: []string{url}, + } + + metricName := "metricName" + p, _ := parsers.NewJSONParser(metricName, nil, nil) + plugin.SetParser(p) + + var acc testutil.Accumulator + require.Error(t, acc.GatherError(plugin.Gather)) +} + +func TestParserNotSet(t *testing.T) { + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/endpoint" { + _, _ = w.Write([]byte(simpleJSON)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer fakeServer.Close() + + url := fakeServer.URL + "/endpoint" + plugin := &plugin.HTTP{ + URLs: []string{url}, + } + + var acc testutil.Accumulator + require.Error(t, acc.GatherError(plugin.Gather)) +} + const simpleJSON = ` { "a": 1.2