Skip to content

Commit

Permalink
Add monitor skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Nov 20, 2018
1 parent 2326381 commit 7749711
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 1 deletion.
3 changes: 3 additions & 0 deletions cmd/agent_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/gossip/auditor"
"github.com/bbva/qed/gossip/monitor"
"github.com/bbva/qed/log"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -51,6 +52,8 @@ func newAgentStartCommand() *cobra.Command {
switch config.Role {
case gossip.PublisherType:
case gossip.MonitorType:
conf := monitor.DefaultConfig()
agent, err = gossip.Create(config, monitor.NewMonitorHandlerBuilder(conf))
case gossip.AuditorType:
conf := auditor.DefaultConfig()
agent, err = gossip.Create(config, auditor.NewAuditorHandlerBuilder(conf))
Expand Down
86 changes: 86 additions & 0 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package monitor

import (
"bytes"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)

type Monitor struct {
Agent *gossip.Agent
Config *Config
quit chan bool
}

type Config struct {
}

func DefaultConfig() *Config {
return &Config{}
}

func NewMonitorHandlerBuilder(c *Config) gossip.MessageHandlerBuilder {
monitor := &Monitor{
Config: c,
quit: make(chan bool),
}
return func(a *gossip.Agent) gossip.MessageHandler {
monitor.Agent = a
return monitor
}
}

func (m *Monitor) HandleMsg(msg []byte) {

batch, err := decode(msg)
if err != nil {
log.Errorf("Unable to decode message: %v", err)
return
}

log.Infof("Batch received, TTL: %d: %v", batch.TTL, *batch)

if batch.TTL <= 0 {
return
}

peers := m.Agent.GetPeers(2, gossip.AuditorType)
peers = append(peers, m.Agent.GetPeers(2, gossip.MonitorType)...)
peers = append(peers, m.Agent.GetPeers(2, gossip.PublisherType)...)

batch.TTL--
newBatch, _ := encode(batch)

for _, peer := range peers {
err := m.Agent.Memberlist().SendReliable(&memberlist.Node{Addr: peer.Addr, Port: peer.Port}, newBatch)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
}

}

func decode(buf []byte) (*protocol.BatchSnapshots, error) {
batch := &protocol.BatchSnapshots{}
reader := bytes.NewReader(buf)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(batch); err != nil {
log.Errorf("Failed to decode snapshots batch: %v", err)
return nil, err
}
return batch, nil
}

func encode(msg *protocol.BatchSnapshots) ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(msg); err != nil {
log.Errorf("Failed to encode message: %v", err)
return nil, err
}
return buf.Bytes(), nil
}
8 changes: 7 additions & 1 deletion tests/gossip/run_gossip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ sleep 1s
for i in `seq 1 $1`;
do
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent -k key -l debug --bind 127.0.0.1:910$i --join $master --node auditor$i --role auditor" &
pids[${i}]=$!
pids+=($!)
done

for i in `seq 1 $2`;
do
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent -k key -l debug --bind 127.0.0.1:920$i --join $master --node monitor$i --role monitor" &
pids+=($!)
done

for pid in ${pids[*]}; do
Expand Down

0 comments on commit 7749711

Please sign in to comment.