-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add generic http input plugin which supports setting an input data format #3546
Changes from 1 commit
3c61498
48b5894
6f39654
7cfba8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# HTTP Input Plugin | ||
|
||
The HTTP input plugin gathers formatted metrics from one or more HTTP(S) endpoints. | ||
It requires `data_format` to be specified so it can use the corresponding Parser to convert the returned payload into measurements, fields and tags. | ||
See [DATA_FORMATS_INPUT.md](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md) for the list of supported formats. | ||
|
||
### Configuration: | ||
|
||
This section contains the default TOML to configure the plugin. You can | ||
generate it using `telegraf --usage http`. | ||
|
||
```toml | ||
# Read formatted metrics from one or more HTTP endpoints | ||
[[inputs.http]] | ||
## One or more URLs from which to read formatted metrics | ||
urls = [ | ||
"http://localhost:2015/simple.json" | ||
] | ||
|
||
## Optional HTTP Basic Auth Credentials | ||
# username = "username" | ||
# password = "pa$$word" | ||
|
||
## Optional SSL Config | ||
# ssl_ca = "/etc/telegraf/ca.pem" | ||
# ssl_cert = "/etc/telegraf/cert.pem" | ||
# ssl_key = "/etc/telegraf/key.pem" | ||
## Use SSL but skip chain & host verification | ||
# insecure_skip_verify = false | ||
|
||
## http request & header timeout | ||
## defaults to 5s if not set | ||
timeout = "10s" | ||
|
||
## Mandatory data_format | ||
## See available options at https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
data_format = "json" | ||
``` | ||
|
||
### Metrics: | ||
|
||
The metrics collected by this input plugin will depend on the configurated `data_format` and the payload returned by the HTTP endpoint(s). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
package http | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"sync" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
"github.com/influxdata/telegraf/plugins/parsers" | ||
) | ||
|
||
type HTTP struct { | ||
URLs []string `toml:"urls"` | ||
|
||
// HTTP Basic Auth Credentials | ||
Username string | ||
Password string | ||
|
||
// Path to CA file | ||
SSLCA string `toml:"ssl_ca"` | ||
// Path to host cert file | ||
SSLCert string `toml:"ssl_cert"` | ||
// Path to cert key file | ||
SSLKey string `toml:"ssl_key"` | ||
// Use SSL but skip chain & host verification | ||
InsecureSkipVerify bool | ||
|
||
Timeout internal.Duration | ||
|
||
client *http.Client | ||
|
||
// The parser will automatically be set by Telegraf core code because | ||
// this plugin implements the ParserInput interface (i.e. the SetParser method) | ||
parser parsers.Parser | ||
} | ||
|
||
var sampleConfig = ` | ||
## One or more URLs from which to read formatted metrics | ||
urls = [ | ||
"http://localhost:2015/simple.json" | ||
] | ||
|
||
## Optional HTTP Basic Auth Credentials | ||
# username = "username" | ||
# password = "pa$$word" | ||
|
||
## Optional SSL Config | ||
# ssl_ca = "/etc/telegraf/ca.pem" | ||
# ssl_cert = "/etc/telegraf/cert.pem" | ||
# ssl_key = "/etc/telegraf/key.pem" | ||
## Use SSL but skip chain & host verification | ||
# insecure_skip_verify = false | ||
|
||
## http request & header timeout | ||
## defaults to 5s if not set | ||
timeout = "10s" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update the comment based on removing header timeout (see below). Use the default value and comment it out:
|
||
|
||
## Mandatory data_format | ||
## See available options at https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md | ||
data_format = "json" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually optional, if unset it will default to
|
||
` | ||
|
||
// SampleConfig returns the default configuration of the Input | ||
func (*HTTP) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
// Description returns a one-sentence description on the Input | ||
func (*HTTP) Description() string { | ||
return "Read formatted metrics from one or more HTTP endpoints" | ||
} | ||
|
||
// Gather takes in an accumulator and adds the metrics that the Input | ||
// gathers. This is called every "interval" | ||
func (h *HTTP) Gather(acc telegraf.Accumulator) error { | ||
if h.client == nil { | ||
tlsCfg, err := internal.GetTLSConfig( | ||
h.SSLCert, h.SSLKey, h.SSLCA, h.InsecureSkipVerify) | ||
if err != nil { | ||
return err | ||
} | ||
h.client = &http.Client{ | ||
Transport: &http.Transport{ | ||
ResponseHeaderTimeout: h.Timeout.Duration, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't set a response header timeout since we have the client timeout. I need to exercise these from the code so they stop getting copied around. |
||
TLSClientConfig: tlsCfg, | ||
}, | ||
Timeout: h.Timeout.Duration, | ||
} | ||
} | ||
|
||
var wg sync.WaitGroup | ||
for _, u := range h.URLs { | ||
wg.Add(1) | ||
go func(url string) { | ||
defer wg.Done() | ||
if err := h.gatherURL(acc, url); err != nil { | ||
acc.AddError(fmt.Errorf("[url=%s]: %s", url, err)) | ||
} | ||
}(u) | ||
} | ||
|
||
wg.Wait() | ||
|
||
return nil | ||
} | ||
|
||
// SetParser takes the data_format from the config and finds the right parser for that format | ||
func (h *HTTP) SetParser(parser parsers.Parser) { | ||
h.parser = parser | ||
} | ||
|
||
// Gathers data from a particular URL | ||
// Parameters: | ||
// acc : The telegraf Accumulator to use | ||
// url : endpoint to send request to | ||
// | ||
// Returns: | ||
// error: Any error that may have occurred | ||
func (h *HTTP) gatherURL( | ||
acc telegraf.Accumulator, | ||
url string, | ||
) error { | ||
request, err := http.NewRequest("GET", url, nil) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if h.Username != "" { | ||
request.SetBasicAuth(h.Username, h.Password) | ||
} | ||
|
||
resp, err := h.client.Do(request) | ||
if err != nil { | ||
return err | ||
} | ||
defer resp.Body.Close() | ||
|
||
b, err := ioutil.ReadAll(resp.Body) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
metrics, err := h.parser.Parse(b) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, metric := range metrics { | ||
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func init() { | ||
inputs.Add("http", func() telegraf.Input { | ||
return &HTTP{ | ||
Timeout: internal.Duration{Duration: time.Second * 5}, | ||
} | ||
}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package http_test | ||
|
||
import ( | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
plugin "github.com/influxdata/telegraf/plugins/inputs/http" | ||
"github.com/influxdata/telegraf/plugins/parsers" | ||
"github.com/influxdata/telegraf/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestHTTPwithJSONFormat(t *testing.T) { | ||
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
if r.URL.Path == "/endpoint" { | ||
_, _ = w.Write([]byte(simpleJSON)) | ||
} else { | ||
w.WriteHeader(http.StatusNotFound) | ||
} | ||
})) | ||
defer fakeServer.Close() | ||
|
||
plugin := &plugin.HTTP{ | ||
URLs: []string{fakeServer.URL + "/endpoint"}, | ||
} | ||
metricName := "metricName" | ||
p, _ := parsers.NewJSONParser(metricName, nil, nil) | ||
plugin.SetParser(p) | ||
|
||
var acc testutil.Accumulator | ||
require.NoError(t, acc.GatherError(plugin.Gather)) | ||
|
||
require.Len(t, acc.Metrics, 1) | ||
|
||
// basic check to see if we got the right field and value | ||
var metric = acc.Metrics[0] | ||
require.Equal(t, metric.Measurement, metricName) | ||
require.Len(t, acc.Metrics[0].Fields, 1) | ||
require.Equal(t, acc.Metrics[0].Fields["a"], 1.2) | ||
} | ||
|
||
const simpleJSON = ` | ||
{ | ||
"a": 1.2 | ||
} | ||
` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of my comment about
data_format
we should set this to something more format neutral such ashttp://localhost/metrics