Skip to content

Commit

Permalink
Add process function to measure performance with test_service.go, and…
Browse files Browse the repository at this point in the history
… adapt to changes in BatchSnapshots encode and decode
  • Loading branch information
gdiazlo committed Nov 20, 2018
1 parent a25e7c6 commit 6322dc5
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 55 deletions.
49 changes: 23 additions & 26 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package auditor

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)

type Auditor struct {
Expand Down Expand Up @@ -35,14 +36,16 @@ func NewAuditorHandlerBuilder(c *Config) gossip.MessageHandlerBuilder {
}

func (a *Auditor) HandleMsg(msg []byte) {

batch, err := a.decode(msg)
var batch protocol.BatchSnapshots
err := batch.Decode(msg)
if err != nil {
log.Errorf("Unable to decode message: %v", err)
return
}

log.Infof("Batch received, TTL: %d: %v", batch.TTL, *batch)
log.Infof("Batch received, TTL: %d: %v", batch.TTL, batch)

a.Process(&batch)

if batch.TTL <= 0 {
return
Expand All @@ -53,34 +56,28 @@ func (a *Auditor) HandleMsg(msg []byte) {
peers = append(peers, a.Agent.GetPeers(2, gossip.PublisherType)...)

batch.TTL--
newBatch, _ := encode(batch)
newMsg, _ := batch.Encode()

for _, peer := range peers {
err := a.Agent.Memberlist().SendReliable(&memberlist.Node{Addr: peer.Addr, Port: peer.Port}, newBatch)
err := a.Agent.Memberlist().SendReliable(peer, newMsg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
}

}
func (a *Auditor) Process(b *protocol.BatchSnapshots) {
for i := 0; i < len(b.Snapshots); i++ {
res, err := http.Get(fmt.Sprintf("http://127.0.0.1:8888/?nodeType=auditor&id=%d", b.Snapshots[0].Snapshot.Version))
if err != nil || res == nil {
log.Debugf("Error contacting service with error %v", err)
}
// to reuse connections we need to do this
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()

func (a *Auditor) decode(buf []byte) (*protocol.BatchSnapshots, error) {
batch := &protocol.BatchSnapshots{}
reader := bytes.NewReader(buf)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(batch); err != nil {
log.Errorf("Failed to decode snapshots batch: %v", err)
return nil, err
// time.Sleep(1 * time.Second)
}
return batch, nil
}

func encode(msg *protocol.BatchSnapshots) ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(msg); err != nil {
log.Errorf("Failed to encode message: %v", err)
return nil, err
}
return buf.Bytes(), nil
}
log.Debugf("process(): Processed %v elements of batch id %v", len(b.Snapshots), b.Snapshots[0].Snapshot.Version)
}
50 changes: 24 additions & 26 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package monitor

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)

type Monitor struct {
Expand Down Expand Up @@ -35,14 +36,16 @@ func NewMonitorHandlerBuilder(c *Config) gossip.MessageHandlerBuilder {
}

func (m *Monitor) HandleMsg(msg []byte) {

batch, err := decode(msg)
var batch protocol.BatchSnapshots
err := batch.Decode(msg)
if err != nil {
log.Errorf("Unable to decode message: %v", err)
return
}

log.Infof("Batch received, TTL: %d: %v", batch.TTL, *batch)
log.Infof("Batch received, TTL: %d: %v", batch.TTL, batch)

m.Process(&batch)

if batch.TTL <= 0 {
return
Expand All @@ -53,34 +56,29 @@ func (m *Monitor) HandleMsg(msg []byte) {
peers = append(peers, m.Agent.GetPeers(2, gossip.PublisherType)...)

batch.TTL--
newBatch, _ := encode(batch)
newMsg, _ := batch.Encode()

for _, peer := range peers {
err := m.Agent.Memberlist().SendReliable(&memberlist.Node{Addr: peer.Addr, Port: peer.Port}, newBatch)
err := m.Agent.Memberlist().SendReliable(peer, newMsg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
}

}

func decode(buf []byte) (*protocol.BatchSnapshots, error) {
batch := &protocol.BatchSnapshots{}
reader := bytes.NewReader(buf)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(batch); err != nil {
log.Errorf("Failed to decode snapshots batch: %v", err)
return nil, err
}
return batch, nil
}
func (m *Monitor) Process(b *protocol.BatchSnapshots) {
for i := 0; i < len(b.Snapshots); i++ {
res, err := http.Get(fmt.Sprintf("http://127.0.0.1:8888/?nodeType=monitor&id=%d", b.Snapshots[0].Snapshot.Version))
if err != nil || res == nil {
log.Debugf("Error contacting service with error %v", err)
}
// to reuse connections we need to do this
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()

func encode(msg *protocol.BatchSnapshots) ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(msg); err != nil {
log.Errorf("Failed to encode message: %v", err)
return nil, err
// time.Sleep(1 * time.Second)
}
return buf.Bytes(), nil
}

log.Debugf("process(): Processed %v elements of batch id %v", len(b.Snapshots), b.Snapshots[0].Snapshot.Version)
}
5 changes: 2 additions & 3 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/sign"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)

type Sender struct {
Expand Down Expand Up @@ -67,7 +66,7 @@ func (s Sender) Start(ch chan *protocol.Snapshot) {
continue
}
log.Debugf("Encoding batch: %+v", batch)
msg, _ := encode(batch)
msg, _ := batch.Encode()

peers := s.Agent.GetPeers(1, gossip.AuditorType)
peers = append(peers, s.Agent.GetPeers(1, gossip.MonitorType)...)
Expand All @@ -77,7 +76,7 @@ func (s Sender) Start(ch chan *protocol.Snapshot) {
for _, peer := range peers {
log.Debugf("%+v", peer)
log.Debugf("Sending batch to peer: %s:%d", peer.Addr, peer.Port)
err := s.Agent.Memberlist().SendReliable(&memberlist.Node{Addr: peer.Addr, Port: peer.Port}, msg)
err := s.Agent.Memberlist().SendReliable(peer, msg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
Expand Down

0 comments on commit 6322dc5

Please sign in to comment.