diff --git a/offset_commit_request.go b/offset_commit_request.go index c6e8717e8..f807e9b76 100644 --- a/offset_commit_request.go +++ b/offset_commit_request.go @@ -2,6 +2,7 @@ package sarama // ReceiveTime is a special value for the timestamp field of Offset Commit Requests which // tells the broker to set the timestamp to the time at which the request was received. +// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2. const ReceiveTime int64 = -1 type offsetCommitRequestBlock struct { @@ -10,18 +11,25 @@ type offsetCommitRequestBlock struct { metadata string } -func (r *offsetCommitRequestBlock) encode(pe packetEncoder) error { +func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error { pe.putInt64(r.offset) - pe.putInt64(r.timestamp) + if version >= 1 { + pe.putInt64(r.timestamp) + } return pe.putString(r.metadata) } type OffsetCommitRequest struct { ConsumerGroup string + Version int16 // 0 (0.8.1 and later) or 1 (0.8.2 and later, includes timestamp field) blocks map[string]map[int32]*offsetCommitRequestBlock } func (r *OffsetCommitRequest) encode(pe packetEncoder) error { + if r.Version < 0 || r.Version > 1 { + return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"} + } + err := pe.putString(r.ConsumerGroup) if err != nil { return err @@ -41,7 +49,7 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error { } for partition, block := range partitions { pe.putInt32(partition) - err = block.encode(pe) + err = block.encode(pe, r.Version) if err != nil { return err } @@ -55,7 +63,7 @@ func (r *OffsetCommitRequest) key() int16 { } func (r *OffsetCommitRequest) version() int16 { - return 0 + return r.Version } func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) { @@ -67,5 +75,9 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock) } + if r.Version == 0 && timestamp != 0 { + Logger.Println("Non-zero timestamp specified for OffsetCommitRequest v0, it will be ignored") + } + r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata} } diff --git a/offset_commit_request_test.go b/offset_commit_request_test.go index 2d33ae31f..89dd4d974 100644 --- a/offset_commit_request_test.go +++ b/offset_commit_request_test.go @@ -11,7 +11,16 @@ var ( 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', 0x00, 0x00, 0x00, 0x00} - offsetCommitRequestOneBlock = []byte{ + offsetCommitRequestOneBlockV0 = []byte{ + 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x52, 0x21, + 0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, + 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'} + + offsetCommitRequestOneBlockV1 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, 't', 'o', 'p', 'i', 'c', @@ -30,5 +39,8 @@ func TestOffsetCommitRequest(t *testing.T) { testEncodable(t, "no blocks", request, offsetCommitRequestNoBlocks) request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata") - testEncodable(t, "one block", request, offsetCommitRequestOneBlock) + testEncodable(t, "one block", request, offsetCommitRequestOneBlockV0) + + request.Version = 1 + testEncodable(t, "one block", request, offsetCommitRequestOneBlockV1) }