Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce mockSequence and further simplify tests #569

Merged
merged 1 commit into from
Nov 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 47 additions & 84 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,24 +342,16 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
func TestConsumerShutsDownOutOfRange(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
broker0.SetHandler(func(req *request) (res encoder) {
switch reqBody := req.body.(type) {
case *MetadataRequest:
return newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()).
For(reqBody)
case *OffsetRequest:
return newMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 7).
For(reqBody)
case *FetchRequest:
fetchResponse := new(FetchResponse)
fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
return fetchResponse
}
return nil
fetchResponse := new(FetchResponse)
fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": newMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 7),
"FetchRequest": newMockWrapper(fetchResponse),
})

master, err := NewConsumer([]string{broker0.Addr()}, nil)
Expand Down Expand Up @@ -388,31 +380,21 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
func TestConsumerExtraOffsets(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
called := 0
broker0.SetHandler(func(req *request) (res encoder) {
switch req.body.(type) {
case *MetadataRequest:
return newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body)
case *OffsetRequest:
return newMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body)
case *FetchRequest:
fetchResponse := &FetchResponse{}
called++
if called > 1 {
fetchResponse.AddError("my_topic", 0, ErrNoError)
return fetchResponse
}
fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
return fetchResponse
}
return nil
fetchResponse1 := &FetchResponse{}
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 4)
fetchResponse2 := &FetchResponse{}
fetchResponse2.AddError("my_topic", 0, ErrNoError)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": newMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": newMockSequence(fetchResponse1, fetchResponse2),
})

master, err := NewConsumer([]string{broker0.Addr()}, nil)
Expand Down Expand Up @@ -441,30 +423,20 @@ func TestConsumerExtraOffsets(t *testing.T) {
func TestConsumerNonSequentialOffsets(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
called := 0
broker0.SetHandler(func(req *request) (res encoder) {
switch req.body.(type) {
case *MetadataRequest:
return newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body)
case *OffsetRequest:
return newMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body)
case *FetchRequest:
called++
fetchResponse := &FetchResponse{}
if called > 1 {
fetchResponse.AddError("my_topic", 0, ErrNoError)
return fetchResponse
}
fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
return fetchResponse
}
return nil
fetchResponse1 := &FetchResponse{}
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 11)
fetchResponse2 := &FetchResponse{}
fetchResponse2.AddError("my_topic", 0, ErrNoError)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": newMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": newMockSequence(fetchResponse1, fetchResponse2),
})

master, err := NewConsumer([]string{broker0.Addr()}, nil)
Expand Down Expand Up @@ -585,14 +557,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
})

// leader0 says no longer leader of partition 0
leader0.SetHandler(func(req *request) (res encoder) {
switch req.body.(type) {
case *FetchRequest:
fetchResponse := new(FetchResponse)
fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
return fetchResponse
}
return nil
fetchResponse := new(FetchResponse)
fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
leader0.SetHandlerByMap(map[string]MockResponse{
"FetchRequest": newMockWrapper(fetchResponse),
})

time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -632,15 +600,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
SetMessage("my_topic", 0, int64(7), testMsg).
SetMessage("my_topic", 0, int64(8), testMsg).
SetMessage("my_topic", 0, int64(9), testMsg)
leader1.SetHandler(func(req *request) (res encoder) {
switch reqBody := req.body.(type) {
case *FetchRequest:
res := mockFetchResponse3.For(reqBody).(*FetchResponse)
res.AddError("my_topic", 1, ErrNotLeaderForPartition)
return res

}
return nil
fetchResponse4 := new(FetchResponse)
fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
leader1.SetHandlerByMap(map[string]MockResponse{
"FetchRequest": newMockSequence(mockFetchResponse3, fetchResponse4),
})

// leader0 provides two messages on partition 1
Expand Down
20 changes: 10 additions & 10 deletions mockbroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,8 @@ func (b *mockBroker) SetLatency(latency time.Duration) {
b.latency = latency
}

// SetHandler sets the specified function as the request handler. Whenever
// a mock broker reads a request from the wire it passes the request to the
// function and sends back whatever the handler function returns.
func (b *mockBroker) SetHandler(handler requestHandlerFunc) {
b.lock.Lock()
b.handler = handler
b.lock.Unlock()
}

func (b *mockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
b.SetHandler(func(req *request) (res encoder) {
b.setHandler(func(req *request) (res encoder) {
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
mockResponse := handlerMap[reqTypeName]
if mockResponse == nil {
Expand Down Expand Up @@ -112,6 +103,15 @@ func (b *mockBroker) Close() {
<-b.stopper
}

// setHandler sets the specified function as the request handler. Whenever
// a mock broker reads a request from the wire it passes the request to the
// function and sends back whatever the handler function returns.
func (b *mockBroker) setHandler(handler requestHandlerFunc) {
b.lock.Lock()
b.handler = handler
b.lock.Unlock()
}

func (b *mockBroker) serverLoop() {
defer close(b.stopper)
var err error
Expand Down
35 changes: 35 additions & 0 deletions mockresponses_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"fmt"
"testing"
)

Expand All @@ -10,6 +11,8 @@ type MockResponse interface {
For(reqBody decoder) (res encoder)
}

// mockWrapper is a mock response builder that returns a particular concrete
// response regardless of the actual request passed to the `For` method.
type mockWrapper struct {
res encoder
}
Expand All @@ -22,6 +25,38 @@ func newMockWrapper(res encoder) *mockWrapper {
return &mockWrapper{res: res}
}

// mockSequence is a mock response builder that is created from a sequence of
// concrete responses. Every time when a `MockBroker` calls its `For` method
// the next response from the sequence is returned. When the end of the
// sequence is reached the last element from the sequence is returned.
type mockSequence struct {
responses []MockResponse
}

func newMockSequence(responses ...interface{}) *mockSequence {
ms := &mockSequence{}
ms.responses = make([]MockResponse, len(responses))
for i, res := range responses {
switch res := res.(type) {
case MockResponse:
ms.responses[i] = res
case encoder:
ms.responses[i] = newMockWrapper(res)
default:
panic(fmt.Sprintf("Unexpected response type: %T", res))
}
}
return ms
}

func (mc *mockSequence) For(reqBody decoder) (res encoder) {
res = mc.responses[0].For(reqBody)
if len(mc.responses) > 1 {
mc.responses = mc.responses[1:]
}
return res
}

// mockMetadataResponse is a `MetadataResponse` builder.
type mockMetadataResponse struct {
leaders map[string]map[int32]int32
Expand Down