Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Create Topics #852

Closed
wants to merge 11 commits into from
32 changes: 30 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
type Broker struct {
id int32
addr string
rack string

isController bool
conf *Config
correlationID int32
conn net.Conn
Expand Down Expand Up @@ -420,7 +422,7 @@ func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
}
}

func (b *Broker) decode(pd packetDecoder) (err error) {
func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
b.id, err = pd.getInt32()
if err != nil {
return err
Expand All @@ -436,6 +438,13 @@ func (b *Broker) decode(pd packetDecoder) (err error) {
return err
}

if version == 1 {
// v1 metadata response adds a rack to the broker metadata
if b.rack, err = pd.getString(); err != nil {
return err
}
}

b.addr = net.JoinHostPort(host, fmt.Sprint(port))
if _, _, err := net.SplitHostPort(b.addr); err != nil {
return err
Expand All @@ -444,7 +453,7 @@ func (b *Broker) decode(pd packetDecoder) (err error) {
return nil
}

func (b *Broker) encode(pe packetEncoder) (err error) {
func (b *Broker) encode(pe packetEncoder, version int16) (err error) {

host, portstr, err := net.SplitHostPort(b.addr)
if err != nil {
Expand All @@ -464,6 +473,13 @@ func (b *Broker) encode(pe packetEncoder) (err error) {

pe.putInt32(int32(port))

if version == 1 {
// v1 metadata response adds a rack to the broker metadata
if err = pe.putString(b.rack); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -672,3 +688,15 @@ func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
b.brokerRequestSize.Update(requestSize)
}
}

func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
response := new(CreateTopicsResponse)

err := b.sendAndReceive(request, response)

if err != nil {
return nil, err
}

return response, nil
}
62 changes: 61 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Client interface {

// Closed returns true if the client has already had Close called on it
Closed() bool

// Create a topic
CreateTopic(topic string, numPartitions int32, replicationFactor int16, configs map[string]string, timeout int32) error
}

const (
Expand Down Expand Up @@ -448,6 +451,20 @@ func (client *client) any() *Broker {
return nil
}

func (client *client) controller() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

for _, broker := range client.brokers {
if broker.isController {
_ = broker.Open(client.conf)
return broker
}
}

return nil
}

// private caching/lazy metadata helpers

type partitionType int
Expand Down Expand Up @@ -600,7 +617,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
} else {
Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
}
response, err := broker.GetMetadata(&MetadataRequest{Topics: topics})
response, err := broker.GetMetadata(NewMetadataRequest(client.conf.Version, topics))

switch err.(type) {
case nil:
Expand Down Expand Up @@ -638,6 +655,7 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er
// - if it is an existing ID, but the address we have is stale, discard the old one and save it
// - otherwise ignore it, replacing our existing one would just bounce the connection
for _, broker := range data.Brokers {
broker.isController = (broker.id == data.ControllerId)
client.registerBroker(broker)
}

Expand Down Expand Up @@ -747,3 +765,45 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
}

func (client *client) CreateTopic(topic string, numPartitions int32,
replicationFactor int16, configs map[string]string, timeout int32) error {
if client.Closed() {
return ErrClosedClient
}

createTopicsRequest := new(CreateTopicsRequest)
createTopicsRequest.CreateRequests = make([]CreateTopicRequest, 1)
createTopicRequest := CreateTopicRequest{}
createTopicRequest.Topic = topic
createTopicRequest.NumPartitions = numPartitions
createTopicRequest.ReplicationFactor = replicationFactor
createTopicRequest.ReplicaAssignments = make([]ReplicaAssignment, 0)
createTopicRequest.Configs = make([]ConfigKV, len(configs))
createTopicsRequest.CreateRequests[0] = createTopicRequest
createTopicsRequest.Timeout = timeout
i := 0
for key, value := range configs {
configKV := ConfigKV{}
configKV.Key = key
configKV.Value = value
createTopicRequest.Configs[i] = configKV
i = i + 1
}
broker := client.controller()
if broker != nil {
Logger.Printf("Creating topic %v on broker %v\n", topic, broker.addr)
createTopicsResponse, err := broker.CreateTopics(createTopicsRequest)
if err != nil {
return err
}

kafkaErr := createTopicsResponse.CreateTopicResponses[0].Err

if kafkaErr != ErrNoError {
return kafkaErr
}
return nil
}
return ErrOutOfBrokers
}
2 changes: 1 addition & 1 deletion consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err
r.Err = KError(tmp)

coordinator := new(Broker)
if err := coordinator.decode(pd); err != nil {
if err := coordinator.decode(pd, version); err != nil {
return err
}
if coordinator.addr == ":0" {
Expand Down
202 changes: 202 additions & 0 deletions create_topics_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package sarama

type ReplicaAssignment struct {
PartitionID int32
Replicas []int32
}

type ConfigKV struct {
Key string
Value string
}

type CreateTopicRequest struct {
Topic string
NumPartitions int32
ReplicationFactor int16
ReplicaAssignments []ReplicaAssignment
Configs []ConfigKV
}

type CreateTopicsRequest struct {
CreateRequests []CreateTopicRequest
Timeout int32
}

func (cr *CreateTopicRequest) encode(pe packetEncoder) error {
if err := pe.putString(cr.Topic); err != nil {
return err
}

pe.putInt32(cr.NumPartitions)
pe.putInt16(cr.ReplicationFactor)

if err := pe.putArrayLength(len(cr.ReplicaAssignments)); err != nil {
return err
}

for i := range cr.ReplicaAssignments {
replicaAssignment := cr.ReplicaAssignments[i]
pe.putInt32(replicaAssignment.PartitionID)

if err := pe.putArrayLength(len(replicaAssignment.Replicas)); err != nil {
return err
}

for j := range replicaAssignment.Replicas {
pe.putInt32(replicaAssignment.Replicas[j])
}
}

if err := pe.putArrayLength(len(cr.Configs)); err != nil {
return err
}

for i := range cr.Configs {
if err := pe.putString(cr.Configs[i].Key); err != nil {
return err
}

if err := pe.putString(cr.Configs[i].Value); err != nil {
return err
}
}

return nil
}

func (cr *CreateTopicRequest) decode(pd packetDecoder, version int16) error {
topic, err := pd.getString()
if err != nil {
return err
}
cr.Topic = topic

numPartitions, err := pd.getInt32()
if err != nil {
return err
}
cr.NumPartitions = numPartitions

replicationFactor, err := pd.getInt16()
if err != nil {
return err
}
cr.ReplicationFactor = replicationFactor

partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}

if partitionCount > 0 {
cr.ReplicaAssignments = make([]ReplicaAssignment, partitionCount)

for i := range cr.ReplicaAssignments {
replicaAssignment := ReplicaAssignment{}

partitionID, err := pd.getInt32()
if err != nil {
return err
}
replicaAssignment.PartitionID = partitionID

replicaCount, err := pd.getArrayLength()
if err != nil {
return err
}

replicaAssignment.Replicas = make([]int32, replicaCount)

for j := range replicaAssignment.Replicas {
replica, err := pd.getInt32()
if err != nil {
return err
}
replicaAssignment.Replicas[j] = replica
}
cr.ReplicaAssignments[i] = replicaAssignment
}
}

configCount, err := pd.getArrayLength()
if err != nil {
return err
}

if configCount > 0 {
cr.Configs = make([]ConfigKV, configCount)

for i := range cr.Configs {
key, err := pd.getString()
if err != nil {
return err
}

value, err := pd.getString()
if err != nil {
return err
}

cr.Configs[i] = ConfigKV{Key: key, Value: value}
}
}

return nil
}

func (ct *CreateTopicsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(ct.CreateRequests)); err != nil {
return err
}

for i := range ct.CreateRequests {
if err := ct.CreateRequests[i].encode(pe); err != nil {
return err
}
}

pe.putInt32(ct.Timeout)

return nil
}

func (ct *CreateTopicsRequest) decode(pd packetDecoder, version int16) error {
createTopicRequestCount, err := pd.getArrayLength()
if err != nil {
return err
}
if createTopicRequestCount == 0 {
return nil
}

ct.CreateRequests = make([]CreateTopicRequest, createTopicRequestCount)
for i := range ct.CreateRequests {
ct.CreateRequests[i] = CreateTopicRequest{}
err = ct.CreateRequests[i].decode(pd, version)
if err != nil {
return err
}
}

timeout, err := pd.getInt32()
if err != nil {
return err
}

ct.Timeout = timeout

return nil
}

func (ct *CreateTopicsRequest) key() int16 {
return 19
}

func (ct *CreateTopicsRequest) version() int16 {
return 0
}

func (ct *CreateTopicsRequest) requiredVersion() KafkaVersion {
return V0_10_0_0
}
Loading