Skip to content

Commit

Permalink
Enhance mock broker
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Jul 22, 2015
1 parent 2b0d726 commit da6bc48
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 73 deletions.
25 changes: 16 additions & 9 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
closeProducer(t, producer)
}

// If a Kafka broker becomes unavailable and then returns back in service, then
// producer reconnects to it and continues sending messages.
func TestAsyncProducerBrokerBounce(t *testing.T) {
// Given
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
leaderAddr := leader.Addr()
Expand All @@ -295,30 +298,34 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Flush.Messages = 1
config.Producer.Return.Successes = true
config.Producer.Retry.Backoff = 0
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
leader.Returns(prodSuccess)
expectResults(t, producer, 1, 0)

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
leader.Close() // producer should get EOF
leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
// Then: a produced message goes through the new broker connection.
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
leader.Returns(prodSuccess)
expectResults(t, producer, 10, 0)
seedBroker.Close()
leader.Close()
expectResults(t, producer, 1, 0)

closeProducer(t, producer)
seedBroker.Close()
leader.Close()
}

func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
Expand Down
10 changes: 3 additions & 7 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,13 +447,13 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
}

//redirect partition 1 back to main leader
fetchResponse := new(FetchResponse)
fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
tmp.Returns(fetchResponse)
metadataResponse = new(MetadataResponse)
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)
fetchResponse := new(FetchResponse)
fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
tmp.Returns(fetchResponse)
time.Sleep(5 * time.Millisecond)

// now send one message to each partition to make sure everything is primed
Expand Down Expand Up @@ -493,10 +493,6 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
case <-c0.Errors():
case <-c1.Errors():
}
// send it back to the same broker
seedBroker.Returns(metadataResponse)

time.Sleep(5 * time.Millisecond)

select {
case <-c0.Messages():
Expand Down
192 changes: 135 additions & 57 deletions mockbroker_test.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,66 @@
package sarama

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"strconv"
"sync"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
)

const (
expectationTimeout = 250 * time.Millisecond
)

// mockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
// accepts a single connection. It reads Kafka requests from that connection and returns each response
// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
// the server sleeps for 250ms instead of reading a request).
type requestHandlerFunc func(req *request) (res encoder)

// mockBroker is a mock Kafka broker. It consists of a TCP server on a
// kernel-selected localhost port that can accept many connections. It reads
// Kafka requests from that connection and passes them to the user specified
// handler function (see SetHandler) that generates respective responses. If
// the handler has not been explicitly specified then the broker returns
// responses set by the Returns function in the exact order they were provided.
// (if a response has a len of 0, nothing is sent, and the client request will
// timeout in this case).
//
// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
// waiting for a response, the test panics.
// When running tests with one of these, it is strongly recommended to specify
// a timeout to `go test` so that if the broker hangs waiting for a response,
// the test panics.
//
// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
// automatically as a convenience.
// It is not necessary to prefix message length or correlation ID to your
// response bytes, the server does that automatically as a convenience.
type mockBroker struct {
brokerID int32
port int32
stopper chan bool
closing chan none
stopper chan none
expectations chan encoder
listener net.Listener
t *testing.T
latency time.Duration
handler requestHandlerFunc
handlerMux sync.Mutex
}

func (b *mockBroker) SetLatency(latency time.Duration) {
b.latency = latency
}

// SetHandler sets the specified function as the request handler. Whenever
// a mock broker reads a request from the wire it passes the request to the
// function and sends back whatever the handler function returns.
func (b *mockBroker) SetHandler(handler requestHandlerFunc) {
b.handlerMux.Lock()
b.handler = handler
b.handlerMux.Unlock()
}

func (b *mockBroker) BrokerID() int32 {
return b.brokerID
}
Expand All @@ -47,80 +74,129 @@ func (b *mockBroker) Addr() string {
}

func (b *mockBroker) Close() {
close(b.expectations)
if len(b.expectations) > 0 {
b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))
for e := range b.expectations {
_, _ = buf.WriteString(spew.Sdump(e))
}
b.t.Error(buf.String())
}
close(b.expectations)
close(b.closing)
<-b.stopper
}

func (b *mockBroker) serverLoop() (ok bool) {
var (
err error
conn net.Conn
)

func (b *mockBroker) serverLoop() {
defer close(b.stopper)
if conn, err = b.listener.Accept(); err != nil {
return b.serverError(err, conn)
var err error
var conn net.Conn

go func() {
<-b.closing
safeClose(b.t, b.listener)
}()

wg := &sync.WaitGroup{}
i := 0
for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {
wg.Add(1)
go b.handleRequests(conn, i, wg)
i++
}
reqHeader := make([]byte, 4)
wg.Wait()
Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
}

func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
_ = conn.Close()
}()
Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
var err error

abort := make(chan none)
defer close(abort)
go func() {
select {
case <-b.closing:
_ = conn.Close()
case <-abort:
}
}()

resHeader := make([]byte, 8)
for expectation := range b.expectations {
_, err = io.ReadFull(conn, reqHeader)
for {
req, err := decodeRequest(conn)
if err != nil {
return b.serverError(err, conn)
}
body := make([]byte, binary.BigEndian.Uint32(reqHeader))
if len(body) < 10 {
return b.serverError(errors.New("Kafka request too short."), conn)
}
if _, err = io.ReadFull(conn, body); err != nil {
return b.serverError(err, conn)
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
b.serverError(err)
break
}

if b.latency > 0 {
time.Sleep(b.latency)
}

response, err := encode(expectation)
res := b.requestHandler()(req)
Logger.Printf("*** mockbroker/%d/%d: served %+v -> %+v", b.brokerID, idx, req, res)

encodedRes, err := encode(res)
if err != nil {
return false
b.serverError(err)
break
}
if len(response) == 0 {
if len(encodedRes) == 0 {
continue
}

binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
if _, err = conn.Write(resHeader); err != nil {
return b.serverError(err, conn)
b.serverError(err)
break
}
if _, err = conn.Write(response); err != nil {
return b.serverError(err, conn)
if _, err = conn.Write(encodedRes); err != nil {
b.serverError(err)
break
}
}
if err = conn.Close(); err != nil {
return b.serverError(err, nil)
}
if err = b.listener.Close(); err != nil {
b.t.Error(err)
return false
}
return true
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
}

func (b *mockBroker) serverError(err error, conn net.Conn) bool {
b.t.Error(err)
if conn != nil {
if err := conn.Close(); err != nil {
b.t.Error(err)
func (b *mockBroker) requestHandler() requestHandlerFunc {
b.handlerMux.Lock()
defer b.handlerMux.Unlock()
return b.handler
}

func (b *mockBroker) defaultRequestHandler(req *request) (res encoder) {
select {
case res, ok := <-b.expectations:
if !ok {
return nil
}
return res
case <-time.After(expectationTimeout):
return nil
}
if err := b.listener.Close(); err != nil {
b.t.Error(err)
}

func (b *mockBroker) serverError(err error) {
isConnectionClosedError := false
if _, ok := err.(*net.OpError); ok {
isConnectionClosedError = true
} else if err == io.EOF {
isConnectionClosedError = true
} else if err.Error() == "use of closed network connection" {
isConnectionClosedError = true
}
return false

if isConnectionClosedError {
return
}

b.t.Errorf(err.Error())
}

// newMockBroker launches a fake Kafka broker. It takes a *testing.T as provided by the
Expand All @@ -136,17 +212,19 @@ func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker {
var err error

broker := &mockBroker{
stopper: make(chan bool),
closing: make(chan none),
stopper: make(chan none),
t: t,
brokerID: brokerID,
expectations: make(chan encoder, 512),
}
broker.handler = broker.defaultRequestHandler

broker.listener, err = net.Listen("tcp", addr)
if err != nil {
t.Fatal(err)
}
Logger.Printf("mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit da6bc48

Please sign in to comment.