Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up the flusher via helper methods #512

Merged
merged 1 commit into from
Aug 13, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 74 additions & 58 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,10 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
go withRecover(a.run)

f := &flusher{
parent: p,
broker: broker,
input: bridge,
parent: p,
broker: broker,
input: bridge,
currentRetries: make(map[string]map[int32]error),
}
go withRecover(f.run)

Expand Down Expand Up @@ -582,17 +583,18 @@ shutdown:
close(a.output)
}

// one per broker
// takes a batch at a time from the messageAggregator and sends to the broker
// takes a batch at a time from the aggregator and sends to the broker
type flusher struct {
parent *asyncProducer
broker *Broker
input <-chan []*ProducerMessage

currentRetries map[string]map[int32]error
}

func (f *flusher) run() {
var closing error
currentRetries := make(map[string]map[int32]error)

Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())

for batch := range f.input {
Expand All @@ -601,30 +603,7 @@ func (f *flusher) run() {
continue
}

// group messages by topic/partition
msgSets := make(map[string]map[int32][]*ProducerMessage)
for i, msg := range batch {
if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.Partition] != nil {
if msg.flags&chaser == chaser {
// we can start processing this topic/partition again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
f.broker.ID(), msg.Topic, msg.Partition)
currentRetries[msg.Topic][msg.Partition] = nil
}
f.parent.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
batch[i] = nil // to prevent it being returned/retried twice
continue
}

partitionSet := msgSets[msg.Topic]
if partitionSet == nil {
partitionSet = make(map[int32][]*ProducerMessage)
msgSets[msg.Topic] = partitionSet
}

partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
}

msgSets := f.groupAndFilter(batch)
request := f.parent.buildRequest(msgSets)
if request == nil {
continue
Expand Down Expand Up @@ -653,40 +632,77 @@ func (f *flusher) run() {
continue
}

// we iterate through the blocks in the request, not the response, so that we notice
// if the response is missing a block completely
for topic, partitionSet := range msgSets {
for partition, msgs := range partitionSet {
f.parseResponse(msgSets, response)
}
Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
}

block := response.GetBlock(topic, partition)
if block == nil {
f.parent.returnErrors(msgs, ErrIncompleteResponse)
continue
}
func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
msgSets := make(map[string]map[int32][]*ProducerMessage)

switch block.Err {
case ErrNoError:
// All the messages for this topic-partition were delivered successfully!
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
f.parent.returnSuccesses(msgs)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
f.broker.ID(), topic, partition, block.Err)
if currentRetries[topic] == nil {
currentRetries[topic] = make(map[int32]error)
}
currentRetries[topic][partition] = block.Err
f.parent.retryMessages(msgs, block.Err)
default:
f.parent.returnErrors(msgs, block.Err)
for i, msg := range batch {

if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
// we're currently retrying this partition so we need to filter out this message
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
batch[i] = nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually some functional changes here; I re-orged a few of these lines so that we retry here before we do the chaser check; I don't think this really makes any difference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I can't really think that would disrupt anything - if anything this ordering is more logical.


if msg.flags&chaser == chaser {
// ...but now we can start processing future messages again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
f.broker.ID(), msg.Topic, msg.Partition)
delete(f.currentRetries[msg.Topic], msg.Partition)
}

continue
}

partitionSet := msgSets[msg.Topic]
if partitionSet == nil {
partitionSet = make(map[int32][]*ProducerMessage)
msgSets[msg.Topic] = partitionSet
}

partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
}

return msgSets
}

func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) {
// we iterate through the blocks in the request set, not the response, so that we notice
// if the response is missing a block completely
for topic, partitionSet := range msgSets {
for partition, msgs := range partitionSet {
block := response.GetBlock(topic, partition)
if block == nil {
f.parent.returnErrors(msgs, ErrIncompleteResponse)
continue
}

switch block.Err {
// Success
case ErrNoError:
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
f.parent.returnSuccesses(msgs)
// Retriable errors
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
f.broker.ID(), topic, partition, block.Err)
if f.currentRetries[topic] == nil {
f.currentRetries[topic] = make(map[int32]error)
}
f.currentRetries[topic][partition] = block.Err
f.parent.retryMessages(msgs, block.Err)
// Other non-retriable errors
default:
f.parent.returnErrors(msgs, block.Err)
}
}
}
Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
}

// singleton
Expand Down