Skip to content

Commit

Permalink
Merge pull request confluentinc#8 from kphelps/kphelps/multibatch
Browse files Browse the repository at this point in the history
Add support for multi-batch Produce request and response
  • Loading branch information
kphelps authored Oct 10, 2024
2 parents 10f988f + b4f99ab commit e05f845
Show file tree
Hide file tree
Showing 13 changed files with 628 additions and 89 deletions.
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ delivery.report.only.error | P | true, false | false
dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb()) <br>*Type: see dedicated API*
dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb()) <br>*Type: see dedicated API*
sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages. <br>*Type: integer*
multibatch | P | true, false | false | low | Batch produce requests across multiple partitions. <br>*Type: boolean*
client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. **WARNING**: `resolve_canonical_bootstrap_servers_only` must only be used with `GSSAPI` (Kerberos) as `sasl.mechanism`, as it's the only purpose of this configuration value. **NOTE**: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname. <br>*Type: enum value*
enable.metrics.push | * | true, false | true | low | Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client <br>*Type: boolean*

Expand Down
38 changes: 38 additions & 0 deletions src/rdbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,44 @@ size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size) {
return size;
}

/**
* @brief This is mostly a copy/paste function of rd_slice_read. Both
* functions read \p size bytes from current read position,
* advancing the read offset by the number of bytes copied. The
* difference is that instead of copying into a buffer, this function
* writes into a \p rd_buf_t.
* If there are less than \p size remaining in the buffer
* then 0 is returned and no bytes are copied.
*
* @returns \p size, or 0 if \p size bytes are not available in buffer.
*
* @remark This performs a complete read, no partitial reads.
*/
size_t rd_slice_read_into_buf(rd_slice_t *slice, rd_buf_t *rbuf, size_t size) {
size_t remains = size;
size_t rlen;
const void *p;
size_t orig_end = slice->end;

if (unlikely(rd_slice_remains(slice) < size))
return 0;

/* Temporarily shrink slice to offset + \p size */
slice->end = rd_slice_abs_offset(slice) + size;

while ((rlen = rd_slice_reader(slice, &p))) {
rd_dassert(remains >= rlen);
rd_buf_write(rbuf, p, rlen);
remains -= rlen;
}

rd_dassert(remains == 0);

/* Restore original size */
slice->end = orig_end;

return size;
}

/**
* @brief Read \p size bytes from absolute slice offset \p offset
Expand Down
2 changes: 2 additions & 0 deletions src/rdbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ size_t rd_slice_reader(rd_slice_t *slice, const void **p);
size_t rd_slice_peeker(const rd_slice_t *slice, const void **p);

size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size);
size_t rd_slice_read_into_buf(rd_slice_t *slice, rd_buf_t *rbuf, size_t size);

size_t
rd_slice_peek(const rd_slice_t *slice, size_t offset, void *dst, size_t size);

Expand Down
48 changes: 43 additions & 5 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,26 @@ void rd_kafka_broker_conn_closed(rd_kafka_broker_t *rkb,
rd_kafka_broker_fail(rkb, log_level, err, "%s", errstr);
}

rd_bool_t buf_contains_toppar(rd_kafka_buf_t *rkbuf, rd_kafka_toppar_t *rktp) {

if (rd_list_cnt(&rkbuf->rkbuf_u.Produce.batch_list) > 0) {
/* this is multi-batch request, loop through all batches.
The size of this list is bounded by number of topic+partition
this broker thread manages. In reality it's likely to be
significantly smaller than that, given not all toppars has
ready batch (linger, batch-size constraints) in each iteration
of processing */
rd_kafka_msgbatch_t *msgbatch;
int i;
RD_LIST_FOREACH(msgbatch, &rkbuf->rkbuf_u.Produce.batch_list, i) {
if (msgbatch->rktp == rktp)
return rd_true;
}
return rd_false;

} else
return rkbuf->rkbuf_u.Produce.batch.rktp == rktp;
}

/**
* @brief Purge requests in \p rkbq matching request \p ApiKey
Expand All @@ -792,7 +812,7 @@ static int rd_kafka_broker_bufq_purge_by_toppar(rd_kafka_broker_t *rkb,
TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {

if (rkbuf->rkbuf_reqhdr.ApiKey != ApiKey ||
rkbuf->rkbuf_u.Produce.batch.rktp != rktp ||
!buf_contains_toppar(rkbuf, rktp) ||
/* Skip partially sent buffers and let them transmit.
* The alternative would be to kill the connection here,
* which is more drastic and costly. */
Expand Down Expand Up @@ -3890,7 +3910,9 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
rd_ts_t *next_wakeup,
rd_bool_t do_timeout_scan,
rd_bool_t may_send,
rd_bool_t flushing) {
rd_bool_t flushing,
rd_bool_t multi_batch_request,
rd_list_t *batch_bufq) {
int cnt = 0;
int r;
rd_kafka_msg_t *rkm;
Expand Down Expand Up @@ -4139,9 +4161,16 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
/* Send Produce requests for this toppar, honouring the
* queue backpressure threshold. */
for (reqcnt = 0; reqcnt < max_requests; reqcnt++) {
r = rd_kafka_ProduceRequest(rkb, rktp, pid, epoch_base_msgid);
if (likely(r > 0))
r = rd_kafka_ProduceRequest(rkb, rktp, pid, epoch_base_msgid, multi_batch_request, batch_bufq);
if (likely(r > 0)) {
cnt += r;
if (multi_batch_request)
/* In case of multi-batch request, we can't have multiple
batches from the same toppar in a single Request, so
we break out the loop, essentially means we called
rd_kafka_ProduceRequest only once in this function */
break;
}
else
break;
}
Expand Down Expand Up @@ -4183,6 +4212,9 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb,
rd_kafka_pid_t pid = RD_KAFKA_PID_INITIALIZER;
rd_bool_t may_send = rd_true;
rd_bool_t flushing = rd_false;
rd_list_t batch_bufq;
rd_bool_t multi_batch_request = rd_kafka_is_idempotent(rkb->rkb_rk) ?
rd_false : rkb->rkb_rk->rk_conf.multibatch;

/* Round-robin serve each toppar. */
rktp = rkb->rkb_active_toppar_next;
Expand Down Expand Up @@ -4210,20 +4242,26 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb,

flushing = may_send && rd_atomic32_get(&rkb->rkb_rk->rk_flushing) > 0;

rd_list_init(&batch_bufq, 0, NULL);
do {
rd_ts_t this_next_wakeup = ret_next_wakeup;

/* Try producing toppar */
cnt += rd_kafka_toppar_producer_serve(
rkb, rktp, pid, now, &this_next_wakeup, do_timeout_scan,
may_send, flushing);
may_send, flushing, multi_batch_request, &batch_bufq);

rd_kafka_set_next_wakeup(&ret_next_wakeup, this_next_wakeup);

} while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars, rktp,
rktp_activelink)) !=
rkb->rkb_active_toppar_next);

if (multi_batch_request && !rd_list_empty(&batch_bufq)) {
rd_kafka_MultiBatchProduceRequest(rkb, pid, &batch_bufq);
}
rd_list_destroy(&batch_bufq);

/* Update next starting toppar to produce in round-robin list. */
rd_kafka_broker_active_toppar_next(
rkb,
Expand Down
7 changes: 7 additions & 0 deletions src/rdkafka_buf.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ void rd_kafka_buf_destroy_final(rd_kafka_buf_t *rkbuf) {

case RD_KAFKAP_Produce:
rd_kafka_msgbatch_destroy(&rkbuf->rkbuf_batch);
int i;
rd_kafka_msgbatch_t *msgbatch;
RD_LIST_FOREACH(msgbatch, &rkbuf->rkbuf_u.Produce.batch_list, i) {
rd_kafka_msgbatch_destroy(msgbatch);
rd_free(msgbatch);
}
rd_list_destroy(&rkbuf->rkbuf_u.Produce.batch_list);
break;
}

Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
} Metadata;
struct {
rd_kafka_msgbatch_t batch; /**< MessageSet/batch */
rd_list_t batch_list; /* MessageSet/batch list*/
size_t batch_start_pos; /* Pos where Record batch
* starts in the buf */
size_t batch_end_pos; /* Pos after Record batch +
* Partition tags in the buf */
} Produce;
struct {
rd_bool_t commit; /**< true = txn commit,
Expand Down
3 changes: 3 additions & 0 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"A higher value allows for more effective batching of these "
"messages.",
0, 900000, 10},
{_RK_GLOBAL | _RK_PRODUCER, "multibatch", _RK_C_BOOL,
_RK(multibatch), "Batch produce requests across multiple partitions.", 0,
1, 0},
{_RK_GLOBAL, "client.dns.lookup", _RK_C_S2I, _RK(client_dns_lookup),
"Controls how the client uses DNS lookups. By default, when the lookup "
"returns multiple IP addresses for a hostname, they will all be attempted "
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ struct rd_kafka_conf_s {
rd_kafka_compression_t compression_codec;
int dr_err_only;
int sticky_partition_linger_ms;
int multibatch;

/* Message delivery report callback.
* Called once for each produced message, either on
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ typedef struct rd_kafka_Produce_result {
*record_errors; /**< Errors for records that caused the batch to be
dropped */
int32_t record_errors_cnt; /**< record_errors count */
rd_kafka_resp_err_t errorcode; /**< error code from the response */
} rd_kafka_Produce_result_t;

typedef struct rd_kafka_msg_s {
Expand Down
7 changes: 7 additions & 0 deletions src/rdkafka_msgset_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,9 @@ rd_kafka_msgset_writer_write_Produce_header(rd_kafka_msgset_writer_t *msetw) {
/* MessageSetSize: Will be finalized later*/
msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_arraycnt_pos(rkbuf);

// For multi-batch requests, here where we start copying the batch
rkbuf->rkbuf_u.Produce.batch_start_pos = msetw->msetw_of_MessageSetSize;

if (msetw->msetw_MsgVersion == 2) {
/* MessageSet v2 header */
rd_kafka_msgset_writer_write_MessageSet_v2_header(msetw);
Expand Down Expand Up @@ -1399,6 +1402,10 @@ rd_kafka_msgset_writer_finalize(rd_kafka_msgset_writer_t *msetw,

/* Partition tags */
rd_kafka_buf_write_tags_empty(rkbuf);

/* Save the position of end of batch + partition tags for multi-batch req */
rkbuf->rkbuf_u.Produce.batch_end_pos = rd_buf_write_pos(&rkbuf->rkbuf_buf);

/* Topics tags */
rd_kafka_buf_write_tags_empty(rkbuf);

Expand Down
Loading

0 comments on commit e05f845

Please sign in to comment.