Skip to content

Commit

Permalink
Support ListOffsetRequest v1 [KIP-79]
Browse files Browse the repository at this point in the history
Makes it possible to lookup offsets based on a Timestamp.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
  • Loading branch information
jurriaan committed Oct 31, 2016
1 parent fd49817 commit 06ff93b
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 26 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ env:
- KAFKA_VERSION=0.8.2.2
- KAFKA_VERSION=0.9.0.1
- KAFKA_VERSION=0.10.0.1
- KAFKA_VERSION=0.10.1.0

before_install:
- export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
Expand Down
3 changes: 3 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,9 @@ func (client *client) getOffset(topic string, partitionID int32, time int64) (in
}

request := &OffsetRequest{}
if client.conf.Version.IsAtLeast(V0_10_1_0) {
request.Version = 1
}
request.AddBlock(topic, partitionID, time, 1)

response, err := broker.GetAvailableOffsets(request)
Expand Down
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const (
ErrUnsupportedSASLMechanism KError = 33
ErrIllegalSASLState KError = 34
ErrUnsupportedVersion KError = 35
ErrUnsupportedForMessageFormat KError = 43
)

func (err KError) Error() string {
Expand Down Expand Up @@ -188,6 +189,8 @@ func (err KError) Error() string {
return "kafka server: Request is not valid given the current SASL state."
case ErrUnsupportedVersion:
return "kafka server: The version of API is not supported."
case ErrUnsupportedForMessageFormat:
return "kafka server: The requested operation is not supported by the message format version."
}

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
Expand Down
39 changes: 27 additions & 12 deletions offset_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,33 @@ package sarama

type offsetRequestBlock struct {
time int64
maxOffsets int32
maxOffsets int32 // Only used in version 0
}

func (b *offsetRequestBlock) encode(pe packetEncoder) error {
func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
pe.putInt64(int64(b.time))
pe.putInt32(b.maxOffsets)
if version == 0 {
pe.putInt32(b.maxOffsets)
}

return nil
}

func (b *offsetRequestBlock) decode(pd packetDecoder) (err error) {
func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
if b.time, err = pd.getInt64(); err != nil {
return err
}
if b.maxOffsets, err = pd.getInt32(); err != nil {
return err
if version == 0 {
if b.maxOffsets, err = pd.getInt32(); err != nil {
return err
}
}
return nil
}

type OffsetRequest struct {
blocks map[string]map[int32]*offsetRequestBlock
Version int16
blocks map[string]map[int32]*offsetRequestBlock
}

func (r *OffsetRequest) encode(pe packetEncoder) error {
Expand All @@ -42,7 +48,7 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
}
for partition, block := range partitions {
pe.putInt32(partition)
if err = block.encode(pe); err != nil {
if err = block.encode(pe, r.Version); err != nil {
return err
}
}
Expand All @@ -51,6 +57,8 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
}

func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
r.Version = version

// Ignore replica ID
if _, err := pd.getInt32(); err != nil {
return err
Expand Down Expand Up @@ -79,7 +87,7 @@ func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
return err
}
block := &offsetRequestBlock{}
if err := block.decode(pd); err != nil {
if err := block.decode(pd, version); err != nil {
return err
}
r.blocks[topic][partition] = block
Expand All @@ -93,11 +101,16 @@ func (r *OffsetRequest) key() int16 {
}

func (r *OffsetRequest) version() int16 {
return 0
return r.Version
}

func (r *OffsetRequest) requiredVersion() KafkaVersion {
return minVersion
switch r.Version {
case 1:
return V0_10_1_0
default:
return minVersion
}
}

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
Expand All @@ -111,7 +124,9 @@ func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, ma

tmp := new(offsetRequestBlock)
tmp.time = time
tmp.maxOffsets = maxOffsets
if r.Version == 0 {
tmp.maxOffsets = maxOffsets
}

r.blocks[topic][partitionID] = tmp
}
17 changes: 17 additions & 0 deletions offset_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ var (
0x00, 0x00, 0x00, 0x04,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x02}

offsetRequestOneBlockV1 = []byte{
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x04,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}
)

func TestOffsetRequest(t *testing.T) {
Expand All @@ -24,3 +32,12 @@ func TestOffsetRequest(t *testing.T) {
request.AddBlock("foo", 4, 1, 2)
testRequest(t, "one block", request, offsetRequestOneBlock)
}

func TestOffsetRequestV1(t *testing.T) {
request := new(OffsetRequest)
request.Version = 1
testRequest(t, "no blocks", request, offsetRequestNoBlocks)

request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1
testRequest(t, "one block", request, offsetRequestOneBlockV1)
}
58 changes: 45 additions & 13 deletions offset_response.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,57 @@
package sarama

type OffsetResponseBlock struct {
Err KError
Offsets []int64
Err KError
Offsets []int64 // Version 0
Offset int64 // Version 1
Timestamp int64 // Version 1
}

func (b *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
}
b.Err = KError(tmp)

b.Offsets, err = pd.getInt64Array()
if version == 0 {
b.Offsets, err = pd.getInt64Array()

return err
return err
}

b.Timestamp, err = pd.getInt64()
if err != nil {
return err
}

b.Offset, err = pd.getInt64()
if err != nil {
return err
}

// For backwards compatibility put the offset in the offsets array too
b.Offsets = []int64{b.Offset}

return nil
}

func (b *OffsetResponseBlock) encode(pe packetEncoder) (err error) {
func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(int16(b.Err))

return pe.putInt64Array(b.Offsets)
if version == 0 {
return pe.putInt64Array(b.Offsets)
}

pe.putInt64(b.Timestamp)
pe.putInt64(b.Offset)

return nil
}

type OffsetResponse struct {
Blocks map[string]map[int32]*OffsetResponseBlock
Version int16
Blocks map[string]map[int32]*OffsetResponseBlock
}

func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -54,7 +81,7 @@ func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
}

block := new(OffsetResponseBlock)
err = block.decode(pd)
err = block.decode(pd, version)
if err != nil {
return err
}
Expand Down Expand Up @@ -106,7 +133,7 @@ func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
}
for partition, block := range partitions {
pe.putInt32(partition)
if err = block.encode(pe); err != nil {
if err = block.encode(pe, r.version()); err != nil {
return err
}
}
Expand All @@ -120,11 +147,16 @@ func (r *OffsetResponse) key() int16 {
}

func (r *OffsetResponse) version() int16 {
return 0
return r.Version
}

func (r *OffsetResponse) requiredVersion() KafkaVersion {
return minVersion
switch r.Version {
case 1:
return V0_10_1_0
default:
return minVersion
}
}

// testing API
Expand All @@ -138,5 +170,5 @@ func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset
byTopic = make(map[int32]*OffsetResponseBlock)
r.Blocks[topic] = byTopic
}
byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}}
byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset}
}
49 changes: 49 additions & 0 deletions offset_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ var (
0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06}

normalOffsetResponseV1 = []byte{
0x00, 0x00, 0x00, 0x02,

0x00, 0x01, 'a',
0x00, 0x00, 0x00, 0x00,

0x00, 0x01, 'z',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x02,
0x00, 0x00,
0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06}
)

func TestEmptyOffsetResponse(t *testing.T) {
Expand All @@ -28,6 +41,13 @@ func TestEmptyOffsetResponse(t *testing.T) {
if len(response.Blocks) != 0 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were none.")
}

response = OffsetResponse{}

testVersionDecodable(t, "empty", &response, emptyOffsetResponse, 1)
if len(response.Blocks) != 0 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were none.")
}
}

func TestNormalOffsetResponse(t *testing.T) {
Expand Down Expand Up @@ -58,5 +78,34 @@ func TestNormalOffsetResponse(t *testing.T) {
if response.Blocks["z"][2].Offsets[0] != 5 || response.Blocks["z"][2].Offsets[1] != 6 {
t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
}
}

func TestNormalOffsetResponseV1(t *testing.T) {
response := OffsetResponse{}

testVersionDecodable(t, "normal", &response, normalOffsetResponseV1, 1)

if len(response.Blocks) != 2 {
t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.")
}

if len(response.Blocks["a"]) != 0 {
t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.")
}

if len(response.Blocks["z"]) != 1 {
t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.")
}

if response.Blocks["z"][2].Err != ErrNoError {
t.Fatal("Decoding produced invalid error for topic z partition 2.")
}

if response.Blocks["z"][2].Timestamp != 1477920049286 {
t.Fatal("Decoding produced invalid timestamp for topic z partition 2.", response.Blocks["z"][2].Timestamp)
}

if response.Blocks["z"][2].Offset != 6 {
t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
}
}
2 changes: 1 addition & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func allocateBody(key, version int16) protocolBody {
case 1:
return &FetchRequest{}
case 2:
return &OffsetRequest{}
return &OffsetRequest{Version: version}
case 3:
return &MetadataRequest{}
case 8:
Expand Down
1 change: 1 addition & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ var (
V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
minVersion = V0_8_2_0
)

0 comments on commit 06ff93b

Please sign in to comment.