Skip to content

Commit

Permalink
Update configuration options, make agent and config non public atribu…
Browse files Browse the repository at this point in the history
…tes, clean up design, implement out queue as a timedout channel, simplify stop code
  • Loading branch information
gdiazlo committed Mar 19, 2019
1 parent e861b74 commit a3b276f
Showing 1 changed file with 49 additions and 69 deletions.
118 changes: 49 additions & 69 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sender

import (
"fmt"
"sync"
"time"

"github.com/bbva/qed/gossip"
Expand All @@ -27,148 +28,127 @@ import (
"github.com/bbva/qed/sign"
)

const (
NumSenders = 10
)

type Sender struct {
Agent *gossip.Agent
Config *Config
agent *gossip.Agent
config *Config
signer sign.Signer
out chan *protocol.BatchSnapshots
quit chan bool
}

type Config struct {
BatchSize int
BatchSize uint
BatchInterval time.Duration
NumSenders int
TTL int
EachN int
SendTimer time.Duration
}

func DefaultConfig() *Config {
return &Config{
BatchSize: 100,
BatchInterval: 1 * time.Second,
NumSenders: 3,
TTL: 1,
EachN: 1,
SendTimer: 500 * time.Millisecond,
SendTimer: 1000 * time.Millisecond,
}
}

func NewSender(a *gossip.Agent, c *Config, s sign.Signer) *Sender {
metrics.QedSenderInstancesCount.Inc()
return &Sender{
Agent: a,
Config: c,
agent: a,
config: c,
signer: s,
out: make(chan *protocol.BatchSnapshots, 1<<16),
quit: make(chan bool),
}
}

// Start NumSenders concurrent senders and waits for them
// to finish
func (s Sender) Start(ch chan *protocol.Snapshot) {

for i := 0; i < NumSenders; i++ {
for i := 0; i < s.config.NumSenders; i++ {
go s.batcherSender(i, ch, s.quit)
}

for {
select {
case <-time.After(s.Config.BatchInterval * 60):
log.Debug("Messages in sender queue: ", len(ch))
case <-s.quit:
return
}
}
<-s.quit
}

func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool) {
batches := []*protocol.BatchSnapshots{}
batch := &protocol.BatchSnapshots{
TTL: s.Config.TTL,
From: s.Agent.Self,
func (s Sender) newBatch() *protocol.BatchSnapshots {
return &protocol.BatchSnapshots{
TTL: s.config.TTL,
From: s.agent.Self,
Snapshots: make([]*protocol.SignedSnapshot, 0),
}
}

nextBatch := func() {
batches = append(batches, batch)
batch = &protocol.BatchSnapshots{
TTL: s.Config.TTL,
From: s.Agent.Self,
Snapshots: make([]*protocol.SignedSnapshot, 0),
}
}

ticker := time.NewTicker(s.Config.SendTimer)
// Sign snapshots, build batches of signed snapshots and send those batches
// to other members of the gossip network.
// If the out queue is full, we drop the current batch and pray other sender will
// send the batches to the gossip network.
func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool) {
batch := s.newBatch()

for {
select {
case snap := <-ch:
if len(batch.Snapshots) == s.Config.BatchSize {
nextBatch()
if len(batch.Snapshots) == s.config.BatchSize {
s.agent.ChTimedSend(batch, s.out)
batch = s.newBatch()
}
ss, err := s.doSign(snap)
if err != nil {
log.Errorf("Failed signing message: %v", err)
}
batch.Snapshots = append(batch.Snapshots, ss)

case <-ticker.C:
case <-time.After(s.config.SendTimer):
// send whatever we have on each tick, do not wait
// to have complete batches
if len(batch.Snapshots) > 0 {
nextBatch()
s.agent.ChTimedSend(batch, s.out)
batch = s.newBatch()
}
for _, b := range batches {
for b := range s.out {
go s.sender(*b)
}
batches = []*protocol.BatchSnapshots{}

case <-quit:
return
}
}
}

// Send a batch to the peers it selects based on the gossip
// network topology.
// Do not retry sending to faulty agents, and pray other
// sender will.
func (s Sender) sender(batch protocol.BatchSnapshots) {
var wg sync.WaitGroup
msg, _ := batch.Encode()

peers := s.Agent.Topology.Each(s.Config.EachN, nil)
peers := s.agent.Topology.Each(s.config.EachN, nil)
for _, peer := range peers.L {
// Metrics
metrics.QedSenderBatchesSentTotal.Inc()

dst := peer.Node()

log.Debugf("Sending batch %+v to node %+v\n", batch, dst.Name)

retries := uint(5)
for {
err := s.Agent.Memberlist().SendReliable(dst, msg)
if err == nil {
break
} else {
if retries == 0 {
log.Infof("Failed send message: %v", err)
break
}
delay := (10 << retries) * time.Millisecond
time.Sleep(delay)
retries -= 1
continue
}
err := s.agent.Memberlist().SendReliable(dst, msg)
if err == nil {
log.Infof("Failed send message to %+v because: %v", peer, err)
break
}
}
wg.Wait()
log.Infof("Sent batch %+v to nodes %+v\n", batch, peers.L)
}

func (s Sender) Stop() {
metrics.QedSenderInstancesCount.Dec()

for i := 0; i < NumSenders+1; i++ {
// INFO: we need NumSenders+1 for the debug ticker in Start function
s.quit <- true
}
close(s.quit)
}

func (s *Sender) doSign(snapshot *protocol.Snapshot) (*protocol.SignedSnapshot, error) {

signature, err := s.signer.Sign([]byte(fmt.Sprintf("%v", snapshot)))
if err != nil {
log.Info("Publisher: error signing snapshot")
Expand Down

0 comments on commit a3b276f

Please sign in to comment.