diff --git a/consumer.go b/consumer.go index 78d7fa2ca..ea6ccd88a 100644 --- a/consumer.go +++ b/consumer.go @@ -10,11 +10,12 @@ import ( // ConsumerMessage encapsulates a Kafka message returned by the consumer. type ConsumerMessage struct { - Key, Value []byte - Topic string - Partition int32 - Offset int64 - Timestamp time.Time // only set if kafka is version 0.10+ + Key, Value []byte + Topic string + Partition int32 + Offset int64 + Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp + BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp } // ConsumerError is what is provided to the user when an error occurs. @@ -520,12 +521,13 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu if offset >= child.offset { messages = append(messages, &ConsumerMessage{ - Topic: child.topic, - Partition: child.partition, - Key: msg.Msg.Key, - Value: msg.Msg.Value, - Offset: offset, - Timestamp: msg.Msg.Timestamp, + Topic: child.topic, + Partition: child.partition, + Key: msg.Msg.Key, + Value: msg.Msg.Value, + Offset: offset, + Timestamp: msg.Msg.Timestamp, + BlockTimestamp: msgBlock.Msg.Timestamp, }) child.offset = offset + 1 } else {