Skip to content

Commit

Permalink
Reduce internal client poll timeout for consumer iterator interface (#…
Browse files Browse the repository at this point in the history
…1824)

More attempts to address heartbeat timing issues in consumers, especially with the iterator interface. Here we can reduce the `client.poll` timeout to at most the retry backoff (typically 100ms) so that the consumer iterator interface doesn't block for longer than the heartbeat timeout.
  • Loading branch information
dpkp authored and jeffwidman committed Aug 16, 2019
1 parent ace6af5 commit 5bc2529
Showing 1 changed file with 1 addition and 3 deletions.
4 changes: 1 addition & 3 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,9 +1086,7 @@ def _message_generator(self):
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)

poll_ms = 1000 * (self._consumer_timeout - time.time())
if not self._fetcher.in_flight_fetches():
poll_ms = min(poll_ms, self.config['reconnect_backoff_ms'])
poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms'])
self._client.poll(timeout_ms=poll_ms)

# after the long poll, we should check whether the group needs to rebalance
Expand Down

0 comments on commit 5bc2529

Please sign in to comment.