Skip to content

Commit

Permalink
serf: fix pointer names
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Oct 16, 2017
1 parent 4e49c78 commit c1603ab
Showing 1 changed file with 50 additions and 51 deletions.
101 changes: 50 additions & 51 deletions serf/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,16 @@ func New(opts ...OptionFn) (*Serf, error) {

// Bootstrap saves the node metadata and starts the serf agent
// Info of node updates is returned on reconcileCh channel
func (b *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.ClusterMember) error {
addr, strPort, err := net.SplitHostPort(b.addr)
func (s *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.ClusterMember) error {
addr, strPort, err := net.SplitHostPort(s.addr)
if err != nil {
return err
}

port, err := strconv.Atoi(strPort)
if err != nil {
return err
}
b.nodeID = node.ID
s.nodeID = node.ID
eventCh := make(chan serf.Event, 256)
conf := serf.DefaultConfig()
conf.Init()
Expand All @@ -67,145 +66,145 @@ func (b *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.Cl
conf.Tags["id"] = strconv.Itoa(int(node.ID))
conf.Tags["port"] = strconv.Itoa(node.Port)
conf.Tags["raft_port"] = strconv.Itoa(node.RaftPort)
s, err := serf.Create(conf)
sserf, err := serf.Create(conf)
if err != nil {
return err
}
b.serf = s
b.reconcileCh = reconcileCh
if _, err := b.Join(b.initMembers...); err != nil {
s.serf = sserf
s.reconcileCh = reconcileCh
if _, err := s.Join(s.initMembers...); err != nil {
// b.Shutdown()
return err
}

// ingest events for serf
go b.serfEventHandler(eventCh)
go s.serfEventHandler(eventCh)

return nil
}

// serfEventHandler is used to handle events from the serf cluster
func (b *Serf) serfEventHandler(eventCh <-chan serf.Event) {
func (s *Serf) serfEventHandler(eventCh <-chan serf.Event) {
for {
select {
case e := <-eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
b.nodeJoinEvent(e.(serf.MemberEvent))
b.localMemberEvent(e.(serf.MemberEvent))
s.nodeJoinEvent(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
b.nodeFailedEvent(e.(serf.MemberEvent))
b.localMemberEvent(e.(serf.MemberEvent))
s.nodeFailedEvent(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberUpdate, serf.EventMemberReap, serf.EventUser, serf.EventQuery:
// ignore
default:
b.logger.Info("unhandled serf event: %#v", e)
s.logger.Info("unhandled serf event: %#v", e)
}
case <-b.shutdownCh:
case <-s.shutdownCh:
return
}
}
}

// nodeJoinEvent is used to handle join events on the serf cluster
func (b *Serf) nodeJoinEvent(me serf.MemberEvent) {
func (s *Serf) nodeJoinEvent(me serf.MemberEvent) {
for _, m := range me.Members {
// TODO: need to change these parts
peer, err := clusterMember(m)
if err != nil {
b.logger.Info("failed to parse peer from serf member: %s", m.Name)
s.logger.Info("failed to parse peer from serf member: %s", m.Name)
continue
}
b.logger.Info("adding peer: %+v", peer)
b.peerLock.Lock()
b.peers[peer.ID] = peer
b.peerLock.Unlock()
s.logger.Info("adding peer: %+v", peer)
s.peerLock.Lock()
s.peers[peer.ID] = peer
s.peerLock.Unlock()
}
}

// nodeFailedEvent is used to handle fail events on the serf cluster.
func (b *Serf) nodeFailedEvent(me serf.MemberEvent) {
func (s *Serf) nodeFailedEvent(me serf.MemberEvent) {
for _, m := range me.Members {
b.logger.Info("removing peer: %s", me)
s.logger.Info("removing peer: %s", me)
peer, err := clusterMember(m)
if err != nil {
continue
}
b.peerLock.Lock()
delete(b.peers, peer.ID)
b.peerLock.Unlock()
s.peerLock.Lock()
delete(s.peers, peer.ID)
s.peerLock.Unlock()
}
}

// localMemberEvent is used to reconcile Serf events with the store if we are the leader.
func (b *Serf) localMemberEvent(me serf.MemberEvent) error {
func (s *Serf) localMemberEvent(me serf.MemberEvent) error {
isReap := me.EventType() == serf.EventMemberReap
for _, m := range me.Members {
if isReap {
m.Status = statusReap
}
conn, err := clusterMember(m)
if err != nil {
b.logger.Info("failed to parse serf member event: %s", m)
s.logger.Info("failed to parse serf member event: %s", m)
continue
}
b.reconcileCh <- conn
s.reconcileCh <- conn
}
return nil
}

// ID of this serf node
func (b *Serf) ID() int32 {
return b.nodeID
func (s *Serf) ID() int32 {
return s.nodeID
}

// Addr of serf agent
func (b *Serf) Addr() string {
return b.addr
func (s *Serf) Addr() string {
return s.addr
}

// Join an existing serf cluster
func (b *Serf) Join(addrs ...string) (int, error) {
func (s *Serf) Join(addrs ...string) (int, error) {
if len(addrs) == 0 {
return 0, nil
}
return b.serf.Join(addrs, true)
return s.serf.Join(addrs, true)
}

// Cluster is the list of all nodes connected to Serf
func (b *Serf) Cluster() []*jocko.ClusterMember {
b.peerLock.RLock()
defer b.peerLock.RUnlock()
func (s *Serf) Cluster() []*jocko.ClusterMember {
s.peerLock.RLock()
defer s.peerLock.RUnlock()

cluster := make([]*jocko.ClusterMember, 0, len(b.peers))
for _, v := range b.peers {
cluster := make([]*jocko.ClusterMember, 0, len(s.peers))
for _, v := range s.peers {
cluster = append(cluster, v)
}
return cluster
}

// Member returns broker details of node with given ID
func (b *Serf) Member(memberID int32) *jocko.ClusterMember {
b.peerLock.RLock()
defer b.peerLock.RUnlock()
return b.peers[memberID]
func (s *Serf) Member(memberID int32) *jocko.ClusterMember {
s.peerLock.RLock()
defer s.peerLock.RUnlock()
return s.peers[memberID]
}

// leave the serf cluster
func (b *Serf) leave() error {
if err := b.serf.Leave(); err != nil {
func (s *Serf) leave() error {
if err := s.serf.Leave(); err != nil {
return err
}
return nil
}

// Shutdown Serf agent
func (b *Serf) Shutdown() error {
close(b.shutdownCh)
if err := b.leave(); err != nil {
func (s *Serf) Shutdown() error {
close(s.shutdownCh)
if err := s.leave(); err != nil {
return err
}
if err := b.serf.Shutdown(); err != nil {
if err := s.serf.Shutdown(); err != nil {
return err
}
return nil
Expand Down

0 comments on commit c1603ab

Please sign in to comment.