Skip to content

Commit

Permalink
Use config and remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Mar 7, 2019
1 parent 733065d commit 3f0bb62
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package sender

import (
"fmt"
"sync"
"time"

"github.com/bbva/qed/gossip"
Expand All @@ -40,10 +39,11 @@ type Sender struct {
}

type Config struct {
BatchSize uint
BatchSize int
BatchInterval time.Duration
TTL int
EachN int
SendTimer time.Duration
}

func DefaultConfig() *Config {
Expand All @@ -52,6 +52,7 @@ func DefaultConfig() *Config {
BatchInterval: 1 * time.Second,
TTL: 2,
EachN: 1,
SendTimer: 500 * time.Millisecond,
}
}

Expand All @@ -66,15 +67,14 @@ func NewSender(a *gossip.Agent, c *Config, s sign.Signer) *Sender {
}

func (s Sender) Start(ch chan *protocol.Snapshot) {
ticker := time.NewTicker(1000 * time.Millisecond)

for i := 0; i < NumSenders; i++ {
go s.batcherSender(i, ch, s.quit)
}

for {
select {
case <-ticker.C:
case <-time.After(s.Config.BatchInterval * 60):
log.Debug("Messages in sender queue: ", len(ch))
case <-s.quit:
return
Expand All @@ -90,9 +90,7 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
Snapshots: make([]*protocol.SignedSnapshot, 0),
}

ticker := time.NewTicker(500 * time.Millisecond)

resetBatches := func() {
nextBatch := func() {
batches = append(batches, batch)
batch = &protocol.BatchSnapshots{
TTL: s.Config.TTL,
Expand All @@ -101,12 +99,13 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
}
}

ticker := time.NewTicker(s.Config.SendTimer)

for {
select {
case snap := <-ch:
// TODO: batchSize 100 must be configurable
if len(batch.Snapshots) == 100 {
resetBatches()
if len(batch.Snapshots) == s.Config.BatchSize {
nextBatch()
}
ss, err := s.doSign(snap)
if err != nil {
Expand All @@ -116,7 +115,7 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool

case <-ticker.C:
if len(batch.Snapshots) > 0 {
resetBatches()
nextBatch()
}
for _, b := range batches {
go s.sender(*b)
Expand All @@ -130,7 +129,6 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
}

func (s Sender) sender(batch protocol.BatchSnapshots) {
var wg sync.WaitGroup
msg, _ := batch.Encode()

peers := s.Agent.Topology.Each(s.Config.EachN, nil)
Expand All @@ -139,23 +137,26 @@ func (s Sender) sender(batch protocol.BatchSnapshots) {
metrics.QedSenderBatchesSentTotal.Inc()

dst := peer.Node()
log.Infof("Sending batch %+v to node %+v\n", batch, dst.Name)
wg.Add(1)
log.Debugf("Sending batch %+v to node %+v\n", batch, dst.Name)

for retries := uint(0); retries < 5; retries++ {
retries := uint(5)
for {
err := s.Agent.Memberlist().SendReliable(dst, msg)
if err == nil {
break
}
if err != nil {
if retries == 5 {
log.Errorf("Failed send message: %v", err)
if retries == 0 {
log.Infof("Failed send message: %v", err)
break
}
delay := (10 << retries) * time.Millisecond
time.Sleep(delay)
retries -= 1
continue
}
}
}
wg.Wait()
log.Infof("Sent batch %+v to nodes %+v\n", batch, peers.L)
}

func (s Sender) Stop() {
Expand Down

0 comments on commit 3f0bb62

Please sign in to comment.