Skip to content

Commit

Permalink
Send uncompressed if compression messageset is larger than uncompress…
Browse files Browse the repository at this point in the history
…ed (issue #781)

This also caused a crash (from recent additions)
  • Loading branch information
edenhill committed Sep 28, 2016
1 parent 0cec538 commit 45b730a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2601,6 +2601,14 @@ static int rd_kafka_compress_MessageSet_buf (rd_kafka_broker_t *rkb,

}

if (unlikely(RD_KAFKAP_MESSAGE_OVERHEAD + coutlen >
(size_t)MessageSetSize)) {
/* If the compressed data is larger than the
* uncompressed throw it away and send as uncompressed. */
rd_free(siov.iov_base);
return 0;
}

/* Rewind rkbuf to the pre-message checkpoint.
* This is to replace all the original Messages with just the
* Message containing the compressed payload. */
Expand Down
11 changes: 9 additions & 2 deletions tests/0017-compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,16 @@ int main_0017_compression(int argc, char **argv) {
rkt_p = test_create_producer_topic(rk_p, topics[i],
"compression.codec", codecs[i], NULL);

/* Produce small message that will not decrease with
* compression (issue #781) */
test_produce_msgs(rk_p, rkt_p, testid, partition,
msg_base + (partition*msg_cnt), msg_cnt,
NULL, 0);
msg_base + (partition*msg_cnt), 1,
NULL, 5);

/* Produce standard sized messages */
test_produce_msgs(rk_p, rkt_p, testid, partition,
msg_base + (partition*msg_cnt) + 1, msg_cnt-1,
NULL, 512);
rd_kafka_topic_destroy(rkt_p);
}

Expand Down

0 comments on commit 45b730a

Please sign in to comment.