Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Topics() and Partitions() to Consumer interface, and mock Consumer implementation. #431

Merged
merged 1 commit into from
Apr 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ func (ce ConsumerErrors) Error() string {
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
// scope.
type Consumer interface {

// Topics returns the set of available topics as retrieved from the cluster metadata.
// This method is the same as Client.Topics(), and is provided for convenience.
Topics() ([]string, error)

// Partitions returns the sorted list of all partition IDs for the given topic.
// This method is the same as Client.Pertitions(), and is provided for convenience.
Partitions(topic string) ([]int32, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than mention delegation (technical aspect) mention that they are "the same as" and "provided for convenience" or something (UX aspect).


// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
// literal offset, or OffsetNewest or OffsetOldest
Expand Down Expand Up @@ -98,6 +107,14 @@ func (c *consumer) Close() error {
return nil
}

func (c *consumer) Topics() ([]string, error) {
return c.client.Topics()
}

func (c *consumer) Partitions(topic string) ([]int32, error) {
return c.client.Partitions(topic)
}

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
child := &partitionConsumer{
consumer: c,
Expand Down
43 changes: 43 additions & 0 deletions mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Consumer struct {
t ErrorReporter
config *sarama.Config
partitionConsumers map[string]map[int32]*PartitionConsumer
metadata map[string][]int32
}

// NewConsumer returns a new mock Consumer instance. The t argument should
Expand Down Expand Up @@ -62,6 +63,39 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
return pc, nil
}

// Topics returns a list of topics, as registered with SetMetadata
func (c *Consumer) Topics() ([]string, error) {
c.l.Lock()
defer c.l.Unlock()

if c.metadata == nil {
c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetMetadata.")
return nil, sarama.ErrOutOfBrokers
}

var result []string
for topic, _ := range c.metadata {
result = append(result, topic)
}
return result, nil
}

// Partitions returns the list of parititons for the given topic, as registered with SetMetadata
func (c *Consumer) Partitions(topic string) ([]int32, error) {
c.l.Lock()
defer c.l.Unlock()

if c.metadata == nil {
c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetMetadata.")
return nil, sarama.ErrOutOfBrokers
}
if c.metadata[topic] == nil {
return nil, sarama.ErrUnknownTopicOrPartition
}

return c.metadata[topic], nil
}

// Close implements the Close method from the sarama.Consumer interface. It will close
// all registered PartitionConsumer instances.
func (c *Consumer) Close() error {
Expand All @@ -81,6 +115,15 @@ func (c *Consumer) Close() error {
// Expectation API
///////////////////////////////////////////////////

// SetMetadata sets the clusters topic/partition metadata,
// which will be returned by Topics() and Partitions().
func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) {
c.l.Lock()
defer c.l.Unlock()

c.metadata = metadata
}

// ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
// The registered PartitionConsumer will be returned, so you can set expectations
// on it using method chanining. Once a topic/partition is registered, you are
Expand Down
59 changes: 58 additions & 1 deletion mocks/consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mocks

import (
"sort"
"testing"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -187,6 +188,62 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) {
}

if len(trm.errors) != 0 {
t.Errorf("Expected ano expectation failures to be set on the error reporter.")
t.Errorf("Expected no expectation failures to be set on the error reporter.")
}
}

func TestConsumerTopicMetadata(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)

consumer.SetTopicMetadata(map[string][]int32{
"test1": []int32{0, 1, 2, 3},
"test2": []int32{0, 1, 2, 3, 4, 5, 6, 7},
})

topics, err := consumer.Topics()
if err != nil {
t.Error(t)
}

sortedTopics := sort.StringSlice(topics)
sortedTopics.Sort()
if len(sortedTopics) != 2 || sortedTopics[0] != "test1" || sortedTopics[1] != "test2" {
t.Error("Unexpected topics returned:", sortedTopics)
}

partitions1, err := consumer.Partitions("test1")
if err != nil {
t.Error(t)
}

if len(partitions1) != 4 {
t.Error("Unexpected partitions returned:", len(partitions1))
}

partitions2, err := consumer.Partitions("test2")
if err != nil {
t.Error(t)
}

if len(partitions2) != 8 {
t.Error("Unexpected partitions returned:", len(partitions2))
}

if len(trm.errors) != 0 {
t.Errorf("Expected no expectation failures to be set on the error reporter.")
}
}

func TestConsumerUnexpectedTopicMetadata(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)

if _, err := consumer.Topics(); err != sarama.ErrOutOfBrokers {
t.Error("Expected sarama.ErrOutOfBrokers, found", err)
}

if len(trm.errors) != 1 {
t.Errorf("Expected an expectation failure to be set on the error reporter.")
}
}