Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OffsetCommitRequest #390

Merged
merged 1 commit into from
Mar 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions offset_commit_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
// tells the broker to set the timestamp to the time at which the request was received.
// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
const ReceiveTime int64 = -1

type offsetCommitRequestBlock struct {
Expand All @@ -10,18 +11,25 @@ type offsetCommitRequestBlock struct {
metadata string
}

func (r *offsetCommitRequestBlock) encode(pe packetEncoder) error {
func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
pe.putInt64(r.offset)
pe.putInt64(r.timestamp)
if version >= 1 {
pe.putInt64(r.timestamp)
}
return pe.putString(r.metadata)
}

type OffsetCommitRequest struct {
ConsumerGroup string
Version int16 // 0 (0.8.1 and later) or 1 (0.8.2 and later, includes timestamp field)
blocks map[string]map[int32]*offsetCommitRequestBlock
}

func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
if r.Version < 0 || r.Version > 1 {
return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
}

err := pe.putString(r.ConsumerGroup)
if err != nil {
return err
Expand All @@ -41,7 +49,7 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
}
for partition, block := range partitions {
pe.putInt32(partition)
err = block.encode(pe)
err = block.encode(pe, r.Version)
if err != nil {
return err
}
Expand All @@ -55,7 +63,7 @@ func (r *OffsetCommitRequest) key() int16 {
}

func (r *OffsetCommitRequest) version() int16 {
return 0
return r.Version
}

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
Expand All @@ -67,5 +75,9 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i
r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
}

if r.Version == 0 && timestamp != 0 {
Logger.Println("Non-zero timestamp specified for OffsetCommitRequest v0, it will be ignored")
}

r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
}
16 changes: 14 additions & 2 deletions offset_commit_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@ var (
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x00}

offsetCommitRequestOneBlock = []byte{
offsetCommitRequestOneBlockV0 = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x52, 0x21,
0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF,
0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'}

offsetCommitRequestOneBlockV1 = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
Expand All @@ -30,5 +39,8 @@ func TestOffsetCommitRequest(t *testing.T) {
testEncodable(t, "no blocks", request, offsetCommitRequestNoBlocks)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
testEncodable(t, "one block", request, offsetCommitRequestOneBlock)
testEncodable(t, "one block", request, offsetCommitRequestOneBlockV0)

request.Version = 1
testEncodable(t, "one block", request, offsetCommitRequestOneBlockV1)
}