diff --git a/consumer_test.go b/consumer_test.go index cad709b537..c92c483647 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -1,38 +1,37 @@ package sarama import ( - "log" - "os" - "os/signal" "sync" "testing" "time" ) -func TestConsumerOffsetManual(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) - - metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) +var testMsg = StringEncoder("Foo") - offsetResponseNewest := new(OffsetResponse) - offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345) - leader.Returns(offsetResponseNewest) - - offsetResponseOldest := new(OffsetResponse) - offsetResponseOldest.AddTopicPartition("my_topic", 0, 0) - leader.Returns(offsetResponseOldest) +// If a particular offset is provided then messages are consumed starting from +// that offset. +func TestConsumerOffsetManual(t *testing.T) { + // Given + broker0 := newMockBroker(t, 0) + defer broker0.Close() + mockFetchResponse := newMockFetchResponse(t, 1) for i := 0; i < 10; i++ { - fetchResponse := new(FetchResponse) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234)) - leader.Returns(fetchResponse) + mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg) } - master, err := NewConsumer([]string{seedBroker.Addr()}, nil) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 0). + SetOffset("my_topic", 0, OffsetNewest, 2345), + "FetchRequest": mockFetchResponse, + }) + + // When + master, err := NewConsumer([]string{broker0.Addr()}, nil) if err != nil { t.Fatal(err) } @@ -41,14 +40,12 @@ func TestConsumerOffsetManual(t *testing.T) { if err != nil { t.Fatal(err) } - seedBroker.Close() + // Then: messages starting from offset 1234 are consumed. for i := 0; i < 10; i++ { select { case message := <-consumer.Messages(): - if message.Offset != int64(i+1234) { - t.Error("Incorrect message offset!") - } + assertMessageOffset(t, message, int64(i+1234)) case err := <-consumer.Errors(): t.Error(err) } @@ -56,195 +53,491 @@ func TestConsumerOffsetManual(t *testing.T) { safeClose(t, consumer) safeClose(t, master) - leader.Close() } +// If `OffsetNewest` is passed as the initial offset then the first consumed +// message is indeed corresponds to the offset that broker claims to be the +// newest in his metadata response. func TestConsumerOffsetNewest(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + // Given + broker0 := newMockBroker(t, 0) + defer broker0.Close() + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, 10). + SetOffset("my_topic", 0, OffsetOldest, 7), + "FetchRequest": newMockFetchResponse(t, 1). + SetMessage("my_topic", 0, 9, testMsg). + SetMessage("my_topic", 0, 10, testMsg). + SetMessage("my_topic", 0, 11, testMsg). + SetHighWaterMark("my_topic", 0, 14), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, nil) + if err != nil { + t.Fatal(err) + } - metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) + // When + consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest) + if err != nil { + t.Fatal(err) + } - offsetResponseNewest := new(OffsetResponse) - offsetResponseNewest.AddTopicPartition("my_topic", 0, 10) - leader.Returns(offsetResponseNewest) + // Then + assertMessageOffset(t, <-consumer.Messages(), 10) + if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 { + t.Errorf("Expected high water mark offset 14, found %d", hwmo) + } - offsetResponseOldest := new(OffsetResponse) - offsetResponseOldest.AddTopicPartition("my_topic", 0, 7) - leader.Returns(offsetResponseOldest) + safeClose(t, consumer) + safeClose(t, master) +} - fetchResponse := new(FetchResponse) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 10) - block := fetchResponse.GetBlock("my_topic", 0) - block.HighWaterMarkOffset = 14 - leader.Returns(fetchResponse) +// It is possible to close a partition consumer and create the same anew. +func TestConsumerRecreate(t *testing.T) { + // Given + broker0 := newMockBroker(t, 0) + defer broker0.Close() + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 0). + SetOffset("my_topic", 0, OffsetNewest, 1000), + "FetchRequest": newMockFetchResponse(t, 1). + SetMessage("my_topic", 0, 10, testMsg), + }) + + c, err := NewConsumer([]string{broker0.Addr()}, nil) + if err != nil { + t.Fatal(err) + } - master, err := NewConsumer([]string{seedBroker.Addr()}, nil) + pc, err := c.ConsumePartition("my_topic", 0, 10) if err != nil { t.Fatal(err) } - seedBroker.Close() + assertMessageOffset(t, <-pc.Messages(), 10) - consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest) + // When + safeClose(t, pc) + pc, err = c.ConsumePartition("my_topic", 0, 10) if err != nil { t.Fatal(err) } - msg := <-consumer.Messages() + // Then + assertMessageOffset(t, <-pc.Messages(), 10) + + safeClose(t, pc) + safeClose(t, c) +} + +// An attempt to consume the same partition twice should fail. +func TestConsumerDuplicate(t *testing.T) { + // Given + broker0 := newMockBroker(t, 0) + defer broker0.Close() + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 0). + SetOffset("my_topic", 0, OffsetNewest, 1000), + "FetchRequest": newMockFetchResponse(t, 1), + }) - // we deliver one message, so it should be one higher than we return in the OffsetResponse - if msg.Offset != 10 { - t.Error("Latest message offset not fetched correctly:", msg.Offset) + config := NewConfig() + config.ChannelBufferSize = 0 + c, err := NewConsumer([]string{broker0.Addr()}, config) + if err != nil { + t.Fatal(err) } - if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 { - t.Errorf("Expected high water mark offset 14, found %d", hwmo) + pc1, err := c.ConsumePartition("my_topic", 0, 0) + if err != nil { + t.Fatal(err) } - leader.Close() - safeClose(t, consumer) - safeClose(t, master) + // When + pc2, err := c.ConsumePartition("my_topic", 0, 0) - // We deliver one message, so it should be one higher than we return in the OffsetResponse. - // This way it is set correctly for the next FetchRequest. - if consumer.(*partitionConsumer).offset != 11 { - t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset) + // Then + if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") { + t.Fatal("A partition cannot be consumed twice at the same time") } + + safeClose(t, pc1) + safeClose(t, c) } -func TestConsumerShutsDownOutOfRange(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) +// If consumer fails to refresh metadata it keeps retrying every with frequency +// given in `Config.Consumer.Retry.Backoff`. +func TestConsumerLeaderRefreshError(t *testing.T) { + // Given + broker0 := newMockBroker(t, 100) + defer broker0.Close() + + // Stage 1: my_topic/0 served by broker0 + Logger.Printf(" STAGE 1") + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 123). + SetOffset("my_topic", 0, OffsetNewest, 1000), + "FetchRequest": newMockFetchResponse(t, 1). + SetMessage("my_topic", 0, 123, testMsg), + }) + + config := NewConfig() + config.Net.ReadTimeout = 100 * time.Millisecond + config.Consumer.Retry.Backoff = 500 * time.Millisecond + config.Metadata.Retry.Max = 0 + c, err := NewConsumer([]string{broker0.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) + if err != nil { + t.Errorf("Failed to create a partition consumer, err=%v", err) + } + + assertMessageOffset(t, <-pc.Messages(), 123) + + // Stage 2: broker0 says that it is no longer the leader for my_topic/0, + // but the requests to retrieve metadata fail with network timeout. + Logger.Printf(" STAGE 2") + + fetchResponse2 := &FetchResponse{} + fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) + + broker0.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": newMockWrapper(fetchResponse2), + }) - metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) + // Stage 3: finally the metadata returned by broker0 tells that broker1 is + // a new leader for my_topic/0. Consumption resumes. - offsetResponseNewest := new(OffsetResponse) - offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234) - leader.Returns(offsetResponseNewest) + // Unfortunately consumer does not propagate `ErrNotLeaderForPartition` + // error to PartitionConsumer.Errors() channel. So there is no other way to + // synchronize here by sleep. + time.Sleep(300 * time.Millisecond) + Logger.Printf(" STAGE 3") - offsetResponseOldest := new(OffsetResponse) - offsetResponseOldest.AddTopicPartition("my_topic", 0, 0) - leader.Returns(offsetResponseOldest) + broker1 := newMockBroker(t, 101) + defer broker1.Close() - fetchResponse := new(FetchResponse) - fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) - leader.Returns(fetchResponse) + broker1.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": newMockFetchResponse(t, 1). + SetMessage("my_topic", 0, 124, testMsg), + }) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetBroker(broker1.Addr(), broker1.BrokerID()). + SetLeader("my_topic", 0, broker1.BrokerID()), + }) - master, err := NewConsumer([]string{seedBroker.Addr()}, nil) + assertMessageOffset(t, <-pc.Messages(), 124) + + safeClose(t, pc) + safeClose(t, c) +} + +func TestConsumerInvalidTopic(t *testing.T) { + // Given + broker0 := newMockBroker(t, 100) + defer broker0.Close() + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()), + }) + + c, err := NewConsumer([]string{broker0.Addr()}, nil) + if err != nil { + t.Fatal(err) + } + + // When + pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) + + // Then + if pc != nil || err != ErrUnknownTopicOrPartition { + t.Errorf("Should fail with, err=%v", err) + } + + safeClose(t, c) +} + +// Nothing bad happens if a partition consumer that has no leader assigned at +// the moment is closed. +func TestConsumerClosePartitionWithoutLeader(t *testing.T) { + // Given + broker0 := newMockBroker(t, 100) + defer broker0.Close() + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 123). + SetOffset("my_topic", 0, OffsetNewest, 1000), + "FetchRequest": newMockFetchResponse(t, 1). + SetMessage("my_topic", 0, 123, testMsg), + }) + + config := NewConfig() + config.Net.ReadTimeout = 100 * time.Millisecond + config.Consumer.Retry.Backoff = 100 * time.Millisecond + config.Metadata.Retry.Max = 0 + c, err := NewConsumer([]string{broker0.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) + if err != nil { + t.Errorf("Failed to create a partition consumer, err=%v", err) + } + + assertMessageOffset(t, <-pc.Messages(), 123) + + // broker0 says that it is no longer the leader for my_topic/0, but the + // requests to retrieve metadata fail with network timeout. + fetchResponse2 := &FetchResponse{} + fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "FetchRequest": newMockWrapper(fetchResponse2), + }) + + // When + + // Unfortunately consumer does not propagate `ErrNotLeaderForPartition` + // error to PartitionConsumer.Errors() channel. So there is no other way to + // synchronize here by sleep. + time.Sleep(200 * time.Millisecond) + + // Then: the partition consumer can be closed without any problem. + safeClose(t, pc) + safeClose(t, c) +} + +// If the initial offset passed on partition consumer creation is out of the +// actual offset range for the partition, then the partition consumer stops +// immediately closing its output channels. +func TestConsumerShutsDownOutOfRange(t *testing.T) { + // Given + broker0 := newMockBroker(t, 0) + defer broker0.Close() + + broker0.SetHandler(func(req *request) (res encoder) { + switch reqBody := req.body.(type) { + case *MetadataRequest: + return newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()). + For(reqBody) + case *OffsetRequest: + return newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 7). + For(reqBody) + case *FetchRequest: + fetchResponse := new(FetchResponse) + fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) + return fetchResponse + } + return nil + }) + + master, err := NewConsumer([]string{broker0.Addr()}, nil) if err != nil { t.Fatal(err) } - seedBroker.Close() + // When consumer, err := master.ConsumePartition("my_topic", 0, 101) if err != nil { t.Fatal(err) } + // Then: consumer should shut down closing its messages and errors channels. if _, ok := <-consumer.Messages(); ok { t.Error("Expected the consumer to shut down") } + safeClose(t, consumer) - leader.Close() safeClose(t, master) } -func TestConsumerFunnyOffsets(t *testing.T) { - // for topics that are compressed and/or compacted (different things!) we have to be - // able to handle receiving offsets that are non-sequential (though still strictly increasing) and - // possibly starting prior to the actual value we requested - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) +// If a fetch response contains messages with offsets that are smaller then +// requested, then such messages are ignored. +func TestConsumerExtraOffsets(t *testing.T) { + // Given + broker0 := newMockBroker(t, 0) + defer broker0.Close() + + called := 0 + broker0.SetHandler(func(req *request) (res encoder) { + switch req.body.(type) { + case *MetadataRequest: + return newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body) + case *OffsetRequest: + return newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body) + case *FetchRequest: + fetchResponse := &FetchResponse{} + called++ + if called > 1 { + fetchResponse.AddError("my_topic", 0, ErrNoError) + return fetchResponse + } + fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1) + fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2) + fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3) + fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4) + return fetchResponse + } + return nil + }) - metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) + master, err := NewConsumer([]string{broker0.Addr()}, nil) + if err != nil { + t.Fatal(err) + } - offsetResponseNewest := new(OffsetResponse) - offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234) - leader.Returns(offsetResponseNewest) + // When + consumer, err := master.ConsumePartition("my_topic", 0, 3) + if err != nil { + t.Fatal(err) + } - offsetResponseOldest := new(OffsetResponse) - offsetResponseOldest.AddTopicPartition("my_topic", 0, 0) - leader.Returns(offsetResponseOldest) + // Then: messages with offsets 1 and 2 are not returned even though they + // are present in the response. + assertMessageOffset(t, <-consumer.Messages(), 3) + assertMessageOffset(t, <-consumer.Messages(), 4) - fetchResponse := new(FetchResponse) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3)) - leader.Returns(fetchResponse) + safeClose(t, consumer) + safeClose(t, master) +} - fetchResponse = new(FetchResponse) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5)) - leader.Returns(fetchResponse) +// It is fine if offsets of fetched messages are not sequential (although +// strictly increasing!). +func TestConsumerNonSequentialOffsets(t *testing.T) { + // Given + broker0 := newMockBroker(t, 0) + defer broker0.Close() + + called := 0 + broker0.SetHandler(func(req *request) (res encoder) { + switch req.body.(type) { + case *MetadataRequest: + return newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body) + case *OffsetRequest: + return newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body) + case *FetchRequest: + called++ + fetchResponse := &FetchResponse{} + if called > 1 { + fetchResponse.AddError("my_topic", 0, ErrNoError) + return fetchResponse + } + fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5) + fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7) + fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11) + return fetchResponse + } + return nil + }) - master, err := NewConsumer([]string{seedBroker.Addr()}, nil) + master, err := NewConsumer([]string{broker0.Addr()}, nil) if err != nil { t.Fatal(err) } - consumer, err := master.ConsumePartition("my_topic", 0, 2) + // When + consumer, err := master.ConsumePartition("my_topic", 0, 3) if err != nil { t.Fatal(err) } - if message := <-consumer.Messages(); message.Offset != 3 { - t.Error("Incorrect message offset!") - } - - if message := <-consumer.Messages(); message.Offset != 5 { - t.Error("Incorrect message offset!") - } + // Then: messages with offsets 1 and 2 are not returned even though they + // are present in the response. + assertMessageOffset(t, <-consumer.Messages(), 5) + assertMessageOffset(t, <-consumer.Messages(), 7) + assertMessageOffset(t, <-consumer.Messages(), 11) - leader.Close() - seedBroker.Close() safeClose(t, consumer) safeClose(t, master) } +// If leadership for a partition is changing then consumer resolves the new +// leader and switches to it. func TestConsumerRebalancingMultiplePartitions(t *testing.T) { // initial setup - seedBroker := newMockBroker(t, 1) - leader0 := newMockBroker(t, 2) - leader1 := newMockBroker(t, 3) - - metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID()) - metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError) - metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) + seedBroker := newMockBroker(t, 10) + defer seedBroker.Close() + leader0 := newMockBroker(t, 0) + defer leader0.Close() + leader1 := newMockBroker(t, 1) + defer leader1.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(leader0.Addr(), leader0.BrokerID()). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetLeader("my_topic", 0, leader0.BrokerID()). + SetLeader("my_topic", 1, leader1.BrokerID()), + }) + + mockOffsetResponse1 := newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 0). + SetOffset("my_topic", 0, OffsetNewest, 1000). + SetOffset("my_topic", 1, OffsetOldest, 0). + SetOffset("my_topic", 1, OffsetNewest, 1000) + leader0.SetHandlerByMap(map[string]MockResponse{ + "OffsetRequest": mockOffsetResponse1, + "FetchRequest": newMockFetchResponse(t, 1), + }) + leader1.SetHandlerByMap(map[string]MockResponse{ + "OffsetRequest": mockOffsetResponse1, + "FetchRequest": newMockFetchResponse(t, 1), + }) // launch test goroutines config := NewConfig() - config.Consumer.Retry.Backoff = 0 + config.Consumer.Retry.Backoff = 50 master, err := NewConsumer([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } - offsetResponseNewest0 := new(OffsetResponse) - offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234) - leader0.Returns(offsetResponseNewest0) - - offsetResponseOldest0 := new(OffsetResponse) - offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0) - leader0.Returns(offsetResponseOldest0) - - offsetResponseNewest1 := new(OffsetResponse) - offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234) - leader1.Returns(offsetResponseNewest1) - - offsetResponseOldest1 := new(OffsetResponse) - offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0) - leader1.Returns(offsetResponseOldest1) - // we expect to end up (eventually) consuming exactly ten messages on each partition var wg sync.WaitGroup for i := int32(0); i < 2; i++ { @@ -275,423 +568,292 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { }(i, consumer) } - // leader0 provides first four messages on partition 0 - fetchResponse := new(FetchResponse) + time.Sleep(50 * time.Millisecond) + Logger.Printf(" STAGE 1") + // Stage 1: + // * my_topic/0 -> leader0 serves 4 messages + // * my_topic/1 -> leader1 serves 0 messages + + mockFetchResponse := newMockFetchResponse(t, 1) for i := 0; i < 4; i++ { - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i)) + mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg) } - leader0.Returns(fetchResponse) + leader0.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": mockFetchResponse, + }) + + time.Sleep(50 * time.Millisecond) + Logger.Printf(" STAGE 2") + // Stage 2: + // * leader0 says that it is no longer serving my_topic/0 + // * seedBroker tells that leader1 is serving my_topic/0 now + + // seed broker tells that the new partition 0 leader is leader1 + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetLeader("my_topic", 0, leader1.BrokerID()). + SetLeader("my_topic", 1, leader1.BrokerID()), + }) // leader0 says no longer leader of partition 0 - fetchResponse = new(FetchResponse) - fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition) - leader0.Returns(fetchResponse) + leader0.SetHandler(func(req *request) (res encoder) { + switch req.body.(type) { + case *FetchRequest: + fetchResponse := new(FetchResponse) + fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition) + return fetchResponse + } + return nil + }) - // metadata assigns both partitions to leader1 - metadataResponse = new(MetadataResponse) - metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError) - metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) - time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering + time.Sleep(50 * time.Millisecond) + Logger.Printf(" STAGE 3") + // Stage 3: + // * my_topic/0 -> leader1 serves 3 messages + // * my_topic/1 -> leader1 server 8 messages - // leader1 provides five messages on partition 1 - fetchResponse = new(FetchResponse) - for i := 0; i < 5; i++ { - fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i)) + // leader1 provides 3 message on partition 0, and 8 messages on partition 1 + mockFetchResponse2 := newMockFetchResponse(t, 2) + for i := 4; i < 7; i++ { + mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg) } - leader1.Returns(fetchResponse) - - // leader1 provides three more messages on both partitions - fetchResponse = new(FetchResponse) - for i := 0; i < 3; i++ { - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4)) - fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5)) + for i := 0; i < 8; i++ { + mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg) } - leader1.Returns(fetchResponse) + leader1.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": mockFetchResponse2, + }) - // leader1 provides three more messages on partition0, says no longer leader of partition1 - fetchResponse = new(FetchResponse) - for i := 0; i < 3; i++ { - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7)) - } - fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition) - leader1.Returns(fetchResponse) + time.Sleep(50 * time.Millisecond) + Logger.Printf(" STAGE 4") + // Stage 4: + // * my_topic/0 -> leader1 serves 3 messages + // * my_topic/1 -> leader1 tells that it is no longer the leader + // * seedBroker tells that leader0 is a new leader for my_topic/1 // metadata assigns 0 to leader1 and 1 to leader0 - metadataResponse = new(MetadataResponse) - metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError) - metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) - time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetLeader("my_topic", 0, leader1.BrokerID()). + SetLeader("my_topic", 1, leader0.BrokerID()), + }) + + // leader1 provides three more messages on partition0, says no longer leader of partition1 + mockFetchResponse3 := newMockFetchResponse(t, 3). + SetMessage("my_topic", 0, int64(7), testMsg). + SetMessage("my_topic", 0, int64(8), testMsg). + SetMessage("my_topic", 0, int64(9), testMsg) + leader1.SetHandler(func(req *request) (res encoder) { + switch reqBody := req.body.(type) { + case *FetchRequest: + res := mockFetchResponse3.For(reqBody).(*FetchResponse) + res.AddError("my_topic", 1, ErrNotLeaderForPartition) + return res + + } + return nil + }) // leader0 provides two messages on partition 1 - fetchResponse = new(FetchResponse) - fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8)) - fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9)) - leader0.Returns(fetchResponse) - time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering - - leader1.Close() - leader0.Close() + mockFetchResponse4 := newMockFetchResponse(t, 2) + for i := 8; i < 10; i++ { + mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg) + } + leader0.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": mockFetchResponse4, + }) + wg.Wait() - seedBroker.Close() safeClose(t, master) } +// When two partitions have the same broker as the leader, if one partition +// consumer channel buffer is full then that does not affect the ability to +// read messages by the other consumer. func TestConsumerInterleavedClose(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) - - metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) - metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) + t.Skip("Enable when partition consumers stop affect each other") + + // Given + broker0 := newMockBroker(t, 0) + defer broker0.Close() + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()). + SetLeader("my_topic", 1, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 1000). + SetOffset("my_topic", 0, OffsetNewest, 1100). + SetOffset("my_topic", 1, OffsetOldest, 2000). + SetOffset("my_topic", 1, OffsetNewest, 2100), + "FetchRequest": newMockFetchResponse(t, 1). + SetMessage("my_topic", 0, 1000, testMsg). + SetMessage("my_topic", 0, 1001, testMsg). + SetMessage("my_topic", 0, 1002, testMsg). + SetMessage("my_topic", 1, 2000, testMsg), + }) config := NewConfig() config.ChannelBufferSize = 0 - master, err := NewConsumer([]string{seedBroker.Addr()}, config) + master, err := NewConsumer([]string{broker0.Addr()}, config) if err != nil { t.Fatal(err) } - offsetResponseNewest0 := new(OffsetResponse) - offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234) - leader.Returns(offsetResponseNewest0) - - offsetResponseOldest0 := new(OffsetResponse) - offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0) - leader.Returns(offsetResponseOldest0) - - c0, err := master.ConsumePartition("my_topic", 0, 0) + c0, err := master.ConsumePartition("my_topic", 0, 1000) if err != nil { t.Fatal(err) } - fetchResponse := new(FetchResponse) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) - leader.Returns(fetchResponse) - time.Sleep(50 * time.Millisecond) - - offsetResponseNewest1 := new(OffsetResponse) - offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234) - leader.Returns(offsetResponseNewest1) - - offsetResponseOldest1 := new(OffsetResponse) - offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0) - leader.Returns(offsetResponseOldest1) - - c1, err := master.ConsumePartition("my_topic", 1, 0) + c1, err := master.ConsumePartition("my_topic", 1, 2000) if err != nil { t.Fatal(err) } - <-c0.Messages() - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) - fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) - leader.Returns(fetchResponse) + // When/Then: we can read from partition 0 even if nobody reads from partition 1 + assertMessageOffset(t, <-c0.Messages(), 1000) + assertMessageOffset(t, <-c0.Messages(), 1001) + assertMessageOffset(t, <-c0.Messages(), 1002) safeClose(t, c1) safeClose(t, c0) safeClose(t, master) - leader.Close() - seedBroker.Close() } func TestConsumerBounceWithReferenceOpen(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) - leaderAddr := leader.Addr() - tmp := newMockBroker(t, 3) - - metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) - metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) + broker0 := newMockBroker(t, 0) + broker0Addr := broker0.Addr() + broker1 := newMockBroker(t, 1) + + mockMetadataResponse := newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetBroker(broker1.Addr(), broker1.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()). + SetLeader("my_topic", 1, broker1.BrokerID()) + + mockOffsetResponse := newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 1000). + SetOffset("my_topic", 0, OffsetNewest, 1100). + SetOffset("my_topic", 1, OffsetOldest, 2000). + SetOffset("my_topic", 1, OffsetNewest, 2100) + + mockFetchResponse := newMockFetchResponse(t, 1) + for i := 0; i < 10; i++ { + mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg) + mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg) + } + + broker0.SetHandlerByMap(map[string]MockResponse{ + "OffsetRequest": mockOffsetResponse, + "FetchRequest": mockFetchResponse, + }) + broker1.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse, + "OffsetRequest": mockOffsetResponse, + "FetchRequest": mockFetchResponse, + }) config := NewConfig() config.Consumer.Return.Errors = true - config.Consumer.Retry.Backoff = 0 - config.ChannelBufferSize = 0 - master, err := NewConsumer([]string{seedBroker.Addr()}, config) + config.Consumer.Retry.Backoff = 100 * time.Millisecond + config.ChannelBufferSize = 1 + master, err := NewConsumer([]string{broker1.Addr()}, config) if err != nil { t.Fatal(err) } - offsetResponseNewest := new(OffsetResponse) - offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234) - leader.Returns(offsetResponseNewest) - - offsetResponseOldest := new(OffsetResponse) - offsetResponseOldest.AddTopicPartition("my_topic", 0, 0) - leader.Returns(offsetResponseOldest) - - c0, err := master.ConsumePartition("my_topic", 0, 0) + c0, err := master.ConsumePartition("my_topic", 0, 1000) if err != nil { t.Fatal(err) } - offsetResponseNewest = new(OffsetResponse) - offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234) - tmp.Returns(offsetResponseNewest) - - offsetResponseOldest = new(OffsetResponse) - offsetResponseOldest.AddTopicPartition("my_topic", 1, 0) - tmp.Returns(offsetResponseOldest) - - c1, err := master.ConsumePartition("my_topic", 1, 0) + c1, err := master.ConsumePartition("my_topic", 1, 2000) if err != nil { t.Fatal(err) } - //redirect partition 1 back to main leader - metadataResponse = new(MetadataResponse) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) - metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) - fetchResponse := new(FetchResponse) - fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition) - tmp.Returns(fetchResponse) - time.Sleep(5 * time.Millisecond) - - // now send one message to each partition to make sure everything is primed - fetchResponse = new(FetchResponse) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) - fetchResponse.AddError("my_topic", 1, ErrNoError) - leader.Returns(fetchResponse) - <-c0.Messages() - - fetchResponse = new(FetchResponse) - fetchResponse.AddError("my_topic", 0, ErrNoError) - fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) - leader.Returns(fetchResponse) - <-c1.Messages() - - // bounce the broker - leader.Close() - leader = newMockBrokerAddr(t, 2, leaderAddr) - - // unblock one of the two (it doesn't matter which) - select { - case <-c0.Errors(): - case <-c1.Errors(): + // read messages from both partition to make sure that both brokers operate + // normally. + assertMessageOffset(t, <-c0.Messages(), 1000) + assertMessageOffset(t, <-c1.Messages(), 2000) + + // Simulate broker shutdown. Note that metadata response does not change, + // that is the leadership does not move to another broker. So partition + // consumer will keep retrying to restore the connection with the broker. + broker0.Close() + + // Make sure that while the partition/0 leader is down, consumer/partition/1 + // is capable of pulling messages from broker1. + for i := 1; i < 7; i++ { + offset := (<-c1.Messages()).Offset + if offset != int64(2000+i) { + t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i)) + } } - // send it back to the same broker - seedBroker.Returns(metadataResponse) - fetchResponse = new(FetchResponse) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) - fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) - leader.Returns(fetchResponse) + // Bring broker0 back to service. + broker0 = newMockBrokerAddr(t, 0, broker0Addr) + broker0.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": mockFetchResponse, + }) - time.Sleep(5 * time.Millisecond) + // Read the rest of messages from both partitions. + for i := 7; i < 10; i++ { + assertMessageOffset(t, <-c1.Messages(), int64(2000+i)) + } + for i := 1; i < 10; i++ { + assertMessageOffset(t, <-c0.Messages(), int64(1000+i)) + } - // unblock the other one select { case <-c0.Errors(): - case <-c1.Errors(): + default: + t.Errorf("Partition consumer should have detected broker restart") } - select { - case <-c0.Messages(): - case <-c1.Messages(): - } - - leader.Close() - seedBroker.Close() - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - _ = c0.Close() - wg.Done() - }() - go func() { - _ = c1.Close() - wg.Done() - }() - wg.Wait() + safeClose(t, c1) + safeClose(t, c0) safeClose(t, master) - tmp.Close() + broker0.Close() + broker1.Close() } func TestConsumerOffsetOutOfRange(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) - - metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse) - - master, err := NewConsumer([]string{seedBroker.Addr()}, nil) + // Given + broker0 := newMockBroker(t, 2) + defer broker0.Close() + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": newMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": newMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 2345), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, nil) if err != nil { t.Fatal(err) } - seedBroker.Close() - - offsetResponseNewest := new(OffsetResponse) - offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234) - offsetResponseOldest := new(OffsetResponse) - offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345) - - leader.Returns(offsetResponseNewest) - leader.Returns(offsetResponseOldest) + // When/Then if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange { t.Fatal("Should return ErrOffsetOutOfRange, got:", err) } - - leader.Returns(offsetResponseNewest) - leader.Returns(offsetResponseOldest) if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange { t.Fatal("Should return ErrOffsetOutOfRange, got:", err) } - - leader.Returns(offsetResponseNewest) - leader.Returns(offsetResponseOldest) if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange { t.Fatal("Should return ErrOffsetOutOfRange, got:", err) } - leader.Close() safeClose(t, master) } -// This example has the simplest use case of the consumer. It simply -// iterates over the messages channel using a for/range loop. Because -// a producer never stopsunless requested, a signal handler is registered -// so we can trigger a clean shutdown of the consumer. -func ExampleConsumer_for_loop() { - master, err := NewConsumer([]string{"localhost:9092"}, nil) - if err != nil { - log.Fatalln(err) - } - defer func() { - if err := master.Close(); err != nil { - log.Fatalln(err) - } - }() - - consumer, err := master.ConsumePartition("my_topic", 0, 0) - if err != nil { - log.Fatalln(err) - } - - go func() { - // By default, the consumer will always keep going, unless we tell it to stop. - // In this case, we capture the SIGINT signal so we can tell the consumer to stop - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - <-signals - consumer.AsyncClose() - }() - - msgCount := 0 - for message := range consumer.Messages() { - log.Println(string(message.Value)) - msgCount++ +func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) { + if msg.Offset != expectedOffset { + t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset) } - log.Println("Processed", msgCount, "messages.") -} - -// This example shows how to use a consumer with a select statement -// dealing with the different channels. -func ExampleConsumer_select() { - config := NewConfig() - config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them. - - master, err := NewConsumer([]string{"localhost:9092"}, config) - if err != nil { - log.Fatalln(err) - } - defer func() { - if err := master.Close(); err != nil { - log.Fatalln(err) - } - }() - - consumer, err := master.ConsumePartition("my_topic", 0, 0) - if err != nil { - log.Fatalln(err) - } - defer func() { - if err := consumer.Close(); err != nil { - log.Fatalln(err) - } - }() - - msgCount := 0 - - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - -consumerLoop: - for { - select { - case err := <-consumer.Errors(): - log.Println(err) - case <-consumer.Messages(): - msgCount++ - case <-signals: - log.Println("Received interrupt") - break consumerLoop - } - } - log.Println("Processed", msgCount, "messages.") -} - -// This example shows how to use a consumer with different goroutines -// to read from the Messages and Errors channels. -func ExampleConsumer_goroutines() { - config := NewConfig() - config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them. - - master, err := NewConsumer([]string{"localhost:9092"}, config) - if err != nil { - log.Fatalln(err) - } - defer func() { - if err := master.Close(); err != nil { - panic(err) - } - }() - - consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest) - if err != nil { - log.Fatalln(err) - } - - var ( - wg sync.WaitGroup - msgCount int - ) - - wg.Add(1) - go func() { - defer wg.Done() - for message := range consumer.Messages() { - log.Printf("Consumed message with offset %d", message.Offset) - msgCount++ - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - for err := range consumer.Errors() { - log.Println(err) - } - }() - - // Wait for an interrupt signal to trigger the shutdown - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - <-signals - consumer.AsyncClose() - - // Wait for the Messages and Errors channel to be fully drained. - wg.Wait() - log.Println("Processed", msgCount, "messages.") } diff --git a/mockbroker_test.go b/mockbroker_test.go index 2c670b798e..2d8d8f27ab 100644 --- a/mockbroker_test.go +++ b/mockbroker_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "reflect" "strconv" "sync" "testing" @@ -61,6 +62,17 @@ func (b *mockBroker) SetHandler(handler requestHandlerFunc) { b.handlerMux.Unlock() } +func (b *mockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { + b.SetHandler(func(req *request) (res encoder) { + reqTypeName := reflect.TypeOf(req.body).Elem().Name() + mockResponse := handlerMap[reqTypeName] + if mockResponse == nil { + return nil + } + return mockResponse.For(req.body) + }) +} + func (b *mockBroker) BrokerID() int32 { return b.brokerID } @@ -139,7 +151,11 @@ func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) } res := b.requestHandler()(req) - Logger.Printf("*** mockbroker/%d/%d: served %+v -> %+v", b.brokerID, idx, req, res) + if res == nil { + Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, req) + continue + } + Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res) encodedRes, err := encode(res) if err != nil { diff --git a/mockresponses_test.go b/mockresponses_test.go new file mode 100644 index 0000000000..fb47eef0a5 --- /dev/null +++ b/mockresponses_test.go @@ -0,0 +1,232 @@ +package sarama + +import ( + "testing" +) + +// MockResponse is a response builder interface it defines one method that +// allows generating a response based on a request body. +type MockResponse interface { + For(reqBody decoder) (res encoder) +} + +type mockWrapper struct { + res encoder +} + +func (mw *mockWrapper) For(reqBody decoder) (res encoder) { + return mw.res +} + +func newMockWrapper(res encoder) *mockWrapper { + return &mockWrapper{res: res} +} + +// mockMetadataResponse is a `MetadataResponse` builder. +type mockMetadataResponse struct { + leaders map[string]map[int32]int32 + brokers map[string]int32 + t *testing.T +} + +func newMockMetadataResponse(t *testing.T) *mockMetadataResponse { + return &mockMetadataResponse{ + leaders: make(map[string]map[int32]int32), + brokers: make(map[string]int32), + t: t, + } +} + +func (mmr *mockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *mockMetadataResponse { + partitions := mmr.leaders[topic] + if partitions == nil { + partitions = make(map[int32]int32) + mmr.leaders[topic] = partitions + } + partitions[partition] = brokerID + return mmr +} + +func (mmr *mockMetadataResponse) SetBroker(addr string, brokerID int32) *mockMetadataResponse { + mmr.brokers[addr] = brokerID + return mmr +} + +func (mor *mockMetadataResponse) For(reqBody decoder) encoder { + metadataRequest := reqBody.(*MetadataRequest) + metadataResponse := &MetadataResponse{} + for addr, brokerID := range mor.brokers { + metadataResponse.AddBroker(addr, brokerID) + } + if len(metadataRequest.Topics) == 0 { + for topic, partitions := range mor.leaders { + for partition, brokerID := range partitions { + metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError) + } + } + return metadataResponse + } + for _, topic := range metadataRequest.Topics { + for partition, brokerID := range mor.leaders[topic] { + metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError) + } + } + return metadataResponse +} + +// mockOffsetResponse is an `OffsetResponse` builder. +type mockOffsetResponse struct { + offsets map[string]map[int32]map[int64]int64 + t *testing.T +} + +func newMockOffsetResponse(t *testing.T) *mockOffsetResponse { + return &mockOffsetResponse{ + offsets: make(map[string]map[int32]map[int64]int64), + t: t, + } +} + +func (mor *mockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *mockOffsetResponse { + partitions := mor.offsets[topic] + if partitions == nil { + partitions = make(map[int32]map[int64]int64) + mor.offsets[topic] = partitions + } + times := partitions[partition] + if times == nil { + times = make(map[int64]int64) + partitions[partition] = times + } + times[time] = offset + return mor +} + +func (mor *mockOffsetResponse) For(reqBody decoder) encoder { + offsetRequest := reqBody.(*OffsetRequest) + offsetResponse := &OffsetResponse{} + for topic, partitions := range offsetRequest.blocks { + for partition, block := range partitions { + offset := mor.getOffset(topic, partition, block.time) + offsetResponse.AddTopicPartition(topic, partition, offset) + } + } + return offsetResponse +} + +func (mor *mockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 { + partitions := mor.offsets[topic] + if partitions == nil { + mor.t.Errorf("missing topic: %s", topic) + } + times := partitions[partition] + if times == nil { + mor.t.Errorf("missing partition: %d", partition) + } + offset, ok := times[time] + if !ok { + mor.t.Errorf("missing time: %d", time) + } + return offset +} + +// mockFetchResponse is a `FetchResponse` builder. +type mockFetchResponse struct { + messages map[string]map[int32]map[int64]Encoder + highWaterMarks map[string]map[int32]int64 + t *testing.T + batchSize int +} + +func newMockFetchResponse(t *testing.T, batchSize int) *mockFetchResponse { + return &mockFetchResponse{ + messages: make(map[string]map[int32]map[int64]Encoder), + highWaterMarks: make(map[string]map[int32]int64), + t: t, + batchSize: batchSize, + } +} + +func (mfr *mockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *mockFetchResponse { + partitions := mfr.messages[topic] + if partitions == nil { + partitions = make(map[int32]map[int64]Encoder) + mfr.messages[topic] = partitions + } + messages := partitions[partition] + if messages == nil { + messages = make(map[int64]Encoder) + partitions[partition] = messages + } + messages[offset] = msg + return mfr +} + +func (mfr *mockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *mockFetchResponse { + partitions := mfr.highWaterMarks[topic] + if partitions == nil { + partitions = make(map[int32]int64) + mfr.highWaterMarks[topic] = partitions + } + partitions[partition] = offset + return mfr +} + +func (mfr *mockFetchResponse) For(reqBody decoder) encoder { + fetchRequest := reqBody.(*FetchRequest) + res := &FetchResponse{} + for topic, partitions := range fetchRequest.blocks { + for partition, block := range partitions { + initialOffset := block.fetchOffset + offset := initialOffset + maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition)) + for i := 0; i < mfr.batchSize && offset < maxOffset; { + msg := mfr.getMessage(topic, partition, offset) + if msg != nil { + res.AddMessage(topic, partition, nil, msg, offset) + i++ + } + offset++ + } + fb := res.GetBlock(topic, partition) + if fb == nil { + res.AddError(topic, partition, ErrNoError) + fb = res.GetBlock(topic, partition) + } + fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition) + } + } + return res +} + +func (mfr *mockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder { + partitions := mfr.messages[topic] + if partitions == nil { + return nil + } + messages := partitions[partition] + if messages == nil { + return nil + } + return messages[offset] +} + +func (mfr *mockFetchResponse) getMessageCount(topic string, partition int32) int { + partitions := mfr.messages[topic] + if partitions == nil { + return 0 + } + messages := partitions[partition] + if messages == nil { + return 0 + } + return len(messages) +} + +func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) int64 { + partitions := mfr.highWaterMarks[topic] + if partitions == nil { + return 0 + } + return partitions[partition] +}