Skip to content

Commit

Permalink
Merge pull request #390 from Shopify/offset-commit-request
Browse files Browse the repository at this point in the history
Fix OffsetCommitRequest
  • Loading branch information
eapache committed Mar 24, 2015
2 parents b170778 + 7e0bb5a commit eb30a57
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
20 changes: 16 additions & 4 deletions offset_commit_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
// tells the broker to set the timestamp to the time at which the request was received.
// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
const ReceiveTime int64 = -1

type offsetCommitRequestBlock struct {
Expand All @@ -10,18 +11,25 @@ type offsetCommitRequestBlock struct {
metadata string
}

func (r *offsetCommitRequestBlock) encode(pe packetEncoder) error {
func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
pe.putInt64(r.offset)
pe.putInt64(r.timestamp)
if version >= 1 {
pe.putInt64(r.timestamp)
}
return pe.putString(r.metadata)
}

type OffsetCommitRequest struct {
ConsumerGroup string
Version int16 // 0 (0.8.1 and later) or 1 (0.8.2 and later, includes timestamp field)
blocks map[string]map[int32]*offsetCommitRequestBlock
}

func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
if r.Version < 0 || r.Version > 1 {
return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
}

err := pe.putString(r.ConsumerGroup)
if err != nil {
return err
Expand All @@ -41,7 +49,7 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
}
for partition, block := range partitions {
pe.putInt32(partition)
err = block.encode(pe)
err = block.encode(pe, r.Version)
if err != nil {
return err
}
Expand All @@ -55,7 +63,7 @@ func (r *OffsetCommitRequest) key() int16 {
}

func (r *OffsetCommitRequest) version() int16 {
return 0
return r.Version
}

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
Expand All @@ -67,5 +75,9 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i
r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
}

if r.Version == 0 && timestamp != 0 {
Logger.Println("Non-zero timestamp specified for OffsetCommitRequest v0, it will be ignored")
}

r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
}
16 changes: 14 additions & 2 deletions offset_commit_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@ var (
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x00}

offsetCommitRequestOneBlock = []byte{
offsetCommitRequestOneBlockV0 = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x52, 0x21,
0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF,
0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'}

offsetCommitRequestOneBlockV1 = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
Expand All @@ -30,5 +39,8 @@ func TestOffsetCommitRequest(t *testing.T) {
testEncodable(t, "no blocks", request, offsetCommitRequestNoBlocks)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
testEncodable(t, "one block", request, offsetCommitRequestOneBlock)
testEncodable(t, "one block", request, offsetCommitRequestOneBlockV0)

request.Version = 1
testEncodable(t, "one block", request, offsetCommitRequestOneBlockV1)
}

0 comments on commit eb30a57

Please sign in to comment.