Skip to content

Commit

Permalink
Add configuration to support broker SSL
Browse files Browse the repository at this point in the history
Possibly implements #154, if my assumptions about the implementation are
correct.
  • Loading branch information
eapache committed Mar 6, 2015
1 parent 54af88d commit 72c818b
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"crypto/tls"
"fmt"
"io"
"net"
Expand All @@ -12,7 +13,9 @@ import (

// BrokerConfig is used to pass multiple configuration options to Broker.Open.
type BrokerConfig struct {
MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5).
MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5).
UseTLS bool // Whether or not to use TLS when connecting to the broker (defaults to false).
TLSConfig *tls.Config // The TLS configuration to use for secure connections if specified by UseTLS (defaults to nil).

// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
Expand Down Expand Up @@ -110,7 +113,15 @@ func (b *Broker) Open(conf *BrokerConfig) error {
go withRecover(func() {
defer b.lock.Unlock()

b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout)
dialer := &net.Dialer{
Timeout: conf.DialTimeout,
}

if conf.UseTLS {
b.conn, b.connErr = tls.DialWithDialer(dialer, "tcp", b.addr, conf.TLSConfig)
} else {
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
}
if b.connErr != nil {
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
Expand Down

0 comments on commit 72c818b

Please sign in to comment.