Skip to content

Commit

Permalink
Merge pull request #79 from gdiazlo/qedgossip
Browse files Browse the repository at this point in the history
Fix issued with gossip network
  • Loading branch information
iknite authored Mar 7, 2019
2 parents a471714 + 59dadd9 commit 8232b61
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 35 deletions.
6 changes: 3 additions & 3 deletions deploy/aws/provision/templates/qed-config.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ server:
{% endif %}
{% if 'role_monitor' in group_names %}
agent:
node: "monitor"
node: "monitor-{{ ansible_hostname }}"
bind: "{{ ansible_eth0.ipv4.address }}:8200"
advertise: ""
join:
Expand All @@ -56,7 +56,7 @@ agent:
{% endif %}
{% if 'role_auditor' in group_names %}
agent:
node: "auditor"
node: "auditor-{{ ansible_hostname }}"
bind: "{{ ansible_eth0.ipv4.address }}:8100"
advertise: ""
join:
Expand All @@ -76,7 +76,7 @@ agent:
{% endif %}
{% if 'role_publisher' in group_names %}
agent:
node: "publisher"
node: "publisher-{{ ansible_hostname }}"
bind: "{{ ansible_eth0.ipv4.address }}:8300"
advertise: ""
join:
Expand Down
39 changes: 20 additions & 19 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package sender

import (
"fmt"
"sync"
"time"

"github.com/bbva/qed/gossip"
Expand All @@ -40,10 +39,11 @@ type Sender struct {
}

type Config struct {
BatchSize uint
BatchSize int
BatchInterval time.Duration
TTL int
EachN int
SendTimer time.Duration
}

func DefaultConfig() *Config {
Expand All @@ -52,6 +52,7 @@ func DefaultConfig() *Config {
BatchInterval: 1 * time.Second,
TTL: 2,
EachN: 1,
SendTimer: 500 * time.Millisecond,
}
}

Expand All @@ -66,15 +67,14 @@ func NewSender(a *gossip.Agent, c *Config, s sign.Signer) *Sender {
}

func (s Sender) Start(ch chan *protocol.Snapshot) {
ticker := time.NewTicker(1000 * time.Millisecond)

for i := 0; i < NumSenders; i++ {
go s.batcherSender(i, ch, s.quit)
}

for {
select {
case <-ticker.C:
case <-time.After(s.Config.BatchInterval * 60):
log.Debug("Messages in sender queue: ", len(ch))
case <-s.quit:
return
Expand All @@ -90,9 +90,7 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
Snapshots: make([]*protocol.SignedSnapshot, 0),
}

ticker := time.NewTicker(500 * time.Millisecond)

resetBatches := func() {
nextBatch := func() {
batches = append(batches, batch)
batch = &protocol.BatchSnapshots{
TTL: s.Config.TTL,
Expand All @@ -101,12 +99,13 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
}
}

ticker := time.NewTicker(s.Config.SendTimer)

for {
select {
case snap := <-ch:
// TODO: batchSize 100 must be configurable
if len(batch.Snapshots) == 100 {
resetBatches()
if len(batch.Snapshots) == s.Config.BatchSize {
nextBatch()
}
ss, err := s.doSign(snap)
if err != nil {
Expand All @@ -116,7 +115,7 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool

case <-ticker.C:
if len(batch.Snapshots) > 0 {
resetBatches()
nextBatch()
}
for _, b := range batches {
go s.sender(*b)
Expand All @@ -130,7 +129,6 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
}

func (s Sender) sender(batch protocol.BatchSnapshots) {
var wg sync.WaitGroup
msg, _ := batch.Encode()

peers := s.Agent.Topology.Each(s.Config.EachN, nil)
Expand All @@ -139,23 +137,26 @@ func (s Sender) sender(batch protocol.BatchSnapshots) {
metrics.QedSenderBatchesSentTotal.Inc()

dst := peer.Node()
log.Infof("Sending batch %+v to node %+v\n", batch, dst.Name)
wg.Add(1)
log.Debugf("Sending batch %+v to node %+v\n", batch, dst.Name)

for retries := uint(0); retries < 5; retries++ {
retries := uint(5)
for {
err := s.Agent.Memberlist().SendReliable(dst, msg)
if err == nil {
break
}
if err != nil {
if retries == 5 {
log.Errorf("Failed send message: %v", err)
if retries == 0 {
log.Infof("Failed send message: %v", err)
break
}
delay := (10 << retries) * time.Millisecond
time.Sleep(delay)
retries -= 1
continue
}
}
}
wg.Wait()
log.Infof("Sent batch %+v to nodes %+v\n", batch, peers.L)
}

func (s Sender) Stop() {
Expand Down
2 changes: 1 addition & 1 deletion raftwal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package raftwal

import (
"bytes"
// "encoding/binary"

"encoding/json"
"fmt"
"io"
Expand Down
3 changes: 2 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func NewServer(conf *Config) (*Server, error) {
config := gossip.DefaultConfig()
config.BindAddr = conf.GossipAddr
config.Role = member.Server
config.NodeName = conf.NodeID
server.agent, err = gossip.NewAgent(config, nil)
if err != nil {
return nil, err
Expand All @@ -140,7 +141,7 @@ func NewServer(conf *Config) (*Server, error) {
}

// TODO: add queue size to config
server.agentsQueue = make(chan *protocol.Snapshot, 100000)
server.agentsQueue = make(chan *protocol.Snapshot, 2<<16)

// Create sender
server.sender = sender.NewSender(server.agent, sender.DefaultConfig(), server.signer)
Expand Down
21 changes: 10 additions & 11 deletions tests/gossip/run_gossip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,31 @@ if [ ! -f "$keyFile" ]; then
echo -e 'y\n' | ssh-keygen -t ed25519 -N '' -f /var/tmp/id_ed25519
fi

$QED start \
-k key \
-l silent \
--node-id server0 \
--gossip-addr $qedGossipEndpoint \
--raft-addr 127.0.0.1:8500 \
--keypath $keyFile &

xterm -hold -e "$QED start -k key -l debug -p $(mktemp -d) --node-id server0 --raft-addr 127.0.0.1:8500 --gossip-addr 127.0.0.1:8400 --mgmt-addr 127.0.0.1:8700 --metrics-addr 127.0.0.1:8600 --http-addr 127.0.0.1:8800 --keypath $keyFile" &
pids[0]=$!

sleep 3s

xterm -hold -e "$QED start -k key -l debug -p $(mktemp -d) --node-id server1 --gossip-addr 127.0.0.2:8401 --raft-addr 127.0.0.2:8501 --keypath $keyFile --join-addr 127.0.0.1:8700 --gossip-join-addr 127.0.0.1:8400 --http-addr 127.0.0.2:8801 --mgmt-addr 127.0.0.2:8701 --metrics-addr 127.0.0.2:8601" &
pids+=($!)

sleep 2s

for i in `seq 1 $1`;
do
xterm -hold -e "$QED agent --alertsUrls $alertsStoreEndpoint auditor -k key -l info --bind 127.0.0.1:810$i --join $qedGossipEndpoint --qedUrls $qedHTTPEndpoint --pubUrls $snapshotStoreEndpoint --node auditor$i" &
xterm -hold -e "$QED agent --alertsUrls $alertsStoreEndpoint auditor -k key -l debug --bind 127.0.0.1:810$i --join $qedGossipEndpoint --qedUrls $qedHTTPEndpoint --pubUrls $snapshotStoreEndpoint --node auditor$i" &
pids+=($!)
done

for i in `seq 1 $2`;
do
xterm -hold -e "$QED agent --alertsUrls $alertsStoreEndpoint monitor -k key -l info --bind 127.0.0.1:820$i --join $qedGossipEndpoint --qedUrls $qedHTTPEndpoint --pubUrls $snapshotStoreEndpoint --node monitor$i" &
xterm -hold -e "$QED agent --alertsUrls $alertsStoreEndpoint monitor -k key -l debug --bind 127.0.0.1:820$i --join $qedGossipEndpoint --qedUrls $qedHTTPEndpoint --pubUrls $snapshotStoreEndpoint --node monitor$i" &
pids+=($!)
done

for i in `seq 1 $3`;
do
xterm -hold -e "$QED agent --alertsUrls $alertsStoreEndpoint publisher -k key -l info --bind 127.0.0.1:830$i --join $qedGossipEndpoint --pubUrls $snapshotStoreEndpoint --node publisher$i" &
xterm -hold -e "$QED agent --alertsUrls $alertsStoreEndpoint publisher -k key -l debug --bind 127.0.0.1:830$i --join $qedGossipEndpoint --pubUrls $snapshotStoreEndpoint --node publisher$i" &
pids+=($!)
done

Expand Down

0 comments on commit 8232b61

Please sign in to comment.