Skip to content

Commit

Permalink
rename brokerconn to clustermember
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Feb 13, 2017
1 parent 27056b7 commit 9ffedd8
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 35 deletions.
10 changes: 5 additions & 5 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Broker struct {
leaderCh chan bool

serf jocko.Serf
reconcileCh chan *jocko.BrokerConn
reconcileCh chan *jocko.ClusterMember
reconcileInterval time.Duration

shutdownCh chan struct{}
Expand All @@ -42,7 +42,7 @@ func New(id int32, opts ...BrokerFn) (*Broker, error) {
replicationManager: newReplicationManager(),
id: id,
topics: make(map[string][]*jocko.Partition),
reconcileCh: make(chan *jocko.BrokerConn, 32),
reconcileCh: make(chan *jocko.ClusterMember, 32),
reconcileInterval: time.Second * 5,
shutdownCh: make(chan struct{}),
leaderCh: make(chan bool, 1),
Expand All @@ -61,7 +61,7 @@ func New(id int32, opts ...BrokerFn) (*Broker, error) {
return nil, err
}

conn := &jocko.BrokerConn{
conn := &jocko.ClusterMember{
ID: b.id,
Port: port,
RaftPort: raftPort,
Expand All @@ -87,7 +87,7 @@ func (b *Broker) ID() int32 {
return b.id
}

func (b *Broker) Cluster() []*jocko.BrokerConn {
func (b *Broker) Cluster() []*jocko.ClusterMember {
return b.serf.Cluster()
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func (b *Broker) AddPartition(partition *jocko.Partition) error {
return b.raftApply(addPartition, partition)
}

func (b *Broker) BrokerConn(id int32) *jocko.BrokerConn {
func (b *Broker) ClusterMember(id int32) *jocko.ClusterMember {
return b.serf.Member(id)
}

Expand Down
8 changes: 4 additions & 4 deletions broker/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (b *Broker) revokeLeadership() error {
// leaderLoop runs as long as we are the leader to run maintainence duties
func (b *Broker) leaderLoop(stopCh chan struct{}) {
defer b.revokeLeadership()
var reconcileCh chan *jocko.BrokerConn
var reconcileCh chan *jocko.ClusterMember
establishedLeader := false

RECONCILE:
Expand Down Expand Up @@ -100,7 +100,7 @@ func (b *Broker) reconcile() error {
return nil
}

func (b *Broker) reconcileMember(member *jocko.BrokerConn) error {
func (b *Broker) reconcileMember(member *jocko.ClusterMember) error {
// don't reconcile ourself
if member.ID == b.id {
return nil
Expand All @@ -119,11 +119,11 @@ func (b *Broker) reconcileMember(member *jocko.BrokerConn) error {
return nil
}

func (b *Broker) addRaftPeer(member *jocko.BrokerConn) error {
func (b *Broker) addRaftPeer(member *jocko.ClusterMember) error {
addr := &net.TCPAddr{IP: net.ParseIP(member.IP), Port: member.RaftPort}
return b.raft.AddPeer(addr.String())
}

func (b *Broker) removeRaftPeer(member *jocko.BrokerConn) error {
func (b *Broker) removeRaftPeer(member *jocko.ClusterMember) error {
return b.raft.RemovePeer(member.IP)
}
22 changes: 11 additions & 11 deletions jocko/jocko.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ func (p *Partition) LeaderID() int32 {

// Serf manages the cluster membership for Jocko nodes
type Serf interface {
Bootstrap(node *BrokerConn, reconcileCh chan<- *BrokerConn) error
Cluster() []*BrokerConn
Member(memberID int32) *BrokerConn
Bootstrap(node *ClusterMember, reconcileCh chan<- *ClusterMember) error
Cluster() []*ClusterMember
Member(memberID int32) *ClusterMember
Join(addrs ...string) (int, error)
Leave() error
Shutdown() error
Expand All @@ -121,7 +121,7 @@ type RaftCommand struct {

// Raft manages consensus for Jocko cluster
type Raft interface {
Bootstrap(peers []*BrokerConn, fsm raft.FSM, leaderCh chan<- bool) (err error)
Bootstrap(peers []*ClusterMember, fsm raft.FSM, leaderCh chan<- bool) (err error)
Apply(cmd RaftCommand) error
IsLeader() bool
LeaderID() string
Expand All @@ -139,16 +139,16 @@ type Broker interface {
StartReplica(*Partition) error
DeleteTopic(topic string) error
Partition(topic string, id int32) (*Partition, error)
BrokerConn(brokerID int32) *BrokerConn
ClusterMember(brokerID int32) *ClusterMember
BecomeLeader(topic string, id int32, command *protocol.PartitionState) error
BecomeFollower(topic string, id int32, command *protocol.PartitionState) error
Join(addr ...string) (int, error)
Cluster() []*BrokerConn
Cluster() []*ClusterMember
TopicPartitions(topic string) ([]*Partition, error)
IsLeaderOfPartition(topic string, id int32, leaderID int32) bool
}

type BrokerConn struct {
type ClusterMember struct {
ID int32 `json:"id"`
Port int `json:"port"`
IP string `json:"addr"`
Expand All @@ -160,11 +160,11 @@ type BrokerConn struct {
conn net.Conn
}

func (b *BrokerConn) Addr() *net.TCPAddr {
func (b *ClusterMember) Addr() *net.TCPAddr {
return &net.TCPAddr{IP: net.ParseIP(b.IP), Port: b.Port}
}

func (b *BrokerConn) Write(p []byte) (int, error) {
func (b *ClusterMember) Write(p []byte) (int, error) {
if b.conn == nil {
if err := b.connect(); err != nil {
return 0, err
Expand All @@ -173,7 +173,7 @@ func (b *BrokerConn) Write(p []byte) (int, error) {
return b.conn.Write(p)
}

func (b *BrokerConn) Read(p []byte) (int, error) {
func (b *ClusterMember) Read(p []byte) (int, error) {
if b.conn == nil {
if err := b.connect(); err != nil {
return 0, err
Expand All @@ -182,7 +182,7 @@ func (b *BrokerConn) Read(p []byte) (int, error) {
return b.conn.Read(p)
}

func (b *BrokerConn) connect() error {
func (b *ClusterMember) connect() error {
addr := &net.TCPAddr{IP: net.ParseIP(b.IP), Port: b.Port}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func New(opts ...OptionFn) (*Raft, error) {

// Bootstrap the Raft agent using fsm and connect to peers
// Updates to leadership are returned on leaderCh channel
func (b *Raft) Bootstrap(peers []*jocko.BrokerConn, fsm raft.FSM, leaderCh chan<- bool) (err error) {
func (b *Raft) Bootstrap(peers []*jocko.ClusterMember, fsm raft.FSM, leaderCh chan<- bool) (err error) {
b.transport, err = raft.NewTCPTransport(b.addr, nil, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failed")
Expand Down
24 changes: 12 additions & 12 deletions serf/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ type Serf struct {
logger *simplelog.Logger
serf *serf.Serf
addr string
reconcileCh chan<- *jocko.BrokerConn
reconcileCh chan<- *jocko.ClusterMember
eventCh chan serf.Event
initMembers []string
shutdownCh chan struct{}

peers map[int32]*jocko.BrokerConn
peers map[int32]*jocko.ClusterMember
peerLock sync.RWMutex
}

// New Serf object
func New(opts ...OptionFn) (*Serf, error) {
b := &Serf{
peers: make(map[int32]*jocko.BrokerConn),
peers: make(map[int32]*jocko.ClusterMember),
eventCh: make(chan serf.Event, 256),
shutdownCh: make(chan struct{}),
}
Expand All @@ -47,7 +47,7 @@ func New(opts ...OptionFn) (*Serf, error) {

// Bootstrap saves the node metadata and starts the serf agent
// Info of node updates is returned on reconcileCh channel
func (b *Serf) Bootstrap(node *jocko.BrokerConn, reconcileCh chan<- *jocko.BrokerConn) error {
func (b *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.ClusterMember) error {
addr, strPort, err := net.SplitHostPort(b.addr)
if err != nil {
return err
Expand Down Expand Up @@ -111,7 +111,7 @@ func (b *Serf) serfEventHandler() {
func (b *Serf) nodeJoinEvent(me serf.MemberEvent) {
for _, m := range me.Members {
// TODO: need to change these parts
peer, err := brokerConn(m)
peer, err := clusterMember(m)
if err != nil {
b.logger.Info("failed to parse peer from serf member: %s", m.Name)
continue
Expand All @@ -127,7 +127,7 @@ func (b *Serf) nodeJoinEvent(me serf.MemberEvent) {
func (b *Serf) nodeFailedEvent(me serf.MemberEvent) {
for _, m := range me.Members {
b.logger.Info("removing peer: %s", me)
peer, err := brokerConn(m)
peer, err := clusterMember(m)
if err != nil {
continue
}
Expand All @@ -144,7 +144,7 @@ func (b *Serf) localMemberEvent(me serf.MemberEvent) error {
if isReap {
m.Status = StatusReap
}
conn, err := brokerConn(m)
conn, err := clusterMember(m)
if err != nil {
b.logger.Info("failed to parse serf member event: %s", m)
continue
Expand All @@ -168,19 +168,19 @@ func (b *Serf) Join(addrs ...string) (int, error) {
}

// Cluster is the list of all nodes connected to Serf
func (b *Serf) Cluster() []*jocko.BrokerConn {
func (b *Serf) Cluster() []*jocko.ClusterMember {
b.peerLock.RLock()
defer b.peerLock.RUnlock()

cluster := make([]*jocko.BrokerConn, 0, len(b.peers))
cluster := make([]*jocko.ClusterMember, 0, len(b.peers))
for _, v := range b.peers {
cluster = append(cluster, v)
}
return cluster
}

// Member returns broker details of node with given ID
func (b *Serf) Member(memberID int32) *jocko.BrokerConn {
func (b *Serf) Member(memberID int32) *jocko.ClusterMember {
b.peerLock.RLock()
defer b.peerLock.RUnlock()
return b.peers[memberID]
Expand All @@ -203,7 +203,7 @@ func (b *Serf) Shutdown() error {
return nil
}

func brokerConn(m serf.Member) (*jocko.BrokerConn, error) {
func clusterMember(m serf.Member) (*jocko.ClusterMember, error) {
portStr := m.Tags["port"]
port, err := strconv.Atoi(portStr)
if err != nil {
Expand All @@ -222,7 +222,7 @@ func brokerConn(m serf.Member) (*jocko.BrokerConn, error) {
return nil, err
}

conn := &jocko.BrokerConn{
conn := &jocko.ClusterMember{
IP: m.Addr.String(),
ID: int32(id),
RaftPort: raftPort,
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (s *Server) handleLeaderAndISR(conn net.Conn, header *protocol.RequestHeade
body := &protocol.LeaderAndISRResponse{}
for _, p := range req.PartitionStates {
partition, err := s.broker.Partition(p.Topic, p.Partition)
broker := s.broker.BrokerConn(p.Leader)
broker := s.broker.ClusterMember(p.Leader)
if broker == nil {
// TODO: error cause we don't know who this broker is
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func zero(p []byte) {
}

func (s *Server) handleJoin(w http.ResponseWriter, r *http.Request) {
b := new(jocko.BrokerConn)
b := new(jocko.ClusterMember)
if err := json.NewDecoder(r.Body).Decode(&b); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
Expand Down

0 comments on commit 9ffedd8

Please sign in to comment.