Skip to content

Commit

Permalink
Fix to main loop timeout calculation (#4671)
Browse files Browse the repository at this point in the history
leading to a tight loop for a max period of 1 ms

When the main thread loop was awakened less than 1 ms
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x
  • Loading branch information
emasab authored Apr 5, 2024
1 parent a6c8cec commit 807b23a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ librdkafka v2.3.1 is a maintenance release:
* Integration tests can be started in KRaft mode and run against any
GitHub Kafka branch other than the released versions.
* Fix pipeline inclusion of static binaries (#4666)
* Fix to main loop timeout calculation leading to a tight loop for a
max period of 1 ms (#4671).


## Fixes
Expand All @@ -20,6 +22,10 @@ librdkafka v2.3.1 is a maintenance release:
Solved by correctly excluding the binary configured with that library,
when targeting a static build.
Happening since v2.0.2, with specified platforms, when using static binaries (#4666).
* When the main thread loop was awakened less than 1 ms
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4671).



Expand Down
5 changes: 4 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2120,7 +2120,10 @@ static int rd_kafka_thread_main(void *arg) {
RD_KAFKA_CGRP_STATE_TERM)))) {
rd_ts_t sleeptime = rd_kafka_timers_next(
&rk->rk_timers, 1000 * 1000 /*1s*/, 1 /*lock*/);
rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
/* Use ceiling division to avoid calling serve with a 0 ms
* timeout in a tight loop until 1 ms has passed. */
int timeout_ms = (sleeptime + 999) / 1000;
rd_kafka_q_serve(rk->rk_ops, timeout_ms, 0,
RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
rd_kafka_cgrp_serve(rk->rk_cgrp);
Expand Down

0 comments on commit 807b23a

Please sign in to comment.