diff --git a/go.mod b/go.mod index 0f7e99309b3b2..d4e65701ff044 100644 --- a/go.mod +++ b/go.mod @@ -186,6 +186,7 @@ require ( github.com/apache/arrow/go/arrow v0.0.0-20211006091945-a69884db78f4 // indirect github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 // indirect github.com/armon/go-metrics v0.3.3 // indirect + github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 // indirect github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.2.0 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.5.3 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.3 // indirect diff --git a/go.sum b/go.sum index 9590ddd2a2529..b617c66dae94c 100644 --- a/go.sum +++ b/go.sum @@ -287,6 +287,8 @@ github.com/armon/go-metrics v0.3.3 h1:a9F4rlj7EWWrbj7BYw8J8+x+ZZkJeqzNyRk8hdPF+r github.com/armon/go-metrics v0.3.3/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= diff --git a/plugins/common/proxy/socks5.go b/plugins/common/proxy/socks5.go new file mode 100644 index 0000000000000..07ca8647f6e57 --- /dev/null +++ b/plugins/common/proxy/socks5.go @@ -0,0 +1,22 @@ +package proxy + +import ( + "golang.org/x/net/proxy" +) + +type Socks5ProxyConfig struct { + Socks5ProxyEnable *bool `toml:"socks5_enable"` + Socks5ProxyAddress string `toml:"socks5_address"` + Socks5ProxyUsername string `toml:"socks5_username"` + Socks5ProxyPassword string `toml:"socks5_password"` +} + +func (c *Socks5ProxyConfig) GetDialer() (proxy.Dialer, error) { + var auth *proxy.Auth + if c.Socks5ProxyPassword != "" || c.Socks5ProxyUsername != "" { + auth = new(proxy.Auth) + auth.User = c.Socks5ProxyUsername + auth.Password = c.Socks5ProxyPassword + } + return proxy.SOCKS5("tcp", c.Socks5ProxyAddress, auth, proxy.Direct) +} diff --git a/plugins/common/proxy/socks5_test.go b/plugins/common/proxy/socks5_test.go new file mode 100644 index 0000000000000..29573cffd3e59 --- /dev/null +++ b/plugins/common/proxy/socks5_test.go @@ -0,0 +1,68 @@ +package proxy + +import ( + "net" + "testing" + "time" + + "github.com/armon/go-socks5" + "github.com/stretchr/testify/require" +) + +func TestSocks5ProxyConfig(t *testing.T) { + const ( + proxyAddress = "0.0.0.0:12345" + proxyUsername = "user" + proxyPassword = "password" + ) + + l, err := net.Listen("tcp", "0.0.0.0:0") + require.NoError(t, err) + + server, err := socks5.New(&socks5.Config{ + AuthMethods: []socks5.Authenticator{socks5.UserPassAuthenticator{ + Credentials: socks5.StaticCredentials{ + proxyUsername: proxyPassword, + }, + }}, + }) + require.NoError(t, err) + + go func() { require.NoError(t, server.ListenAndServe("tcp", proxyAddress)) }() + + enabled := true + conf := Socks5ProxyConfig{ + Socks5ProxyEnable: &enabled, + Socks5ProxyAddress: proxyAddress, + Socks5ProxyUsername: proxyUsername, + Socks5ProxyPassword: proxyPassword, + } + dialer, err := conf.GetDialer() + require.NoError(t, err) + + var proxyConn net.Conn + for i := 0; i < 10; i++ { + proxyConn, err = dialer.Dial("tcp", l.Addr().String()) + if err != nil { + time.Sleep(10 * time.Millisecond) + continue + } + break + } + require.NotNil(t, proxyConn) + defer func() { require.NoError(t, proxyConn.Close()) }() + + serverConn, err := l.Accept() + require.NoError(t, err) + defer func() { require.NoError(t, serverConn.Close()) }() + + writePayload := []byte("test") + _, err = proxyConn.Write(writePayload) + require.NoError(t, err) + + receivePayload := make([]byte, 4) + _, err = serverConn.Read(receivePayload) + require.NoError(t, err) + + require.Equal(t, writePayload, receivePayload) +} diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 90fd7259e107e..8950f1c975302 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/kafka" + "github.com/influxdata/telegraf/plugins/common/proxy" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" ) @@ -32,6 +33,8 @@ type Kafka struct { RoutingTag string `toml:"routing_tag"` RoutingKey string `toml:"routing_key"` + proxy.Socks5ProxyConfig + // Legacy TLS config options // TLS client certificate Certificate string @@ -189,6 +192,12 @@ var sampleConfig = ` ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Optional SOCKS5 proxy to use when connecting to brokers + # socks5_enable = true + # socks5_address = "127.0.0.1:1080" + # socks5_username = "alice" + # socks5_password = "pass123" + ## Optional SASL Config # sasl_username = "kafka" # sasl_password = "secret" @@ -292,6 +301,16 @@ func (k *Kafka) Init() error { k.TLSKey = k.Key } + if k.Socks5ProxyEnable != nil && *k.Socks5ProxyEnable { + config.Net.Proxy.Enable = true + + dialer, err := k.Socks5ProxyConfig.GetDialer() + if err != nil { + return fmt.Errorf("Error while connecting to proxy server: %s", err) + } + config.Net.Proxy.Dialer = dialer + } + return nil }