From 99f4fd70a5d2a441915a9504a907c79175fc88c9 Mon Sep 17 00:00:00 2001 From: Maxim Vladimirsky Date: Fri, 13 Nov 2015 01:16:24 -0800 Subject: [PATCH] Introduce mockSequence and further simplify tests --- consumer_test.go | 131 +++++++++++++++--------------------------- mockbroker_test.go | 20 +++---- mockresponses_test.go | 35 +++++++++++ 3 files changed, 92 insertions(+), 94 deletions(-) diff --git a/consumer_test.go b/consumer_test.go index c6a5b24d4..5286ef212 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -342,24 +342,16 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) { func TestConsumerShutsDownOutOfRange(t *testing.T) { // Given broker0 := newMockBroker(t, 0) - broker0.SetHandler(func(req *request) (res encoder) { - switch reqBody := req.body.(type) { - case *MetadataRequest: - return newMockMetadataResponse(t). - SetBroker(broker0.Addr(), broker0.BrokerID()). - SetLeader("my_topic", 0, broker0.BrokerID()). - For(reqBody) - case *OffsetRequest: - return newMockOffsetResponse(t). - SetOffset("my_topic", 0, OffsetNewest, 1234). - SetOffset("my_topic", 0, OffsetOldest, 7). - For(reqBody) - case *FetchRequest: - fetchResponse := new(FetchResponse) - fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) - return fetchResponse - } - return nil + fetchResponse := new(FetchResponse) + fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 7), + "FetchRequest": newMockWrapper(fetchResponse), }) master, err := NewConsumer([]string{broker0.Addr()}, nil) @@ -388,31 +380,21 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) { func TestConsumerExtraOffsets(t *testing.T) { // Given broker0 := newMockBroker(t, 0) - called := 0 - broker0.SetHandler(func(req *request) (res encoder) { - switch req.body.(type) { - case *MetadataRequest: - return newMockMetadataResponse(t). - SetBroker(broker0.Addr(), broker0.BrokerID()). - SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body) - case *OffsetRequest: - return newMockOffsetResponse(t). - SetOffset("my_topic", 0, OffsetNewest, 1234). - SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body) - case *FetchRequest: - fetchResponse := &FetchResponse{} - called++ - if called > 1 { - fetchResponse.AddError("my_topic", 0, ErrNoError) - return fetchResponse - } - fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1) - fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2) - fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3) - fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4) - return fetchResponse - } - return nil + fetchResponse1 := &FetchResponse{} + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 3) + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 4) + fetchResponse2 := &FetchResponse{} + fetchResponse2.AddError("my_topic", 0, ErrNoError) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": newMockSequence(fetchResponse1, fetchResponse2), }) master, err := NewConsumer([]string{broker0.Addr()}, nil) @@ -441,30 +423,20 @@ func TestConsumerExtraOffsets(t *testing.T) { func TestConsumerNonSequentialOffsets(t *testing.T) { // Given broker0 := newMockBroker(t, 0) - called := 0 - broker0.SetHandler(func(req *request) (res encoder) { - switch req.body.(type) { - case *MetadataRequest: - return newMockMetadataResponse(t). - SetBroker(broker0.Addr(), broker0.BrokerID()). - SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body) - case *OffsetRequest: - return newMockOffsetResponse(t). - SetOffset("my_topic", 0, OffsetNewest, 1234). - SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body) - case *FetchRequest: - called++ - fetchResponse := &FetchResponse{} - if called > 1 { - fetchResponse.AddError("my_topic", 0, ErrNoError) - return fetchResponse - } - fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5) - fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7) - fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11) - return fetchResponse - } - return nil + fetchResponse1 := &FetchResponse{} + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5) + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7) + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 11) + fetchResponse2 := &FetchResponse{} + fetchResponse2.AddError("my_topic", 0, ErrNoError) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": newMockSequence(fetchResponse1, fetchResponse2), }) master, err := NewConsumer([]string{broker0.Addr()}, nil) @@ -585,14 +557,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { }) // leader0 says no longer leader of partition 0 - leader0.SetHandler(func(req *request) (res encoder) { - switch req.body.(type) { - case *FetchRequest: - fetchResponse := new(FetchResponse) - fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition) - return fetchResponse - } - return nil + fetchResponse := new(FetchResponse) + fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition) + leader0.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": newMockWrapper(fetchResponse), }) time.Sleep(50 * time.Millisecond) @@ -632,15 +600,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { SetMessage("my_topic", 0, int64(7), testMsg). SetMessage("my_topic", 0, int64(8), testMsg). SetMessage("my_topic", 0, int64(9), testMsg) - leader1.SetHandler(func(req *request) (res encoder) { - switch reqBody := req.body.(type) { - case *FetchRequest: - res := mockFetchResponse3.For(reqBody).(*FetchResponse) - res.AddError("my_topic", 1, ErrNotLeaderForPartition) - return res - - } - return nil + fetchResponse4 := new(FetchResponse) + fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition) + leader1.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": newMockSequence(mockFetchResponse3, fetchResponse4), }) // leader0 provides two messages on partition 1 diff --git a/mockbroker_test.go b/mockbroker_test.go index 987697380..28f85ba35 100644 --- a/mockbroker_test.go +++ b/mockbroker_test.go @@ -59,17 +59,8 @@ func (b *mockBroker) SetLatency(latency time.Duration) { b.latency = latency } -// SetHandler sets the specified function as the request handler. Whenever -// a mock broker reads a request from the wire it passes the request to the -// function and sends back whatever the handler function returns. -func (b *mockBroker) SetHandler(handler requestHandlerFunc) { - b.lock.Lock() - b.handler = handler - b.lock.Unlock() -} - func (b *mockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { - b.SetHandler(func(req *request) (res encoder) { + b.setHandler(func(req *request) (res encoder) { reqTypeName := reflect.TypeOf(req.body).Elem().Name() mockResponse := handlerMap[reqTypeName] if mockResponse == nil { @@ -112,6 +103,15 @@ func (b *mockBroker) Close() { <-b.stopper } +// setHandler sets the specified function as the request handler. Whenever +// a mock broker reads a request from the wire it passes the request to the +// function and sends back whatever the handler function returns. +func (b *mockBroker) setHandler(handler requestHandlerFunc) { + b.lock.Lock() + b.handler = handler + b.lock.Unlock() +} + func (b *mockBroker) serverLoop() { defer close(b.stopper) var err error diff --git a/mockresponses_test.go b/mockresponses_test.go index 118759d76..55b648d6f 100644 --- a/mockresponses_test.go +++ b/mockresponses_test.go @@ -1,6 +1,7 @@ package sarama import ( + "fmt" "testing" ) @@ -10,6 +11,8 @@ type MockResponse interface { For(reqBody decoder) (res encoder) } +// mockWrapper is a mock response builder that returns a particular concrete +// response regardless of the actual request passed to the `For` method. type mockWrapper struct { res encoder } @@ -22,6 +25,38 @@ func newMockWrapper(res encoder) *mockWrapper { return &mockWrapper{res: res} } +// mockSequence is a mock response builder that is created from a sequence of +// concrete responses. Every time when a `MockBroker` calls its `For` method +// the next response from the sequence is returned. When the end of the +// sequence is reached the last element from the sequence is returned. +type mockSequence struct { + responses []MockResponse +} + +func newMockSequence(responses ...interface{}) *mockSequence { + ms := &mockSequence{} + ms.responses = make([]MockResponse, len(responses)) + for i, res := range responses { + switch res := res.(type) { + case MockResponse: + ms.responses[i] = res + case encoder: + ms.responses[i] = newMockWrapper(res) + default: + panic(fmt.Sprintf("Unexpected response type: %T", res)) + } + } + return ms +} + +func (mc *mockSequence) For(reqBody decoder) (res encoder) { + res = mc.responses[0].For(reqBody) + if len(mc.responses) > 1 { + mc.responses = mc.responses[1:] + } + return res +} + // mockMetadataResponse is a `MetadataResponse` builder. type mockMetadataResponse struct { leaders map[string]map[int32]int32