Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send messages and errors directly in consumer mock #555

Merged
merged 2 commits into from
Oct 28, 2015
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 20 additions & 38 deletions mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
}

pc.consumed = true
go pc.handleExpectations()
return pc, nil
}

Expand Down Expand Up @@ -141,13 +140,12 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset

if c.partitionConsumers[topic][partition] == nil {
c.partitionConsumers[topic][partition] = &PartitionConsumer{
t: c.t,
topic: topic,
partition: partition,
offset: offset,
expectations: make(chan *consumerExpectation, 1000),
messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
t: c.t,
topic: topic,
partition: partition,
offset: offset,
messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
}
}

Expand All @@ -169,7 +167,6 @@ type PartitionConsumer struct {
topic string
partition int32
offset int64
expectations chan *consumerExpectation
messages chan *sarama.ConsumerMessage
errors chan *sarama.ConsumerError
singleClose sync.Once
Expand All @@ -179,40 +176,15 @@ type PartitionConsumer struct {
highWaterMarkOffset int64
}

func (pc *PartitionConsumer) handleExpectations() {
pc.l.Lock()
defer pc.l.Unlock()

for ex := range pc.expectations {
if ex.Err != nil {
pc.errors <- &sarama.ConsumerError{
Topic: pc.topic,
Partition: pc.partition,
Err: ex.Err,
}
} else {
atomic.AddInt64(&pc.highWaterMarkOffset, 1)

ex.Msg.Topic = pc.topic
ex.Msg.Partition = pc.partition
ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)

pc.messages <- ex.Msg
}
}

close(pc.messages)
close(pc.errors)
}

///////////////////////////////////////////////////
// PartitionConsumer interface implementation
///////////////////////////////////////////////////

// AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
func (pc *PartitionConsumer) AsyncClose() {
pc.singleClose.Do(func() {
close(pc.expectations)
close(pc.messages)
close(pc.errors)
})
}

Expand Down Expand Up @@ -289,7 +261,13 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
// verify that the channel is empty on close.
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
pc.expectations <- &consumerExpectation{Msg: msg}
pc.highWaterMarkOffset += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this still need to be atomic and/or locked somehow? I guess it depends on if people expect to be able to call this method multiple times concurrently, but I don't know if that's true (it was safe to call the old one concurrently).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's atomic in sarama, so I prefer the mock to be atomic as well to match the sarama API as well as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the atomic operations because it's no longer executed in a goroutine... then again, handleExpectations uses a for loop anyway, so that alone would not lead to race conditions. So I guess it's not totally clear to me why they were there.

If atomic operations are needed in case someone calls code concurrently, that means that plain assignments are never safe in public APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In sarama it's atomic, because it's possible to have the processing lag monitoring in a separate goroutine. So it's not so much about internal use inside the mock, but more about how the API is used.


msg.Topic = pc.topic
msg.Partition = pc.partition
msg.Offset = pc.highWaterMarkOffset

pc.messages <- msg
}

// YieldError will yield an error on the Errors channel of this partition consumer
Expand All @@ -298,7 +276,11 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
// not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
// the channel is empty on close.
func (pc *PartitionConsumer) YieldError(err error) {
pc.expectations <- &consumerExpectation{Err: err}
pc.errors <- &sarama.ConsumerError{
Topic: pc.topic,
Partition: pc.partition,
Err: err,
}
}

// ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
Expand Down