From 57f03c7bfa824b74b6782f5ba7614a4e70298be2 Mon Sep 17 00:00:00 2001 From: faceair Date: Wed, 24 Jan 2018 15:15:20 +0800 Subject: [PATCH] Consumer: reduce ticker allocations --- consumer.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/consumer.go b/consumer.go index 506e657f0..71ab48727 100644 --- a/consumer.go +++ b/consumer.go @@ -441,20 +441,20 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { var msgs []*ConsumerMessage - msgSent := false + expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime) + firstAttempt := true feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) - expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime) for i, msg := range msgs { messageSelect: select { case child.messages <- msg: - msgSent = true + firstAttempt = true case <-expiryTicker.C: - if !msgSent { + if !firstAttempt { child.responseResult = errTimedOut child.broker.acks.Done() for _, msg = range msgs[i:] { @@ -465,16 +465,16 @@ feederLoop: } else { // current message has not been sent, return to select // statement - msgSent = false + firstAttempt = false goto messageSelect } } } - expiryTicker.Stop() child.broker.acks.Done() } + expiryTicker.Stop() close(child.messages) close(child.errors) }