Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timestamp extraction for compressed messages #858

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -3471,6 +3471,17 @@ static char *rd_kafka_snappy_java_decompress (rd_kafka_broker_t *rkb,
#endif


/**
* Message header
*/
struct msg_hdr {
int64_t Offset;
int32_t MessageSize;
uint32_t Crc;
int8_t MagicByte; /* MsgVersion */
int8_t Attributes;
int64_t Timestamp;
};

/**
* Parses a MessageSet and enqueues internal ops on the local
Expand All @@ -3480,6 +3491,7 @@ static rd_kafka_resp_err_t
rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
rd_kafka_q_t *rkq,
struct msg_hdr *blk_hdr,
struct rd_kafka_toppar_ver *tver,
int16_t ApiVersion,
rd_kafka_buf_t *rkbuf_orig,
Expand All @@ -3502,14 +3514,6 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
rktp->rktp_partition);

while (rd_kafka_buf_remain(rkbuf) > 0) {
struct {
int64_t Offset;
int32_t MessageSize;
uint32_t Crc;
int8_t MagicByte; /* MsgVersion */
int8_t Attributes;
int64_t Timestamp;
} hdr;
rd_kafkap_bytes_t Key;
rd_kafkap_bytes_t Value;
int32_t Value_len;
Expand All @@ -3520,6 +3524,7 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
int relative_offsets;
rd_kafka_resp_err_t err RD_UNUSED = RD_KAFKA_RESP_ERR_NO_ERROR;

struct msg_hdr hdr;
rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSize);
rd_kafka_buf_read_i32(rkbuf, &hdr.Crc);
Expand Down Expand Up @@ -3601,8 +3606,15 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
rd_kafka_buf_keep(rkbuf_orig);

if (hdr.MagicByte >= 1 && hdr.Timestamp) {
rkm->rkm_timestamp = hdr.Timestamp;
if (hdr.Attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME)
int8_t attributes;
if (blk_hdr && hdr.Timestamp == -1) {
rkm->rkm_timestamp = blk_hdr->Timestamp;
attributes = blk_hdr->Attributes;
} else {
rkm->rkm_timestamp = hdr.Timestamp;
attributes = hdr.Attributes;
}
if (attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME)
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME;
else
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
Expand Down Expand Up @@ -3787,7 +3799,7 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
/* Now parse the contained Messages */
rd_kafka_messageset_handle(rkb, rktp,
relative_offsets ?
&relq : rkq, tver,
&relq : rkq, &hdr, tver,
ApiVersion,
rkbufz, outbuf, outlen);

Expand Down Expand Up @@ -4076,7 +4088,7 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,

/* Parse and handle the message set */
err2 = rd_kafka_messageset_handle(
rkb, rktp, &tmp_opq, tver,
rkb, rktp, &tmp_opq, NULL, tver,
request->rkbuf_reqhdr.ApiVersion,
rkbuf, rkbuf->rkbuf_rbuf+rkbuf->rkbuf_of,
hdr.MessageSetSize);
Expand Down