diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 37934e0846..5b015ba82e 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -2601,6 +2601,14 @@ static int rd_kafka_compress_MessageSet_buf (rd_kafka_broker_t *rkb, } + if (unlikely(RD_KAFKAP_MESSAGE_OVERHEAD + coutlen > + (size_t)MessageSetSize)) { + /* If the compressed data is larger than the + * uncompressed throw it away and send as uncompressed. */ + rd_free(siov.iov_base); + return 0; + } + /* Rewind rkbuf to the pre-message checkpoint. * This is to replace all the original Messages with just the * Message containing the compressed payload. */ diff --git a/tests/0017-compression.c b/tests/0017-compression.c index 9f3edbbbf9..318d975518 100644 --- a/tests/0017-compression.c +++ b/tests/0017-compression.c @@ -75,9 +75,16 @@ int main_0017_compression(int argc, char **argv) { rkt_p = test_create_producer_topic(rk_p, topics[i], "compression.codec", codecs[i], NULL); + /* Produce small message that will not decrease with + * compression (issue #781) */ test_produce_msgs(rk_p, rkt_p, testid, partition, - msg_base + (partition*msg_cnt), msg_cnt, - NULL, 0); + msg_base + (partition*msg_cnt), 1, + NULL, 5); + + /* Produce standard sized messages */ + test_produce_msgs(rk_p, rkt_p, testid, partition, + msg_base + (partition*msg_cnt) + 1, msg_cnt-1, + NULL, 512); rd_kafka_topic_destroy(rkt_p); }