diff --git a/serf/serf.go b/serf/serf.go index 28909e8a..57d91dc7 100644 --- a/serf/serf.go +++ b/serf/serf.go @@ -45,17 +45,16 @@ func New(opts ...OptionFn) (*Serf, error) { // Bootstrap saves the node metadata and starts the serf agent // Info of node updates is returned on reconcileCh channel -func (b *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.ClusterMember) error { - addr, strPort, err := net.SplitHostPort(b.addr) +func (s *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.ClusterMember) error { + addr, strPort, err := net.SplitHostPort(s.addr) if err != nil { return err } - port, err := strconv.Atoi(strPort) if err != nil { return err } - b.nodeID = node.ID + s.nodeID = node.ID eventCh := make(chan serf.Event, 256) conf := serf.DefaultConfig() conf.Init() @@ -67,78 +66,78 @@ func (b *Serf) Bootstrap(node *jocko.ClusterMember, reconcileCh chan<- *jocko.Cl conf.Tags["id"] = strconv.Itoa(int(node.ID)) conf.Tags["port"] = strconv.Itoa(node.Port) conf.Tags["raft_port"] = strconv.Itoa(node.RaftPort) - s, err := serf.Create(conf) + sserf, err := serf.Create(conf) if err != nil { return err } - b.serf = s - b.reconcileCh = reconcileCh - if _, err := b.Join(b.initMembers...); err != nil { + s.serf = sserf + s.reconcileCh = reconcileCh + if _, err := s.Join(s.initMembers...); err != nil { // b.Shutdown() return err } // ingest events for serf - go b.serfEventHandler(eventCh) + go s.serfEventHandler(eventCh) return nil } // serfEventHandler is used to handle events from the serf cluster -func (b *Serf) serfEventHandler(eventCh <-chan serf.Event) { +func (s *Serf) serfEventHandler(eventCh <-chan serf.Event) { for { select { case e := <-eventCh: switch e.EventType() { case serf.EventMemberJoin: - b.nodeJoinEvent(e.(serf.MemberEvent)) - b.localMemberEvent(e.(serf.MemberEvent)) + s.nodeJoinEvent(e.(serf.MemberEvent)) + s.localMemberEvent(e.(serf.MemberEvent)) case serf.EventMemberLeave, serf.EventMemberFailed: - b.nodeFailedEvent(e.(serf.MemberEvent)) - b.localMemberEvent(e.(serf.MemberEvent)) + s.nodeFailedEvent(e.(serf.MemberEvent)) + s.localMemberEvent(e.(serf.MemberEvent)) case serf.EventMemberUpdate, serf.EventMemberReap, serf.EventUser, serf.EventQuery: // ignore default: - b.logger.Info("unhandled serf event: %#v", e) + s.logger.Info("unhandled serf event: %#v", e) } - case <-b.shutdownCh: + case <-s.shutdownCh: return } } } // nodeJoinEvent is used to handle join events on the serf cluster -func (b *Serf) nodeJoinEvent(me serf.MemberEvent) { +func (s *Serf) nodeJoinEvent(me serf.MemberEvent) { for _, m := range me.Members { // TODO: need to change these parts peer, err := clusterMember(m) if err != nil { - b.logger.Info("failed to parse peer from serf member: %s", m.Name) + s.logger.Info("failed to parse peer from serf member: %s", m.Name) continue } - b.logger.Info("adding peer: %+v", peer) - b.peerLock.Lock() - b.peers[peer.ID] = peer - b.peerLock.Unlock() + s.logger.Info("adding peer: %+v", peer) + s.peerLock.Lock() + s.peers[peer.ID] = peer + s.peerLock.Unlock() } } // nodeFailedEvent is used to handle fail events on the serf cluster. -func (b *Serf) nodeFailedEvent(me serf.MemberEvent) { +func (s *Serf) nodeFailedEvent(me serf.MemberEvent) { for _, m := range me.Members { - b.logger.Info("removing peer: %s", me) + s.logger.Info("removing peer: %s", me) peer, err := clusterMember(m) if err != nil { continue } - b.peerLock.Lock() - delete(b.peers, peer.ID) - b.peerLock.Unlock() + s.peerLock.Lock() + delete(s.peers, peer.ID) + s.peerLock.Unlock() } } // localMemberEvent is used to reconcile Serf events with the store if we are the leader. -func (b *Serf) localMemberEvent(me serf.MemberEvent) error { +func (s *Serf) localMemberEvent(me serf.MemberEvent) error { isReap := me.EventType() == serf.EventMemberReap for _, m := range me.Members { if isReap { @@ -146,66 +145,66 @@ func (b *Serf) localMemberEvent(me serf.MemberEvent) error { } conn, err := clusterMember(m) if err != nil { - b.logger.Info("failed to parse serf member event: %s", m) + s.logger.Info("failed to parse serf member event: %s", m) continue } - b.reconcileCh <- conn + s.reconcileCh <- conn } return nil } // ID of this serf node -func (b *Serf) ID() int32 { - return b.nodeID +func (s *Serf) ID() int32 { + return s.nodeID } // Addr of serf agent -func (b *Serf) Addr() string { - return b.addr +func (s *Serf) Addr() string { + return s.addr } // Join an existing serf cluster -func (b *Serf) Join(addrs ...string) (int, error) { +func (s *Serf) Join(addrs ...string) (int, error) { if len(addrs) == 0 { return 0, nil } - return b.serf.Join(addrs, true) + return s.serf.Join(addrs, true) } // Cluster is the list of all nodes connected to Serf -func (b *Serf) Cluster() []*jocko.ClusterMember { - b.peerLock.RLock() - defer b.peerLock.RUnlock() +func (s *Serf) Cluster() []*jocko.ClusterMember { + s.peerLock.RLock() + defer s.peerLock.RUnlock() - cluster := make([]*jocko.ClusterMember, 0, len(b.peers)) - for _, v := range b.peers { + cluster := make([]*jocko.ClusterMember, 0, len(s.peers)) + for _, v := range s.peers { cluster = append(cluster, v) } return cluster } // Member returns broker details of node with given ID -func (b *Serf) Member(memberID int32) *jocko.ClusterMember { - b.peerLock.RLock() - defer b.peerLock.RUnlock() - return b.peers[memberID] +func (s *Serf) Member(memberID int32) *jocko.ClusterMember { + s.peerLock.RLock() + defer s.peerLock.RUnlock() + return s.peers[memberID] } // leave the serf cluster -func (b *Serf) leave() error { - if err := b.serf.Leave(); err != nil { +func (s *Serf) leave() error { + if err := s.serf.Leave(); err != nil { return err } return nil } // Shutdown Serf agent -func (b *Serf) Shutdown() error { - close(b.shutdownCh) - if err := b.leave(); err != nil { +func (s *Serf) Shutdown() error { + close(s.shutdownCh) + if err := s.leave(); err != nil { return err } - if err := b.serf.Shutdown(); err != nil { + if err := s.serf.Shutdown(); err != nil { return err } return nil