Skip to content

Commit

Permalink
ListOffsetsRequest should only be sent to the leader replica (CI) (#4754
Browse files Browse the repository at this point in the history
)

as replicas reply with "NOT_LEADER_OR_FOLLOWER" when using the replica id, Java clients sends requests to the leader too.


Co-authored-by: Kyle Phelps <[email protected]>
  • Loading branch information
emasab and kphelps authored Oct 18, 2024
1 parent 88604ca commit 2810e5c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ librdkafka v2.6.1 is a maintenance release:
under some particular conditions (#4800).
* Fix for retrieving offset commit metadata when it contains
zeros and configured with `strndup` (#4876)
* Fix for a loop of ListOffset requests, happening in a Fetch From Follower
scenario, if such request is made to the follower (#4616, #4754, @kphelps).


## Fixes
Expand All @@ -32,6 +34,12 @@ librdkafka v2.6.1 is a maintenance release:
instead of rest of metadata. Solved by avoiding to use
`strndup` for copying metadata.
Happening since: 0.9.0 (#4876).
* Issues: #4616
When an out of range on a follower caused an offset reset, the corresponding
ListOffsets request is made to the follower, causing a repeated
"Not leader for partition" error. Fixed by sending the request always
to the leader.
Happening since 1.5.0 (tested version) or previous ones (#4616, #4754, @kphelps).



Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ static void rd_kafka_toppar_handle_Offset(rd_kafka_t *rk,

rd_kafka_toppar_lock(rktp);
/* Drop reply from previous partition leader */
if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_broker != rkb)
if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_leader != rkb)
err = RD_KAFKA_RESP_ERR__OUTDATED;
rd_kafka_toppar_unlock(rktp);

Expand Down Expand Up @@ -1549,7 +1549,7 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp,
rd_kafka_assert(NULL,
thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));

rkb = rktp->rktp_broker;
rkb = rktp->rktp_leader;

if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
backoff_ms = 500;
Expand Down
9 changes: 8 additions & 1 deletion tests/0104-fetch_from_follower_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

#include "test.h"


/**
* @name Fetch from follower tests using the mock broker.
*/
Expand Down Expand Up @@ -111,6 +110,14 @@ static void do_test_offset_reset(const char *auto_offset_reset) {
else
test_consumer_poll(auto_offset_reset, c, 0, 1, 0, msgcnt, NULL);

/* send another batch of messages to ensure the consumer isn't stuck
* sending ListOffsets to the replica and receiving
* NOT_LEADER_OR_FOLLOWER errors.
* See PR #4616 */
test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000,
"bootstrap.servers", bootstraps, NULL);
test_consumer_poll("ASSIGN", c, 0, 1, 0, msgcnt, NULL);

test_consumer_close(c);

rd_kafka_destroy(c);
Expand Down

0 comments on commit 2810e5c

Please sign in to comment.