Skip to content

Commit

Permalink
Refactor gossip.node -> gossip.agent
Browse files Browse the repository at this point in the history
Co-authored-by: Gabriel Díaz <[email protected]>
  • Loading branch information
Jose Luis Lucas and gdiazlo committed Nov 19, 2018
1 parent fc2735b commit 6fb93eb
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 97 deletions.
164 changes: 82 additions & 82 deletions gossip/node.go → gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,57 +24,57 @@ import (
"github.com/hashicorp/memberlist"
)

type NodeMeta struct {
Role NodeType
type AgentMeta struct {
Role AgentType
}

type Node struct {
type Agent struct {
config *Config
meta *NodeMeta
meta *AgentMeta
memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue

topology *Topology
topologyLock sync.RWMutex

stateLock sync.Mutex
state NodeState
state AgentState
}

// NodeState is the state of the Node instance.
type NodeState int
// AgentState is the state of the Node instance.
type AgentState int

const (
NodeAlive NodeState = iota
NodeLeaving
NodeLeft
NodeShutdown
AgentAlive AgentState = iota
AgentLeaving
AgentLeft
AgentShutdown
)

func (s NodeState) String() string {
func (s AgentState) String() string {
switch s {
case NodeAlive:
case AgentAlive:
return "alive"
case NodeLeaving:
case AgentLeaving:
return "leaving"
case NodeLeft:
case AgentLeft:
return "left"
case NodeShutdown:
case AgentShutdown:
return "shutdown"
default:
return "unknown"
}
}

type Topology struct {
members map[NodeType]map[string]*Member
members map[AgentType]map[string]*Member
sync.Mutex
}

func NewTopology() *Topology {
members := make(map[NodeType]map[string]*Member)
members := make(map[AgentType]map[string]*Member)
for i := 0; i < int(MaxType); i++ {
members[NodeType(i)] = make(map[string]*Member)
members[AgentType(i)] = make(map[string]*Member)
}
return &Topology{
members: members,
Expand All @@ -95,7 +95,7 @@ func (t *Topology) Delete(m *Member) error {
return nil
}

func (t *Topology) Get(kind NodeType) []*Member {
func (t *Topology) Get(kind AgentType) []*Member {
t.Lock()
defer t.Unlock()
members := make([]*Member, 0)
Expand All @@ -110,7 +110,7 @@ type Member struct {
Name string
Addr net.IP
Port uint16
Role NodeType
Role AgentType
Status MemberStatus
Node *memberlist.Node
}
Expand Down Expand Up @@ -143,19 +143,19 @@ func (s MemberStatus) String() string {
}
}

type DelegateBuilder func(*Node) memberlist.Delegate
type DelegateBuilder func(*Agent) memberlist.Delegate

func Create(conf *Config, delegate DelegateBuilder) (node *Node, err error) {
func Create(conf *Config, delegate DelegateBuilder) (agent *Agent, err error) {

meta := &NodeMeta{
meta := &AgentMeta{
Role: conf.Role,
}

node = &Node{
agent = &Agent{
config: conf,
meta: meta,
topology: NewTopology(),
state: NodeAlive,
state: AgentAlive,
}

bindIP, bindPort, err := conf.AddrParts(conf.BindAddr)
Expand All @@ -181,62 +181,62 @@ func Create(conf *Config, delegate DelegateBuilder) (node *Node, err error) {
conf.MemberlistConfig.Logger = log.GetLogger()

// Configure delegates
conf.MemberlistConfig.Delegate = delegate(node)
conf.MemberlistConfig.Events = &eventDelegate{node}
conf.MemberlistConfig.Delegate = delegate(agent)
conf.MemberlistConfig.Events = &eventDelegate{agent}

node.memberlist, err = memberlist.Create(conf.MemberlistConfig)
agent.memberlist, err = memberlist.Create(conf.MemberlistConfig)
if err != nil {
return nil, err
}

// Print local member info
localNode := node.memberlist.LocalNode()
localNode := agent.memberlist.LocalNode()
log.Infof("Local member %s:%d", localNode.Addr, localNode.Port)

// Set broadcast queue
node.broadcasts = &memberlist.TransmitLimitedQueue{
agent.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return node.memberlist.NumMembers()
return agent.memberlist.NumMembers()
},
RetransmitMult: 2,
}

return node, nil
return agent, nil
}

// Join asks the Node instance to join.
func (n *Node) Join(addrs []string) (int, error) {
func (a *Agent) Join(addrs []string) (int, error) {

if n.State() != NodeAlive {
if a.State() != AgentAlive {
return 0, fmt.Errorf("Node can't join after Leave or Shutdown")
}

if len(addrs) > 0 {
log.Debugf("Trying to join the cluster using members: %v", addrs)
return n.memberlist.Join(addrs)
return a.memberlist.Join(addrs)
}
return 0, nil
}

func (n *Node) Leave() error {
func (a *Agent) Leave() error {

// Check the current state
n.stateLock.Lock()
if n.state == NodeLeft {
n.stateLock.Unlock()
a.stateLock.Lock()
if a.state == AgentLeft {
a.stateLock.Unlock()
return nil
} else if n.state == NodeLeaving {
n.stateLock.Unlock()
} else if a.state == AgentLeaving {
a.stateLock.Unlock()
return fmt.Errorf("Leave already in progress")
} else if n.state == NodeShutdown {
n.stateLock.Unlock()
} else if a.state == AgentShutdown {
a.stateLock.Unlock()
return fmt.Errorf("Leave called after Shutdown")
}
n.state = NodeLeaving
n.stateLock.Unlock()
a.state = AgentLeaving
a.stateLock.Unlock()

// Attempt the memberlist leave
err := n.memberlist.Leave(n.config.BroadcastTimeout)
err := a.memberlist.Leave(a.config.BroadcastTimeout)
if err != nil {
return err
}
Expand All @@ -246,14 +246,14 @@ func (n *Node) Leave() error {
// queue, but this wait is for that message to propagate through the
// cluster. In particular, we want to stay up long enough to service
// any probes from other nodes before they learn about us leaving.
time.Sleep(n.config.LeavePropagateDelay)
time.Sleep(a.config.LeavePropagateDelay)

// Transition to Left only if we not already shutdown
n.stateLock.Lock()
if n.state != NodeShutdown {
n.state = NodeLeft
a.stateLock.Lock()
if a.state != AgentShutdown {
a.state = AgentLeft
}
n.stateLock.Unlock()
a.stateLock.Unlock()
return nil

}
Expand All @@ -266,40 +266,40 @@ func (n *Node) Leave() error {
// exit as a node failure.
//
// It is safe to call this method multiple times.
func (n *Node) Shutdown() error {
func (a *Agent) Shutdown() error {

n.stateLock.Lock()
defer n.stateLock.Unlock()
a.stateLock.Lock()
defer a.stateLock.Unlock()

if n.state == NodeShutdown {
if a.state == AgentShutdown {
return nil
}

if n.state != NodeLeft {
if a.state != AgentLeft {
log.Info("node: Shutdown without a Leave")
}

n.state = NodeShutdown
err := n.memberlist.Shutdown()
a.state = AgentShutdown
err := a.memberlist.Shutdown()
if err != nil {
return err
}

return nil
}

func (n *Node) Memberlist() *memberlist.Memberlist {
return n.memberlist
func (a *Agent) Memberlist() *memberlist.Memberlist {
return a.memberlist
}

func (n *Node) State() NodeState {
n.stateLock.Lock()
defer n.stateLock.Unlock()
return n.state
func (a *Agent) State() AgentState {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.state
}

func (n *Node) getMember(peer *memberlist.Node) *Member {
meta, err := n.decodeMetadata(peer.Meta)
func (a *Agent) getMember(peer *memberlist.Node) *Member {
meta, err := a.decodeMetadata(peer.Meta)
if err != nil {
panic(err)
}
Expand All @@ -312,35 +312,35 @@ func (n *Node) getMember(peer *memberlist.Node) *Member {
}
}

func (n *Node) handleNodeJoin(peer *memberlist.Node) {
member := n.getMember(peer)
func (a *Agent) handleNodeJoin(peer *memberlist.Node) {
member := a.getMember(peer)
member.Status = StatusAlive
n.topology.Update(member)
a.topology.Update(member)
log.Debugf("%s member joined: %s %s:%d", member.Role, member.Name, member.Addr, member.Port)
}

func (n *Node) handleNodeLeave(peer *memberlist.Node) {
member := n.getMember(peer)
n.topology.Delete(member)
func (a *Agent) handleNodeLeave(peer *memberlist.Node) {
member := a.getMember(peer)
a.topology.Delete(member)
log.Debugf("%s member left: %s %s:%d", member.Role, member.Name, member.Addr, member.Port)
}

func (n *Node) handleNodeUpdate(peer *memberlist.Node) {
func (a *Agent) handleNodeUpdate(peer *memberlist.Node) {
// ignore
}

func (n *Node) encodeMetadata() ([]byte, error) {
func (a *Agent) encodeMetadata() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(n.meta); err != nil {
if err := encoder.Encode(a.meta); err != nil {
log.Errorf("Failed to encode node metadata: %v", err)
return nil, err
}
return buf.Bytes(), nil
}

func (n *Node) decodeMetadata(buf []byte) (*NodeMeta, error) {
meta := &NodeMeta{}
func (a *Agent) decodeMetadata(buf []byte) (*AgentMeta, error) {
meta := &AgentMeta{}
reader := bytes.NewReader(buf)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(meta); err != nil {
Expand All @@ -350,14 +350,14 @@ func (n *Node) decodeMetadata(buf []byte) (*NodeMeta, error) {
return meta, nil
}

func (n *Node) GetPeers(numNodes int, nodeType NodeType) []*Member {
fullList := n.topology.Get(nodeType)
if len(fullList) <= numNodes {
func (a *Agent) GetPeers(max int, agentType AgentType) []*Member {
fullList := a.topology.Get(agentType)
if len(fullList) <= max {
return fullList
}

var filteredList []*Member
for i := 0; i < numNodes; i++ {
for i := 0; i < max; i++ {
filteredList = append(filteredList, fullList[i])
}

Expand Down
10 changes: 5 additions & 5 deletions gossip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"github.com/hashicorp/memberlist"
)

type NodeType int
type AgentType int

func (t NodeType) String() string {
func (t AgentType) String() string {
switch t {
case AuditorType:
return "auditor"
Expand All @@ -36,7 +36,7 @@ func (t NodeType) String() string {
}
}

func NewNodeType(value string) NodeType {
func NewNodeType(value string) AgentType {
switch value {
case "auditor":
return AuditorType
Expand All @@ -50,7 +50,7 @@ func NewNodeType(value string) NodeType {
}

const (
AuditorType NodeType = iota
AuditorType AgentType = iota
MonitorType
PublisherType
ServerType
Expand Down Expand Up @@ -78,7 +78,7 @@ type Config struct {
// is not set, Auditor will set it to the hostname of the running machine.
NodeName string

Role NodeType
Role AgentType

// BindAddr is the address that the Auditor agent's communication ports
// will bind to. Auditor will use this address to bind to for both TCP
Expand Down
Loading

0 comments on commit 6fb93eb

Please sign in to comment.