From 5576b5a928e079518cd049234963f5b7c599c58d Mon Sep 17 00:00:00 2001 From: Yunjing Xu Date: Mon, 24 Oct 2016 19:02:38 -0700 Subject: [PATCH] Fix timestamp extraction for compressed messages --- src/rdkafka_broker.c | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 78196c9c67..b8a6625842 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3471,6 +3471,17 @@ static char *rd_kafka_snappy_java_decompress (rd_kafka_broker_t *rkb, #endif +/** + * Message header + */ +struct msg_hdr { + int64_t Offset; + int32_t MessageSize; + uint32_t Crc; + int8_t MagicByte; /* MsgVersion */ + int8_t Attributes; + int64_t Timestamp; +}; /** * Parses a MessageSet and enqueues internal ops on the local @@ -3480,6 +3491,7 @@ static rd_kafka_resp_err_t rd_kafka_messageset_handle (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp, rd_kafka_q_t *rkq, + struct msg_hdr *blk_hdr, struct rd_kafka_toppar_ver *tver, int16_t ApiVersion, rd_kafka_buf_t *rkbuf_orig, @@ -3502,14 +3514,6 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb, rktp->rktp_partition); while (rd_kafka_buf_remain(rkbuf) > 0) { - struct { - int64_t Offset; - int32_t MessageSize; - uint32_t Crc; - int8_t MagicByte; /* MsgVersion */ - int8_t Attributes; - int64_t Timestamp; - } hdr; rd_kafkap_bytes_t Key; rd_kafkap_bytes_t Value; int32_t Value_len; @@ -3520,6 +3524,7 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb, int relative_offsets; rd_kafka_resp_err_t err RD_UNUSED = RD_KAFKA_RESP_ERR_NO_ERROR; + struct msg_hdr hdr; rd_kafka_buf_read_i64(rkbuf, &hdr.Offset); rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSize); rd_kafka_buf_read_i32(rkbuf, &hdr.Crc); @@ -3601,8 +3606,15 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb, rd_kafka_buf_keep(rkbuf_orig); if (hdr.MagicByte >= 1 && hdr.Timestamp) { - rkm->rkm_timestamp = hdr.Timestamp; - if (hdr.Attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME) + int8_t attributes; + if (blk_hdr && hdr.Timestamp == -1) { + rkm->rkm_timestamp = blk_hdr->Timestamp; + attributes = blk_hdr->Attributes; + } else { + rkm->rkm_timestamp = hdr.Timestamp; + attributes = hdr.Attributes; + } + if (attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME) rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME; else rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME; @@ -3787,7 +3799,7 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb, /* Now parse the contained Messages */ rd_kafka_messageset_handle(rkb, rktp, relative_offsets ? - &relq : rkq, tver, + &relq : rkq, &hdr, tver, ApiVersion, rkbufz, outbuf, outlen); @@ -4076,7 +4088,7 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb, /* Parse and handle the message set */ err2 = rd_kafka_messageset_handle( - rkb, rktp, &tmp_opq, tver, + rkb, rktp, &tmp_opq, NULL, tver, request->rkbuf_reqhdr.ApiVersion, rkbuf, rkbuf->rkbuf_rbuf+rkbuf->rkbuf_of, hdr.MessageSetSize);