Skip to content

Commit

Permalink
Consumer: reduce ticker allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
faceair committed Feb 7, 2018
1 parent 0f4f8ca commit 57f03c7
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,20 +441,20 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {

func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage
msgSent := false
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
firstAttempt := true

feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)

for i, msg := range msgs {
messageSelect:
select {
case child.messages <- msg:
msgSent = true
firstAttempt = true
case <-expiryTicker.C:
if !msgSent {
if !firstAttempt {
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
Expand All @@ -465,16 +465,16 @@ feederLoop:
} else {
// current message has not been sent, return to select
// statement
msgSent = false
firstAttempt = false
goto messageSelect
}
}
}

expiryTicker.Stop()
child.broker.acks.Done()
}

expiryTicker.Stop()
close(child.messages)
close(child.errors)
}
Expand Down

0 comments on commit 57f03c7

Please sign in to comment.