-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add http input plugin which supports any input data format (#3546)
- Loading branch information
1 parent
42ccc9f
commit f82f03b
Showing
5 changed files
with
361 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# HTTP Input Plugin | ||
|
||
The HTTP input plugin collects metrics from one or more HTTP(S) endpoints. The metrics need to be formatted in one of the supported data formats. Each data format has its own unique set of configuration options, read more about them here: | ||
https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
|
||
|
||
### Configuration: | ||
|
||
This section contains the default TOML to configure the plugin. You can | ||
generate it using `telegraf --usage http`. | ||
|
||
```toml | ||
# Read formatted metrics from one or more HTTP endpoints | ||
[[inputs.http]] | ||
## One or more URLs from which to read formatted metrics | ||
urls = [ | ||
"http://localhost/metrics" | ||
] | ||
|
||
## Optional HTTP Basic Auth Credentials | ||
# username = "username" | ||
# password = "pa$$word" | ||
|
||
## Optional SSL Config | ||
# ssl_ca = "/etc/telegraf/ca.pem" | ||
# ssl_cert = "/etc/telegraf/cert.pem" | ||
# ssl_key = "/etc/telegraf/key.pem" | ||
## Use SSL but skip chain & host verification | ||
# insecure_skip_verify = false | ||
|
||
# timeout = "5s" | ||
|
||
## Data format to consume. | ||
## Each data format has its own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
# data_format = "influx" | ||
``` | ||
|
||
### Metrics: | ||
|
||
The metrics collected by this input plugin will depend on the configurated `data_format` and the payload returned by the HTTP endpoint(s). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
package http | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
"github.com/influxdata/telegraf/plugins/parsers" | ||
) | ||
|
||
type HTTP struct { | ||
URLs []string `toml:"urls"` | ||
|
||
Headers map[string]string | ||
|
||
// HTTP Basic Auth Credentials | ||
Username string | ||
Password string | ||
|
||
// Option to add "url" tag to each metric | ||
TagURL bool `toml:"tag_url"` | ||
|
||
// Path to CA file | ||
SSLCA string `toml:"ssl_ca"` | ||
// Path to host cert file | ||
SSLCert string `toml:"ssl_cert"` | ||
// Path to cert key file | ||
SSLKey string `toml:"ssl_key"` | ||
// Use SSL but skip chain & host verification | ||
InsecureSkipVerify bool | ||
|
||
Timeout internal.Duration | ||
|
||
client *http.Client | ||
|
||
// The parser will automatically be set by Telegraf core code because | ||
// this plugin implements the ParserInput interface (i.e. the SetParser method) | ||
parser parsers.Parser | ||
} | ||
|
||
var sampleConfig = ` | ||
## One or more URLs from which to read formatted metrics | ||
urls = [ | ||
"http://localhost/metrics" | ||
] | ||
## Optional HTTP headers | ||
# headers = {"X-Special-Header" = "Special-Value"} | ||
## Optional HTTP Basic Auth Credentials | ||
# username = "username" | ||
# password = "pa$$word" | ||
## Tag all metrics with the url | ||
# tag_url = true | ||
## Optional SSL Config | ||
# ssl_ca = "/etc/telegraf/ca.pem" | ||
# ssl_cert = "/etc/telegraf/cert.pem" | ||
# ssl_key = "/etc/telegraf/key.pem" | ||
## Use SSL but skip chain & host verification | ||
# insecure_skip_verify = false | ||
# timeout = "5s" | ||
## Data format to consume. | ||
## Each data format has its own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
# data_format = "influx" | ||
` | ||
|
||
// SampleConfig returns the default configuration of the Input | ||
func (*HTTP) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
// Description returns a one-sentence description on the Input | ||
func (*HTTP) Description() string { | ||
return "Read formatted metrics from one or more HTTP endpoints" | ||
} | ||
|
||
// Gather takes in an accumulator and adds the metrics that the Input | ||
// gathers. This is called every "interval" | ||
func (h *HTTP) Gather(acc telegraf.Accumulator) error { | ||
if h.client == nil { | ||
tlsCfg, err := internal.GetTLSConfig( | ||
h.SSLCert, h.SSLKey, h.SSLCA, h.InsecureSkipVerify) | ||
if err != nil { | ||
return err | ||
} | ||
h.client = &http.Client{ | ||
Transport: &http.Transport{ | ||
TLSClientConfig: tlsCfg, | ||
}, | ||
Timeout: h.Timeout.Duration, | ||
} | ||
} | ||
|
||
var wg sync.WaitGroup | ||
for _, u := range h.URLs { | ||
wg.Add(1) | ||
go func(url string) { | ||
defer wg.Done() | ||
if err := h.gatherURL(acc, url); err != nil { | ||
acc.AddError(fmt.Errorf("[url=%s]: %s", url, err)) | ||
} | ||
}(u) | ||
} | ||
|
||
wg.Wait() | ||
|
||
return nil | ||
} | ||
|
||
// SetParser takes the data_format from the config and finds the right parser for that format | ||
func (h *HTTP) SetParser(parser parsers.Parser) { | ||
h.parser = parser | ||
} | ||
|
||
// Gathers data from a particular URL | ||
// Parameters: | ||
// acc : The telegraf Accumulator to use | ||
// url : endpoint to send request to | ||
// | ||
// Returns: | ||
// error: Any error that may have occurred | ||
func (h *HTTP) gatherURL( | ||
acc telegraf.Accumulator, | ||
url string, | ||
) error { | ||
request, err := http.NewRequest("GET", url, nil) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for k, v := range h.Headers { | ||
if strings.ToLower(k) == "host" { | ||
request.Host = v | ||
} else { | ||
request.Header.Add(k, v) | ||
} | ||
} | ||
|
||
if h.Username != "" { | ||
request.SetBasicAuth(h.Username, h.Password) | ||
} | ||
|
||
resp, err := h.client.Do(request) | ||
if err != nil { | ||
return err | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
return fmt.Errorf("Received status code %d (%s), expected %d (%s)", | ||
resp.StatusCode, | ||
http.StatusText(resp.StatusCode), | ||
http.StatusOK, | ||
http.StatusText(http.StatusOK)) | ||
} | ||
|
||
b, err := ioutil.ReadAll(resp.Body) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if h.parser == nil { | ||
return errors.New("Parser is not set") | ||
} | ||
|
||
metrics, err := h.parser.Parse(b) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, metric := range metrics { | ||
if h.TagURL { | ||
metric.AddTag("url", url) | ||
} | ||
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func init() { | ||
inputs.Add("http", func() telegraf.Input { | ||
return &HTTP{ | ||
Timeout: internal.Duration{Duration: time.Second * 5}, | ||
} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package http_test | ||
|
||
import ( | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
plugin "github.com/influxdata/telegraf/plugins/inputs/http" | ||
"github.com/influxdata/telegraf/plugins/parsers" | ||
"github.com/influxdata/telegraf/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestHTTPwithJSONFormat(t *testing.T) { | ||
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
if r.URL.Path == "/endpoint" { | ||
_, _ = w.Write([]byte(simpleJSON)) | ||
} else { | ||
w.WriteHeader(http.StatusNotFound) | ||
} | ||
})) | ||
defer fakeServer.Close() | ||
|
||
url := fakeServer.URL + "/endpoint" | ||
plugin := &plugin.HTTP{ | ||
URLs: []string{url}, | ||
TagURL: true, | ||
} | ||
metricName := "metricName" | ||
p, _ := parsers.NewJSONParser(metricName, nil, nil) | ||
plugin.SetParser(p) | ||
|
||
var acc testutil.Accumulator | ||
require.NoError(t, acc.GatherError(plugin.Gather)) | ||
|
||
require.Len(t, acc.Metrics, 1) | ||
|
||
// basic check to see if we got the right field, value and tag | ||
var metric = acc.Metrics[0] | ||
require.Equal(t, metric.Measurement, metricName) | ||
require.Len(t, acc.Metrics[0].Fields, 1) | ||
require.Equal(t, acc.Metrics[0].Fields["a"], 1.2) | ||
require.Equal(t, acc.Metrics[0].Tags["url"], url) | ||
} | ||
|
||
func TestHTTPHeaders(t *testing.T) { | ||
header := "X-Special-Header" | ||
headerValue := "Special-Value" | ||
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
if r.URL.Path == "/endpoint" { | ||
if r.Header.Get(header) == headerValue { | ||
_, _ = w.Write([]byte(simpleJSON)) | ||
} else { | ||
w.WriteHeader(http.StatusForbidden) | ||
} | ||
} else { | ||
w.WriteHeader(http.StatusNotFound) | ||
} | ||
})) | ||
defer fakeServer.Close() | ||
|
||
url := fakeServer.URL + "/endpoint" | ||
plugin := &plugin.HTTP{ | ||
URLs: []string{url}, | ||
Headers: map[string]string{header: headerValue}, | ||
} | ||
metricName := "metricName" | ||
p, _ := parsers.NewJSONParser(metricName, nil, nil) | ||
plugin.SetParser(p) | ||
|
||
var acc testutil.Accumulator | ||
require.NoError(t, acc.GatherError(plugin.Gather)) | ||
} | ||
|
||
func TestInvalidStatusCode(t *testing.T) { | ||
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(http.StatusNotFound) | ||
})) | ||
defer fakeServer.Close() | ||
|
||
url := fakeServer.URL + "/endpoint" | ||
plugin := &plugin.HTTP{ | ||
URLs: []string{url}, | ||
} | ||
|
||
metricName := "metricName" | ||
p, _ := parsers.NewJSONParser(metricName, nil, nil) | ||
plugin.SetParser(p) | ||
|
||
var acc testutil.Accumulator | ||
require.Error(t, acc.GatherError(plugin.Gather)) | ||
} | ||
|
||
func TestParserNotSet(t *testing.T) { | ||
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
if r.URL.Path == "/endpoint" { | ||
_, _ = w.Write([]byte(simpleJSON)) | ||
} else { | ||
w.WriteHeader(http.StatusNotFound) | ||
} | ||
})) | ||
defer fakeServer.Close() | ||
|
||
url := fakeServer.URL + "/endpoint" | ||
plugin := &plugin.HTTP{ | ||
URLs: []string{url}, | ||
} | ||
|
||
var acc testutil.Accumulator | ||
require.Error(t, acc.GatherError(plugin.Gather)) | ||
} | ||
|
||
const simpleJSON = ` | ||
{ | ||
"a": 1.2 | ||
} | ||
` |