-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add influx2.0 output plugin #4645
Merged
Merged
Changes from 7 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
48f79ce
Add output plugin for influx2.0
glinton ae8498b
Add tests
glinton 93a47ff
Add Readme
glinton 5b4958d
Implement remaining spec
glinton a68e828
Address feedback
glinton 295b990
Better retry-time handling
glinton c3e3c42
Ignore if no retry-after header is returned
glinton 3bf912f
Update default encoding, set retry if greater than max
glinton b938d41
Update readme
glinton 66839d6
Force no uint support
glinton d561ef3
Match old pluign
glinton File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# InfluxDB Output Plugin | ||
|
||
This InfluxDB output plugin writes metrics to the [InfluxDB 2.0](https://github.com/influxdata/platform) HTTP service. | ||
|
||
### Configuration: | ||
|
||
```toml | ||
# Configuration for sending metrics to InfluxDB 2.0 | ||
[[outputs.influxdb_v2]] | ||
## The URLs of the InfluxDB cluster nodes. | ||
## | ||
## Multiple URLs can be specified for a single cluster, only ONE of the | ||
## urls will be written to each interval. | ||
urls = ["http://127.0.0.1:9999"] | ||
|
||
## Token for authentication. | ||
token = "" | ||
|
||
## Organization is the name of the organization you wish to write to. | ||
organization = "" | ||
|
||
## Bucket to the name fo the bucketwrite into; must exist. | ||
bucket = "" | ||
|
||
## Timeout for HTTP messages. | ||
# timeout = "5s" | ||
|
||
## Additional HTTP headers | ||
# http_headers = {"X-Special-Header" = "Special-Value"} | ||
|
||
## HTTP Proxy override, if unset values the standard proxy environment | ||
## variables are consulted to determine which proxy, if any, should be used. | ||
# http_proxy = "http://corporate.proxy:3128" | ||
|
||
## HTTP User-Agent | ||
# user_agent = "telegraf" | ||
|
||
## Content-Encoding for write request body, can be set to "gzip" to | ||
## compress body or "identity" to apply no encoding. | ||
# content_encoding = "identity" | ||
|
||
## Optional TLS Config for use on HTTP connections. | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
# insecure_skip_verify = false | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,293 @@ | ||
package influxdb_v2 | ||
|
||
import ( | ||
"compress/gzip" | ||
"context" | ||
"crypto/tls" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"log" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
"path" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/plugins/serializers/influx" | ||
) | ||
|
||
type APIErrorType int | ||
|
||
type APIError struct { | ||
StatusCode int | ||
Title string | ||
Description string | ||
Type APIErrorType | ||
} | ||
|
||
func (e APIError) Error() string { | ||
if e.Description != "" { | ||
return fmt.Sprintf("%s: %s", e.Title, e.Description) | ||
} | ||
return e.Title | ||
} | ||
|
||
const ( | ||
defaultRequestTimeout = time.Second * 5 | ||
defaultMaxWait = 10 // seconds | ||
defaultDatabase = "telegraf" | ||
defaultUserAgent = "telegraf" | ||
) | ||
|
||
type HTTPConfig struct { | ||
URL *url.URL | ||
Token string | ||
Organization string | ||
Bucket string | ||
Timeout time.Duration | ||
Headers map[string]string | ||
Proxy *url.URL | ||
UserAgent string | ||
ContentEncoding string | ||
TLSConfig *tls.Config | ||
|
||
Serializer *influx.Serializer | ||
} | ||
|
||
type httpClient struct { | ||
WriteURL string | ||
ContentEncoding string | ||
Timeout time.Duration | ||
Headers map[string]string | ||
|
||
client *http.Client | ||
serializer *influx.Serializer | ||
url *url.URL | ||
retryTime time.Time | ||
} | ||
|
||
func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { | ||
if config.URL == nil { | ||
return nil, ErrMissingURL | ||
} | ||
|
||
timeout := config.Timeout | ||
if timeout == 0 { | ||
timeout = defaultRequestTimeout | ||
} | ||
|
||
userAgent := config.UserAgent | ||
if userAgent == "" { | ||
userAgent = defaultUserAgent | ||
} | ||
|
||
var headers = make(map[string]string, len(config.Headers)+2) | ||
headers["User-Agent"] = userAgent | ||
headers["Authorization"] = "Token " + config.Token | ||
for k, v := range config.Headers { | ||
headers[k] = v | ||
} | ||
|
||
var proxy func(*http.Request) (*url.URL, error) | ||
if config.Proxy != nil { | ||
proxy = http.ProxyURL(config.Proxy) | ||
} else { | ||
proxy = http.ProxyFromEnvironment | ||
} | ||
|
||
serializer := config.Serializer | ||
if serializer == nil { | ||
serializer = influx.NewSerializer() | ||
} | ||
|
||
writeURL, err := makeWriteURL( | ||
*config.URL, | ||
config.Organization, | ||
config.Bucket) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var transport *http.Transport | ||
switch config.URL.Scheme { | ||
case "http", "https": | ||
transport = &http.Transport{ | ||
Proxy: proxy, | ||
TLSClientConfig: config.TLSConfig, | ||
} | ||
case "unix": | ||
transport = &http.Transport{ | ||
Dial: func(_, _ string) (net.Conn, error) { | ||
return net.DialTimeout( | ||
config.URL.Scheme, | ||
config.URL.Path, | ||
timeout, | ||
) | ||
}, | ||
} | ||
default: | ||
return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme) | ||
} | ||
|
||
client := &httpClient{ | ||
serializer: serializer, | ||
client: &http.Client{ | ||
Timeout: timeout, | ||
Transport: transport, | ||
}, | ||
url: config.URL, | ||
WriteURL: writeURL, | ||
ContentEncoding: config.ContentEncoding, | ||
Timeout: timeout, | ||
Headers: headers, | ||
} | ||
return client, nil | ||
} | ||
|
||
// URL returns the origin URL that this client connects too. | ||
func (c *httpClient) URL() string { | ||
return c.url.String() | ||
} | ||
|
||
type genericRespError struct { | ||
Code string | ||
Message string | ||
Line *int32 | ||
MaxLength *int32 | ||
} | ||
|
||
func (g genericRespError) Error() string { | ||
errString := fmt.Sprintf("%s: %s", g.Code, g.Message) | ||
if g.Line != nil { | ||
return fmt.Sprintf("%s - line[%d]", errString, g.Line) | ||
} else if g.MaxLength != nil { | ||
return fmt.Sprintf("%s - maxlen[%d]", errString, g.MaxLength) | ||
} | ||
return errString | ||
} | ||
|
||
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { | ||
if c.retryTime.After(time.Now()) { | ||
return errors.New("Retry time has not elapsed") | ||
} | ||
reader := influx.NewReader(metrics, c.serializer) | ||
req, err := c.makeWriteRequest(reader) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
resp, err := c.client.Do(req.WithContext(ctx)) | ||
if err != nil { | ||
return err | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode == http.StatusNoContent { | ||
return nil | ||
} | ||
|
||
writeResp := &genericRespError{} | ||
err = json.NewDecoder(resp.Body).Decode(writeResp) | ||
desc := writeResp.Error() | ||
if err != nil { | ||
desc = err.Error() | ||
} | ||
|
||
switch resp.StatusCode { | ||
glinton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
case http.StatusBadRequest, http.StatusUnauthorized, | ||
http.StatusForbidden, http.StatusRequestEntityTooLarge: | ||
log.Printf("E! [outputs.influxdb_v2] Failed to write metric: %s\n", desc) | ||
return nil | ||
case http.StatusTooManyRequests, http.StatusServiceUnavailable: | ||
retryAfter := resp.Header.Get("Retry-After") | ||
retry, err := strconv.Atoi(retryAfter) | ||
if err != nil { | ||
retry = 0 | ||
} | ||
if retry > defaultMaxWait { | ||
log.Println("E! [outputs.influxdb_v2] Failed to write metric: retry interval too long") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this more like:
|
||
return nil | ||
} | ||
c.retryTime = time.Now().Add(time.Duration(retry) * time.Second) | ||
return fmt.Errorf("Waiting %ds for server before sending metric again", retry) | ||
} | ||
|
||
// This is only until platform spec is fully implemented. As of the | ||
// time of writing, there is no error body returned. | ||
if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" { | ||
desc = fmt.Sprintf("%s; %s", desc, xErr) | ||
} | ||
|
||
return &APIError{ | ||
StatusCode: resp.StatusCode, | ||
Title: resp.Status, | ||
Description: desc, | ||
} | ||
} | ||
|
||
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { | ||
var err error | ||
if c.ContentEncoding == "gzip" { | ||
body, err = compressWithGzip(body) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
req, err := http.NewRequest("POST", c.WriteURL, body) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
req.Header.Set("Content-Type", "text/plain; charset=utf-8") | ||
c.addHeaders(req) | ||
|
||
if c.ContentEncoding == "gzip" { | ||
req.Header.Set("Content-Encoding", "gzip") | ||
} | ||
|
||
return req, nil | ||
} | ||
|
||
func (c *httpClient) addHeaders(req *http.Request) { | ||
for header, value := range c.Headers { | ||
req.Header.Set(header, value) | ||
} | ||
} | ||
|
||
func compressWithGzip(data io.Reader) (io.Reader, error) { | ||
pipeReader, pipeWriter := io.Pipe() | ||
gzipWriter := gzip.NewWriter(pipeWriter) | ||
var err error | ||
|
||
go func() { | ||
_, err = io.Copy(gzipWriter, data) | ||
gzipWriter.Close() | ||
pipeWriter.Close() | ||
}() | ||
|
||
return pipeReader, err | ||
} | ||
|
||
func makeWriteURL(loc url.URL, org, bucket string) (string, error) { | ||
params := url.Values{} | ||
params.Set("bucket", bucket) | ||
params.Set("org", org) | ||
|
||
switch loc.Scheme { | ||
case "unix": | ||
loc.Scheme = "http" | ||
loc.Host = "127.0.0.1" | ||
loc.Path = "v2/write" | ||
case "http", "https": | ||
loc.Path = path.Join(loc.Path, "v2/write") | ||
default: | ||
return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) | ||
} | ||
loc.RawQuery = params.Encode() | ||
return loc.String(), nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's change the default to gzip.