Skip to content

Commit

Permalink
consumer: Correctly set timestamp for compressed messages (#858)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Nov 16, 2016
1 parent 999a7d3 commit 6aa20e1
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -3482,6 +3482,8 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
rd_kafka_q_t *rkq,
struct rd_kafka_toppar_ver *tver,
int16_t ApiVersion,
rd_kafka_timestamp_type_t outer_tstype,
int64_t outer_timestamp,
rd_kafka_buf_t *rkbuf_orig,
void *buf, size_t size) {
rd_kafka_buf_t *rkbuf; /* Slice of rkbuf_orig */
Expand Down Expand Up @@ -3600,7 +3602,10 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
rko->rko_u.fetch.rkbuf = rkbuf_orig; /* original rkbuf */
rd_kafka_buf_keep(rkbuf_orig);

if (hdr.MagicByte >= 1 && hdr.Timestamp) {
if (outer_tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
rkm->rkm_timestamp = outer_timestamp;
rkm->rkm_tstype = outer_tstype;
} else if (hdr.MagicByte >= 1 && hdr.Timestamp) {
rkm->rkm_timestamp = hdr.Timestamp;
if (hdr.Attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME)
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME;
Expand Down Expand Up @@ -3775,6 +3780,15 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
rd_kafka_q_t relq; /* Temporary queue for use with
* relative offsets. */
int relative_offsets = ApiVersion == 2;
rd_kafka_timestamp_type_t use_tstype =
RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
int64_t use_timestamp = 0;

if (hdr.MagicByte >= 1 &&
(hdr.Attributes&RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME)){
use_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME;
use_timestamp = hdr.Timestamp;
}

/* With a new allocated buffer (outbuf) we need
* a separate rkbuf for it to allow multiple fetch ops
Expand All @@ -3789,6 +3803,8 @@ rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
relative_offsets ?
&relq : rkq, tver,
ApiVersion,
use_tstype,
use_timestamp,
rkbufz, outbuf, outlen);


Expand Down Expand Up @@ -4078,6 +4094,7 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
err2 = rd_kafka_messageset_handle(
rkb, rktp, &tmp_opq, tver,
request->rkbuf_reqhdr.ApiVersion,
RD_KAFKA_TIMESTAMP_NOT_AVAILABLE, -1,
rkbuf, rkbuf->rkbuf_rbuf+rkbuf->rkbuf_of,
hdr.MessageSetSize);
if (err2) {
Expand Down

0 comments on commit 6aa20e1

Please sign in to comment.