Skip to content

Commit

Permalink
topic: starts zk topic admin. wip. #50
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed May 4, 2017
1 parent a90b67f commit 586494a
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 0 deletions.
155 changes: 155 additions & 0 deletions topic.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
package main

import (
"encoding/json"
"flag"
"fmt"
"log"
"net/url"
"os"
"os/user"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/samuel/go-zookeeper/zk"
)

type topicArgs struct {
brokers string
zookeepers string
filter string
partitions bool
leaders bool
replicas bool
verbose bool
pretty bool
create bool
name string
version string
}

type topicCmd struct {
brokers []string
zookeepers []string
filter *regexp.Regexp
partitions bool
leaders bool
replicas bool
verbose bool
pretty bool
create bool
name string
version sarama.KafkaVersion

client sarama.Client
Expand All @@ -58,12 +69,15 @@ func (cmd *topicCmd) parseFlags(as []string) topicArgs {
)

flags.StringVar(&args.brokers, "brokers", "", "Comma separated list of brokers. Port defaults to 9092 when omitted.")
flags.StringVar(&args.zookeepers, "zookeepers", "", "Comma separated list of zookeeper nodes. Defaults to brokers and port 2181 when omitted.")
flags.BoolVar(&args.partitions, "partitions", false, "Include information per partition.")
flags.BoolVar(&args.leaders, "leaders", false, "Include leader information per partition.")
flags.BoolVar(&args.replicas, "replicas", false, "Include replica ids per partition.")
flags.StringVar(&args.filter, "filter", "", "Regex to filter topics by name.")
flags.BoolVar(&args.verbose, "verbose", false, "More verbose logging to stderr.")
flags.BoolVar(&args.pretty, "pretty", true, "Control output pretty printing.")
flags.BoolVar(&args.create, "create", false, "Create the specified topic.")
flags.StringVar(&args.name, "name", "", "Exact name of the topic, required for topic management.")
flags.StringVar(&args.version, "version", "", "Kafka protocol version")
flags.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of topic:")
Expand Down Expand Up @@ -95,16 +109,37 @@ func (cmd *topicCmd) parseArgs(as []string) {
}
}
cmd.brokers = strings.Split(args.brokers, ",")
cmd.zookeepers = []string{}
for i, b := range cmd.brokers {
if !strings.Contains(b, ":") {
cmd.brokers[i] = b + ":9092"
}
if args.zookeepers == "" {
if strings.Contains(b, ":") {
cmd.zookeepers = append(cmd.zookeepers, strings.Split(b, ":")[0]+":2181")
} else {
cmd.zookeepers = append(cmd.zookeepers, b+":2181")
}
}
}

// TODO store zookeepers when supplied

if re, err = regexp.Compile(args.filter); err != nil {
failf("invalid regex for filter err=%s", err)
}

if args.create {
if args.name == "" {
failf("name required when creating a topic")
}
if len(cmd.zookeepers) == 0 {
failf("zookeepers required when creating a topic")
}
}
cmd.create = args.create
cmd.name = args.name

cmd.filter = re
cmd.partitions = args.partitions
cmd.leaders = args.leaders
Expand Down Expand Up @@ -136,6 +171,122 @@ func (cmd *topicCmd) connect() {
}
}

var (
zkPathBrokers = "/brokers"
zkPathBrokersIDs = fmt.Sprintf("%s/ids", zkPathBrokers)
)

type broker struct {
id int
endpoints []endpoint
rack string
}

type endpoint struct {
host string
port string
listener string
securityProtocol string

raw string
}

type brokerInfo struct {
ListenerSecurityProtocolMap map[string]string `json:"listener_security_protocol_map"`
Endpoints []string `json:"endpoints"`
JMXPort int `json:"jmx_port"`
Host string `json:"host"`
Port int `json:"port"`
Rack string `json:"rack"`
Timestamp string `json:"timestamp"`
Version int `json:"version"`
}

func (cmd *topicCmd) createTopic() {
// TODO replication factor
// TODO partitions
fmt.Printf("should create topic %#v connecting to %#v\n", cmd.name, cmd.zookeepers)
var (
conn *zk.Conn
evs <-chan zk.Event
err error
)

// TODO quiet mode for connect?
if conn, evs, err = zk.Connect(cmd.zookeepers, 15*time.Second); err != nil {
failf("failed to establish zookeeper connection err=%v", err)
}

if cmd.verbose {
go func() {
for {
fmt.Fprintf(os.Stderr, "received zookeeper event %#v", <-evs)
}
}()
}

var (
rawBrokerIDs []string
brokers []broker
)

if rawBrokerIDs, _, err = conn.Children(zkPathBrokersIDs); err != nil {
failf("failed to retrieve broker ids from zookeeper err=%v", err)
}

fmt.Printf("found broker ids %#v\n", rawBrokerIDs)
for _, rawID := range rawBrokerIDs {
id, err := strconv.Atoi(rawID)
if err != nil {
failf("failed to convert broker id %#v to int err=%v", rawID, err)
}

buf, _, err := conn.Get(fmt.Sprintf("%s/%v", zkPathBrokersIDs, id))
if err != nil {
failf("failed to read info for broker %#v err=%v", id, err)
}

brokers = append(brokers, newBroker(id, buf))
}

fmt.Printf("found brokers %#v\n", brokers)
}

func newBroker(id int, info []byte) broker {
var (
uInfo brokerInfo
err error
)

if err = json.Unmarshal(info, &uInfo); err != nil {
failf("failed to unmarshal broker info %s err=%v", info, err)
}

if uInfo.Version != 4 {
failf("unsupported broker info version %v, only 4 is supported", uInfo.Version)
}

// TODO newEndpoint
endpoints := []endpoint{}
for _, rawEP := range uInfo.Endpoints {
u, err := url.Parse(rawEP)
if err != nil {
failf("failed to parse endpoint uri %#v err=%v", rawEP, err)
}

lstn := strings.ToUpper(u.Scheme)
proto, ok := uInfo.ListenerSecurityProtocolMap[lstn]
if !ok {
fmt.Printf("found no matching security protocol for %#v in %#v\n", lstn, uInfo.ListenerSecurityProtocolMap)
}

ep := endpoint{u.Hostname(), u.Port(), lstn, proto, rawEP}
endpoints = append(endpoints, ep)
}

return broker{id, endpoints, uInfo.Rack}
}

func (cmd *topicCmd) run(as []string, q chan struct{}) {
var (
err error
Expand All @@ -150,6 +301,10 @@ func (cmd *topicCmd) run(as []string, q chan struct{}) {
sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
}

if cmd.create {
cmd.createTopic()
}

cmd.connect()
defer cmd.client.Close()

Expand Down
12 changes: 12 additions & 0 deletions vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@
"revision": "1f30fe9094a513ce4c700b9a54458bbb0c96996c",
"revisionTime": "2016-11-28T21:05:44Z"
},
{
"checksumSHA1": "MFzYQgva79KXz61C+a09UqAd6s0=",
"path": "github.com/samuel/go-zookeeper",
"revision": "1d7be4effb13d2d908342d349d71a284a7542693",
"revisionTime": "2016-10-28T23:23:40Z"
},
{
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
"path": "github.com/samuel/go-zookeeper/zk",
"revision": "1d7be4effb13d2d908342d349d71a284a7542693",
"revisionTime": "2016-10-28T23:23:40Z"
},
{
"checksumSHA1": "hIEmcd7hIDqO/xWSp1rJJHd0TpE=",
"path": "github.com/stretchr/testify/assert",
Expand Down

0 comments on commit 586494a

Please sign in to comment.