Skip to content

Commit

Permalink
Fix batchSender behaviour
Browse files Browse the repository at this point in the history
Co-authored-by: iknite <[email protected]>
  • Loading branch information
Jose Luis Lucas and iknite committed Dec 18, 2018
1 parent b71e16c commit 72931cc
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 89 deletions.
1 change: 1 addition & 0 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
RetransmitMult: 2,
}

fmt.Println(">>>>>>>>>< AGENT ", agent.memberlist.Members())
if p != nil {
go agent.start()
}
Expand Down
2 changes: 1 addition & 1 deletion gossip/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (p *Publisher) dispatchTasks() {
func (p *Publisher) executeTask(task *PublishTask) {
log.Debug("Executing task: %+v", task)
fmt.Printf("Executing task: %+v\n", task)

fmt.Println(">>>>>>>>> PUBLISHER EXECUTE TASK")
buf, err := task.Batch.Encode()
if err != nil {
log.Debug("\nPublisher: Error marshalling: %s", err.Error())
Expand Down
77 changes: 32 additions & 45 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,50 +58,71 @@ func NewSender(a *gossip.Agent, c *Config, s sign.Signer) *Sender {
quit: make(chan bool),
}
}

func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool) {
batches := []*protocol.BatchSnapshots{}
batch := &protocol.BatchSnapshots{
TTL: s.Config.TTL,
From: s.Agent.Self,
Snapshots: make([]*protocol.SignedSnapshot, 0),
}

ticker := time.NewTicker(500 * time.Millisecond)

resetBatches := func() {
batches = append(batches, batch)
batch = &protocol.BatchSnapshots{
TTL: s.Config.TTL,
From: s.Agent.Self,
Snapshots: make([]*protocol.SignedSnapshot, 0),
}
}

for {
select {
case snap := <-ch:
// batchSize 100 must be configurable
// TODO: batchSize 100 must be configurable
if len(batch.Snapshots) == 100 {
go s.sender(batch)
batch = &protocol.BatchSnapshots{
TTL: s.Config.TTL,
From: s.Agent.Self,
Snapshots: make([]*protocol.SignedSnapshot, 0),
}
resetBatches()
}
ss, err := s.doSign(snap)
if err != nil {
log.Errorf("Failed signing message: %v", err)
}
batch.Snapshots = append(batch.Snapshots, ss)
fmt.Printf(">>>>>>>>>>>>>SENDER %d: ADD SNAPSHOT TO BATCH \n", id)

case <-ticker.C:
if len(batch.Snapshots) > 0 {
resetBatches()
}
for _, b := range batches {
go s.sender(b)
fmt.Printf(">>>>>>>>>>>>>SENDER: SEND BATCH LEN(%d)\n", len(b.Snapshots))
}
batches = []*protocol.BatchSnapshots{}

case <-quit:
return
// default:
// fmt.Println("Doing nothing", id)
}
}

}

func (s Sender) sender(batch *protocol.BatchSnapshots) {
var wg sync.WaitGroup
msg, _ := batch.Encode()

// fmt.Println("BATCH: ", batch.Snapshots)
peers := s.Agent.Topology.Each(s.Config.EachN, nil)

for _, peer := range peers.L {
fmt.Println(">>>>>> PEERS ", peers)
dst := peer.Node()
log.Infof("Sending batch %+v to node %+v\n", batch, dst.Name)
wg.Add(1)
go func() {
err := s.Agent.Memberlist().SendReliable(dst, msg)
fmt.Println(">>>>>>>>> MESSAGE SENT")
if err != nil {
log.Errorf("Failed send message: %v", err)
}
Expand All @@ -114,7 +135,7 @@ func (s Sender) sender(batch *protocol.BatchSnapshots) {
func (s Sender) Start(ch chan *protocol.Snapshot) {
ticker := time.NewTicker(1000 * time.Millisecond)

for i := 0; i < 10; i++ {
for i := 0; i < 1; i++ {
go s.batcherSender(i, ch, s.quit)
}

Expand All @@ -132,40 +153,6 @@ func (s Sender) Stop() {
s.quit <- true
}

func (s *Sender) getBatch(ch chan *protocol.Snapshot) *protocol.BatchSnapshots {

if len(ch) == 0 {
return nil
}

var snapshot *protocol.Snapshot
var batch protocol.BatchSnapshots

var batchSize int = 100
var counter int = 0

batch.Snapshots = make([]*protocol.SignedSnapshot, 0)
batch.TTL = s.Config.TTL
batch.From = s.Agent.Self

for {
select {
case snapshot = <-ch:
counter++
default:
return &batch
}

batch.Snapshots = append(batch.Snapshots, &protocol.SignedSnapshot{snapshot, []byte{0x0}})

if counter == batchSize {
return &batch
}

}

}

func (s *Sender) doSign(snapshot *protocol.Snapshot) (*protocol.SignedSnapshot, error) {

signature, err := s.signer.Sign([]byte(fmt.Sprintf("%v", snapshot)))
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *Server) Start() error {
}

go func() {
log.Debug(" * Starting QED agent.")
log.Debug(" * Starting QED gossip agent.")
s.sender.Start(s.agentsQueue)
}()

Expand Down
11 changes: 6 additions & 5 deletions tests/e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ var apiKey, storageType, keyFile string
var cacheSize uint64

const (
QEDUrl = "http://127.0.0.1:8080"
PubUrl = "http://127.0.0.1:8888"
APIKey = "my-key"
QEDUrl = "http://127.0.0.1:8080"
PubUrl = "http://127.0.0.1:8888"
APIKey = "my-key"
QEDGossip = "127.0.0.1:8600"
)

func init() {
Expand Down Expand Up @@ -73,7 +74,7 @@ func newAgent(id int, name string, role member.Type, p gossip.Processor, t *test
agentConf.BindAddr = fmt.Sprintf("127.0.0.1:930%d", id)
}

agentConf.StartJoin = []string{QEDUrl}
agentConf.StartJoin = []string{QEDGossip}
agentConf.EnableCompression = true
agentConf.AlertsUrls = []string{PubUrl}
agentConf.Role = role
Expand Down Expand Up @@ -220,7 +221,7 @@ func setupServer(id int, joinAddr string, t *testing.T) (scope.TestF, scope.Test
t.Log(err)
}
})()
time.Sleep(2 * time.Second)
time.Sleep(5 * time.Second)
}

after := func(t *testing.T) {
Expand Down
77 changes: 40 additions & 37 deletions tests/e2e/tamper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"testing"
"time"

"github.com/bbva/qed/hashing"
// "github.com/bbva/qed/hashing"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/testutils/rand"
"github.com/bbva/qed/testutils/scope"
Expand Down Expand Up @@ -64,50 +64,53 @@ func TestTamper(t *testing.T) {

event := rand.RandomString(10)

scenario("Add one event and get its membership proof", func() {
// scenario("Add one event and get its membership proof", func() {
// var snapshot *protocol.Snapshot
// var err error

// let("Add event", func(t *testing.T) {
// snapshot, err = client.Add(event)
// assert.NoError(t, err)

// assert.Equal(t, snapshot.EventDigest, hashing.NewSha256Hasher().Do([]byte(event)),
// "The snapshot's event doesn't match: expected %s, actual %s", event, snapshot.EventDigest)
// assert.False(t, snapshot.Version < 0, "The snapshot's version must be greater or equal to 0")
// assert.False(t, len(snapshot.HyperDigest) == 0, "The snapshot's hyperDigest cannot be empty")
// assert.False(t, len(snapshot.HistoryDigest) == 0, "The snapshot's hyperDigest cannot be empt")
// })

// let("Get membership proof for first inserted event", func(t *testing.T) {
// result, err := client.Membership([]byte(event), snapshot.Version)
// assert.NoError(t, err)

// assert.True(t, result.Exists, "The queried key should be a member")
// assert.Equal(t, result.QueryVersion, snapshot.Version,
// "The query version doest't match the queried one: expected %d, actual %d", snapshot.Version, result.QueryVersion)
// assert.Equal(t, result.ActualVersion, snapshot.Version,
// "The actual version should match the queried one: expected %d, actual %d", snapshot.Version, result.ActualVersion)
// assert.Equal(t, result.CurrentVersion, snapshot.Version,
// "The current version should match the queried one: expected %d, actual %d", snapshot.Version, result.CurrentVersion)
// assert.Equal(t, []byte(event), result.Key,
// "The returned event doesn't math the original one: expected %s, actual %s", event, result.Key)
// assert.False(t, len(result.KeyDigest) == 0, "The key digest cannot be empty")
// assert.False(t, len(result.Hyper) == 0, "The hyper proof cannot be empty")
// assert.False(t, result.ActualVersion > 0 && len(result.History) == 0,
// "The history proof cannot be empty when version is greater than 0")

// })
// })

scenario("Add one event and check that it has been published", func() {
var snapshot *protocol.Snapshot
var err error

let("Add event", func(t *testing.T) {
snapshot, err = client.Add(event)
assert.NoError(t, err)

assert.Equal(t, snapshot.EventDigest, hashing.NewSha256Hasher().Do([]byte(event)),
"The snapshot's event doesn't match: expected %s, actual %s", event, snapshot.EventDigest)
assert.False(t, snapshot.Version < 0, "The snapshot's version must be greater or equal to 0")
assert.False(t, len(snapshot.HyperDigest) == 0, "The snapshot's hyperDigest cannot be empty")
assert.False(t, len(snapshot.HistoryDigest) == 0, "The snapshot's hyperDigest cannot be empt")
})

let("Get membership proof for first inserted event", func(t *testing.T) {
result, err := client.Membership([]byte(event), snapshot.Version)
assert.NoError(t, err)

assert.True(t, result.Exists, "The queried key should be a member")
assert.Equal(t, result.QueryVersion, snapshot.Version,
"The query version doest't match the queried one: expected %d, actual %d", snapshot.Version, result.QueryVersion)
assert.Equal(t, result.ActualVersion, snapshot.Version,
"The actual version should match the queried one: expected %d, actual %d", snapshot.Version, result.ActualVersion)
assert.Equal(t, result.CurrentVersion, snapshot.Version,
"The current version should match the queried one: expected %d, actual %d", snapshot.Version, result.CurrentVersion)
assert.Equal(t, []byte(event), result.Key,
"The returned event doesn't math the original one: expected %s, actual %s", event, result.Key)
assert.False(t, len(result.KeyDigest) == 0, "The key digest cannot be empty")
assert.False(t, len(result.Hyper) == 0, "The hyper proof cannot be empty")
assert.False(t, result.ActualVersion > 0 && len(result.History) == 0,
"The history proof cannot be empty when version is greater than 0")

})
})

scenario("S", func() {
var snapshot *protocol.Snapshot
var err error

let("Add event", func(t *testing.T) {
snapshot, err = client.Add(event)
assert.NoError(t, err)
time.Sleep(100 * time.Second)
let("Get signed snapshot from snapshot public storage", func(t *testing.T) {
time.Sleep(2 * time.Second)
ss, err := getSnapshot(0)
if err != nil {
fmt.Println("Error: ", err)
Expand Down
1 change: 1 addition & 0 deletions tests/e2e/test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) {
atomic.AddUint64(&s.stats.count[RPS], 1)
atomic.AddUint64(&s.stats.count[SNAP], 1)
if r.Method == "POST" {
fmt.Println(">>>>>>> REQUEST RECEIVED")
// Decode batch to get signed snapshots and batch version.
var b protocol.BatchSnapshots
buf, err := ioutil.ReadAll(r.Body)
Expand Down
Empty file modified tests/gossip/run_gossip.sh
100644 → 100755
Empty file.

0 comments on commit 72931cc

Please sign in to comment.