Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored tmpabuf and fixed an insufficient buffer allocation #4449

Merged
merged 3 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ librdkafka v2.2.1 is a maintenance release:
are partition leader changes and a stale leader epoch is received (#4429).
* Fix a segmentation fault when closing a consumer using the
cooperative-sticky assignor before the first assignment (#4381).
* Fix for insufficient buffer allocation when allocating rack information (@wolfchimneyrock, #4449).


## Fixes

### General fixes

* An assertion failed with insufficient buffer size when allocating
rack information on 32bit architectures.
Solved by aligning all allocations to the maximum allowed word size (#4449).



Expand Down
25 changes: 20 additions & 5 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,36 @@ typedef struct rd_tmpabuf_s {
size_t of;
char *buf;
int failed;
int assert_on_fail;
rd_bool_t assert_on_fail;
} rd_tmpabuf_t;

/**
* @brief Allocate new tmpabuf with \p size bytes pre-allocated.
* @brief Initialize new tmpabuf of non-final \p size bytes.
*/
static RD_UNUSED void
rd_tmpabuf_new(rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
tab->buf = rd_malloc(size);
tab->size = size;
rd_tmpabuf_new(rd_tmpabuf_t *tab, size_t size, rd_bool_t assert_on_fail) {
tab->buf = NULL;
tab->size = RD_ROUNDUP(size, 8);
tab->of = 0;
tab->failed = 0;
tab->assert_on_fail = assert_on_fail;
}

/**
* @brief Add a new allocation of \p _size bytes,
* rounded up to maximum word size,
* for \p _times times.
*/
#define rd_tmpabuf_add_alloc_times(_tab, _size, _times) \
(_tab)->size += RD_ROUNDUP(_size, 8) * _times

#define rd_tmpabuf_add_alloc(_tab, _size) \
rd_tmpabuf_add_alloc_times(_tab, _size, 1)
/**
* @brief Finalize tmpabuf pre-allocating tab->size bytes.
*/
#define rd_tmpabuf_finalize(_tab) (_tab)->buf = rd_malloc((_tab)->size)
milindl marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Free memory allocated by tmpabuf
*/
Expand Down
59 changes: 32 additions & 27 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ static rd_kafka_metadata_internal_t *rd_kafka_metadata_copy_internal(
* Because of this we copy all the structs verbatim but
* any pointer fields needs to be copied explicitly to update
* the pointer address. */
rd_tmpabuf_new(&tbuf, size, 1 /*assert on fail*/);
rd_tmpabuf_new(&tbuf, size, rd_true /*assert on fail*/);
rd_tmpabuf_finalize(&tbuf);
mdi = rd_tmpabuf_write(&tbuf, src, sizeof(*mdi));
md = &mdi->metadata;

Expand Down Expand Up @@ -506,11 +507,13 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
* no more than 4 times larger than the wire representation.
* This is increased to 5 times in case if we want to compute partition
* to rack mapping. */
rd_tmpabuf_new(&tbuf,
sizeof(*mdi) + rkb_namelen +
(rkbuf->rkbuf_totlen * 4 +
(compute_racks ? rkbuf->rkbuf_totlen : 0)),
0 /*dont assert on fail*/);
rd_tmpabuf_new(&tbuf, 0, rd_false /*dont assert on fail*/);
rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi));
rd_tmpabuf_add_alloc(&tbuf, rkb_namelen);
rd_tmpabuf_add_alloc(&tbuf, rkbuf->rkbuf_totlen *
(4 + (compute_racks ? 1 : 0)));

rd_tmpabuf_finalize(&tbuf);

if (!(mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)))) {
rd_kafka_broker_unlock(rkb);
Expand Down Expand Up @@ -1603,35 +1606,37 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics,
rd_kafka_metadata_internal_t *mdi;
rd_kafka_metadata_t *md;
rd_tmpabuf_t tbuf;
size_t topic_names_size = 0;
int total_partition_cnt = 0;
size_t i;
int curr_broker = 0;

/* Calculate total partition count and topic names size before
* allocating memory. */
for (i = 0; i < topic_cnt; i++) {
topic_names_size += 1 + strlen(topics[i].topic);
total_partition_cnt += topics[i].partition_cnt;
}

/* If the replication factor is given, num_brokers must also be given */
rd_assert(replication_factor <= 0 || num_brokers > 0);

/* Allocate contiguous buffer which will back all the memory
* needed by the final metadata_t object */
rd_tmpabuf_new(
&tbuf,
sizeof(*mdi) + (sizeof(*md->topics) * topic_cnt) +
topic_names_size + (64 /*topic name size..*/ * topic_cnt) +
(sizeof(*md->topics[0].partitions) * total_partition_cnt) +
(sizeof(*mdi->topics) * topic_cnt) +
(sizeof(*mdi->topics[0].partitions) * total_partition_cnt) +
(sizeof(*mdi->brokers) * RD_ROUNDUP(num_brokers, 8)) +
(replication_factor > 0 ? RD_ROUNDUP(replication_factor, 8) *
total_partition_cnt * sizeof(int)
: 0),
1 /*assert on fail*/);
rd_tmpabuf_new(&tbuf, sizeof(*mdi), rd_true /*assert on fail*/);

rd_tmpabuf_add_alloc(&tbuf, topic_cnt * sizeof(*md->topics));
rd_tmpabuf_add_alloc(&tbuf, topic_cnt * sizeof(*mdi->topics));
rd_tmpabuf_add_alloc(&tbuf, num_brokers * sizeof(*md->brokers));

/* Calculate total partition count and topic names size before
* allocating memory. */
for (i = 0; i < topic_cnt; i++) {
rd_tmpabuf_add_alloc(&tbuf, 1 + strlen(topics[i].topic));
rd_tmpabuf_add_alloc(&tbuf,
topics[i].partition_cnt *
sizeof(*md->topics[i].partitions));
rd_tmpabuf_add_alloc(&tbuf,
topics[i].partition_cnt *
sizeof(*mdi->topics[i].partitions));
if (replication_factor > 0)
rd_tmpabuf_add_alloc_times(
&tbuf, replication_factor * sizeof(int),
topics[i].partition_cnt);
}

rd_tmpabuf_finalize(&tbuf);

mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi));
memset(mdi, 0, sizeof(*mdi));
Expand Down
38 changes: 17 additions & 21 deletions src/rdkafka_metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,6 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert(
rd_kafka_metadata_broker_internal_t *brokers_internal,
size_t broker_cnt) {
struct rd_kafka_metadata_cache_entry *rkmce, *old;
size_t topic_len;
size_t racks_size = 0;
rd_tmpabuf_t tbuf;
int i;

Expand All @@ -261,34 +259,32 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert(
* any pointer fields needs to be copied explicitly to update
* the pointer address.
* See also rd_kafka_metadata_cache_delete which frees this. */
topic_len = strlen(mtopic->topic) + 1;
rd_tmpabuf_new(&tbuf, 0, rd_true /*assert on fail*/);

rd_tmpabuf_add_alloc(&tbuf, sizeof(*rkmce));
rd_tmpabuf_add_alloc(&tbuf, strlen(mtopic->topic) + 1);
rd_tmpabuf_add_alloc(&tbuf, mtopic->partition_cnt *
sizeof(*mtopic->partitions));
rd_tmpabuf_add_alloc(&tbuf,
mtopic->partition_cnt *
sizeof(*metadata_internal_topic->partitions));

for (i = 0; include_racks && i < mtopic->partition_cnt; i++) {
size_t j;
racks_size += RD_ROUNDUP(
metadata_internal_topic->partitions[i].racks_cnt *
sizeof(char *),
8);
rd_tmpabuf_add_alloc(
&tbuf, metadata_internal_topic->partitions[i].racks_cnt *
sizeof(char *));
for (j = 0;
j < metadata_internal_topic->partitions[i].racks_cnt;
j++) {
racks_size += RD_ROUNDUP(
strlen(metadata_internal_topic->partitions[i]
.racks[j]) +
1,
8);
rd_tmpabuf_add_alloc(
&tbuf, strlen(metadata_internal_topic->partitions[i]
.racks[j]) +
1);
}
}

rd_tmpabuf_new(
&tbuf,
RD_ROUNDUP(sizeof(*rkmce), 8) + RD_ROUNDUP(topic_len, 8) +
(mtopic->partition_cnt *
RD_ROUNDUP(sizeof(*mtopic->partitions), 8)) +
(mtopic->partition_cnt *
RD_ROUNDUP(sizeof(*metadata_internal_topic->partitions), 8)) +
racks_size,
1 /*assert on fail*/);
rd_tmpabuf_finalize(&tbuf);

rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce));

Expand Down
36 changes: 21 additions & 15 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1831,38 +1831,44 @@ rd_kafka_topic_info_t *rd_kafka_topic_info_new_with_rack(
const rd_kafka_metadata_partition_internal_t *mdpi) {
rd_kafka_topic_info_t *ti;
rd_tmpabuf_t tbuf;
size_t tlen = RD_ROUNDUP(strlen(topic) + 1, 8);
size_t total_racks_size = 0;
int i;
rd_bool_t has_racks = rd_false;

rd_tmpabuf_new(&tbuf, 0, rd_true /* assert on fail */);

rd_tmpabuf_add_alloc(&tbuf, sizeof(*ti));
rd_tmpabuf_add_alloc(&tbuf, strlen(topic) + 1);
for (i = 0; i < partition_cnt; i++) {
size_t j;
if (!mdpi[i].racks)
continue;

if (unlikely(!has_racks))
has_racks = rd_true;

for (j = 0; j < mdpi[i].racks_cnt; j++) {
total_racks_size +=
RD_ROUNDUP(strlen(mdpi[i].racks[j]) + 1, 8);
rd_tmpabuf_add_alloc(&tbuf,
strlen(mdpi[i].racks[j]) + 1);
}
total_racks_size +=
RD_ROUNDUP(sizeof(char *) * mdpi[i].racks_cnt, 8);
rd_tmpabuf_add_alloc(&tbuf, sizeof(char *) * mdpi[i].racks_cnt);
}

/* Only bother allocating this if at least one
* rack is there. */
if (has_racks) {
rd_tmpabuf_add_alloc(
&tbuf, sizeof(rd_kafka_metadata_partition_internal_t) *
partition_cnt);
}

if (total_racks_size) /* Only bother allocating this if at least one
rack is there. */
total_racks_size +=
RD_ROUNDUP(sizeof(rd_kafka_metadata_partition_internal_t) *
partition_cnt,
8);
rd_tmpabuf_finalize(&tbuf);

rd_tmpabuf_new(&tbuf, sizeof(*ti) + tlen + total_racks_size,
1 /* assert on fail */);
ti = rd_tmpabuf_alloc(&tbuf, sizeof(*ti));
ti->topic = rd_tmpabuf_write_str(&tbuf, topic);
ti->partition_cnt = partition_cnt;
ti->partitions_internal = NULL;

if (total_racks_size) {
if (has_racks) {
ti->partitions_internal = rd_tmpabuf_alloc(
&tbuf, sizeof(*ti->partitions_internal) * partition_cnt);

Expand Down