diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index 4a44ee979..c23c872ff 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -155,6 +155,7 @@ delivery.report.only.error | P | true, false | false
dr_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_cb())
*Type: see dedicated API*
dr_msg_cb | P | | | low | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())
*Type: see dedicated API*
sticky.partitioning.linger.ms | P | 0 .. 900000 | 10 | low | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.
*Type: integer*
+multibatch | P | true, false | false | low | Batch produce requests across multiple partitions.
*Type: boolean*
client.dns.lookup | * | use_all_dns_ips, resolve_canonical_bootstrap_servers_only | use_all_dns_ips | low | Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. **WARNING**: `resolve_canonical_bootstrap_servers_only` must only be used with `GSSAPI` (Kerberos) as `sasl.mechanism`, as it's the only purpose of this configuration value. **NOTE**: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname.
*Type: enum value*
enable.metrics.push | * | true, false | true | low | Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client
*Type: boolean*
diff --git a/src/rdbuf.c b/src/rdbuf.c
index 427d632eb..08310dff0 100644
--- a/src/rdbuf.c
+++ b/src/rdbuf.c
@@ -938,6 +938,44 @@ size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size) {
return size;
}
+/**
+ * @brief This is mostly a copy/paste function of rd_slice_read. Both
+ * functions read \p size bytes from current read position,
+ * advancing the read offset by the number of bytes copied. The
+ * difference is that instead of copying into a buffer, this function
+ * writes into a \p rd_buf_t.
+ * If there are less than \p size remaining in the buffer
+ * then 0 is returned and no bytes are copied.
+ *
+ * @returns \p size, or 0 if \p size bytes are not available in buffer.
+ *
+ * @remark This performs a complete read, no partitial reads.
+ */
+size_t rd_slice_read_into_buf(rd_slice_t *slice, rd_buf_t *rbuf, size_t size) {
+ size_t remains = size;
+ size_t rlen;
+ const void *p;
+ size_t orig_end = slice->end;
+
+ if (unlikely(rd_slice_remains(slice) < size))
+ return 0;
+
+ /* Temporarily shrink slice to offset + \p size */
+ slice->end = rd_slice_abs_offset(slice) + size;
+
+ while ((rlen = rd_slice_reader(slice, &p))) {
+ rd_dassert(remains >= rlen);
+ rd_buf_write(rbuf, p, rlen);
+ remains -= rlen;
+ }
+
+ rd_dassert(remains == 0);
+
+ /* Restore original size */
+ slice->end = orig_end;
+
+ return size;
+}
/**
* @brief Read \p size bytes from absolute slice offset \p offset
diff --git a/src/rdbuf.h b/src/rdbuf.h
index d8f98422c..0e12a64f0 100644
--- a/src/rdbuf.h
+++ b/src/rdbuf.h
@@ -308,6 +308,8 @@ size_t rd_slice_reader(rd_slice_t *slice, const void **p);
size_t rd_slice_peeker(const rd_slice_t *slice, const void **p);
size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size);
+size_t rd_slice_read_into_buf(rd_slice_t *slice, rd_buf_t *rbuf, size_t size);
+
size_t
rd_slice_peek(const rd_slice_t *slice, size_t offset, void *dst, size_t size);
diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c
index 1beeece2e..473acca81 100644
--- a/src/rdkafka_broker.c
+++ b/src/rdkafka_broker.c
@@ -768,6 +768,26 @@ void rd_kafka_broker_conn_closed(rd_kafka_broker_t *rkb,
rd_kafka_broker_fail(rkb, log_level, err, "%s", errstr);
}
+rd_bool_t buf_contains_toppar(rd_kafka_buf_t *rkbuf, rd_kafka_toppar_t *rktp) {
+
+ if (rd_list_cnt(&rkbuf->rkbuf_u.Produce.batch_list) > 0) {
+ /* this is multi-batch request, loop through all batches.
+ The size of this list is bounded by number of topic+partition
+ this broker thread manages. In reality it's likely to be
+ significantly smaller than that, given not all toppars has
+ ready batch (linger, batch-size constraints) in each iteration
+ of processing */
+ rd_kafka_msgbatch_t *msgbatch;
+ int i;
+ RD_LIST_FOREACH(msgbatch, &rkbuf->rkbuf_u.Produce.batch_list, i) {
+ if (msgbatch->rktp == rktp)
+ return rd_true;
+ }
+ return rd_false;
+
+ } else
+ return rkbuf->rkbuf_u.Produce.batch.rktp == rktp;
+}
/**
* @brief Purge requests in \p rkbq matching request \p ApiKey
@@ -792,7 +812,7 @@ static int rd_kafka_broker_bufq_purge_by_toppar(rd_kafka_broker_t *rkb,
TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {
if (rkbuf->rkbuf_reqhdr.ApiKey != ApiKey ||
- rkbuf->rkbuf_u.Produce.batch.rktp != rktp ||
+ !buf_contains_toppar(rkbuf, rktp) ||
/* Skip partially sent buffers and let them transmit.
* The alternative would be to kill the connection here,
* which is more drastic and costly. */
@@ -3890,7 +3910,9 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
rd_ts_t *next_wakeup,
rd_bool_t do_timeout_scan,
rd_bool_t may_send,
- rd_bool_t flushing) {
+ rd_bool_t flushing,
+ rd_bool_t multi_batch_request,
+ rd_list_t *batch_bufq) {
int cnt = 0;
int r;
rd_kafka_msg_t *rkm;
@@ -4139,9 +4161,16 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
/* Send Produce requests for this toppar, honouring the
* queue backpressure threshold. */
for (reqcnt = 0; reqcnt < max_requests; reqcnt++) {
- r = rd_kafka_ProduceRequest(rkb, rktp, pid, epoch_base_msgid);
- if (likely(r > 0))
+ r = rd_kafka_ProduceRequest(rkb, rktp, pid, epoch_base_msgid, multi_batch_request, batch_bufq);
+ if (likely(r > 0)) {
cnt += r;
+ if (multi_batch_request)
+ /* In case of multi-batch request, we can't have multiple
+ batches from the same toppar in a single Request, so
+ we break out the loop, essentially means we called
+ rd_kafka_ProduceRequest only once in this function */
+ break;
+ }
else
break;
}
@@ -4183,6 +4212,9 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb,
rd_kafka_pid_t pid = RD_KAFKA_PID_INITIALIZER;
rd_bool_t may_send = rd_true;
rd_bool_t flushing = rd_false;
+ rd_list_t batch_bufq;
+ rd_bool_t multi_batch_request = rd_kafka_is_idempotent(rkb->rkb_rk) ?
+ rd_false : rkb->rkb_rk->rk_conf.multibatch;
/* Round-robin serve each toppar. */
rktp = rkb->rkb_active_toppar_next;
@@ -4210,13 +4242,14 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb,
flushing = may_send && rd_atomic32_get(&rkb->rkb_rk->rk_flushing) > 0;
+ rd_list_init(&batch_bufq, 0, NULL);
do {
rd_ts_t this_next_wakeup = ret_next_wakeup;
/* Try producing toppar */
cnt += rd_kafka_toppar_producer_serve(
rkb, rktp, pid, now, &this_next_wakeup, do_timeout_scan,
- may_send, flushing);
+ may_send, flushing, multi_batch_request, &batch_bufq);
rd_kafka_set_next_wakeup(&ret_next_wakeup, this_next_wakeup);
@@ -4224,6 +4257,11 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb,
rktp_activelink)) !=
rkb->rkb_active_toppar_next);
+ if (multi_batch_request && !rd_list_empty(&batch_bufq)) {
+ rd_kafka_MultiBatchProduceRequest(rkb, pid, &batch_bufq);
+ }
+ rd_list_destroy(&batch_bufq);
+
/* Update next starting toppar to produce in round-robin list. */
rd_kafka_broker_active_toppar_next(
rkb,
diff --git a/src/rdkafka_buf.c b/src/rdkafka_buf.c
index 292c21819..2d7b58394 100644
--- a/src/rdkafka_buf.c
+++ b/src/rdkafka_buf.c
@@ -57,6 +57,13 @@ void rd_kafka_buf_destroy_final(rd_kafka_buf_t *rkbuf) {
case RD_KAFKAP_Produce:
rd_kafka_msgbatch_destroy(&rkbuf->rkbuf_batch);
+ int i;
+ rd_kafka_msgbatch_t *msgbatch;
+ RD_LIST_FOREACH(msgbatch, &rkbuf->rkbuf_u.Produce.batch_list, i) {
+ rd_kafka_msgbatch_destroy(msgbatch);
+ rd_free(msgbatch);
+ }
+ rd_list_destroy(&rkbuf->rkbuf_u.Produce.batch_list);
break;
}
diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h
index 37938999d..52ffc9e9d 100644
--- a/src/rdkafka_buf.h
+++ b/src/rdkafka_buf.h
@@ -399,6 +399,11 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
} Metadata;
struct {
rd_kafka_msgbatch_t batch; /**< MessageSet/batch */
+ rd_list_t batch_list; /* MessageSet/batch list*/
+ size_t batch_start_pos; /* Pos where Record batch
+ * starts in the buf */
+ size_t batch_end_pos; /* Pos after Record batch +
+ * Partition tags in the buf */
} Produce;
struct {
rd_bool_t commit; /**< true = txn commit,
diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c
index 84262d56e..8c59fafa6 100644
--- a/src/rdkafka_conf.c
+++ b/src/rdkafka_conf.c
@@ -1470,6 +1470,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"A higher value allows for more effective batching of these "
"messages.",
0, 900000, 10},
+ {_RK_GLOBAL | _RK_PRODUCER, "multibatch", _RK_C_BOOL,
+ _RK(multibatch), "Batch produce requests across multiple partitions.", 0,
+ 1, 0},
{_RK_GLOBAL, "client.dns.lookup", _RK_C_S2I, _RK(client_dns_lookup),
"Controls how the client uses DNS lookups. By default, when the lookup "
"returns multiple IP addresses for a hostname, they will all be attempted "
diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h
index 5c4151304..ad06552ff 100644
--- a/src/rdkafka_conf.h
+++ b/src/rdkafka_conf.h
@@ -439,6 +439,7 @@ struct rd_kafka_conf_s {
rd_kafka_compression_t compression_codec;
int dr_err_only;
int sticky_partition_linger_ms;
+ int multibatch;
/* Message delivery report callback.
* Called once for each produced message, either on
diff --git a/src/rdkafka_msg.h b/src/rdkafka_msg.h
index 663aa005d..e3da1dfdc 100644
--- a/src/rdkafka_msg.h
+++ b/src/rdkafka_msg.h
@@ -84,6 +84,7 @@ typedef struct rd_kafka_Produce_result {
*record_errors; /**< Errors for records that caused the batch to be
dropped */
int32_t record_errors_cnt; /**< record_errors count */
+ rd_kafka_resp_err_t errorcode; /**< error code from the response */
} rd_kafka_Produce_result_t;
typedef struct rd_kafka_msg_s {
diff --git a/src/rdkafka_msgset_writer.c b/src/rdkafka_msgset_writer.c
index 6f71d827f..49c9e6eba 100644
--- a/src/rdkafka_msgset_writer.c
+++ b/src/rdkafka_msgset_writer.c
@@ -460,6 +460,9 @@ rd_kafka_msgset_writer_write_Produce_header(rd_kafka_msgset_writer_t *msetw) {
/* MessageSetSize: Will be finalized later*/
msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_arraycnt_pos(rkbuf);
+ // For multi-batch requests, here where we start copying the batch
+ rkbuf->rkbuf_u.Produce.batch_start_pos = msetw->msetw_of_MessageSetSize;
+
if (msetw->msetw_MsgVersion == 2) {
/* MessageSet v2 header */
rd_kafka_msgset_writer_write_MessageSet_v2_header(msetw);
@@ -1399,6 +1402,10 @@ rd_kafka_msgset_writer_finalize(rd_kafka_msgset_writer_t *msetw,
/* Partition tags */
rd_kafka_buf_write_tags_empty(rkbuf);
+
+ /* Save the position of end of batch + partition tags for multi-batch req */
+ rkbuf->rkbuf_u.Produce.batch_end_pos = rd_buf_write_pos(&rkbuf->rkbuf_buf);
+
/* Topics tags */
rd_kafka_buf_write_tags_empty(rkbuf);
diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c
index 8623be97d..511b8c676 100644
--- a/src/rdkafka_request.c
+++ b/src/rdkafka_request.c
@@ -3532,6 +3532,7 @@ rd_kafkap_Produce_reply_tags_parse(rd_kafka_buf_t *rkbuf,
static void rd_kafka_handle_Produce_metadata_update(
rd_kafka_broker_t *rkb,
+ rd_kafkap_Produce_reply_tags_t *reply_level_tags,
rd_kafkap_Produce_reply_tags_t *ProduceTags) {
if (ProduceTags->leader_change_cnt) {
rd_kafka_metadata_t *md = NULL;
@@ -3548,7 +3549,7 @@ static void rd_kafka_handle_Produce_metadata_update(
rd_tmpabuf_new(&tbuf, 0, rd_true /*assert on fail*/);
rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi));
rd_kafkap_leader_discovery_tmpabuf_add_alloc_brokers(
- &tbuf, &ProduceTags->NodeEndpoints);
+ &tbuf, &reply_level_tags->NodeEndpoints);
rd_kafkap_leader_discovery_tmpabuf_add_alloc_topics(&tbuf, 1);
rd_kafkap_leader_discovery_tmpabuf_add_alloc_topic(
&tbuf, ProduceTags->Topic.TopicName, 1);
@@ -3560,7 +3561,7 @@ static void rd_kafka_handle_Produce_metadata_update(
rd_kafkap_leader_discovery_metadata_init(mdi, nodeid);
rd_kafkap_leader_discovery_set_brokers(
- &tbuf, mdi, &ProduceTags->NodeEndpoints);
+ &tbuf, mdi, &reply_level_tags->NodeEndpoints);
rd_kafkap_leader_discovery_set_topic_cnt(&tbuf, mdi, 1);
@@ -3586,18 +3587,59 @@ static void rd_kafkap_Produce_reply_tags_destroy(
RD_IF_FREE(reply_tags->NodeEndpoints.NodeEndpoints, rd_free);
}
+static int rd_kafka_find_msgbatch(rd_list_t *msgbatch_list,
+ rd_kafkap_str_t *topic_in_response,
+ int32_t partition_id) {
+
+ rd_kafka_msgbatch_t * msgbatch;
+ int i;
+ RD_LIST_FOREACH(msgbatch, msgbatch_list, i) {
+ rd_kafkap_str_t *topic_in_request = msgbatch->rktp->rktp_rkt->rkt_topic;
+
+ if ((topic_in_request->len == topic_in_response->len) &&
+ !strncmp(topic_in_request->str, topic_in_response->str,
+ topic_in_request->len) &&
+ msgbatch->rktp->rktp_partition == partition_id) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+static void
+produce_reply_tags_cleaup(rd_kafkap_Produce_reply_tags_t *reply_level_tags,
+ rd_kafkap_Produce_reply_tags_t *toppar_level_tags,
+ int num_batches) {
+ int i;
+
+ for (i = 0; i < num_batches; i++) {
+ rd_kafkap_Produce_reply_tags_destroy(&toppar_level_tags[i]);
+ }
+ rd_free(toppar_level_tags);
+ rd_kafkap_Produce_reply_tags_destroy(reply_level_tags);
+}
/**
* @brief Parses a Produce reply.
+ *
+ * @remarks This function has been updated to parse multi-batch
+ * ProducerResponse. Two variables \p results and
+ * \p ProduceTags has been updated to handle a list of
+ * rd_kafka_Produce_result_t* and rd_kafkap_Produce_reply_tags_t
+ * instead of the single one. As each partition's result is
+ * parsed, the parsed pair is used
+ * to find the corresponding msgbatch from the list of msgbatches
+ * stored in the request's rkbuf. The position index is used then
+ * to store the results and tags.
* @returns 0 on success or an error code on failure.
* @locality broker thread
*/
static rd_kafka_resp_err_t
rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb,
- rd_kafka_toppar_t *rktp,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
- rd_kafka_Produce_result_t *result) {
+ rd_kafka_Produce_result_t **results,
+ rd_bool_t multi_batch_request) {
int32_t TopicArrayCnt;
int32_t PartitionArrayCnt;
struct {
@@ -3608,89 +3650,110 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb,
const int log_decode_errors = LOG_ERR;
int64_t log_start_offset = -1;
rd_kafkap_str_t TopicName = RD_ZERO_INIT;
- rd_kafkap_Produce_reply_tags_t ProduceTags = RD_ZERO_INIT;
+ rd_kafkap_Produce_reply_tags_t *ProduceTags;
+ rd_kafkap_Produce_reply_tags_t reply_level_tags = RD_ZERO_INIT;
+ int i, j, decoded_batch_cnt=0, pos=0;
+ rd_kafka_Produce_result_t *result;
+
+ int num_batch = multi_batch_request ?
+ rd_list_cnt(&request->rkbuf_u.Produce.batch_list) : 1;
+ ProduceTags = rd_calloc(num_batch, sizeof(rd_kafkap_Produce_reply_tags_t));
rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX);
- if (TopicArrayCnt != 1)
+ if (!multi_batch_request && (TopicArrayCnt != 1))
goto err;
- /* Since we only produce to one single topic+partition in each
- * request we assume that the reply only contains one topic+partition
- * and that it is the same that we requested.
- * If not the broker is buggy. */
- if (request->rkbuf_reqhdr.ApiVersion >= 10)
+ for (i = 0; i < TopicArrayCnt; i++) {
+
rd_kafka_buf_read_str(rkbuf, &TopicName);
- else
- rd_kafka_buf_skip_str(rkbuf);
- rd_kafka_buf_read_arraycnt(rkbuf, &PartitionArrayCnt,
- RD_KAFKAP_PARTITIONS_MAX);
- if (PartitionArrayCnt != 1)
- goto err;
+ rd_kafka_buf_read_arraycnt(rkbuf, &PartitionArrayCnt,
+ RD_KAFKAP_PARTITIONS_MAX);
- rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
- rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
- rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
+ if (!multi_batch_request && (PartitionArrayCnt != 1))
+ goto err;
- result->offset = hdr.Offset;
+ for (j = 0; j < PartitionArrayCnt; j++) {
- result->timestamp = -1;
- if (request->rkbuf_reqhdr.ApiVersion >= 2)
- rd_kafka_buf_read_i64(rkbuf, &result->timestamp);
+ rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
+ rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
+ rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
- if (request->rkbuf_reqhdr.ApiVersion >= 5)
- rd_kafka_buf_read_i64(rkbuf, &log_start_offset);
+ if (multi_batch_request && ((pos=rd_kafka_find_msgbatch(
+ &request->rkbuf_u.Produce.batch_list,
+ &TopicName, hdr.Partition)) == -1)) {
+ /* Got a msgbatch in response that does not exist in the
+ request's msgbatch list */
+ rd_rkb_dbg(rkb, BROKER, "REQERR",
+ "Response does not match the request");
+ goto err;
+ }
+ result = results[pos];
+
+ result->errorcode = hdr.ErrorCode;
+ result->offset = hdr.Offset;
+
+ result->timestamp = -1;
+ if (request->rkbuf_reqhdr.ApiVersion >= 2)
+ rd_kafka_buf_read_i64(rkbuf, &result->timestamp);
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 5)
+ rd_kafka_buf_read_i64(rkbuf, &log_start_offset);
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 8) {
+ int i;
+ int32_t RecordErrorsCnt;
+ rd_kafkap_str_t ErrorMessage;
+ rd_kafka_buf_read_arraycnt(rkbuf, &RecordErrorsCnt, -1);
+ if (RecordErrorsCnt) {
+ result->record_errors = rd_calloc(
+ RecordErrorsCnt, sizeof(*result->record_errors));
+ result->record_errors_cnt = RecordErrorsCnt;
+ for (i = 0; i < RecordErrorsCnt; i++) {
+ int32_t BatchIndex;
+ rd_kafkap_str_t BatchIndexErrorMessage;
+ rd_kafka_buf_read_i32(rkbuf, &BatchIndex);
+ rd_kafka_buf_read_str(rkbuf,
+ &BatchIndexErrorMessage);
+ result->record_errors[i].batch_index =
+ BatchIndex;
+ if (!RD_KAFKAP_STR_IS_NULL(
+ &BatchIndexErrorMessage))
+ result->record_errors[i].errstr =
+ RD_KAFKAP_STR_DUP(
+ &BatchIndexErrorMessage);
+ /* RecordError tags */
+ rd_kafka_buf_skip_tags(rkbuf);
+ }
+ }
- if (request->rkbuf_reqhdr.ApiVersion >= 8) {
- int i;
- int32_t RecordErrorsCnt;
- rd_kafkap_str_t ErrorMessage;
- rd_kafka_buf_read_arraycnt(rkbuf, &RecordErrorsCnt, -1);
- if (RecordErrorsCnt) {
- result->record_errors = rd_calloc(
- RecordErrorsCnt, sizeof(*result->record_errors));
- result->record_errors_cnt = RecordErrorsCnt;
- for (i = 0; i < RecordErrorsCnt; i++) {
- int32_t BatchIndex;
- rd_kafkap_str_t BatchIndexErrorMessage;
- rd_kafka_buf_read_i32(rkbuf, &BatchIndex);
- rd_kafka_buf_read_str(rkbuf,
- &BatchIndexErrorMessage);
- result->record_errors[i].batch_index =
- BatchIndex;
- if (!RD_KAFKAP_STR_IS_NULL(
- &BatchIndexErrorMessage))
- result->record_errors[i].errstr =
- RD_KAFKAP_STR_DUP(
- &BatchIndexErrorMessage);
- /* RecordError tags */
- rd_kafka_buf_skip_tags(rkbuf);
+ rd_kafka_buf_read_str(rkbuf, &ErrorMessage);
+ if (!RD_KAFKAP_STR_IS_NULL(&ErrorMessage))
+ result->errstr = RD_KAFKAP_STR_DUP(&ErrorMessage);
}
- }
- rd_kafka_buf_read_str(rkbuf, &ErrorMessage);
- if (!RD_KAFKAP_STR_IS_NULL(&ErrorMessage))
- result->errstr = RD_KAFKAP_STR_DUP(&ErrorMessage);
- }
+ if (request->rkbuf_reqhdr.ApiVersion >= 10) {
+ rd_kafkap_Produce_reply_tags_Topic_t *TopicTags =
+ &ProduceTags[pos].Topic;
+ rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags =
+ &TopicTags->Partition;
- if (request->rkbuf_reqhdr.ApiVersion >= 10) {
- rd_kafkap_Produce_reply_tags_Topic_t *TopicTags =
- &ProduceTags.Topic;
- rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags =
- &TopicTags->Partition;
+ /* Partition tags count */
+ TopicTags->TopicName = RD_KAFKAP_STR_DUP(&TopicName);
+ PartitionTags->Partition = hdr.Partition;
+ }
- /* Partition tags count */
- TopicTags->TopicName = RD_KAFKAP_STR_DUP(&TopicName);
- PartitionTags->Partition = hdr.Partition;
- }
+ /* Partition tags */
+ rd_kafka_buf_read_tags(rkbuf,
+ rd_kafkap_Produce_reply_tags_partition_parse,
+ &ProduceTags[pos], &ProduceTags[pos].Topic.Partition);
- /* Partition tags */
- rd_kafka_buf_read_tags(rkbuf,
- rd_kafkap_Produce_reply_tags_partition_parse,
- &ProduceTags, &ProduceTags.Topic.Partition);
+ decoded_batch_cnt++;
+ }
- /* Topic tags */
- rd_kafka_buf_skip_tags(rkbuf);
+ /* Topic tags */
+ rd_kafka_buf_skip_tags(rkbuf);
+ }
if (request->rkbuf_reqhdr.ApiVersion >= 1) {
int32_t Throttle_Time;
@@ -3700,19 +3763,33 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb,
Throttle_Time);
}
+ if (decoded_batch_cnt != num_batch) {
+ /* Number of decoded msgbatch in response does not match
+ the number request's msgbatch list */
+ rd_rkb_dbg(rkb, BROKER, "REQERR",
+ "Response does not match the request in number of batches");
+ goto err;
+ }
+
/* ProduceResponse tags */
rd_kafka_buf_read_tags(rkbuf, rd_kafkap_Produce_reply_tags_parse,
- &ProduceTags);
+ &reply_level_tags);
+ for (i = 0; i < num_batch; i++)
+ rd_kafka_handle_Produce_metadata_update(rkb, &reply_level_tags,
+ &ProduceTags[i]);
- rd_kafka_handle_Produce_metadata_update(rkb, &ProduceTags);
+ produce_reply_tags_cleaup(&reply_level_tags, ProduceTags, num_batch);
- rd_kafkap_Produce_reply_tags_destroy(&ProduceTags);
- return hdr.ErrorCode;
+ if (multi_batch_request) {
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ } else {
+ return result->errorcode;
+ }
err_parse:
- rd_kafkap_Produce_reply_tags_destroy(&ProduceTags);
+ produce_reply_tags_cleaup(&reply_level_tags, ProduceTags, num_batch);
return rkbuf->rkbuf_err;
err:
- rd_kafkap_Produce_reply_tags_destroy(&ProduceTags);
+ produce_reply_tags_cleaup(&reply_level_tags, ProduceTags, num_batch);
return RD_KAFKA_RESP_ERR__BAD_MSG;
}
@@ -4694,7 +4771,6 @@ static void rd_kafka_handle_Produce(rd_kafka_t *rk,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_msgbatch_t *batch = &request->rkbuf_batch;
- rd_kafka_toppar_t *rktp = batch->rktp;
rd_kafka_Produce_result_t *result =
rd_kafka_Produce_result_new(RD_KAFKA_OFFSET_INVALID, -1);
@@ -4706,8 +4782,8 @@ static void rd_kafka_handle_Produce(rd_kafka_t *rk,
/* Parse Produce reply (unless the request errored) */
if (!err && reply)
- err = rd_kafka_handle_Produce_parse(rkb, rktp, reply, request,
- result);
+ err = rd_kafka_handle_Produce_parse(rkb, reply, request,
+ &result, rd_false);
rd_kafka_msgbatch_handle_Produce_result(rkb, batch, err, result,
request);
@@ -4715,6 +4791,55 @@ static void rd_kafka_handle_Produce(rd_kafka_t *rk,
}
+/**
+ * @brief Handle MultiBatch ProduceResponse
+ * This is based on function rd_kafka_handle_Produce, where
+ * similiar logics is executed, such as:
+ * - allocating rd_kafka_Produce_result_t
+ * - parsing ProduceResponse
+ * - invoke function to handle results
+ * - free rd_kafka_Produce_result_t
+ * The difference is this function creates/handles multiple
+ * rd_kafka_Produce_result_t, one for each batch
+ *
+ * @param reply is NULL when `acks=0` and on various local errors.
+ *
+ * @warning May be called on the old leader thread. Lock rktp appropriately!
+ *
+ * @locality broker thread (but not necessarily the leader broker thread)
+ */
+static void rd_kafka_handle_MultiBatchProduce(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *reply,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+
+ rd_kafka_Produce_result_t **results;
+ rd_kafka_msgbatch_t *batch;
+ rd_list_t *msgbatches = &request->rkbuf_u.Produce.batch_list;
+ int i, num_batches = rd_list_cnt(msgbatches);
+
+ results = rd_calloc(num_batches, sizeof(rd_kafka_Produce_result_t*));
+
+ for (i = 0; i < num_batches; i++)
+ results[i] = rd_kafka_Produce_result_new(RD_KAFKA_OFFSET_INVALID, -1);
+
+ /* Parse Produce reply (unless the request errored) */
+ if (!err && reply)
+ err = rd_kafka_handle_Produce_parse(rkb, reply, request,
+ results, rd_true);
+
+ RD_LIST_FOREACH(batch, msgbatches, i) {
+ rd_kafka_resp_err_t final_err = err == RD_KAFKA_RESP_ERR_NO_ERROR ?
+ results[i]->errorcode : err;
+ rd_kafka_msgbatch_handle_Produce_result(rkb, batch, final_err,
+ results[i], request);
+ rd_kafka_Produce_result_destroy(results[i]);
+ }
+ rd_free(results);
+}
+
/**
* @brief Send ProduceRequest for messages in toppar queue.
*
@@ -4725,7 +4850,9 @@ static void rd_kafka_handle_Produce(rd_kafka_t *rk,
int rd_kafka_ProduceRequest(rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
const rd_kafka_pid_t pid,
- uint64_t epoch_base_msgid) {
+ uint64_t epoch_base_msgid,
+ rd_bool_t skip_sending,
+ rd_list_t *batch_bufq) {
rd_kafka_buf_t *rkbuf;
rd_kafka_topic_t *rkt = rktp->rktp_rkt;
size_t MessageSetSize = 0;
@@ -4773,12 +4900,313 @@ int rd_kafka_ProduceRequest(rd_kafka_broker_t *rkb,
* capped by socket.timeout.ms */
rd_kafka_buf_set_abs_timeout(rkbuf, tmout, now);
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, RD_KAFKA_NO_REPLYQ,
- rd_kafka_handle_Produce, NULL);
+ if (skip_sending) {
+ rd_list_add(batch_bufq, rkbuf);
+ } else {
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, RD_KAFKA_NO_REPLYQ,
+ rd_kafka_handle_Produce, NULL);
+ }
return cnt;
}
+/**
+ * @brief Fill Request header fields for Multibatch Produce Request
+ */
+static void rd_kafka_fill_MultiBatch_header(rd_kafka_broker_t *rkb,
+ rd_kafka_buf_t *batch_rkbuf,
+ rd_kafka_buf_t *request_rkbuf) {
+
+ rd_kafka_t *rk = rkb->rkb_rk;
+ rd_kafka_topic_t *rkt = batch_rkbuf->rkbuf_batch.rktp->rktp_rkt;
+
+ if (rd_kafka_buf_ApiVersion(request_rkbuf) >= 3)
+ rd_kafka_buf_write_kstr(request_rkbuf, rk->rk_eos.transactional_id);
+
+ /* RequiredAcks */
+ rd_kafka_buf_write_i16(request_rkbuf, rkt->rkt_conf.required_acks);
+
+ /* Timeout */
+ rd_kafka_buf_write_i32(request_rkbuf, rkt->rkt_conf.request_timeout_ms);
+
+ if (!rkt->rkt_conf.required_acks)
+ request_rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
+}
+
+/**
+ * @brief Comparion function used to sort batches by topic name
+ */
+static int rd_kafka_buf_cmp_by_topic(const void *_a, const void *_b) {
+ const rd_kafka_buf_t *a = _a, *b = _b;
+ return RD_CMP(a->rkbuf_batch.rktp->rktp_rkt,
+ b->rkbuf_batch.rktp->rktp_rkt);
+}
+
+/**
+ * @brief Copy the encoded RecordBatch from a single-batch ProduceRequest
+ * into a multi-batch ProduceRequest. The copy relys on two position
+ * markers recorded in by rd_kafka_buf_t during the single-batch
+ * request encoding process. Note: for ProduceRequest version > 9
+ * the copy portion contains not only the RecordBatch, but also the
+ * partition level TAG_BUFFER
+ *
+ * @param batch_rkbuf a rkbuf contains a single-batch ProduceRequest
+ * crafted by rd_kafka_ProduceRequest function
+ * @param request_rkbuf a rkbuf for multi-batch ProduceRequest
+ */
+static void rd_kafka_copy_batch_buf(rd_kafka_buf_t *batch_rkbuf,
+ rd_kafka_buf_t *request_rkbuf) {
+ rd_slice_t *rkbuf_reader = &batch_rkbuf->rkbuf_reader;
+ rd_buf_t *rkbuf_buf = &batch_rkbuf->rkbuf_buf;
+ size_t first_pos = batch_rkbuf->rkbuf_u.Produce.batch_start_pos;
+ size_t last_pos = batch_rkbuf->rkbuf_u.Produce.batch_end_pos;
+ size_t length = last_pos - first_pos;
+ rd_slice_init_full(rkbuf_reader, rkbuf_buf);
+ rd_slice_seek(rkbuf_reader, first_pos);
+ rd_slice_read_into_buf(rkbuf_reader, &request_rkbuf->rkbuf_buf, length);
+}
+
+/**
+ * @brief Given a list of encoded batches, starting from \p start_ind,
+ * select a number of batches to be included in ProduceRequest.
+ * The selected batches is [start_ind, next_ind), Total size of
+ * selection is filled in \p estimated_size
+ */
+static void select_batches_to_include(rd_kafka_t *rk, rd_list_t *batch_bufq,
+ int start_ind, int *next_ind,
+ size_t *estimated_size) {
+
+ rd_kafka_buf_t *batch_rkbuf;
+ int i;
+ size_t total = 0;
+
+ /* TODO implement the selection criteria where topics with different
+ setting should not be included in the same request */
+
+ for (i = start_ind; i < rd_list_cnt(batch_bufq); i++) {
+ batch_rkbuf = rd_list_elem(batch_bufq, i);
+ size_t batch_size = rd_buf_len(&batch_rkbuf->rkbuf_buf);
+
+ /* We would assume at least 1 batch can go into the request,
+ even if it might be slightly large than the max.message.size
+ due to header estimation inaccuracy etc. This is inline with
+ the assumption in librdkafka (other Kafka client in general)*/
+ if (i != start_ind) {
+ if ((total + batch_size) > (size_t)rk->rk_conf.max_msg_size)
+ break;
+ }
+ total += batch_size;
+ }
+
+ *estimated_size = total;
+ *next_ind = i;
+}
+
+/**
+ * @brief Given the list of batches, get the timeout of the earlist batches
+ */
+static int64_t get_first_msg_timeout (rd_list_t* msg_batch_list) {
+
+ rd_kafka_msgbatch_t *msgbatch;
+ int64_t timeout, result = INT64_MAX;
+ rd_ts_t now = rd_clock();
+ int i;
+
+ RD_LIST_FOREACH(msgbatch, msg_batch_list, i) {
+ timeout = (rd_kafka_msgq_first(&msgbatch->msgq)->rkm_ts_timeout
+ - now) / 1000;
+ result = timeout < result ? timeout : result;
+ }
+
+ return result;
+}
+
+/**
+ * @brief Finalize the encoding of a topic. This happens after multiple
+ * Partitions (Index + RecordBatch + Tags) had already by copied
+ * into the \p request_rkbuf. This function fills the number of
+ * partition \p part_cnt from pre-recorded position \p of_part_cnt,
+ * and for ProduceRequest version > 9, it copies topic-level
+ * TAG_BUFFER into the \p request_rkbuf as well.
+ */
+static void finalize_topic_encoding(rd_kafka_buf_t *request_rkbuf,
+ rd_kafka_buf_t *batch_rkbuf,
+ size_t of_part_cnt, int part_cnt) {
+
+ rd_kafka_buf_finalize_arraycnt(request_rkbuf,
+ of_part_cnt, part_cnt);
+
+ /* the topic tags is from end of "record-batch + partition tags"
+ go the end of the write buffer, it's basically the last thing
+ in the single batched request. here we copy it into the multi-
+ batched request */
+ rd_slice_t *rkbuf_reader = &batch_rkbuf->rkbuf_reader;
+ rd_buf_t *rkbuf_buf = &batch_rkbuf->rkbuf_buf;
+ size_t first_pos = batch_rkbuf->rkbuf_u.Produce.batch_end_pos;
+ size_t last_pos = rd_buf_write_pos(rkbuf_buf);
+ size_t length = last_pos - first_pos;
+ if (length > 0)
+ rd_slice_read_into_buf(rkbuf_reader,
+ &request_rkbuf->rkbuf_buf, length);
+}
+
+/**
+ * @brief The main function to create and send multi-batch
+ * ProduceRequeset
+ */
+int rd_kafka_MultiBatchProduceRequest(rd_kafka_broker_t *rkb,
+ const rd_kafka_pid_t pid,
+ rd_list_t *batch_bufq) {
+ rd_kafka_buf_t *cur_rkbuf, *first_rkbuf, *prev_rkbuf;
+ rd_kafka_buf_t *request_rkbuf;
+ rd_kafka_topic_t *prev_topic, *cur_topic;
+ int cur_ind, next_ind, i;
+ size_t estimated_size;
+ size_t of_topic_cnt = 0, of_part_cnt = 0;
+ int topic_cnt = 0, part_cnt = 0, req_cnt = 0;
+ rd_ts_t now;
+ int64_t first_msg_timeout;
+ int features, tmout;
+ int16_t api_version;
+
+ /* sort list of batch_buf by topic, the encoding loop below
+ depends on the fact that we iterate though a list
+ sorted by topics */
+ rd_list_sort(batch_bufq, rd_kafka_buf_cmp_by_topic);
+
+ cur_ind = 0;
+ while (cur_ind < rd_list_cnt(batch_bufq)) {
+
+ /* Here we decide which batches can be included in
+ the next ProduceRequest:
+ - list of batches must fit in the message.max.bytes
+ - batches with different topic level setting shouldn't
+ be in the same request
+ The batches seclected are in range [cur_ind, next_ind) */
+ select_batches_to_include(rkb->rkb_rk, batch_bufq, cur_ind,
+ &next_ind, &estimated_size);
+
+ /* First batch of the request, we use it to initialize things */
+ first_rkbuf = rd_list_elem(batch_bufq, cur_ind);
+ api_version = rd_kafka_buf_ApiVersion(first_rkbuf);
+ features = first_rkbuf->rkbuf_features;
+
+ /* Allocate request buffer based on included batch count
+ and estimated size. For the size, we simplify the math
+ by adding all buf size. It leads to a bit over estimation
+ i.e. we counted request headers and topic name multiple
+ times, but it's not a big issue
+ The function call for creating the request is copy/paste
+ from rd_kafka_msgset_writer_alloc_buf */
+ request_rkbuf = rd_kafka_buf_new_flexver_request(
+ rkb, RD_KAFKAP_Produce,
+ next_ind - cur_ind,
+ estimated_size,
+ api_version >= 9);
+ rd_kafka_buf_ApiVersion_set(request_rkbuf, api_version, features);
+
+ /* init the list to hold rd_kafka_msgbatch_t for each batch*/
+ rd_list_init(&request_rkbuf->rkbuf_u.Produce.batch_list, next_ind-cur_ind, NULL);
+
+ prev_topic = first_rkbuf->rkbuf_batch.rktp->rktp_rkt;
+ prev_rkbuf = first_rkbuf;
+
+ /* Start to encode the request */
+ /* Fill ProducerRequest header */
+ rd_kafka_fill_MultiBatch_header(rkb, first_rkbuf, request_rkbuf);
+
+ /* Topic count: updated later */
+ of_topic_cnt = rd_kafka_buf_write_arraycnt_pos(request_rkbuf);
+ topic_cnt = 0;
+
+ /* Encodes batches[cur_ind, next_ind) in the request*/
+ for (i = cur_ind; i < next_ind; i++) {
+
+ cur_rkbuf = rd_list_elem(batch_bufq, i);
+ cur_topic = cur_rkbuf->rkbuf_batch.rktp->rktp_rkt;
+
+ /* starting a new topic, write name and finalize the prev one if needed*/
+ if (i == cur_ind || (cur_topic != prev_topic)) {
+ if (i != cur_ind) {
+ /* Finalize topic encoding:
+ - filling the partition count
+ - copy the topic level tags */
+ finalize_topic_encoding(request_rkbuf, prev_rkbuf,
+ of_part_cnt, part_cnt);
+ }
+ /* Write the topic name */
+ rd_kafka_buf_write_kstr(request_rkbuf, cur_topic->rkt_topic);
+ /* Partition Count: updated later */
+ of_part_cnt = rd_kafka_buf_write_arraycnt_pos(request_rkbuf);
+ part_cnt = 0;
+ topic_cnt++;
+ }
+
+ rd_kafka_toppar_t *rktp = cur_rkbuf->rkbuf_batch.rktp;
+
+ /* Write the partition */
+ rd_kafka_buf_write_i32(request_rkbuf, rktp->rktp_partition);
+
+ /* Copy the already encoded/compressed RecordBatch from buf */
+ rd_kafka_copy_batch_buf(cur_rkbuf, request_rkbuf);
+
+ /* Move the msgbatch from the cur_rkbuf into the new request's
+ batch_list. A new rd_kafka_msgbatch_t is created and we move
+ the each msgs in the msgq over. This is so that we can reused
+ as much as possible the existing memory accounting/cleanup.
+ The actual message is not copied.
+ This list of msgbatch will be cleaned up when the buf of this
+ MultiBatch request is destroyed */
+ rd_kafka_msgbatch_t *src = &cur_rkbuf->rkbuf_batch;
+ rd_kafka_msgbatch_t *dst = rd_malloc(sizeof(rd_kafka_msgbatch_t));
+ rd_kafka_msgbatch_init(dst, src->rktp, src->pid, src->epoch_base_msgid);
+ rd_kafka_msgq_move(&dst->msgq, &src->msgq);
+ rd_list_add(&request_rkbuf->rkbuf_u.Produce.batch_list, dst);
+
+ part_cnt++;
+ prev_topic = cur_topic;
+ prev_rkbuf = cur_rkbuf;
+ }
+
+ /* Finalize last topic level encodings */
+ finalize_topic_encoding(request_rkbuf, prev_rkbuf, of_part_cnt, part_cnt);
+
+ /* Topic count */
+ rd_kafka_buf_finalize_arraycnt(request_rkbuf, of_topic_cnt, topic_cnt);
+
+ /* The timeout setting is copy/pasted from rd_kafka_ProduceRequest func
+ Instead of taking the first msg from a batch, we took the first
+ msg from each batch, and choose the closest one. */
+ now = rd_clock();
+ first_msg_timeout =
+ get_first_msg_timeout(&request_rkbuf->rkbuf_u.Produce.batch_list);
+ if (unlikely(first_msg_timeout <= 0)) {
+ /* Message has already timed out, allow 100 ms
+ * to produce anyway */
+ tmout = 100;
+ } else {
+ tmout = (int)RD_MIN(INT_MAX, first_msg_timeout);
+ }
+ rd_kafka_buf_set_abs_timeout(request_rkbuf, tmout, now);
+ /* END timeout copy/paste */
+
+ /* Send the multi-batch request to outbuf */
+ rd_kafka_broker_buf_enq_replyq(rkb, request_rkbuf, RD_KAFKA_NO_REPLYQ,
+ rd_kafka_handle_MultiBatchProduce, NULL);
+
+ req_cnt++;
+
+ /* prepare for next round in case there is more batches */
+ cur_ind = next_ind;
+ }
+
+ /* Done with the request from batch bufq, cleanup */
+ RD_LIST_FOREACH(cur_rkbuf, batch_bufq, i) {
+ rd_kafka_buf_destroy(cur_rkbuf);
+ }
+
+ return req_cnt;
+}
/**
* @brief Construct and send CreateTopicsRequest to \p rkb
diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h
index b291a324a..aa9e1eb17 100644
--- a/src/rdkafka_request.h
+++ b/src/rdkafka_request.h
@@ -447,7 +447,13 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
int rd_kafka_ProduceRequest(rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
const rd_kafka_pid_t pid,
- uint64_t epoch_base_msgid);
+ uint64_t epoch_base_msgid,
+ rd_bool_t skip_sending,
+ rd_list_t *batch_bufq);
+
+int rd_kafka_MultiBatchProduceRequest(rd_kafka_broker_t *rkb,
+ const rd_kafka_pid_t pid,
+ rd_list_t *batch_bufq);
rd_kafka_resp_err_t
rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb,
diff --git a/tests/0012-produce_consume.c b/tests/0012-produce_consume.c
index 97f592b3c..a7df70572 100644
--- a/tests/0012-produce_consume.c
+++ b/tests/0012-produce_consume.c
@@ -101,6 +101,8 @@ static void produce_messages(uint64_t testid,
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));
+ test_create_partitions(rk, topic, partition_cnt);
+
/* Create messages. */
prod_msg_remains = msgcnt;
rkmessages = calloc(sizeof(*rkmessages), msgcnt / partition_cnt);
@@ -492,7 +494,7 @@ static void consume_messages_with_queues(uint64_t testid,
*/
static void test_produce_consume(void) {
int msgcnt = test_quick ? 100 : 1000;
- int partition_cnt = 2;
+ int partition_cnt = 10;
int i;
uint64_t testid;
int msg_base = 0;