Skip to content

Commit

Permalink
Fix duplicate messages with
Browse files Browse the repository at this point in the history
cooperative assignor
  • Loading branch information
emasab committed Mar 6, 2024
1 parent 3451583 commit 94fd544
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
# librdkafka v2.3.1

librdkafka v2.3.1 is a feature release:
librdkafka v2.3.1 is a maintenance release:

* Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes,
check the [release notes](https://www.openssl.org/news/cl30.txt).
* Fixed a bug causing duplicate message consumption from a stale
fetch start offset in some particular cases (#4636)


## Fixes

### Consumer fixes

* In case of subscription change with a consumer using the cooperative assignor
it could resume fetching from a previous position.
That could also happen if resuming a partition that wasn't paused.
Fixed by ensuring that a resume operation is completely a no-op when
the partition isn't paused (#4636).



# librdkafka v2.3.0
Expand Down
18 changes: 16 additions & 2 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -2299,7 +2299,22 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp,
int flag,
rd_kafka_replyq_t replyq) {
int32_t version;
rd_kafka_op_t *rko;
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);

if (!pause) {
/* If partitions isn't paused, avoid bumping its version,
* as it'll result in resuming fetches from a stale
* next_fetch_start */
rd_bool_t paused = rd_false;
rd_kafka_toppar_lock(rktp);
paused = RD_KAFKA_TOPPAR_IS_PAUSED(rktp);
rd_kafka_toppar_unlock(rktp);
if (!paused) {
rko->rko_replyq = replyq;
rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
}

/* Bump version barrier. */
version = rd_kafka_toppar_version_new_barrier(rktp);
Expand All @@ -2310,7 +2325,6 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t *rktp,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, version);

rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
rko->rko_version = version;
rko->rko_u.pause.pause = pause;
rko->rko_u.pause.flag = flag;
Expand Down

0 comments on commit 94fd544

Please sign in to comment.