diff --git a/async_producer.go b/async_producer.go index e7ae8c2e5..2c28f1b99 100644 --- a/async_producer.go +++ b/async_producer.go @@ -135,6 +135,11 @@ type ProducerMessage struct { // Partition is the partition that the message was sent to. This is only // guaranteed to be defined if the message was successfully delivered. Partition int32 + // Timestamp is the timestamp assigned to the message by the broker. This + // is only guaranteed to be defined if the message was successfully + // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at + // least version 0.10.0. + Timestamp time.Time retries int flags flagSet @@ -722,6 +727,12 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo switch block.Err { // Success case ErrNoError: + if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) { + timestamp := time.Unix(block.Timestamp, 0) + for _, msg := range msgs { + msg.Timestamp = timestamp + } + } for i, msg := range msgs { msg.Offset = block.Offset + int64(i) } diff --git a/broker.go b/broker.go index 968b39993..992931bd0 100644 --- a/broker.go +++ b/broker.go @@ -198,6 +198,21 @@ func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, e } func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { + switch request.version() { + case 0: + break + case 1: + if !b.conf.Version.IsAtLeast(V0_9_0_0) { + return nil, ErrUnsupportedVersion + } + case 2: + if !b.conf.Version.IsAtLeast(V0_10_0_0) { + return nil, ErrUnsupportedVersion + } + default: + return nil, ErrUnsupportedVersion + } + var response *ProduceResponse var err error diff --git a/produce_request.go b/produce_request.go index efcb98012..8bcefe933 100644 --- a/produce_request.go +++ b/produce_request.go @@ -19,6 +19,7 @@ const ( type ProduceRequest struct { RequiredAcks RequiredAcks Timeout int32 + Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10 msgSets map[string]map[int32]*MessageSet } @@ -110,7 +111,7 @@ func (p *ProduceRequest) key() int16 { } func (p *ProduceRequest) version() int16 { - return 0 + return p.Version } func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { diff --git a/produce_response.go b/produce_response.go index 84029f264..2d90f123e 100644 --- a/produce_response.go +++ b/produce_response.go @@ -1,11 +1,12 @@ package sarama type ProduceResponseBlock struct { - Err KError - Offset int64 + Err KError + Offset int64 + Timestamp int64 // only provided if Version >= 2 } -func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) { +func (pr *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err @@ -17,14 +18,24 @@ func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) { return err } + if version >= 2 { + if pr.Timestamp, err = pd.getInt64(); err != nil { + return err + } + } + return nil } type ProduceResponse struct { - Blocks map[string]map[int32]*ProduceResponseBlock + Blocks map[string]map[int32]*ProduceResponseBlock + Version int16 + ThrottleTime int32 // only provided if Version >= 1 } func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) { + pr.Version = version + numTopics, err := pd.getArrayLength() if err != nil { return err @@ -51,7 +62,7 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) { } block := new(ProduceResponseBlock) - err = block.decode(pd) + err = block.decode(pd, version) if err != nil { return err } @@ -59,6 +70,12 @@ func (pr *ProduceResponse) decode(pd packetDecoder, version int16) (err error) { } } + if pr.Version >= 1 { + if pr.ThrottleTime, err = pd.getInt32(); err != nil { + return err + } + } + return nil } @@ -90,7 +107,7 @@ func (r *ProduceResponse) key() int16 { } func (r *ProduceResponse) version() int16 { - return 0 + return r.Version } func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock { diff --git a/produce_set.go b/produce_set.go index 9fe5f79d4..894b4aa30 100644 --- a/produce_set.go +++ b/produce_set.go @@ -67,6 +67,9 @@ func (ps *produceSet) buildRequest() *ProduceRequest { RequiredAcks: ps.parent.conf.Producer.RequiredAcks, Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond), } + if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { + req.Version = 2 + } for topic, partitionSet := range ps.msgs { for partition, set := range partitionSet {