diff --git a/broker.go b/broker.go index 992931bd0..8b6aa07e1 100644 --- a/broker.go +++ b/broker.go @@ -231,6 +231,21 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { } func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { + switch request.version() { + case 0: + break + case 1: + if !b.conf.Version.IsAtLeast(V0_9_0_0) { + return nil, ErrUnsupportedVersion + } + case 2: + if !b.conf.Version.IsAtLeast(V0_10_0_0) { + return nil, ErrUnsupportedVersion + } + default: + return nil, ErrUnsupportedVersion + } + response := new(FetchResponse) err := b.sendAndReceive(request, response) diff --git a/consumer.go b/consumer.go index 869452754..f6c979766 100644 --- a/consumer.go +++ b/consumer.go @@ -14,6 +14,7 @@ type ConsumerMessage struct { Topic string Partition int32 Offset int64 + Timestamp time.Time // only set if kafka is version 0.10+ } // ConsumerError is what is provided to the user when an error occurs. @@ -489,6 +490,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, + Timestamp: msg.Msg.Timestamp, }) child.offset = msg.Offset + 1 } else { @@ -682,6 +684,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { MinBytes: bc.consumer.conf.Consumer.Fetch.Min, MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond), } + if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { + request.Version = 2 + } for child := range bc.subscriptions { request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) diff --git a/fetch_request.go b/fetch_request.go index bd4b403fe..a7dce47bf 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -24,6 +24,7 @@ func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) { type FetchRequest struct { MaxWaitTime int32 MinBytes int32 + Version int16 blocks map[string]map[int32]*fetchRequestBlock } @@ -56,6 +57,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) { } func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) { + f.Version = version if _, err = pd.getInt32(); err != nil { return err } @@ -103,7 +105,7 @@ func (f *FetchRequest) key() int16 { } func (f *FetchRequest) version() int16 { - return 0 + return f.Version } func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) { diff --git a/fetch_response.go b/fetch_response.go index 13889d52b..91cf20da2 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -1,5 +1,7 @@ package sarama +import "time" + type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 @@ -33,7 +35,9 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) { } type FetchResponse struct { - Blocks map[string]map[int32]*FetchResponseBlock + Blocks map[string]map[int32]*FetchResponseBlock + ThrottleTime time.Duration + Version int16 // v1 requires 0.9+, v2 requires 0.10+ } func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) { @@ -50,6 +54,16 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) { } func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) { + fr.Version = version + + if fr.Version >= 1 { + throttle, err := pd.getInt64() + if err != nil { + return err + } + fr.ThrottleTime = time.Duration(throttle) + } + numTopics, err := pd.getArrayLength() if err != nil { return err @@ -88,6 +102,10 @@ func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) { } func (fr *FetchResponse) encode(pe packetEncoder) (err error) { + if fr.Version >= 1 { + pe.putInt64(int64(fr.ThrottleTime)) + } + err = pe.putArrayLength(len(fr.Blocks)) if err != nil { return err @@ -121,7 +139,7 @@ func (r *FetchResponse) key() int16 { } func (r *FetchResponse) version() int16 { - return 0 + return r.Version } func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock { diff --git a/message.go b/message.go index 55edc9d52..de616c3c6 100644 --- a/message.go +++ b/message.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "fmt" "io/ioutil" + "time" "github.com/eapache/go-xerial-snappy" ) @@ -21,15 +22,13 @@ const ( CompressionSnappy CompressionCodec = 2 ) -// The spec just says: "This is a version id used to allow backwards compatible evolution of the message -// binary format." but it doesn't say what the current value is, so presumably 0... -const messageFormat int8 = 0 - type Message struct { - Codec CompressionCodec // codec used to compress the message contents - Key []byte // the message key, may be nil - Value []byte // the message contents - Set *MessageSet // the message set a message might wrap + Codec CompressionCodec // codec used to compress the message contents + Key []byte // the message key, may be nil + Value []byte // the message contents + Set *MessageSet // the message set a message might wrap + Version int8 // v1 requires Kafka 0.10 + Timestamp time.Time // the timestamp of the message (version 1+ only) compressedCache []byte } @@ -37,11 +36,15 @@ type Message struct { func (m *Message) encode(pe packetEncoder) error { pe.push(&crc32Field{}) - pe.putInt8(messageFormat) + pe.putInt8(m.Version) attributes := int8(m.Codec) & compressionCodecMask pe.putInt8(attributes) + if m.Version >= 1 { + pe.putInt64(m.Timestamp.Unix()) + } + err := pe.putBytes(m.Key) if err != nil { return err @@ -89,13 +92,10 @@ func (m *Message) decode(pd packetDecoder) (err error) { return err } - format, err := pd.getInt8() + m.Version, err = pd.getInt8() if err != nil { return err } - if format != messageFormat { - return PacketDecodingError{"unexpected messageFormat"} - } attribute, err := pd.getInt8() if err != nil { @@ -103,6 +103,14 @@ func (m *Message) decode(pd packetDecoder) (err error) { } m.Codec = CompressionCodec(attribute & compressionCodecMask) + if m.Version >= 1 { + timestamp, err := pd.getInt64() + if err != nil { + return err + } + m.Timestamp = time.Unix(timestamp, 0) + } + m.Key, err = pd.getBytes() if err != nil { return err