From b5ace411a03515614e31bfd5a3b41de1db7e17fc Mon Sep 17 00:00:00 2001 From: Robin Date: Mon, 18 Dec 2017 14:59:02 +0100 Subject: [PATCH] add CreateTopicsRequest/Response --- broker.go | 11 +++ create_topics_request.go | 174 +++++++++++++++++++++++++++++++++ create_topics_request_test.go | 50 ++++++++++ create_topics_response.go | 112 +++++++++++++++++++++ create_topics_response_test.go | 52 ++++++++++ request.go | 2 + 6 files changed, 401 insertions(+) create mode 100644 create_topics_request.go create mode 100644 create_topics_request_test.go create mode 100644 create_topics_response.go create mode 100644 create_topics_response_test.go diff --git a/broker.go b/broker.go index 923b07faf..3955ca2b4 100644 --- a/broker.go +++ b/broker.go @@ -373,6 +373,17 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, return response, nil } +func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) { + response := new(CreateTopicsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/create_topics_request.go b/create_topics_request.go new file mode 100644 index 000000000..709c0a44e --- /dev/null +++ b/create_topics_request.go @@ -0,0 +1,174 @@ +package sarama + +import ( + "time" +) + +type CreateTopicsRequest struct { + Version int16 + + TopicDetails map[string]*TopicDetail + Timeout time.Duration + ValidateOnly bool +} + +func (c *CreateTopicsRequest) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(c.TopicDetails)); err != nil { + return err + } + for topic, detail := range c.TopicDetails { + if err := pe.putString(topic); err != nil { + return err + } + if err := detail.encode(pe); err != nil { + return err + } + } + + pe.putInt32(int32(c.Timeout / time.Millisecond)) + + if c.Version >= 1 { + pe.putBool(c.ValidateOnly) + } + + return nil +} + +func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + c.TopicDetails = make(map[string]*TopicDetail, n) + + for i := 0; i < n; i++ { + topic, err := pd.getString() + if err != nil { + return err + } + c.TopicDetails[topic] = new(TopicDetail) + if err = c.TopicDetails[topic].decode(pd, version); err != nil { + return err + } + } + + timeout, err := pd.getInt32() + if err != nil { + return err + } + c.Timeout = time.Duration(timeout) * time.Millisecond + + if version >= 1 { + c.ValidateOnly, err = pd.getBool() + if err != nil { + return err + } + + c.Version = version + } + + return nil +} + +func (c *CreateTopicsRequest) key() int16 { + return 19 +} + +func (c *CreateTopicsRequest) version() int16 { + return c.Version +} + +func (c *CreateTopicsRequest) requiredVersion() KafkaVersion { + switch c.Version { + case 2: + return V1_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_10_1_0 + } +} + +type TopicDetail struct { + NumPartitions int32 + ReplicationFactor int16 + ReplicaAssignment map[int32][]int32 + ConfigEntries map[string]*string +} + +func (t *TopicDetail) encode(pe packetEncoder) error { + pe.putInt32(t.NumPartitions) + pe.putInt16(t.ReplicationFactor) + + if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil { + return err + } + for partition, assignment := range t.ReplicaAssignment { + pe.putInt32(partition) + if err := pe.putInt32Array(assignment); err != nil { + return err + } + } + + if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil { + return err + } + for configKey, configValue := range t.ConfigEntries { + if err := pe.putString(configKey); err != nil { + return err + } + if err := pe.putNullableString(configValue); err != nil { + return err + } + } + + return nil +} + +func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) { + if t.NumPartitions, err = pd.getInt32(); err != nil { + return err + } + if t.ReplicationFactor, err = pd.getInt16(); err != nil { + return err + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + t.ReplicaAssignment = make(map[int32][]int32, n) + for i := 0; i < n; i++ { + replica, err := pd.getInt32() + if err != nil { + return err + } + if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil { + return err + } + } + } + + n, err = pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + t.ConfigEntries = make(map[string]*string, n) + for i := 0; i < n; i++ { + configKey, err := pd.getString() + if err != nil { + return err + } + if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil { + return err + } + } + } + + return nil +} diff --git a/create_topics_request_test.go b/create_topics_request_test.go new file mode 100644 index 000000000..56b1b80e5 --- /dev/null +++ b/create_topics_request_test.go @@ -0,0 +1,50 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + createTopicsRequestV0 = []byte{ + 0, 0, 0, 1, + 0, 5, 't', 'o', 'p', 'i', 'c', + 255, 255, 255, 255, + 255, 255, + 0, 0, 0, 1, // 1 replica assignment + 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, + 0, 0, 0, 1, // 1 config + 0, 12, 'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's', + 0, 2, '-', '1', + 0, 0, 0, 100, + } + + createTopicsRequestV1 = append(createTopicsRequestV0, byte(1)) +) + +func TestCreateTopicsRequest(t *testing.T) { + retention := "-1" + + req := &CreateTopicsRequest{ + TopicDetails: map[string]*TopicDetail{ + "topic": { + NumPartitions: -1, + ReplicationFactor: -1, + ReplicaAssignment: map[int32][]int32{ + 0: []int32{0, 1, 2}, + }, + ConfigEntries: map[string]*string{ + "retention.ms": &retention, + }, + }, + }, + Timeout: 100 * time.Millisecond, + } + + testRequest(t, "version 0", req, createTopicsRequestV0) + + req.Version = 1 + req.ValidateOnly = true + + testRequest(t, "version 1", req, createTopicsRequestV1) +} diff --git a/create_topics_response.go b/create_topics_response.go new file mode 100644 index 000000000..66207e00c --- /dev/null +++ b/create_topics_response.go @@ -0,0 +1,112 @@ +package sarama + +import "time" + +type CreateTopicsResponse struct { + Version int16 + ThrottleTime time.Duration + TopicErrors map[string]*TopicError +} + +func (c *CreateTopicsResponse) encode(pe packetEncoder) error { + if c.Version >= 2 { + pe.putInt32(int32(c.ThrottleTime / time.Millisecond)) + } + + if err := pe.putArrayLength(len(c.TopicErrors)); err != nil { + return err + } + for topic, topicError := range c.TopicErrors { + if err := pe.putString(topic); err != nil { + return err + } + if err := topicError.encode(pe, c.Version); err != nil { + return err + } + } + + return nil +} + +func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) { + c.Version = version + + if version >= 2 { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + c.TopicErrors = make(map[string]*TopicError, n) + for i := 0; i < n; i++ { + topic, err := pd.getString() + if err != nil { + return err + } + c.TopicErrors[topic] = new(TopicError) + if err := c.TopicErrors[topic].decode(pd, version); err != nil { + return err + } + } + + return nil +} + +func (c *CreateTopicsResponse) key() int16 { + return 19 +} + +func (c *CreateTopicsResponse) version() int16 { + return c.Version +} + +func (c *CreateTopicsResponse) requiredVersion() KafkaVersion { + switch c.Version { + case 2: + return V1_0_0_0 + case 1: + return V0_11_0_0 + default: + return V0_10_1_0 + } +} + +type TopicError struct { + Err KError + ErrMsg *string +} + +func (t *TopicError) encode(pe packetEncoder, version int16) error { + pe.putInt16(int16(t.Err)) + + if version >= 1 { + if err := pe.putNullableString(t.ErrMsg); err != nil { + return err + } + } + + return nil +} + +func (t *TopicError) decode(pd packetDecoder, version int16) (err error) { + kErr, err := pd.getInt16() + if err != nil { + return err + } + t.Err = KError(kErr) + + if version >= 1 { + if t.ErrMsg, err = pd.getNullableString(); err != nil { + return err + } + } + + return nil +} diff --git a/create_topics_response_test.go b/create_topics_response_test.go new file mode 100644 index 000000000..53790064f --- /dev/null +++ b/create_topics_response_test.go @@ -0,0 +1,52 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + createTopicsResponseV0 = []byte{ + 0, 0, 0, 1, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 42, + } + + createTopicsResponseV1 = []byte{ + 0, 0, 0, 1, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 42, + 0, 3, 'm', 's', 'g', + } + + createTopicsResponseV2 = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 42, + 0, 3, 'm', 's', 'g', + } +) + +func TestCreateTopicsResponse(t *testing.T) { + resp := &CreateTopicsResponse{ + TopicErrors: map[string]*TopicError{ + "topic": &TopicError{ + Err: ErrInvalidRequest, + }, + }, + } + + testResponse(t, "version 0", resp, createTopicsResponseV0) + + resp.Version = 1 + msg := "msg" + resp.TopicErrors["topic"].ErrMsg = &msg + + testResponse(t, "version 1", resp, createTopicsResponseV1) + + resp.Version = 2 + resp.ThrottleTime = 100 * time.Millisecond + + testResponse(t, "version 2", resp, createTopicsResponseV2) +} diff --git a/request.go b/request.go index 9c37ca78b..35d8c1d8b 100644 --- a/request.go +++ b/request.go @@ -114,6 +114,8 @@ func allocateBody(key, version int16) protocolBody { return &SaslHandshakeRequest{} case 18: return &ApiVersionsRequest{} + case 19: + return &CreateTopicsRequest{} case 37: return &CreatePartitionsRequest{} }