Skip to content

Commit

Permalink
Merge pull request #1007 from buyology/create-topics
Browse files Browse the repository at this point in the history
add CreateTopicsRequest/Response
  • Loading branch information
eapache authored Dec 21, 2017
2 parents eaafcb0 + b5ace41 commit 541ca4a
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 0 deletions.
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,17 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse,
return response, nil
}

func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
response := new(CreateTopicsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
174 changes: 174 additions & 0 deletions create_topics_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package sarama

import (
"time"
)

type CreateTopicsRequest struct {
Version int16

TopicDetails map[string]*TopicDetail
Timeout time.Duration
ValidateOnly bool
}

func (c *CreateTopicsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(c.TopicDetails)); err != nil {
return err
}
for topic, detail := range c.TopicDetails {
if err := pe.putString(topic); err != nil {
return err
}
if err := detail.encode(pe); err != nil {
return err
}
}

pe.putInt32(int32(c.Timeout / time.Millisecond))

if c.Version >= 1 {
pe.putBool(c.ValidateOnly)
}

return nil
}

func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
}

c.TopicDetails = make(map[string]*TopicDetail, n)

for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
c.TopicDetails[topic] = new(TopicDetail)
if err = c.TopicDetails[topic].decode(pd, version); err != nil {
return err
}
}

timeout, err := pd.getInt32()
if err != nil {
return err
}
c.Timeout = time.Duration(timeout) * time.Millisecond

if version >= 1 {
c.ValidateOnly, err = pd.getBool()
if err != nil {
return err
}

c.Version = version
}

return nil
}

func (c *CreateTopicsRequest) key() int16 {
return 19
}

func (c *CreateTopicsRequest) version() int16 {
return c.Version
}

func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 2:
return V1_0_0_0
case 1:
return V0_11_0_0
default:
return V0_10_1_0
}
}

type TopicDetail struct {
NumPartitions int32
ReplicationFactor int16
ReplicaAssignment map[int32][]int32
ConfigEntries map[string]*string
}

func (t *TopicDetail) encode(pe packetEncoder) error {
pe.putInt32(t.NumPartitions)
pe.putInt16(t.ReplicationFactor)

if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil {
return err
}
for partition, assignment := range t.ReplicaAssignment {
pe.putInt32(partition)
if err := pe.putInt32Array(assignment); err != nil {
return err
}
}

if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil {
return err
}
for configKey, configValue := range t.ConfigEntries {
if err := pe.putString(configKey); err != nil {
return err
}
if err := pe.putNullableString(configValue); err != nil {
return err
}
}

return nil
}

func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) {
if t.NumPartitions, err = pd.getInt32(); err != nil {
return err
}
if t.ReplicationFactor, err = pd.getInt16(); err != nil {
return err
}

n, err := pd.getArrayLength()
if err != nil {
return err
}

if n > 0 {
t.ReplicaAssignment = make(map[int32][]int32, n)
for i := 0; i < n; i++ {
replica, err := pd.getInt32()
if err != nil {
return err
}
if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil {
return err
}
}
}

n, err = pd.getArrayLength()
if err != nil {
return err
}

if n > 0 {
t.ConfigEntries = make(map[string]*string, n)
for i := 0; i < n; i++ {
configKey, err := pd.getString()
if err != nil {
return err
}
if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
return err
}
}
}

return nil
}
50 changes: 50 additions & 0 deletions create_topics_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package sarama

import (
"testing"
"time"
)

var (
createTopicsRequestV0 = []byte{
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
255, 255, 255, 255,
255, 255,
0, 0, 0, 1, // 1 replica assignment
0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2,
0, 0, 0, 1, // 1 config
0, 12, 'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
0, 2, '-', '1',
0, 0, 0, 100,
}

createTopicsRequestV1 = append(createTopicsRequestV0, byte(1))
)

func TestCreateTopicsRequest(t *testing.T) {
retention := "-1"

req := &CreateTopicsRequest{
TopicDetails: map[string]*TopicDetail{
"topic": {
NumPartitions: -1,
ReplicationFactor: -1,
ReplicaAssignment: map[int32][]int32{
0: []int32{0, 1, 2},
},
ConfigEntries: map[string]*string{
"retention.ms": &retention,
},
},
},
Timeout: 100 * time.Millisecond,
}

testRequest(t, "version 0", req, createTopicsRequestV0)

req.Version = 1
req.ValidateOnly = true

testRequest(t, "version 1", req, createTopicsRequestV1)
}
112 changes: 112 additions & 0 deletions create_topics_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package sarama

import "time"

type CreateTopicsResponse struct {
Version int16
ThrottleTime time.Duration
TopicErrors map[string]*TopicError
}

func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
if c.Version >= 2 {
pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
}

if err := pe.putArrayLength(len(c.TopicErrors)); err != nil {
return err
}
for topic, topicError := range c.TopicErrors {
if err := pe.putString(topic); err != nil {
return err
}
if err := topicError.encode(pe, c.Version); err != nil {
return err
}
}

return nil
}

func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
c.Version = version

if version >= 2 {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
}

n, err := pd.getArrayLength()
if err != nil {
return err
}

c.TopicErrors = make(map[string]*TopicError, n)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
c.TopicErrors[topic] = new(TopicError)
if err := c.TopicErrors[topic].decode(pd, version); err != nil {
return err
}
}

return nil
}

func (c *CreateTopicsResponse) key() int16 {
return 19
}

func (c *CreateTopicsResponse) version() int16 {
return c.Version
}

func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
switch c.Version {
case 2:
return V1_0_0_0
case 1:
return V0_11_0_0
default:
return V0_10_1_0
}
}

type TopicError struct {
Err KError
ErrMsg *string
}

func (t *TopicError) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(t.Err))

if version >= 1 {
if err := pe.putNullableString(t.ErrMsg); err != nil {
return err
}
}

return nil
}

func (t *TopicError) decode(pd packetDecoder, version int16) (err error) {
kErr, err := pd.getInt16()
if err != nil {
return err
}
t.Err = KError(kErr)

if version >= 1 {
if t.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit 541ca4a

Please sign in to comment.