Skip to content

Commit

Permalink
Misc. fixes and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache committed Sep 27, 2015
1 parent 3a54e97 commit 8a9748b
Showing 1 changed file with 33 additions and 25 deletions.
58 changes: 33 additions & 25 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func (bp *brokerProducer) run() {
}

if reason := bp.retryReason(msg); reason != nil {
bp.parent.retryMessages([]*ProducerMessage{msg}, reason)
bp.parent.retryMessage(msg, reason)
continue
}

Expand Down Expand Up @@ -620,26 +620,27 @@ func (bp *brokerProducer) retryReason(msg *ProducerMessage) error {
if bp.currentRetries[msg.Topic] != nil {
err := bp.currentRetries[msg.Topic][msg.Partition]
if err != nil && msg.flags&chaser == chaser {
// we're currently retrying this partition so we need to filter out this message
// but now we can start processing future messages again
// we were retrying this partition but we can start processing again
Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
delete(bp.currentRetries[msg.Topic], msg.Partition)
}
return err
}

return nil
}

func (bp *brokerProducer) wait() {
if bp.pending != nil {
select {
case response := <-bp.responses:
bp.handleResponse(response)
case err := <-bp.errors:
bp.handleError(err)
}
if bp.pending == nil {
return
}

select {
case response := <-bp.responses:
bp.handleResponse(response)
case err := <-bp.errors:
bp.handleError(err)
}
}

Expand Down Expand Up @@ -674,12 +675,8 @@ func (bp *brokerProducer) handleResponse(response *ProduceResponse) {
switch block.Err {
// Success
case ErrNoError:
i := 0
for _, msg := range msgs {
if msg != nil {
msg.Offset = block.Offset + int64(i)
i++
}
for i, msg := range msgs {
msg.Offset = block.Offset + int64(i)
}
bp.parent.returnSuccesses(msgs)
// Retriable errors
Expand All @@ -692,11 +689,12 @@ func (bp *brokerProducer) handleResponse(response *ProduceResponse) {
}
bp.currentRetries[topic][partition] = block.Err
bp.parent.retryMessages(msgs, block.Err)
// Other non-retriable errors
// Other non-retriable errors
default:
bp.parent.returnErrors(msgs, block.Err)
}
})

bp.pending = nil
}

Expand All @@ -715,6 +713,8 @@ func (bp *brokerProducer) handleError(err error) {
bp.parent.retryMessages(msgs, err)
})
}

bp.pending = nil
}

// takes a set at a time from the brokerProducer and sends to the broker
Expand Down Expand Up @@ -776,8 +776,8 @@ type partitionSet struct {
}

type produceSet struct {
msgs map[string]map[int32]*partitionSet
parent *asyncProducer
msgs map[string]map[int32]*partitionSet

bufferBytes int
bufferCount int
Expand Down Expand Up @@ -845,7 +845,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: valBytes})
req.AddMessage(topic, partition, &Message{
Codec: ps.parent.conf.Producer.Compression,
Key: nil,
Value: valBytes,
})

This comment has been minimized.

Copy link
@wvanbergen

wvanbergen Sep 27, 2015

Contributor

Might be nice to add some documentation comments here to explain how a set of compressed messages works. When I first saw this, I was thinking: "why a we hardcoding nil as key here?

This comment has been minimized.

Copy link
@eapache

eapache Sep 27, 2015

Author Contributor

done

}
}
}
Expand Down Expand Up @@ -948,14 +952,18 @@ func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
}
}

func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, err)
} else {
msg.retries++
p.retries <- msg
}
}

func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
for _, msg := range batch {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, err)
} else {
msg.retries++
p.retries <- msg
}
p.retryMessage(msg, err)
}
}

Expand Down

0 comments on commit 8a9748b

Please sign in to comment.