Skip to content

Commit

Permalink
Retry fetching committed offsets indefinitely (#3265)
Browse files Browse the repository at this point in the history
The consumer will now retry indefinitely (or until the assignment is changed)
to retrieve committed offsets. This fixes the issue where only two retries
were attempted when outstanding transactions were blocking OffsetFetch
requests with `ERR_UNSTABLE_OFFSET_COMMIT`.
  • Loading branch information
edenhill committed Feb 22, 2021
1 parent 1bba347 commit 439a5fc
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 17 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ librdkafka v1.6.1 is a maintenance release.
on the transaction coordinator. We now allow empty/no-op transactions to
be committed.

### Consumer fixes

* The consumer will now retry indefinitely (or until the assignment is changed)
to retrieve committed offsets. This fixes the issue where only two retries
were attempted when outstanding transactions were blocking OffsetFetch
requests with `ERR_UNSTABLE_OFFSET_COMMIT`. #3265




Expand Down
52 changes: 48 additions & 4 deletions src/rdkafka_assignment.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,27 @@ rd_kafka_assignment_apply_offsets (rd_kafka_t *rk,
continue;
}

if (rktpar->err) {
if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT ||
rktpar->err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) {
/* Ongoing transactions are blocking offset retrieval.
* This is typically retried from the OffsetFetch
* handler but we can come here if the assignment
* (and thus the assignment.version) was changed while
* the OffsetFetch request was in-flight, in which case
* we put this partition back on the pending list for
* later handling by the assignment state machine. */

rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
"Adding %s [%"PRId32"] back to pending "
"list because on-going transaction is "
"blocking offset retrieval",
rktpar->topic,
rktpar->partition);

rd_kafka_topic_partition_list_add_copy(
rk->rk_consumer.assignment.pending, rktpar);

} else if (rktpar->err) {
/* Partition-level error */
rd_kafka_consumer_err(
rk->rk_consumer.q, RD_KAFKA_NODEID_UA,
Expand Down Expand Up @@ -202,6 +222,11 @@ rd_kafka_assignment_apply_offsets (rd_kafka_t *rk,

/**
* @brief Reply handler for OffsetFetch queries from the assignment code.
*
* @param opaque Is a malloced int64_t* containing the assignment version at the
* time of the request.
*
* @locality rdkafka main thread
*/
static void
rd_kafka_assignment_handle_OffsetFetch (rd_kafka_t *rk,
Expand All @@ -211,25 +236,34 @@ rd_kafka_assignment_handle_OffsetFetch (rd_kafka_t *rk,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_topic_partition_list_t *offsets = NULL;
int64_t *req_assignment_version = (int64_t *)opaque;
/* Only allow retries if there's been no change to the assignment,
* otherwise rely on assignment state machine to retry. */
rd_bool_t allow_retry = *req_assignment_version ==
rk->rk_consumer.assignment.version;

if (err == RD_KAFKA_RESP_ERR__DESTROY) {
/* Termination, quick cleanup. */
rd_free(req_assignment_version);
return;
}

err = rd_kafka_handle_OffsetFetch(rk, rkb, err,
reply, request, &offsets,
rd_true/* Update toppars */,
rd_true/* Add parts */);
rd_true/* Add parts */,
allow_retry);
if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
if (offsets)
rd_kafka_topic_partition_list_destroy(offsets);
return; /* retrying */
}

rd_free(req_assignment_version);

/* offsets may be NULL for certain errors, such
* as ERR__TRANSPORT. */
if (!offsets) {
if (!offsets && !allow_retry) {
rd_dassert(err);
if (!err)
err = RD_KAFKA_RESP_ERR__NO_OFFSET;
Expand Down Expand Up @@ -505,6 +539,9 @@ rd_kafka_assignment_serve_pending (rd_kafka_t *rk) {


if (partitions_to_query->cnt > 0) {
int64_t *req_assignment_version = rd_malloc(sizeof(int64_t));
*req_assignment_version = rk->rk_consumer.assignment.version;

rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
"Fetching committed offsets for "
"%d pending partition(s) in assignment",
Expand All @@ -517,7 +554,8 @@ rd_kafka_assignment_serve_pending (rd_kafka_t *rk) {
RD_KAFKA_READ_COMMITTED/*require_stable*/,
RD_KAFKA_REPLYQ(rk->rk_ops, 0),
rd_kafka_assignment_handle_OffsetFetch,
NULL);
/* Must be freed by handler */
(void *)req_assignment_version);
}

if (coord)
Expand Down Expand Up @@ -629,6 +667,8 @@ int rd_kafka_assignment_clear (rd_kafka_t *rk) {
rk->rk_consumer.assignment.all);
rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.all);

rk->rk_consumer.assignment.version++;

return cnt;
}

Expand Down Expand Up @@ -726,6 +766,8 @@ rd_kafka_assignment_add (rd_kafka_t *rk,
rk->rk_consumer.assignment.pending->cnt,
rk->rk_consumer.assignment.queried->cnt);

rk->rk_consumer.assignment.version++;

return NULL;
}

Expand Down Expand Up @@ -816,6 +858,8 @@ rd_kafka_assignment_subtract (rd_kafka_t *rk,
rd_assert(rk->rk_consumer.assignment.queried->cnt == 0);
}

rk->rk_consumer.assignment.version++;

return NULL;
}

Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ typedef struct rd_kafka_assignment_s {
int started_cnt;
/** Number of partitions being stopped. */
int wait_stop_cnt;
/** Assignment version: any change to the assignment will bump this
* version by one. This is used to know if a protocol response is
* outdated or not.
* @locks_required none
* @locality rdkafka main thread */
int64_t version;
} rd_kafka_assignment_t;


Expand Down
10 changes: 10 additions & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -712,4 +712,14 @@ void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko);
!(rko)->rko_err && \
((rko)->rko_u.fetch.rkm.rkm_flags & RD_KAFKA_MSG_F_CONTROL))



/**
* @returns true if the rko's replyq is valid and the
* rko's rktp version (if any) is not outdated.
*/
#define rd_kafka_op_replyq_is_valid(RKO) \
(rd_kafka_replyq_is_valid(&(RKO)->rko_replyq) && \
!rd_kafka_op_version_outdated((RKO), 0))

#endif /* _RDKAFKA_OP_H_ */
5 changes: 3 additions & 2 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -2209,7 +2209,9 @@ rd_kafka_toppar_op_serve (rd_kafka_t *rk,


/* Propagate error to application */
if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD) {
if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD &&
rko->rko_err !=
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT)
rd_kafka_consumer_err(
rktp->rktp_fetchq,
RD_KAFKA_NODEID_UA,
Expand All @@ -2219,7 +2221,6 @@ rd_kafka_toppar_op_serve (rd_kafka_t *rk,
"Failed to fetch "
"offsets from brokers: %s",
rd_kafka_err2str(rko->rko_err));
}

rd_kafka_toppar_destroy(rktp);

Expand Down
20 changes: 20 additions & 0 deletions src/rdkafka_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,26 @@ rd_kafka_replyq_make (rd_kafka_q_t *rkq, int version, const char *id) {
#define RD_KAFKA_NO_REPLYQ (rd_kafka_replyq_t){NULL, 0}
#endif


/**
* @returns true if the replyq is valid, else false.
*/
static RD_INLINE RD_UNUSED rd_bool_t
rd_kafka_replyq_is_valid (rd_kafka_replyq_t *replyq) {
rd_bool_t valid = rd_true;

if (!replyq->q)
return rd_false;

rd_kafka_q_lock(replyq->q);
valid = rd_kafka_q_ready(replyq->q);
rd_kafka_q_unlock(replyq->q);

return valid;
}



/**
* Set up replyq.
* Q refcnt is increased.
Expand Down
16 changes: 12 additions & 4 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,8 @@ rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
rd_kafka_buf_t *request,
rd_kafka_topic_partition_list_t **offsets,
rd_bool_t update_toppar,
rd_bool_t add_part) {
rd_bool_t add_part,
rd_bool_t allow_retry) {
const int log_decode_errors = LOG_ERR;
int32_t TopicArrayCnt;
int64_t offset = RD_KAFKA_OFFSET_INVALID;
Expand Down Expand Up @@ -805,7 +806,7 @@ rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
}

if (actions & RD_KAFKA_ERR_ACTION_RETRY || retry_unstable) {
if (rd_kafka_buf_retry(rkb, request))
if (allow_retry && rd_kafka_buf_retry(rkb, request))
return RD_KAFKA_RESP_ERR__IN_PROGRESS;
/* FALLTHRU */
}
Expand Down Expand Up @@ -865,7 +866,11 @@ void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
err = rd_kafka_handle_OffsetFetch(rkb->rkb_rk, rkb, err, rkbuf,
request, &offsets,
rd_false/*dont update rktp*/,
rd_false/*dont add part*/);
rd_false/*dont add part*/,
/* Allow retries if replyq
* is valid */
rd_kafka_op_replyq_is_valid(
rko));
if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
if (offsets)
rd_kafka_topic_partition_list_destroy(offsets);
Expand Down Expand Up @@ -940,7 +945,7 @@ void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,

if (ApiVersion >= 7) {
/* RequireStable */
rd_kafka_buf_write_i8(rkbuf, 0xaa); //require_stable);
rd_kafka_buf_write_i8(rkbuf, require_stable);
}

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
Expand All @@ -959,6 +964,9 @@ void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
return;
}

/* Let handler decide if retries should be performed */
rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES;

rd_rkb_dbg(rkb, CGRP|RD_KAFKA_DBG_CONSUMER, "OFFSET",
"Fetch committed offsets for %d/%d partition(s)",
PartCnt, parts->cnt);
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
rd_kafka_buf_t *request,
rd_kafka_topic_partition_list_t **offsets,
rd_bool_t update_toppar,
rd_bool_t add_part);
rd_bool_t add_part,
rd_bool_t allow_retry);

void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
Expand Down
Loading

0 comments on commit 439a5fc

Please sign in to comment.