Skip to content

Commit

Permalink
adding tcp keepalives to the broker's socket connection
Browse files Browse the repository at this point in the history
  • Loading branch information
eric committed Apr 3, 2015
1 parent a6c0681 commit 76aa551
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 2 deletions.
2 changes: 1 addition & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (b *Broker) Open(conf *Config) error {
go withRecover(func() {
defer b.lock.Unlock()

b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.Net.DialTimeout)
b.conn, b.connErr = DialWithTiming("tcp", b.addr, conf.Net.DialTimeout, conf.Net.KeepAlive)
if b.connErr != nil {
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
Expand Down
2 changes: 2 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"sync"
"testing"
"time"
)

func safeClose(t *testing.T, c io.Closer) {
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Net.KeepAlive = 12 * time.Millisecond
config.Metadata.Retry.Max = 0
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type Config struct {
DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).

// KeepAlive specifies the keep-alive period for an active
// network connection.
// If zero, keep-alives are not enabled.
KeepAlive time.Duration
}

// Metadata is the namespace for metadata management properties used by the Client, and shared by the Producer/Consumer.
Expand Down Expand Up @@ -126,6 +131,7 @@ func NewConfig() *Config {
c.Net.DialTimeout = 30 * time.Second
c.Net.ReadTimeout = 30 * time.Second
c.Net.WriteTimeout = 30 * time.Second
c.Net.KeepAlive = 0 * time.Second

c.Metadata.Retry.Max = 3
c.Metadata.Retry.Backoff = 250 * time.Millisecond
Expand Down
17 changes: 16 additions & 1 deletion utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package sarama

import "sort"
import (
"net"
"sort"
"time"
)

type none struct{}

Expand Down Expand Up @@ -87,3 +91,14 @@ func (b ByteEncoder) Encode() ([]byte, error) {
func (b ByteEncoder) Length() int {
return len(b)
}

//DialWithTiming is exactly like net.DialTimeout from the net package, but adds support for tcp keepalives
// Some cloud providers, like google compute engine, kill idle connections after a few mins. Adding the keepalive
// keeps the connection open.
func DialWithTiming(network, address string, timeout time.Duration, keepalive time.Duration) (net.Conn, error) {
d := net.Dialer{
Timeout: timeout,
KeepAlive: keepalive,
}
return d.Dial(network, address)
}

0 comments on commit 76aa551

Please sign in to comment.