forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add influx v2 output plugin (influxdata#4645)
- Loading branch information
1 parent
b60f6a0
commit 1af5165
Showing
7 changed files
with
756 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# InfluxDB Output Plugin | ||
|
||
This InfluxDB output plugin writes metrics to the [InfluxDB 2.0](https://github.com/influxdata/platform) HTTP service. | ||
|
||
### Configuration: | ||
|
||
```toml | ||
# Configuration for sending metrics to InfluxDB 2.0 | ||
[[outputs.influxdb_v2]] | ||
## The URLs of the InfluxDB cluster nodes. | ||
## | ||
## Multiple URLs can be specified for a single cluster, only ONE of the | ||
## urls will be written to each interval. | ||
urls = ["http://127.0.0.1:9999"] | ||
|
||
## Token for authentication. | ||
token = "" | ||
|
||
## Organization is the name of the organization you wish to write to. | ||
organization = "" | ||
|
||
## Bucket to the name fo the bucketwrite into; must exist. | ||
bucket = "" | ||
|
||
## Timeout for HTTP messages. | ||
# timeout = "5s" | ||
|
||
## Additional HTTP headers | ||
# http_headers = {"X-Special-Header" = "Special-Value"} | ||
|
||
## HTTP Proxy override, if unset values the standard proxy environment | ||
## variables are consulted to determine which proxy, if any, should be used. | ||
# http_proxy = "http://corporate.proxy:3128" | ||
|
||
## HTTP User-Agent | ||
# user_agent = "telegraf" | ||
|
||
## Content-Encoding for write request body, can be set to "gzip" to | ||
## compress body or "identity" to apply no encoding. | ||
# content_encoding = "gzip" | ||
|
||
## Enable or disable uint support for writing uints influxdb 2.0. | ||
# influx_uint_support = false | ||
|
||
## Optional TLS Config for use on HTTP connections. | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
# insecure_skip_verify = false | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,292 @@ | ||
package influxdb_v2 | ||
|
||
import ( | ||
"compress/gzip" | ||
"context" | ||
"crypto/tls" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"log" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
"path" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/plugins/serializers/influx" | ||
) | ||
|
||
type APIErrorType int | ||
|
||
type APIError struct { | ||
StatusCode int | ||
Title string | ||
Description string | ||
Type APIErrorType | ||
} | ||
|
||
func (e APIError) Error() string { | ||
if e.Description != "" { | ||
return fmt.Sprintf("%s: %s", e.Title, e.Description) | ||
} | ||
return e.Title | ||
} | ||
|
||
const ( | ||
defaultRequestTimeout = time.Second * 5 | ||
defaultMaxWait = 10 // seconds | ||
defaultDatabase = "telegraf" | ||
defaultUserAgent = "telegraf" | ||
) | ||
|
||
type HTTPConfig struct { | ||
URL *url.URL | ||
Token string | ||
Organization string | ||
Bucket string | ||
Timeout time.Duration | ||
Headers map[string]string | ||
Proxy *url.URL | ||
UserAgent string | ||
ContentEncoding string | ||
TLSConfig *tls.Config | ||
|
||
Serializer *influx.Serializer | ||
} | ||
|
||
type httpClient struct { | ||
WriteURL string | ||
ContentEncoding string | ||
Timeout time.Duration | ||
Headers map[string]string | ||
|
||
client *http.Client | ||
serializer *influx.Serializer | ||
url *url.URL | ||
retryTime time.Time | ||
} | ||
|
||
func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { | ||
if config.URL == nil { | ||
return nil, ErrMissingURL | ||
} | ||
|
||
timeout := config.Timeout | ||
if timeout == 0 { | ||
timeout = defaultRequestTimeout | ||
} | ||
|
||
userAgent := config.UserAgent | ||
if userAgent == "" { | ||
userAgent = defaultUserAgent | ||
} | ||
|
||
var headers = make(map[string]string, len(config.Headers)+2) | ||
headers["User-Agent"] = userAgent | ||
headers["Authorization"] = "Token " + config.Token | ||
for k, v := range config.Headers { | ||
headers[k] = v | ||
} | ||
|
||
var proxy func(*http.Request) (*url.URL, error) | ||
if config.Proxy != nil { | ||
proxy = http.ProxyURL(config.Proxy) | ||
} else { | ||
proxy = http.ProxyFromEnvironment | ||
} | ||
|
||
serializer := config.Serializer | ||
if serializer == nil { | ||
serializer = influx.NewSerializer() | ||
} | ||
|
||
writeURL, err := makeWriteURL( | ||
*config.URL, | ||
config.Organization, | ||
config.Bucket) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var transport *http.Transport | ||
switch config.URL.Scheme { | ||
case "http", "https": | ||
transport = &http.Transport{ | ||
Proxy: proxy, | ||
TLSClientConfig: config.TLSConfig, | ||
} | ||
case "unix": | ||
transport = &http.Transport{ | ||
Dial: func(_, _ string) (net.Conn, error) { | ||
return net.DialTimeout( | ||
config.URL.Scheme, | ||
config.URL.Path, | ||
timeout, | ||
) | ||
}, | ||
} | ||
default: | ||
return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme) | ||
} | ||
|
||
client := &httpClient{ | ||
serializer: serializer, | ||
client: &http.Client{ | ||
Timeout: timeout, | ||
Transport: transport, | ||
}, | ||
url: config.URL, | ||
WriteURL: writeURL, | ||
ContentEncoding: config.ContentEncoding, | ||
Timeout: timeout, | ||
Headers: headers, | ||
} | ||
return client, nil | ||
} | ||
|
||
// URL returns the origin URL that this client connects too. | ||
func (c *httpClient) URL() string { | ||
return c.url.String() | ||
} | ||
|
||
type genericRespError struct { | ||
Code string | ||
Message string | ||
Line *int32 | ||
MaxLength *int32 | ||
} | ||
|
||
func (g genericRespError) Error() string { | ||
errString := fmt.Sprintf("%s: %s", g.Code, g.Message) | ||
if g.Line != nil { | ||
return fmt.Sprintf("%s - line[%d]", errString, g.Line) | ||
} else if g.MaxLength != nil { | ||
return fmt.Sprintf("%s - maxlen[%d]", errString, g.MaxLength) | ||
} | ||
return errString | ||
} | ||
|
||
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { | ||
if c.retryTime.After(time.Now()) { | ||
return errors.New("Retry time has not elapsed") | ||
} | ||
reader := influx.NewReader(metrics, c.serializer) | ||
req, err := c.makeWriteRequest(reader) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
resp, err := c.client.Do(req.WithContext(ctx)) | ||
if err != nil { | ||
return err | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode == http.StatusNoContent { | ||
return nil | ||
} | ||
|
||
writeResp := &genericRespError{} | ||
err = json.NewDecoder(resp.Body).Decode(writeResp) | ||
desc := writeResp.Error() | ||
if err != nil { | ||
desc = err.Error() | ||
} | ||
|
||
switch resp.StatusCode { | ||
case http.StatusBadRequest, http.StatusUnauthorized, | ||
http.StatusForbidden, http.StatusRequestEntityTooLarge: | ||
log.Printf("E! [outputs.influxdb_v2] Failed to write metric: %s\n", desc) | ||
return nil | ||
case http.StatusTooManyRequests, http.StatusServiceUnavailable: | ||
retryAfter := resp.Header.Get("Retry-After") | ||
retry, err := strconv.Atoi(retryAfter) | ||
if err != nil { | ||
retry = 0 | ||
} | ||
if retry > defaultMaxWait { | ||
retry = defaultMaxWait | ||
} | ||
c.retryTime = time.Now().Add(time.Duration(retry) * time.Second) | ||
return fmt.Errorf("Waiting %ds for server before sending metric again", retry) | ||
} | ||
|
||
// This is only until platform spec is fully implemented. As of the | ||
// time of writing, there is no error body returned. | ||
if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" { | ||
desc = fmt.Sprintf("%s; %s", desc, xErr) | ||
} | ||
|
||
return &APIError{ | ||
StatusCode: resp.StatusCode, | ||
Title: resp.Status, | ||
Description: desc, | ||
} | ||
} | ||
|
||
func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { | ||
var err error | ||
if c.ContentEncoding == "gzip" { | ||
body, err = compressWithGzip(body) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
req, err := http.NewRequest("POST", c.WriteURL, body) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
req.Header.Set("Content-Type", "text/plain; charset=utf-8") | ||
c.addHeaders(req) | ||
|
||
if c.ContentEncoding == "gzip" { | ||
req.Header.Set("Content-Encoding", "gzip") | ||
} | ||
|
||
return req, nil | ||
} | ||
|
||
func (c *httpClient) addHeaders(req *http.Request) { | ||
for header, value := range c.Headers { | ||
req.Header.Set(header, value) | ||
} | ||
} | ||
|
||
func compressWithGzip(data io.Reader) (io.Reader, error) { | ||
pipeReader, pipeWriter := io.Pipe() | ||
gzipWriter := gzip.NewWriter(pipeWriter) | ||
var err error | ||
|
||
go func() { | ||
_, err = io.Copy(gzipWriter, data) | ||
gzipWriter.Close() | ||
pipeWriter.Close() | ||
}() | ||
|
||
return pipeReader, err | ||
} | ||
|
||
func makeWriteURL(loc url.URL, org, bucket string) (string, error) { | ||
params := url.Values{} | ||
params.Set("bucket", bucket) | ||
params.Set("org", org) | ||
|
||
switch loc.Scheme { | ||
case "unix": | ||
loc.Scheme = "http" | ||
loc.Host = "127.0.0.1" | ||
loc.Path = "v2/write" | ||
case "http", "https": | ||
loc.Path = path.Join(loc.Path, "v2/write") | ||
default: | ||
return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) | ||
} | ||
loc.RawQuery = params.Encode() | ||
return loc.String(), nil | ||
} |
Oops, something went wrong.