Skip to content

Commit

Permalink
Merge pull request #62 from travisjeffery/fix-listeners
Browse files Browse the repository at this point in the history
Separate protocol and http listeners
  • Loading branch information
Travis Jeffery authored Oct 16, 2017
2 parents ade5ff4 + c1603ab commit eab1e08
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 123 deletions.
30 changes: 9 additions & 21 deletions cmd/jocko/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (
"os"
"time"

"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/tj/go-gracefully"
"github.com/travisjeffery/jocko/broker"
"github.com/travisjeffery/jocko/protocol"
Expand All @@ -25,14 +21,14 @@ var (
cli = kingpin.New("jocko", "Jocko, Go implementation of Kafka")
debugLogs = cli.Flag("debug", "Enable debug logs").Default("false").Bool()

brokerCmd = cli.Command("broker", "Run a Jocko broker")
brokerCmdRaftAddr = brokerCmd.Flag("raft-addr", "Address for Raft to bind and advertise on").Default("127.0.0.1:9093").String()
brokerCmdLogDir = brokerCmd.Flag("log-dir", "A comma separated list of directories under which to store log files").Default("/tmp/jocko").String()
brokerCmdBrokerAddr = brokerCmd.Flag("broker-addr", "Address for broker to bind on").Default("0.0.0.0:9092").String()
brokerCmdSerfAddr = brokerCmd.Flag("serf-addr", "Address for Serf to bind on").Default("0.0.0.0:9094").String()
brokerCmdPrometheusAddr = brokerCmd.Flag("prometheus-addr", "Address for Prometheus to serve metrics on").Default(":9095").String()
brokerCmdSerfMembers = brokerCmd.Flag("serf-members", "List of existing Serf members").Strings()
brokerCmdBrokerID = brokerCmd.Flag("id", "Broker ID").Int32()
brokerCmd = cli.Command("broker", "Run a Jocko broker")
brokerCmdRaftAddr = brokerCmd.Flag("raft-addr", "Address for Raft to bind and advertise on").Default("127.0.0.1:9093").String()
brokerCmdLogDir = brokerCmd.Flag("log-dir", "A comma separated list of directories under which to store log files").Default("/tmp/jocko").String()
brokerCmdBrokerAddr = brokerCmd.Flag("broker-addr", "Address for broker to bind on").Default("0.0.0.0:9092").String()
brokerCmdSerfAddr = brokerCmd.Flag("serf-addr", "Address for Serf to bind on").Default("0.0.0.0:9094").String()
brokerCmdHTTPAddr = brokerCmd.Flag("http-addr", "Address for HTTP handlers to serve metrics on, like Prometheus").Default(":9095").String()
brokerCmdSerfMembers = brokerCmd.Flag("serf-members", "List of existing Serf members").Strings()
brokerCmdBrokerID = brokerCmd.Flag("id", "Broker ID").Int32()

topic = cli.Command("topic", "Manage topics")
topicBrokerAddr = topic.Flag("broker-addr", "Address for Broker to bind on").Default("0.0.0.0:9092").String()
Expand Down Expand Up @@ -90,20 +86,12 @@ func CmdBrokers(logger *simplelog.Logger) int {
os.Exit(1)
}

prom := prometheus.DefaultRegisterer
srv := server.New(*brokerCmdBrokerAddr, store, logger, prom)
srv := server.New(*brokerCmdBrokerAddr, store, *brokerCmdHTTPAddr, logger)
if err := srv.Start(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err)
os.Exit(1)
}

http.Handle("/metrics", promhttp.Handler())
go func() {
err := http.ListenAndServe(*brokerCmdPrometheusAddr, nil)
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
os.Exit(1)
}()

defer srv.Close()

gracefully.Timeout = 10 * time.Second
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ services:
jocko_a:
build:
context: .
image: jocko
image: travisjeffery:jocko

jocko_b:
build:
context: .
image: jocko
image: travisjeffery:jocko
command: jocko broker --serf-members=jocko_a:9094

jocko_c:
build:
context: .
image: jocko
image: travisjeffery:jocko
command: jocko broker --serf-members=jocko_a:9094
14 changes: 2 additions & 12 deletions examples/sarama/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import (
"os"
"time"

"net/http"

"github.com/Shopify/sarama"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/travisjeffery/jocko/broker"
"github.com/travisjeffery/jocko/protocol"
"github.com/travisjeffery/jocko/raft"
Expand All @@ -34,7 +30,7 @@ const (
brokerAddr = "127.0.0.1:9092"
raftAddr = "127.0.0.1:9093"
serfAddr = "127.0.0.1:9094"
promAddr = "127.0.0.1:9095"
httpAddr = "127.0.0.1:9095"
logDir = "logdir"
brokerID = 0
)
Expand Down Expand Up @@ -139,18 +135,12 @@ func setup() func() {
fmt.Fprintf(os.Stderr, "Error opening raft store: %s\n", err)
os.Exit(1)
}
r := prometheus.DefaultRegisterer
server := server.New(brokerAddr, store, logger, r)
server := server.New(brokerAddr, store, httpAddr, logger)
if err := server.Start(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err)
os.Exit(1)
}

if r != nil {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(promAddr, nil)
}

if _, err := store.WaitForLeader(10 * time.Second); err != nil {
panic(err)
}
Expand Down
101 changes: 50 additions & 51 deletions serf/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,16 @@ func New(opts ...OptionFn) (*Serf, error) {

// Bootstrap saves the node metadata and starts the serf agent
// Info of node updates is returned on reconcileCh channel
func (b *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.ClusterMember) error {
addr, strPort, err := net.SplitHostPort(b.addr)
func (s *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.ClusterMember) error {
addr, strPort, err := net.SplitHostPort(s.addr)
if err != nil {
return err
}

port, err := strconv.Atoi(strPort)
if err != nil {
return err
}
b.nodeID = node.ID
s.nodeID = node.ID
eventCh := make(chan serf.Event, 256)
conf := serf.DefaultConfig()
conf.Init()
Expand All @@ -67,145 +66,145 @@ func (b *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.Cl
conf.Tags["id"] = strconv.Itoa(int(node.ID))
conf.Tags["port"] = strconv.Itoa(node.Port)
conf.Tags["raft_port"] = strconv.Itoa(node.RaftPort)
s, err := serf.Create(conf)
sserf, err := serf.Create(conf)
if err != nil {
return err
}
b.serf = s
b.reconcileCh = reconcileCh
if _, err := b.Join(b.initMembers...); err != nil {
s.serf = sserf
s.reconcileCh = reconcileCh
if _, err := s.Join(s.initMembers...); err != nil {
// b.Shutdown()
return err
}

// ingest events for serf
go b.serfEventHandler(eventCh)
go s.serfEventHandler(eventCh)

return nil
}

// serfEventHandler is used to handle events from the serf cluster
func (b *Serf) serfEventHandler(eventCh <-chan serf.Event) {
func (s *Serf) serfEventHandler(eventCh <-chan serf.Event) {
for {
select {
case e := <-eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
b.nodeJoinEvent(e.(serf.MemberEvent))
b.localMemberEvent(e.(serf.MemberEvent))
s.nodeJoinEvent(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
b.nodeFailedEvent(e.(serf.MemberEvent))
b.localMemberEvent(e.(serf.MemberEvent))
s.nodeFailedEvent(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberUpdate, serf.EventMemberReap, serf.EventUser, serf.EventQuery:
// ignore
default:
b.logger.Info("unhandled serf event: %#v", e)
s.logger.Info("unhandled serf event: %#v", e)
}
case <-b.shutdownCh:
case <-s.shutdownCh:
return
}
}
}

// nodeJoinEvent is used to handle join events on the serf cluster
func (b *Serf) nodeJoinEvent(me serf.MemberEvent) {
func (s *Serf) nodeJoinEvent(me serf.MemberEvent) {
for _, m := range me.Members {
// TODO: need to change these parts
peer, err := clusterMember(m)
if err != nil {
b.logger.Info("failed to parse peer from serf member: %s", m.Name)
s.logger.Info("failed to parse peer from serf member: %s", m.Name)
continue
}
b.logger.Info("adding peer: %+v", peer)
b.peerLock.Lock()
b.peers[peer.ID] = peer
b.peerLock.Unlock()
s.logger.Info("adding peer: %+v", peer)
s.peerLock.Lock()
s.peers[peer.ID] = peer
s.peerLock.Unlock()
}
}

// nodeFailedEvent is used to handle fail events on the serf cluster.
func (b *Serf) nodeFailedEvent(me serf.MemberEvent) {
func (s *Serf) nodeFailedEvent(me serf.MemberEvent) {
for _, m := range me.Members {
b.logger.Info("removing peer: %s", me)
s.logger.Info("removing peer: %s", me)
peer, err := clusterMember(m)
if err != nil {
continue
}
b.peerLock.Lock()
delete(b.peers, peer.ID)
b.peerLock.Unlock()
s.peerLock.Lock()
delete(s.peers, peer.ID)
s.peerLock.Unlock()
}
}

// localMemberEvent is used to reconcile Serf events with the store if we are the leader.
func (b *Serf) localMemberEvent(me serf.MemberEvent) error {
func (s *Serf) localMemberEvent(me serf.MemberEvent) error {
isReap := me.EventType() == serf.EventMemberReap
for _, m := range me.Members {
if isReap {
m.Status = statusReap
}
conn, err := clusterMember(m)
if err != nil {
b.logger.Info("failed to parse serf member event: %s", m)
s.logger.Info("failed to parse serf member event: %s", m)
continue
}
b.reconcileCh <- conn
s.reconcileCh <- conn
}
return nil
}

// ID of this serf node
func (b *Serf) ID() int32 {
return b.nodeID
func (s *Serf) ID() int32 {
return s.nodeID
}

// Addr of serf agent
func (b *Serf) Addr() string {
return b.addr
func (s *Serf) Addr() string {
return s.addr
}

// Join an existing serf cluster
func (b *Serf) Join(addrs ...string) (int, error) {
func (s *Serf) Join(addrs ...string) (int, error) {
if len(addrs) == 0 {
return 0, nil
}
return b.serf.Join(addrs, true)
return s.serf.Join(addrs, true)
}

// Cluster is the list of all nodes connected to Serf
func (b *Serf) Cluster() []*jocko.ClusterMember {
b.peerLock.RLock()
defer b.peerLock.RUnlock()
func (s *Serf) Cluster() []*jocko.ClusterMember {
s.peerLock.RLock()
defer s.peerLock.RUnlock()

cluster := make([]*jocko.ClusterMember, 0, len(b.peers))
for _, v := range b.peers {
cluster := make([]*jocko.ClusterMember, 0, len(s.peers))
for _, v := range s.peers {
cluster = append(cluster, v)
}
return cluster
}

// Member returns broker details of node with given ID
func (b *Serf) Member(memberID int32) *jocko.ClusterMember {
b.peerLock.RLock()
defer b.peerLock.RUnlock()
return b.peers[memberID]
func (s *Serf) Member(memberID int32) *jocko.ClusterMember {
s.peerLock.RLock()
defer s.peerLock.RUnlock()
return s.peers[memberID]
}

// leave the serf cluster
func (b *Serf) leave() error {
if err := b.serf.Leave(); err != nil {
func (s *Serf) leave() error {
if err := s.serf.Leave(); err != nil {
return err
}
return nil
}

// Shutdown Serf agent
func (b *Serf) Shutdown() error {
close(b.shutdownCh)
if err := b.leave(); err != nil {
func (s *Serf) Shutdown() error {
close(s.shutdownCh)
if err := s.leave(); err != nil {
return err
}
if err := b.serf.Shutdown(); err != nil {
if err := s.serf.Shutdown(); err != nil {
return err
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package server

import "github.com/prometheus/client_golang/prometheus"

type serverMetrics struct {
type metrics struct {
requestsHandled prometheus.Counter
}

func newServerMetrics(r prometheus.Registerer) *serverMetrics {
m := &serverMetrics{
func newMetrics(r prometheus.Registerer) *metrics {
m := &metrics{
requestsHandled: prometheus.NewCounter(prometheus.CounterOpts{
Name: "requests_handled",
Help: "Number of requests handled by the server.",
Expand Down
Loading

0 comments on commit eab1e08

Please sign in to comment.