Skip to content

Commit

Permalink
Merge pull request #895 from dvsekhvalnov/master
Browse files Browse the repository at this point in the history
#885: added BlockTimestamp to ConsumerMessage
  • Loading branch information
eapache authored Jun 15, 2017
2 parents ac03dfa + ce6585f commit f7e3024
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (

// ConsumerMessage encapsulates a Kafka message returned by the consumer.
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+
Key, Value []byte
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
}

// ConsumerError is what is provided to the user when an error occurs.
Expand Down Expand Up @@ -520,12 +521,13 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu

if offset >= child.offset {
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: offset,
Timestamp: msg.Msg.Timestamp,
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: offset,
Timestamp: msg.Msg.Timestamp,
BlockTimestamp: msgBlock.Msg.Timestamp,
})
child.offset = offset + 1
} else {
Expand Down

0 comments on commit f7e3024

Please sign in to comment.