Skip to content

Commit

Permalink
Message err was not set for on_ack interceptors on broker reply (#1892)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Aug 24, 2018
1 parent c16998b commit ee8729a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,7 @@ void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt,
return;

/* Call on_acknowledgement() interceptors */
rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq);
rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq, err);

if ((rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR) &&
(!rk->rk_conf.dr_err_only || err)) {
Expand Down
8 changes: 7 additions & 1 deletion src/rdkafka_interceptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -453,14 +453,20 @@ rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk,

/**
* @brief Call on_acknowledgement methods for all messages in queue.
*
* @param force_err If non-zero, sets this error on each message.
*
* @locality broker thread
*/
void
rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk,
rd_kafka_msgq_t *rkmq) {
rd_kafka_msgq_t *rkmq,
rd_kafka_resp_err_t force_err) {
rd_kafka_msg_t *rkm;

RD_KAFKA_MSGQ_FOREACH(rkm, rkmq) {
if (force_err)
rkm->rkm_err = force_err;
rd_kafka_interceptors_on_acknowledgement(rk,
&rkm->rkm_rkmessage);
}
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_interceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk,
rd_kafka_message_t *rkmessage);
void
rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk,
rd_kafka_msgq_t *rkmq);
rd_kafka_msgq_t *rkmq,
rd_kafka_resp_err_t force_err);

void rd_kafka_interceptors_on_consume (rd_kafka_t *rk,
rd_kafka_message_t *rkmessage);
Expand Down

0 comments on commit ee8729a

Please sign in to comment.