Skip to content

Commit

Permalink
Add support for connecting to InfluxDB over a unix domain socket (#3942)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Mar 28, 2018
1 parent bc91238 commit 937c736
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 8 deletions.
47 changes: 40 additions & 7 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"path"
Expand Down Expand Up @@ -164,14 +165,32 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
config.Consistency)
queryURL := makeQueryURL(config.URL)

var transport *http.Transport
switch config.URL.Scheme {
case "http", "https":
transport = &http.Transport{
Proxy: proxy,
TLSClientConfig: config.TLSConfig,
}
case "unix":
transport = &http.Transport{
Dial: func(_, _ string) (net.Conn, error) {
return net.DialTimeout(
config.URL.Scheme,
config.URL.Path,
defaultRequestTimeout,
)
},
}
default:
return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme)
}

client := &httpClient{
serializer: serializer,
client: &http.Client{
Timeout: timeout,
Transport: &http.Transport{
Proxy: proxy,
TLSClientConfig: config.TLSConfig,
},
Timeout: timeout,
Transport: transport,
},
database: database,
url: config.URL,
Expand Down Expand Up @@ -392,13 +411,27 @@ func makeWriteURL(loc *url.URL, db, rp, consistency string) string {
}

u := *loc
u.Path = path.Join(u.Path, "write")
switch u.Scheme {
case "unix":
u.Scheme = "http"
u.Host = "127.0.0.1"
u.Path = "/write"
case "http":
u.Path = path.Join(u.Path, "write")
}
u.RawQuery = params.Encode()
return u.String()
}

func makeQueryURL(loc *url.URL) string {
u := *loc
u.Path = path.Join(u.Path, "query")
switch u.Scheme {
case "unix":
u.Scheme = "http"
u.Host = "127.0.0.1"
u.Path = "/query"
case "http":
u.Path = path.Join(u.Path, "query")
}
return u.String()
}
80 changes: 80 additions & 0 deletions plugins/outputs/influxdb/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -556,3 +559,80 @@ func TestHTTP_WriteContentEncodingGzip(t *testing.T) {
err = client.Write(ctx, metrics)
require.NoError(t, err)
}

func TestHTTP_UnixSocket(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf-test")
if err != nil {
require.NoError(t, err)
}
defer os.RemoveAll(tmpdir)

sock := path.Join(tmpdir, "test.sock")
listener, err := net.Listen("unix", sock)
require.NoError(t, err)

ts := httptest.NewUnstartedServer(http.NotFoundHandler())
ts.Listener = listener
ts.Start()
defer ts.Close()

x, _ := url.Parse("unix://" + sock)
fmt.Println(x)

successResponse := []byte(`{"results": [{"statement_id": 0}]}`)

tests := []struct {
name string
config *influxdb.HTTPConfig
database string
queryHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
writeHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
errFunc func(t *testing.T, err error)
}{
{
name: "success",
config: &influxdb.HTTPConfig{
URL: &url.URL{Scheme: "unix", Path: sock},
Database: "xyzzy",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, `CREATE DATABASE "xyzzy"`, r.FormValue("q"))
w.WriteHeader(http.StatusOK)
w.Write(successResponse)
},
writeHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
w.Write(successResponse)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
tt.queryHandlerFunc(t, w, r)
return
case "/write":
tt.queryHandlerFunc(t, w, r)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
})

ctx := context.Background()

client, err := influxdb.NewHTTPClient(tt.config)
require.NoError(t, err)
err = client.CreateDatabase(ctx)
if tt.errFunc != nil {
tt.errFunc(t, err)
} else {
require.NoError(t, err)
}
})
}
}
3 changes: 2 additions & 1 deletion plugins/outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var sampleConfig = `
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
# urls = ["unix:///var/run/influxdb.sock"]
# urls = ["udp://127.0.0.1:8089"]
# urls = ["http://127.0.0.1:8086"]
Expand Down Expand Up @@ -157,7 +158,7 @@ func (i *InfluxDB) Connect() error {
}

i.clients = append(i.clients, c)
case "http", "https":
case "http", "https", "unix":
c, err := i.httpClient(ctx, u, proxy)
if err != nil {
return err
Expand Down

0 comments on commit 937c736

Please sign in to comment.