Skip to content

Commit

Permalink
Fix for idempotent producer fatal errors, triggered after a possibly …
Browse files Browse the repository at this point in the history
…persisted message state (#4438)
  • Loading branch information
emasab authored and anchitj committed Nov 26, 2024
1 parent e75de5b commit 7ff425f
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 16 deletions.
6 changes: 3 additions & 3 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: v1.0
name: 'librdkafka build and release artifact pipeline'
agent:
machine:
type: s1-prod-macos-arm64
type: s1-prod-macos-13-5-arm64
global_job_config:
prologue:
commands:
Expand All @@ -15,7 +15,7 @@ blocks:
task:
agent:
machine:
type: s1-prod-macos-arm64
type: s1-prod-macos-13-5-arm64
env_vars:
- name: ARTIFACT_KEY
value: p-librdkafka__plat-osx__arch-arm64__lnk-all
Expand All @@ -41,7 +41,7 @@ blocks:
task:
agent:
machine:
type: s1-prod-macos
type: s1-prod-macos-13-5-amd64
env_vars:
- name: ARTIFACT_KEY
value: p-librdkafka__plat-osx__arch-x64__lnk-all
Expand Down
17 changes: 16 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ librdkafka v2.2.0 is a feature release:
Add DNS alias support for secured connection (#4292).
* [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API):
IncrementalAlterConfigs API (started by @PrasanthV454, #4110).
* [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): Add Broker-side SCRAM Config API (#4241).
* [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): Add Broker-side SCRAM Config API (#4241).
* Fix for idempotent producer fatal errors, triggered after a possibly persisted message state (#4438).


## Enhancements
Expand Down Expand Up @@ -72,6 +73,20 @@ librdkafka v2.2.0 is a feature release:
assignment completely.


### Idempotent producer fixes

* After a possibly persisted error, such as a disconnection or a timeout, next expected sequence
used to increase, leading to a fatal error if the message wasn't persisted and
the second one in queue failed with an `OUT_OF_ORDER_SEQUENCE_NUMBER`.
The error could contain the message "sequence desynchronization" with
just one possibly persisted error or "rewound sequence number" in case of
multiple errored messages.
Solved by treating the possible persisted message as _not_ persisted,
and expecting a `DUPLICATE_SEQUENCE_NUMBER` error in case it was or
`NO_ERROR` in case it wasn't, in both cases the message will be considered
delivered (#4438).



# librdkafka v2.1.1

Expand Down
19 changes: 7 additions & 12 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -3336,17 +3336,12 @@ static int rd_kafka_handle_Produce_error(rd_kafka_broker_t *rkb,
* which should not be treated as a fatal error
* since this request and sub-sequent requests
* will be retried and thus return to order.
* Unless the error was a timeout, or similar,
* in which case the request might have made it
* and the messages are considered possibly persisted:
* in this case we allow the next in-flight response
* to be successful, in which case we mark
* this request's messages as succesfully delivered. */
if (perr->status &
RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED)
perr->update_next_ack = rd_true;
else
perr->update_next_ack = rd_false;
* In case the message is possibly persisted
* we still treat it as not persisted,
* expecting DUPLICATE_SEQUENCE_NUMBER
* in case it was persisted or NO_ERROR in case
* it wasn't. */
perr->update_next_ack = rd_false;
perr->update_next_err = rd_true;

/* Drain outstanding requests so that retries
Expand Down Expand Up @@ -3627,7 +3622,7 @@ static void rd_kafka_msgbatch_handle_Produce_result(
.err = err,
.incr_retry = 1,
.status = status,
.update_next_ack = rd_true,
.update_next_ack = rd_false,
.update_next_err = rd_true,
.last_seq = (batch->first_seq +
rd_kafka_msgq_len(&batch->msgq) - 1)};
Expand Down
Loading

0 comments on commit 7ff425f

Please sign in to comment.