Skip to content

Commit

Permalink
Modify sender to empty the queue faster
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Dec 5, 2018
1 parent 668b5dd commit 9bd3f62
Showing 1 changed file with 60 additions and 25 deletions.
85 changes: 60 additions & 25 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sender

import (
"fmt"
"sync"
"time"

"github.com/bbva/qed/gossip"
Expand Down Expand Up @@ -57,32 +58,70 @@ func NewSender(a *gossip.Agent, c *Config, s sign.Signer) *Sender {
quit: make(chan bool),
}
}
func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool) {
batch := &protocol.BatchSnapshots{
TTL: s.Config.TTL,
From: s.Agent.Self,
Snapshots: make([]*protocol.SignedSnapshot, 0),
}

for {
select {
case snap := <-ch:
// batchSize 100 must be configurable
if len(batch.Snapshots) == 100 {
go s.sender(batch)
batch = &protocol.BatchSnapshots{
TTL: s.Config.TTL,
From: s.Agent.Self,
Snapshots: make([]*protocol.SignedSnapshot, 0),
}
}
ss, err := s.doSign(snap)
if err != nil {
log.Errorf("Failed signing message: %v", err)
}
batch.Snapshots = append(batch.Snapshots, ss)
case <-quit:
return
// default:
// fmt.Println("Doing nothing", id)
}
}

}
func (s Sender) sender(batch *protocol.BatchSnapshots) {
var wg sync.WaitGroup
msg, _ := batch.Encode()

peers := s.Agent.Topology.Each(s.Config.EachN, nil)

for _, peer := range peers.L {
dst := peer.Node()
log.Infof("Sending batch %+v to node %+v\n", batch, dst.Name)
wg.Add(1)
go func() {
err := s.Agent.Memberlist().SendReliable(dst, msg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
}()
}
wg.Wait()
log.Infof("Sent batch %+v to nodes %+v\n", batch, peers.L)
}

func (s Sender) Start(ch chan *protocol.Snapshot) {
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(1000 * time.Millisecond)

for i := 0; i < 10; i++ {
go s.batcherSender(i, ch, s.quit)
}

for {
select {
case <-ticker.C:
batch := s.getBatch(ch)
if batch == nil {
continue
}
log.Debugf("Encoding batch: %+v", batch)
msg, _ := batch.Encode()

peers := s.Agent.Topology.Each(s.Config.EachN, nil)

for _, peer := range peers.L {
dst := peer.Node()
log.Infof("Sending batch %+v to node %+v\n", batch, dst.Name)
go func() {
err := s.Agent.Memberlist().SendReliable(dst, msg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
}()
}
fmt.Println("QUEUE LENGTH: ", len(ch))
case <-s.quit:
return
}
Expand Down Expand Up @@ -117,11 +156,7 @@ func (s *Sender) getBatch(ch chan *protocol.Snapshot) *protocol.BatchSnapshots {
return &batch
}

ss, err := s.doSign(snapshot)
if err != nil {
log.Errorf("Failed signing message: %v", err)
}
batch.Snapshots = append(batch.Snapshots, ss)
batch.Snapshots = append(batch.Snapshots, &protocol.SignedSnapshot{snapshot, []byte{0x0}})

if counter == batchSize {
return &batch
Expand Down

0 comments on commit 9bd3f62

Please sign in to comment.