Skip to content

Commit

Permalink
Filter source when gossiping snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Nov 20, 2018
1 parent 0736aa4 commit 7608abc
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 16 deletions.
33 changes: 29 additions & 4 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)
Expand Down Expand Up @@ -323,6 +324,11 @@ func (a *Agent) Broadcasts() *memberlist.TransmitLimitedQueue {
return a.broadcasts
}

func (a *Agent) GetAddrPort() (net.IP, uint16) {
n := a.memberlist.LocalNode()
return n.Addr, n.Port
}

func (a *Agent) State() AgentState {
a.stateLock.Lock()
defer a.stateLock.Unlock()
Expand Down Expand Up @@ -388,17 +394,36 @@ func memberToNode(members []*Member) []*memberlist.Node {
return list
}

func (a *Agent) GetPeers(max int, agentType AgentType) []*memberlist.Node {
func (a *Agent) GetPeers(max int, agentType AgentType, excluded *protocol.Source) []*memberlist.Node {

fullList := a.topology.Get(agentType)
if len(fullList) <= max {
return memberToNode(fullList)

var included []*Member
if excluded != nil && agentType.String() == excluded.Role {
included = excludePeers(fullList, excluded)
} else {
included = fullList
}

if len(included) <= max {
return memberToNode(included)
}

var filteredList []*Member
for i := 0; i < max; i++ {
filteredList = append(filteredList, fullList[i])
filteredList = append(filteredList, included[i])
}

return memberToNode(filteredList)
}

func excludePeers(peers []*Member, excluded *protocol.Source) []*Member {
result := make([]*Member, 0)
for _, p := range peers {
if bytes.Equal(p.Addr, excluded.Addr) && p.Port == excluded.Port {
continue
}
result = append(result, p)
}
return result
}
14 changes: 10 additions & 4 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ func (a *Auditor) HandleMsg(msg []byte) {
return
}

peers := a.Agent.GetPeers(2, gossip.AuditorType)
peers = append(peers, a.Agent.GetPeers(2, gossip.MonitorType)...)
peers = append(peers, a.Agent.GetPeers(2, gossip.PublisherType)...)
peers := a.Agent.GetPeers(2, gossip.AuditorType, batch.From)
peers = append(peers, a.Agent.GetPeers(2, gossip.MonitorType, batch.From)...)
peers = append(peers, a.Agent.GetPeers(2, gossip.PublisherType, batch.From)...)

batch.TTL--
addr, port := a.Agent.GetAddrPort()
batch.From = &protocol.Source{
Addr: addr,
Port: port,
Role: a.Agent.Metadata().Role.String(),
}
newMsg, _ := batch.Encode()

for _, peer := range peers {
Expand All @@ -80,4 +86,4 @@ func (a *Auditor) Process(b *protocol.BatchSnapshots) {
}

log.Debugf("process(): Processed %v elements of batch id %v", len(b.Snapshots), b.Snapshots[0].Snapshot.Version)
}
}
14 changes: 10 additions & 4 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ func (m *Monitor) HandleMsg(msg []byte) {
return
}

peers := m.Agent.GetPeers(2, gossip.AuditorType)
peers = append(peers, m.Agent.GetPeers(2, gossip.MonitorType)...)
peers = append(peers, m.Agent.GetPeers(2, gossip.PublisherType)...)
peers := m.Agent.GetPeers(2, gossip.AuditorType, batch.From)
peers = append(peers, m.Agent.GetPeers(2, gossip.MonitorType, batch.From)...)
peers = append(peers, m.Agent.GetPeers(2, gossip.PublisherType, batch.From)...)

batch.TTL--
addr, port := m.Agent.GetAddrPort()
batch.From = &protocol.Source{
Addr: addr,
Port: port,
Role: m.Agent.Metadata().Role.String(),
}
newMsg, _ := batch.Encode()

for _, peer := range peers {
Expand All @@ -81,4 +87,4 @@ func (m *Monitor) Process(b *protocol.BatchSnapshots) {
}

log.Debugf("process(): Processed %v elements of batch id %v", len(b.Snapshots), b.Snapshots[0].Snapshot.Version)
}
}
12 changes: 9 additions & 3 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (s Sender) Start(ch chan *protocol.Snapshot) {
log.Debugf("Encoding batch: %+v", batch)
msg, _ := batch.Encode()

peers := s.Agent.GetPeers(1, gossip.AuditorType)
peers = append(peers, s.Agent.GetPeers(1, gossip.MonitorType)...)
peers = append(peers, s.Agent.GetPeers(1, gossip.PublisherType)...)
peers := s.Agent.GetPeers(2, gossip.AuditorType, nil)
peers = append(peers, s.Agent.GetPeers(2, gossip.MonitorType, nil)...)
peers = append(peers, s.Agent.GetPeers(2, gossip.PublisherType, nil)...)
log.Debugf("Peers selected: %+v", peers)

for _, peer := range peers {
Expand Down Expand Up @@ -113,6 +113,12 @@ func (s *Sender) getBatch(ch chan *protocol.Snapshot) *protocol.BatchSnapshots {
var counter int = 0
batch.Snapshots = make([]*protocol.SignedSnapshot, 0)
batch.TTL = s.Config.TTL
addr, port := s.Agent.GetAddrPort()
batch.From = &protocol.Source{
Addr: addr,
Port: port,
Role: s.Agent.Metadata().Role.String(),
}

for {
select {
Expand Down
8 changes: 8 additions & 0 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package protocol

import (
"bytes"
"net"

"github.com/bbva/qed/balloon"
"github.com/bbva/qed/balloon/history"
Expand Down Expand Up @@ -58,6 +59,13 @@ type SignedSnapshot struct {
type BatchSnapshots struct {
Snapshots []*SignedSnapshot
TTL int
From *Source
}

type Source struct {
Addr net.IP
Port uint16
Role string
}

func (b *BatchSnapshots) Encode() ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion tests/gossip/test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func main() {
select {
case <-ticker.C:
c := atomic.LoadUint64(&count)
fmt.Println("Reuqest per second: ", c/2)
fmt.Println("Request per second: ", c/2)
s.Print()
atomic.StoreUint64(&count, 0)
}
Expand Down

0 comments on commit 7608abc

Please sign in to comment.