From 8cff4a645f6c26e58df7a9774d7006679d1d18bb Mon Sep 17 00:00:00 2001 From: Alexander Olekhnovich Date: Tue, 29 Sep 2020 16:02:41 +0200 Subject: [PATCH] feat: add socks5 proxy support for kafka output plugin Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com> --- go.mod | 1 + go.sum | 2 + plugins/common/proxy/socks5.go | 22 +++++++++ plugins/common/proxy/socks5_test.go | 70 +++++++++++++++++++++++++++++ plugins/outputs/kafka/README.md | 6 +++ plugins/outputs/kafka/kafka.go | 19 ++++++++ 6 files changed, 120 insertions(+) create mode 100644 plugins/common/proxy/socks5.go create mode 100644 plugins/common/proxy/socks5_test.go diff --git a/go.mod b/go.mod index 721bb5eacb7f1..bfbf85086257e 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/antchfx/xpath v1.2.0 github.com/apache/thrift v0.15.0 github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740 + github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/aws/aws-sdk-go-v2 v1.13.0 github.com/aws/aws-sdk-go-v2/config v1.8.3 github.com/aws/aws-sdk-go-v2/credentials v1.4.3 diff --git a/go.sum b/go.sum index c399cf4d855ec..52f5e1c181794 100644 --- a/go.sum +++ b/go.sum @@ -287,6 +287,8 @@ github.com/armon/go-metrics v0.3.3 h1:a9F4rlj7EWWrbj7BYw8J8+x+ZZkJeqzNyRk8hdPF+r github.com/armon/go-metrics v0.3.3/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= diff --git a/plugins/common/proxy/socks5.go b/plugins/common/proxy/socks5.go new file mode 100644 index 0000000000000..e69dd5f3294d1 --- /dev/null +++ b/plugins/common/proxy/socks5.go @@ -0,0 +1,22 @@ +package proxy + +import ( + "golang.org/x/net/proxy" +) + +type Socks5ProxyConfig struct { + Socks5ProxyEnabled bool `toml:"socks5_enabled"` + Socks5ProxyAddress string `toml:"socks5_address"` + Socks5ProxyUsername string `toml:"socks5_username"` + Socks5ProxyPassword string `toml:"socks5_password"` +} + +func (c *Socks5ProxyConfig) GetDialer() (proxy.Dialer, error) { + var auth *proxy.Auth + if c.Socks5ProxyPassword != "" || c.Socks5ProxyUsername != "" { + auth = new(proxy.Auth) + auth.User = c.Socks5ProxyUsername + auth.Password = c.Socks5ProxyPassword + } + return proxy.SOCKS5("tcp", c.Socks5ProxyAddress, auth, proxy.Direct) +} diff --git a/plugins/common/proxy/socks5_test.go b/plugins/common/proxy/socks5_test.go new file mode 100644 index 0000000000000..a82ebf1098890 --- /dev/null +++ b/plugins/common/proxy/socks5_test.go @@ -0,0 +1,70 @@ +package proxy + +import ( + "net" + "testing" + "time" + + "github.com/armon/go-socks5" + "github.com/stretchr/testify/require" +) + +func TestSocks5ProxyConfig(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + const ( + proxyAddress = "0.0.0.0:12345" + proxyUsername = "user" + proxyPassword = "password" + ) + + l, err := net.Listen("tcp", "0.0.0.0:0") + require.NoError(t, err) + + server, err := socks5.New(&socks5.Config{ + AuthMethods: []socks5.Authenticator{socks5.UserPassAuthenticator{ + Credentials: socks5.StaticCredentials{ + proxyUsername: proxyPassword, + }, + }}, + }) + require.NoError(t, err) + + go func() { require.NoError(t, server.ListenAndServe("tcp", proxyAddress)) }() + + conf := Socks5ProxyConfig{ + Socks5ProxyEnabled: true, + Socks5ProxyAddress: proxyAddress, + Socks5ProxyUsername: proxyUsername, + Socks5ProxyPassword: proxyPassword, + } + dialer, err := conf.GetDialer() + require.NoError(t, err) + + var proxyConn net.Conn + for i := 0; i < 10; i++ { + proxyConn, err = dialer.Dial("tcp", l.Addr().String()) + if err == nil { + break + } + time.Sleep(10 * time.Millisecond) + } + require.NotNil(t, proxyConn) + defer func() { require.NoError(t, proxyConn.Close()) }() + + serverConn, err := l.Accept() + require.NoError(t, err) + defer func() { require.NoError(t, serverConn.Close()) }() + + writePayload := []byte("test") + _, err = proxyConn.Write(writePayload) + require.NoError(t, err) + + receivePayload := make([]byte, 4) + _, err = serverConn.Read(receivePayload) + require.NoError(t, err) + + require.Equal(t, writePayload, receivePayload) +} diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 5f3c2f5eac381..6186eeebd43cf 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -113,6 +113,12 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Optional SOCKS5 proxy to use when connecting to brokers + # socks5_enabled = true + # socks5_address = "127.0.0.1:1080" + # socks5_username = "alice" + # socks5_password = "pass123" + ## Optional SASL Config # sasl_username = "kafka" # sasl_password = "secret" diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 90fd7259e107e..118af9868748c 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/kafka" + "github.com/influxdata/telegraf/plugins/common/proxy" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" ) @@ -32,6 +33,8 @@ type Kafka struct { RoutingTag string `toml:"routing_tag"` RoutingKey string `toml:"routing_key"` + proxy.Socks5ProxyConfig + // Legacy TLS config options // TLS client certificate Certificate string @@ -189,6 +192,12 @@ var sampleConfig = ` ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Optional SOCKS5 proxy to use when connecting to brokers + # socks5_enabled = true + # socks5_address = "127.0.0.1:1080" + # socks5_username = "alice" + # socks5_password = "pass123" + ## Optional SASL Config # sasl_username = "kafka" # sasl_password = "secret" @@ -292,6 +301,16 @@ func (k *Kafka) Init() error { k.TLSKey = k.Key } + if k.Socks5ProxyEnabled { + config.Net.Proxy.Enable = true + + dialer, err := k.Socks5ProxyConfig.GetDialer() + if err != nil { + return fmt.Errorf("connecting to proxy server failed: %s", err) + } + config.Net.Proxy.Dialer = dialer + } + return nil }