Skip to content

Commit

Permalink
Merge pull request #985 from buyology/create-partitions
Browse files Browse the repository at this point in the history
add CreatePartitionsRequest/Response
  • Loading branch information
eapache authored Dec 8, 2017
2 parents cd645bf + 7957e72 commit 0e0650f
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 15 deletions.
121 changes: 121 additions & 0 deletions create_partitions_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package sarama

import "time"

type CreatePartitionsRequest struct {
TopicPartitions map[string]*TopicPartition
Timeout time.Duration
ValidateOnly bool
}

func (c *CreatePartitionsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(c.TopicPartitions)); err != nil {
return err
}

for topic, partition := range c.TopicPartitions {
if err := pe.putString(topic); err != nil {
return err
}
if err := partition.encode(pe); err != nil {
return err
}
}

pe.putInt32(int32(c.Timeout / time.Millisecond))

pe.putBool(c.ValidateOnly)

return nil
}

func (c *CreatePartitionsRequest) decode(pd packetDecoder, version int16) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
}
c.TopicPartitions = make(map[string]*TopicPartition, n)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
c.TopicPartitions[topic] = new(TopicPartition)
if err := c.TopicPartitions[topic].decode(pd, version); err != nil {
return err
}
}

timeout, err := pd.getInt32()
if err != nil {
return err
}
c.Timeout = time.Duration(timeout) * time.Millisecond

if c.ValidateOnly, err = pd.getBool(); err != nil {
return err
}

return nil
}

func (r *CreatePartitionsRequest) key() int16 {
return 37
}

func (r *CreatePartitionsRequest) version() int16 {
return 0
}

func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion {
return V1_0_0_0
}

type TopicPartition struct {
Count int32
Assignment [][]int32
}

func (t *TopicPartition) encode(pe packetEncoder) error {
pe.putInt32(t.Count)

if len(t.Assignment) == 0 {
pe.putInt32(-1)
return nil
}

if err := pe.putArrayLength(len(t.Assignment)); err != nil {
return err
}

for _, assign := range t.Assignment {
if err := pe.putInt32Array(assign); err != nil {
return err
}
}

return nil
}

func (t *TopicPartition) decode(pd packetDecoder, version int16) (err error) {
if t.Count, err = pd.getInt32(); err != nil {
return err
}

n, err := pd.getInt32()
if err != nil {
return err
}
if n <= 0 {
return nil
}
t.Assignment = make([][]int32, n)

for i := 0; i < int(n); i++ {
if t.Assignment[i], err = pd.getInt32Array(); err != nil {
return err
}
}

return nil
}
50 changes: 50 additions & 0 deletions create_partitions_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package sarama

import (
"testing"
"time"
)

var (
createPartitionRequestNoAssignment = []byte{
0, 0, 0, 1, // one topic
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 3, // 3 partitions
255, 255, 255, 255, // no assignments
0, 0, 0, 100, // timeout
0, // validate only = false
}

createPartitionRequestAssignment = []byte{
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 3, // 3 partitions
0, 0, 0, 2,
0, 0, 0, 2,
0, 0, 0, 2, 0, 0, 0, 3,
0, 0, 0, 2,
0, 0, 0, 3, 0, 0, 0, 1,
0, 0, 0, 100,
1, // validate only = true
}
)

func TestCreatePartitionsRequest(t *testing.T) {
req := &CreatePartitionsRequest{
TopicPartitions: map[string]*TopicPartition{
"topic": &TopicPartition{
Count: 3,
},
},
Timeout: 100 * time.Millisecond,
}

buf := testRequestEncode(t, "no assignment", req, createPartitionRequestNoAssignment)
testRequestDecode(t, "no assignment", req, buf)

req.ValidateOnly = true
req.TopicPartitions["topic"].Assignment = [][]int32{{2, 3}, {3, 1}}

buf = testRequestEncode(t, "assignment", req, createPartitionRequestAssignment)
testRequestDecode(t, "assignment", req, buf)
}
94 changes: 94 additions & 0 deletions create_partitions_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package sarama

import "time"

type CreatePartitionsResponse struct {
ThrottleTime time.Duration
TopicPartitionErrors map[string]*TopicPartitionError
}

func (c *CreatePartitionsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(c.TopicPartitionErrors)); err != nil {
return err
}

for topic, partitionError := range c.TopicPartitionErrors {
if err := pe.putString(topic); err != nil {
return err
}
if err := partitionError.encode(pe); err != nil {
return err
}
}

return nil
}

func (c *CreatePartitionsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

n, err := pd.getArrayLength()
if err != nil {
return err
}

c.TopicPartitionErrors = make(map[string]*TopicPartitionError, n)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
c.TopicPartitionErrors[topic] = new(TopicPartitionError)
if err := c.TopicPartitionErrors[topic].decode(pd, version); err != nil {
return err
}
}

return nil
}

func (r *CreatePartitionsResponse) key() int16 {
return 37
}

func (r *CreatePartitionsResponse) version() int16 {
return 0
}

func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
}

type TopicPartitionError struct {
Err KError
ErrMsg *string
}

func (t *TopicPartitionError) encode(pe packetEncoder) error {
pe.putInt16(int16(t.Err))

if err := pe.putNullableString(t.ErrMsg); err != nil {
return err
}

return nil
}

func (t *TopicPartitionError) decode(pd packetDecoder, version int16) (err error) {
kerr, err := pd.getInt16()
if err != nil {
return err
}
t.Err = KError(kerr)

if t.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}

return nil
}
52 changes: 52 additions & 0 deletions create_partitions_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package sarama

import (
"reflect"
"testing"
"time"
)

var (
createPartitionResponseSuccess = []byte{
0, 0, 0, 100, // throttleTimeMs
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, // no error
255, 255, // no error message
}

createPartitionResponseFail = []byte{
0, 0, 0, 100, // throttleTimeMs
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 37, // partition error
0, 5, 'e', 'r', 'r', 'o', 'r',
}
)

func TestCreatePartitionsResponse(t *testing.T) {
resp := &CreatePartitionsResponse{
ThrottleTime: 100 * time.Millisecond,
TopicPartitionErrors: map[string]*TopicPartitionError{
"topic": &TopicPartitionError{},
},
}

testResponse(t, "success", resp, createPartitionResponseSuccess)
decodedresp := new(CreatePartitionsResponse)
testVersionDecodable(t, "success", decodedresp, createPartitionResponseSuccess, 0)
if !reflect.DeepEqual(decodedresp, resp) {
t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp)
}

errMsg := "error"
resp.TopicPartitionErrors["topic"].Err = ErrInvalidPartitions
resp.TopicPartitionErrors["topic"].ErrMsg = &errMsg

testResponse(t, "with errors", resp, createPartitionResponseFail)
decodedresp = new(CreatePartitionsResponse)
testVersionDecodable(t, "with errors", decodedresp, createPartitionResponseFail, 0)
if !reflect.DeepEqual(decodedresp, resp) {
t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp)
}
}
1 change: 1 addition & 0 deletions packet_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type packetDecoder interface {
getInt64() (int64, error)
getVarint() (int64, error)
getArrayLength() (int, error)
getBool() (bool, error)

// Collections
getBytes() ([]byte, error)
Expand Down
1 change: 1 addition & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type packetEncoder interface {
putInt64(in int64)
putVarint(in int64)
putArrayLength(in int) error
putBool(in bool)

// Collections
putBytes(in []byte) error
Expand Down
4 changes: 4 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (pe *prepEncoder) putArrayLength(in int) error {
return nil
}

func (pe *prepEncoder) putBool(in bool) {
pe.length++
}

// arrays

func (pe *prepEncoder) putBytes(in []byte) error {
Expand Down
Loading

0 comments on commit 0e0650f

Please sign in to comment.