Skip to content

Commit

Permalink
Merge pull request #311 from Shopify/unify-config
Browse files Browse the repository at this point in the history
Unify config
  • Loading branch information
eapache committed Mar 6, 2015
2 parents 54af88d + 0e95e22 commit 20700b5
Show file tree
Hide file tree
Showing 12 changed files with 513 additions and 675 deletions.
96 changes: 27 additions & 69 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,54 +10,12 @@ import (
"time"
)

// BrokerConfig is used to pass multiple configuration options to Broker.Open.
type BrokerConfig struct {
MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5).

// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
}

// NewBrokerConfig returns a new broker configuration with sane defaults.
func NewBrokerConfig() *BrokerConfig {
return &BrokerConfig{
MaxOpenRequests: 5,
DialTimeout: 30 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
}

// Validate checks a BrokerConfig instance. This will return a
// ConfigurationError if the specified values don't make sense.
func (config *BrokerConfig) Validate() error {
if config.MaxOpenRequests <= 0 {
return ConfigurationError("Invalid MaxOpenRequests")
}

if config.DialTimeout <= 0 {
return ConfigurationError("Invalid DialTimeout")
}

if config.ReadTimeout <= 0 {
return ConfigurationError("Invalid ReadTimeout")
}

if config.WriteTimeout <= 0 {
return ConfigurationError("Invalid WriteTimeout")
}

return nil
}

// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
type Broker struct {
id int32
addr string

conf *BrokerConfig
conf *Config
correlationID int32
conn net.Conn
connErr error
Expand All @@ -84,10 +42,10 @@ func NewBroker(addr string) *Broker {
// waiting for the connection to complete. This means that any subsequent operations on the broker will
// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
// AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.
func (b *Broker) Open(conf *BrokerConfig) error {
// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
func (b *Broker) Open(conf *Config) error {
if conf == nil {
conf = NewBrokerConfig()
conf = NewConfig()
}

err := conf.Validate()
Expand All @@ -110,7 +68,7 @@ func (b *Broker) Open(conf *BrokerConfig) error {
go withRecover(func() {
defer b.lock.Unlock()

b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout)
b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.Net.DialTimeout)
if b.connErr != nil {
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
Expand All @@ -120,7 +78,7 @@ func (b *Broker) Open(conf *BrokerConfig) error {

b.conf = conf
b.done = make(chan bool)
b.responses = make(chan responsePromise, b.conf.MaxOpenRequests-1)
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)

Logger.Printf("Connected to broker %s\n", b.addr)
go withRecover(b.responseReceiver)
Expand Down Expand Up @@ -178,10 +136,10 @@ func (b *Broker) Addr() string {
return b.addr
}

func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error) {
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
response := new(MetadataResponse)

err := b.sendAndReceive(clientID, request, response)
err := b.sendAndReceive(request, response)

if err != nil {
return nil, err
Expand All @@ -190,10 +148,10 @@ func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*Metada
return response, nil
}

func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
response := new(ConsumerMetadataResponse)

err := b.sendAndReceive(clientID, request, response)
err := b.sendAndReceive(request, response)

if err != nil {
return nil, err
Expand All @@ -202,10 +160,10 @@ func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataR
return response, nil
}

func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error) {
func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
response := new(OffsetResponse)

err := b.sendAndReceive(clientID, request, response)
err := b.sendAndReceive(request, response)

if err != nil {
return nil, err
Expand All @@ -214,15 +172,15 @@ func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*
return response, nil
}

func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error) {
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
var response *ProduceResponse
var err error

if request.RequiredAcks == NoResponse {
err = b.sendAndReceive(clientID, request, nil)
err = b.sendAndReceive(request, nil)
} else {
response = new(ProduceResponse)
err = b.sendAndReceive(clientID, request, response)
err = b.sendAndReceive(request, response)
}

if err != nil {
Expand All @@ -232,10 +190,10 @@ func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResp
return response, nil
}

func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error) {
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
response := new(FetchResponse)

err := b.sendAndReceive(clientID, request, response)
err := b.sendAndReceive(request, response)

if err != nil {
return nil, err
Expand All @@ -244,10 +202,10 @@ func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse,
return response, nil
}

func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
response := new(OffsetCommitResponse)

err := b.sendAndReceive(clientID, request, response)
err := b.sendAndReceive(request, response)

if err != nil {
return nil, err
Expand All @@ -256,10 +214,10 @@ func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*O
return response, nil
}

func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
response := new(OffsetFetchResponse)

err := b.sendAndReceive(clientID, request, response)
err := b.sendAndReceive(request, response)

if err != nil {
return nil, err
Expand All @@ -268,7 +226,7 @@ func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*Off
return response, nil
}

func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
func (b *Broker) send(req requestEncoder, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()

Expand All @@ -279,13 +237,13 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
return nil, ErrNotConnected
}

fullRequest := request{b.correlationID, clientID, req}
fullRequest := request{b.correlationID, b.conf.ClientID, req}
buf, err := encode(&fullRequest)
if err != nil {
return nil, err
}

err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.WriteTimeout))
err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
return nil, err
}
Expand All @@ -306,8 +264,8 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
return &promise, nil
}

func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res decoder) error {
promise, err := b.send(clientID, req, res != nil)
func (b *Broker) sendAndReceive(req requestEncoder, res decoder) error {
promise, err := b.send(req, res != nil)

if err != nil {
return err
Expand Down Expand Up @@ -372,7 +330,7 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
func (b *Broker) responseReceiver() {
header := make([]byte, 8)
for response := range b.responses {
err := b.conn.SetReadDeadline(time.Now().Add(b.conf.ReadTimeout))
err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
if err != nil {
response.errors <- err
continue
Expand Down
18 changes: 9 additions & 9 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func ExampleBroker() error {
}

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata("myClient", &request)
response, err := broker.GetMetadata(&request)
if err != nil {
_ = broker.Close()
return err
Expand Down Expand Up @@ -80,7 +80,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := MetadataRequest{}
response, err := broker.GetMetadata("clientID", &request)
response, err := broker.GetMetadata(&request)
if err != nil {
t.Error(err)
}
Expand All @@ -92,7 +92,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ConsumerMetadataRequest{}
response, err := broker.GetConsumerMetadata("clientID", &request)
response, err := broker.GetConsumerMetadata(&request)
if err != nil {
t.Error(err)
}
Expand All @@ -105,7 +105,7 @@ var brokerTestTable = []struct {
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
request.RequiredAcks = NoResponse
response, err := broker.Produce("clientID", &request)
response, err := broker.Produce(&request)
if err != nil {
t.Error(err)
}
Expand All @@ -118,7 +118,7 @@ var brokerTestTable = []struct {
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
request.RequiredAcks = WaitForLocal
response, err := broker.Produce("clientID", &request)
response, err := broker.Produce(&request)
if err != nil {
t.Error(err)
}
Expand All @@ -130,7 +130,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := FetchRequest{}
response, err := broker.Fetch("clientID", &request)
response, err := broker.Fetch(&request)
if err != nil {
t.Error(err)
}
Expand All @@ -142,7 +142,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetFetchRequest{}
response, err := broker.FetchOffset("clientID", &request)
response, err := broker.FetchOffset(&request)
if err != nil {
t.Error(err)
}
Expand All @@ -154,7 +154,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetCommitRequest{}
response, err := broker.CommitOffset("clientID", &request)
response, err := broker.CommitOffset(&request)
if err != nil {
t.Error(err)
}
Expand All @@ -166,7 +166,7 @@ var brokerTestTable = []struct {
{[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetRequest{}
response, err := broker.GetAvailableOffsets("clientID", &request)
response, err := broker.GetAvailableOffsets(&request)
if err != nil {
t.Error(err)
}
Expand Down
Loading

0 comments on commit 20700b5

Please sign in to comment.