Skip to content

Commit

Permalink
Exclude sender from to-send nodes list
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Luis Lucas committed Nov 21, 2018
1 parent 7608abc commit f897f64
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 32 deletions.
46 changes: 26 additions & 20 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,36 +394,42 @@ func memberToNode(members []*Member) []*memberlist.Node {
return list
}

func (a *Agent) GetPeers(max int, agentType AgentType, excluded *protocol.Source) []*memberlist.Node {

func (a *Agent) GetPeers(max int, agentType AgentType, excluded []*protocol.Source) []*memberlist.Node {
fullList := a.topology.Get(agentType)
fullList = excludePeers(fullList, excluded)

var included []*Member
if excluded != nil && agentType.String() == excluded.Role {
included = excludePeers(fullList, excluded)
} else {
included = fullList
}

if len(included) <= max {
return memberToNode(included)
if len(fullList) <= max {
return memberToNode(fullList)
}

var filteredList []*Member
var randomPeers []*Member
for i := 0; i < max; i++ {
filteredList = append(filteredList, included[i])
randomPeers = append(randomPeers, fullList[i])
}

return memberToNode(filteredList)
return memberToNode(randomPeers)
}

func excludePeers(peers []*Member, excluded *protocol.Source) []*Member {
result := make([]*Member, 0)
func excludePeers(peers []*Member, excluded []*protocol.Source) []*Member {
if excluded == nil {
return peers
}

filteredList := make([]*Member, 0)
var exclude bool

for _, p := range peers {
if bytes.Equal(p.Addr, excluded.Addr) && p.Port == excluded.Port {
continue
exclude = false
for _, e := range excluded {
if bytes.Equal(p.Addr, e.Addr) && p.Port == e.Port {
exclude = true
}
}

if !exclude {
filteredList = append(filteredList, p)
}
result = append(result, p)
}
return result

return filteredList
}
18 changes: 12 additions & 6 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,23 @@ func (a *Auditor) HandleMsg(msg []byte) {
return
}

peers := a.Agent.GetPeers(2, gossip.AuditorType, batch.From)
peers = append(peers, a.Agent.GetPeers(2, gossip.MonitorType, batch.From)...)
peers = append(peers, a.Agent.GetPeers(2, gossip.PublisherType, batch.From)...)

batch.TTL--
// Exclude origin and myself from peer list to send message.
var excludedPeers []*protocol.Source
excludedPeers = append(excludedPeers, batch.From)
addr, port := a.Agent.GetAddrPort()
batch.From = &protocol.Source{
myself := &protocol.Source{
Addr: addr,
Port: port,
Role: a.Agent.Metadata().Role.String(),
}
excludedPeers = append(excludedPeers, myself)

peers := a.Agent.GetPeers(2, gossip.AuditorType, excludedPeers)
peers = append(peers, a.Agent.GetPeers(2, gossip.MonitorType, excludedPeers)...)
peers = append(peers, a.Agent.GetPeers(2, gossip.PublisherType, excludedPeers)...)

batch.TTL--
batch.From = myself
newMsg, _ := batch.Encode()

for _, peer := range peers {
Expand Down
18 changes: 12 additions & 6 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,23 @@ func (m *Monitor) HandleMsg(msg []byte) {
return
}

peers := m.Agent.GetPeers(2, gossip.AuditorType, batch.From)
peers = append(peers, m.Agent.GetPeers(2, gossip.MonitorType, batch.From)...)
peers = append(peers, m.Agent.GetPeers(2, gossip.PublisherType, batch.From)...)

batch.TTL--
// Exclude origin and myself from peer list to send message.
var excludedPeers []*protocol.Source
excludedPeers = append(excludedPeers, batch.From)
addr, port := m.Agent.GetAddrPort()
batch.From = &protocol.Source{
myself := &protocol.Source{
Addr: addr,
Port: port,
Role: m.Agent.Metadata().Role.String(),
}
excludedPeers = append(excludedPeers, myself)

peers := m.Agent.GetPeers(2, gossip.AuditorType, excludedPeers)
peers = append(peers, m.Agent.GetPeers(2, gossip.MonitorType, excludedPeers)...)
peers = append(peers, m.Agent.GetPeers(2, gossip.PublisherType, excludedPeers)...)

batch.TTL--
batch.From = myself
newMsg, _ := batch.Encode()

for _, peer := range peers {
Expand Down

0 comments on commit f897f64

Please sign in to comment.