diff --git a/CHANGELOG.md b/CHANGELOG.md index 390d86c7a..4b4bde6ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ Improvements: ([#634](https://github.com/Shopify/sarama/pull/634)). - Pre-allocate decoding errors, greatly reducing heap usage and GC time against misbehaving brokers ([#690](https://github.com/Shopify/sarama/pull/690)). + - Re-use consumer expiry timers, removing one allocation per consumed message + ([#707](https://github.com/Shopify/sarama/pull/707)). Bug Fixes: - Actually default the client ID to "sarama" like we say we do diff --git a/consumer.go b/consumer.go index c001794e1..c70b528f0 100644 --- a/consumer.go +++ b/consumer.go @@ -413,15 +413,18 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { var msgs []*ConsumerMessage + expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime) feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) for i, msg := range msgs { + expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) + select { case child.messages <- msg: - case <-time.After(child.conf.Consumer.MaxProcessingTime): + case <-expiryTimer.C: child.responseResult = errTimedOut child.broker.acks.Done() for _, msg = range msgs[i:] {