Skip to content

Commit

Permalink
Fix routing and processor.
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Nov 27, 2018
1 parent eaa6102 commit 957220e
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 19 deletions.
29 changes: 18 additions & 11 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ type Agent struct {
func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {

agent = &Agent{
config: conf,
Topology: NewTopology(),
In: make(chan *protocol.BatchSnapshots, 1000),
Out: make(chan *protocol.BatchSnapshots, 1000),
Alerts: make(chan Alert, 100),
quit: make(chan bool),
config: conf,
Topology: NewTopology(),
processors: p,
In: make(chan *protocol.BatchSnapshots, 1000),
Out: make(chan *protocol.BatchSnapshots, 1000),
Alerts: make(chan Alert, 100),
quit: make(chan bool),
}

bindIP, bindPort, err := conf.AddrParts(conf.BindAddr)
Expand Down Expand Up @@ -108,16 +109,15 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {

func (a *Agent) Start() {

outTicker := time.NewTimer(2 * time.Second)
alertTicker := time.NewTimer(1 * time.Second)
outTicker := time.NewTicker(2 * time.Second)
alertTicker := time.NewTicker(1 * time.Second)

for {
select {
case batch := <-a.In:
for _, p := range a.processors {
go p.Process(batch)
}

a.Out <- batch
case <-outTicker.C:
a.sendOutQueue()
Expand All @@ -135,6 +135,10 @@ func (a *Agent) processAlertQueue() {

}

func batchId(b *protocol.BatchSnapshots) string {
return fmt.Sprintf("( ttl %d, lv %d)", b.TTL, b.Snapshots[len(b.Snapshots)-1].Snapshot.Version)
}

func (a *Agent) sendOutQueue() {
var batch *protocol.BatchSnapshots
for {
Expand All @@ -149,8 +153,11 @@ func (a *Agent) sendOutQueue() {
}

batch.TTL--
from := batch.From
batch.From = a.Self
msg, _ := batch.Encode()
for _, dst := range a.route(batch.From) {
for _, dst := range a.route(from) {
fmt.Printf("agent.sendOutQueue(): sending %+v to %+v\n", batchId(batch), dst.Name)
a.memberlist.SendReliable(dst, msg)
}
}
Expand All @@ -164,7 +171,7 @@ func (a Agent) route(src *member.Peer) []*memberlist.Node {
excluded.L = append(excluded.L, src)
excluded.L = append(excluded.L, a.Self)

peers := a.Topology.Each(2, &excluded)
peers := a.Topology.Each(1, &excluded)
for _, p := range peers.L {
dst = append(dst, p.Node())
}
Expand Down
4 changes: 3 additions & 1 deletion gossip/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package gossip

import (
"fmt"

"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
Expand Down Expand Up @@ -81,7 +83,7 @@ func (d *agentDelegate) NotifyMsg(msg []byte) {
return
}

log.Infof("Batch received, TTL: %d: %v", batch.TTL, batch)
fmt.Printf("agentDelegate.NotifyMsg(): got batch %+v\n", batchId(&batch))
d.agent.In <- &batch
}

Expand Down
3 changes: 1 addition & 2 deletions gossip/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,5 @@ func (d DummyProcessor) Process(b *protocol.BatchSnapshots) {

// time.Sleep(1 * time.Second)
}

log.Debugf("process(): Processed %v elements of batch id %v", len(b.Snapshots), b.Snapshots[0].Snapshot.Version)
fmt.Printf("process(): Processed %v elements of batch id %v\n", len(b.Snapshots), b.Snapshots[0].Snapshot.Version)
}
6 changes: 4 additions & 2 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func DefaultConfig() *Config {
BatchSize: 100,
BatchInterval: 1 * time.Second,
TTL: 2,
EachN: 2,
EachN: 1,
}
}

Expand Down Expand Up @@ -71,7 +71,9 @@ func (s Sender) Start(ch chan *protocol.Snapshot) {
peers := s.Agent.Topology.Each(s.Config.EachN, nil)

for _, peer := range peers.L {
err := s.Agent.Memberlist().SendReliable(peer.Node(), msg)
dst := peer.Node()
fmt.Printf("sender(): Sending batch %+v to node %+v\n", batch, dst.Name)
err := s.Agent.Memberlist().SendReliable(dst, msg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions tests/gossip/run_gossip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@

master="127.0.0.1:9100"
qed="127.0.0.1:8080"
go run $GOPATH/src/github.com/bbva/qed/main.go start -k key -l debug --node-id server0 --gossip-addr $master --raft-addr 127.0.0.1:9000 -y $HOME/.ssh/id_ed25519 &
go run $GOPATH/src/github.com/bbva/qed/main.go start -k key -l silent --node-id server0 --gossip-addr $master --raft-addr 127.0.0.1:9000 -y $HOME/.ssh/id_ed25519 &
pids[0]=$!
sleep 1s

for i in `seq 1 $1`;
do
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent auditor -k key -l debug --bind 127.0.0.1:910$i --join $master --endpoints $qed --node auditor$i" &
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent auditor -k key -l silent --bind 127.0.0.1:910$i --join $master --endpoints $qed --node auditor$i" &
pids+=($!)
done

for i in `seq 1 $2`;
do
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent monitor -k key -l debug --bind 127.0.0.1:920$i --join $master --endpoints $qed --node monitor$i" &
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent monitor -k key -l silent --bind 127.0.0.1:920$i --join $master --endpoints $qed --node monitor$i" &
pids+=($!)
done

Expand Down

0 comments on commit 957220e

Please sign in to comment.