From 2052bd96dcab85d948d0160df9f9c812f39e7641 Mon Sep 17 00:00:00 2001 From: Willem van Bergen Date: Mon, 20 Apr 2015 13:19:15 -0400 Subject: [PATCH] Add Topics() and Partitions to Consumer, and mock Consumer. --- consumer.go | 17 ++++++++++++ mocks/consumer.go | 43 ++++++++++++++++++++++++++++++ mocks/consumer_test.go | 59 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 118 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index 99897b950..2a7a94a8d 100644 --- a/consumer.go +++ b/consumer.go @@ -39,6 +39,15 @@ func (ce ConsumerErrors) Error() string { // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of // scope. type Consumer interface { + + // Topics returns the set of available topics as retrieved from the cluster metadata. + // This method is the same as Client.Topics(), and is provided for convenience. + Topics() ([]string, error) + + // Partitions returns the sorted list of all partition IDs for the given topic. + // This method is the same as Client.Pertitions(), and is provided for convenience. + Partitions(topic string) ([]int32, error) + // ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will // return an error if this Consumer is already consuming on the given topic/partition. Offset can be a // literal offset, or OffsetNewest or OffsetOldest @@ -98,6 +107,14 @@ func (c *consumer) Close() error { return nil } +func (c *consumer) Topics() ([]string, error) { + return c.client.Topics() +} + +func (c *consumer) Partitions(topic string) ([]int32, error) { + return c.client.Partitions(topic) +} + func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) { child := &partitionConsumer{ consumer: c, diff --git a/mocks/consumer.go b/mocks/consumer.go index 0aa8389c6..ff851e7f5 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -14,6 +14,7 @@ type Consumer struct { t ErrorReporter config *sarama.Config partitionConsumers map[string]map[int32]*PartitionConsumer + metadata map[string][]int32 } // NewConsumer returns a new mock Consumer instance. The t argument should @@ -62,6 +63,39 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) return pc, nil } +// Topics returns a list of topics, as registered with SetMetadata +func (c *Consumer) Topics() ([]string, error) { + c.l.Lock() + defer c.l.Unlock() + + if c.metadata == nil { + c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetMetadata.") + return nil, sarama.ErrOutOfBrokers + } + + var result []string + for topic, _ := range c.metadata { + result = append(result, topic) + } + return result, nil +} + +// Partitions returns the list of parititons for the given topic, as registered with SetMetadata +func (c *Consumer) Partitions(topic string) ([]int32, error) { + c.l.Lock() + defer c.l.Unlock() + + if c.metadata == nil { + c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetMetadata.") + return nil, sarama.ErrOutOfBrokers + } + if c.metadata[topic] == nil { + return nil, sarama.ErrUnknownTopicOrPartition + } + + return c.metadata[topic], nil +} + // Close implements the Close method from the sarama.Consumer interface. It will close // all registered PartitionConsumer instances. func (c *Consumer) Close() error { @@ -81,6 +115,15 @@ func (c *Consumer) Close() error { // Expectation API /////////////////////////////////////////////////// +// SetMetadata sets the clusters topic/partition metadata, +// which will be returned by Topics() and Partitions(). +func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) { + c.l.Lock() + defer c.l.Unlock() + + c.metadata = metadata +} + // ExpectConsumePartition will register a topic/partition, so you can set expectations on it. // The registered PartitionConsumer will be returned, so you can set expectations // on it using method chanining. Once a topic/partition is registered, you are diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 5041256e0..50dad3a69 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -1,6 +1,7 @@ package mocks import ( + "sort" "testing" "github.com/Shopify/sarama" @@ -187,6 +188,62 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { } if len(trm.errors) != 0 { - t.Errorf("Expected ano expectation failures to be set on the error reporter.") + t.Errorf("Expected no expectation failures to be set on the error reporter.") + } +} + +func TestConsumerTopicMetadata(t *testing.T) { + trm := newTestReporterMock() + consumer := NewConsumer(trm, nil) + + consumer.SetTopicMetadata(map[string][]int32{ + "test1": []int32{0, 1, 2, 3}, + "test2": []int32{0, 1, 2, 3, 4, 5, 6, 7}, + }) + + topics, err := consumer.Topics() + if err != nil { + t.Error(t) + } + + sortedTopics := sort.StringSlice(topics) + sortedTopics.Sort() + if len(sortedTopics) != 2 || sortedTopics[0] != "test1" || sortedTopics[1] != "test2" { + t.Error("Unexpected topics returned:", sortedTopics) + } + + partitions1, err := consumer.Partitions("test1") + if err != nil { + t.Error(t) + } + + if len(partitions1) != 4 { + t.Error("Unexpected partitions returned:", len(partitions1)) + } + + partitions2, err := consumer.Partitions("test2") + if err != nil { + t.Error(t) + } + + if len(partitions2) != 8 { + t.Error("Unexpected partitions returned:", len(partitions2)) + } + + if len(trm.errors) != 0 { + t.Errorf("Expected no expectation failures to be set on the error reporter.") + } +} + +func TestConsumerUnexpectedTopicMetadata(t *testing.T) { + trm := newTestReporterMock() + consumer := NewConsumer(trm, nil) + + if _, err := consumer.Topics(); err != sarama.ErrOutOfBrokers { + t.Error("Expected sarama.ErrOutOfBrokers, found", err) + } + + if len(trm.errors) != 1 { + t.Errorf("Expected an expectation failure to be set on the error reporter.") } }