From 72c818b2ca5ab4fd724375d9557774530ed442fa Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Fri, 10 Oct 2014 09:38:50 -0400 Subject: [PATCH] Add configuration to support broker SSL Possibly implements #154, if my assumptions about the implementation are correct. --- broker.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/broker.go b/broker.go index 7fb34e3b97..3a21f4ba4f 100644 --- a/broker.go +++ b/broker.go @@ -1,6 +1,7 @@ package sarama import ( + "crypto/tls" "fmt" "io" "net" @@ -12,7 +13,9 @@ import ( // BrokerConfig is used to pass multiple configuration options to Broker.Open. type BrokerConfig struct { - MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5). + MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5). + UseTLS bool // Whether or not to use TLS when connecting to the broker (defaults to false). + TLSConfig *tls.Config // The TLS configuration to use for secure connections if specified by UseTLS (defaults to nil). // All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka. DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s). @@ -110,7 +113,15 @@ func (b *Broker) Open(conf *BrokerConfig) error { go withRecover(func() { defer b.lock.Unlock() - b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout) + dialer := &net.Dialer{ + Timeout: conf.DialTimeout, + } + + if conf.UseTLS { + b.conn, b.connErr = tls.DialWithDialer(dialer, "tcp", b.addr, conf.TLSConfig) + } else { + b.conn, b.connErr = dialer.Dial("tcp", b.addr) + } if b.connErr != nil { b.conn = nil atomic.StoreInt32(&b.opened, 0)