Skip to content

Commit

Permalink
First attempt to connect QED server with auditors
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Nov 20, 2018
1 parent 529c14a commit 2326381
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 14 deletions.
1 change: 1 addition & 0 deletions cmd/agent_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func newAgentStartCommand() *cobra.Command {
config.BindAddr = bindAddr
config.AdvertiseAddr = advertiseAddr
config.Role = gossip.NewNodeType(role)
config.EnableCompression = true

var agent *gossip.Agent
var err error
Expand Down
5 changes: 2 additions & 3 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func NewTopology() *Topology {
func (t *Topology) Update(m *Member) error {
t.Lock()
defer t.Unlock()
log.Debugf("Updating topology with member: %+v", m)
t.members[m.Role][m.Name] = m
return nil
}
Expand Down Expand Up @@ -122,7 +123,6 @@ type Member struct {
Port uint16
Role AgentType
Status MemberStatus
Node *memberlist.Node
}

// MemberStatus is the state that a member is in.
Expand Down Expand Up @@ -165,7 +165,7 @@ type NopMessageHanler struct {
func (h *NopMessageHanler) HandleMsg([]byte) {}

func NewNopMessageHandler(*Agent) MessageHandler {
return nil // &NopMessageHanler{}
return &NopMessageHanler{}
}

func Create(conf *Config, handler MessageHandlerBuilder) (agent *Agent, err error) {
Expand Down Expand Up @@ -339,7 +339,6 @@ func (a *Agent) getMember(peer *memberlist.Node) *Member {
Addr: net.IP(peer.Addr),
Port: peer.Port,
Role: meta.Role,
Node: peer,
}
}

Expand Down
53 changes: 53 additions & 0 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package auditor

import (
"bytes"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)

type Auditor struct {
Expand Down Expand Up @@ -30,4 +36,51 @@ func NewAuditorHandlerBuilder(c *Config) gossip.MessageHandlerBuilder {

func (a *Auditor) HandleMsg(msg []byte) {

batch, err := a.decode(msg)
if err != nil {
log.Errorf("Unable to decode message: %v", err)
return
}

log.Infof("Batch received, TTL: %d: %v", batch.TTL, *batch)

if batch.TTL <= 0 {
return
}

peers := a.Agent.GetPeers(2, gossip.AuditorType)
peers = append(peers, a.Agent.GetPeers(2, gossip.MonitorType)...)
peers = append(peers, a.Agent.GetPeers(2, gossip.PublisherType)...)

batch.TTL--
newBatch, _ := encode(batch)

for _, peer := range peers {
err := a.Agent.Memberlist().SendReliable(&memberlist.Node{Addr: peer.Addr, Port: peer.Port}, newBatch)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
}

}

func (a *Auditor) decode(buf []byte) (*protocol.BatchSnapshots, error) {
batch := &protocol.BatchSnapshots{}
reader := bytes.NewReader(buf)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(batch); err != nil {
log.Errorf("Failed to decode snapshots batch: %v", err)
return nil, err
}
return batch, nil
}

func encode(msg *protocol.BatchSnapshots) ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(msg); err != nil {
log.Errorf("Failed to encode message: %v", err)
return nil, err
}
return buf.Bytes(), nil
}
27 changes: 20 additions & 7 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/sign"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)

type Sender struct {
Expand Down Expand Up @@ -61,14 +62,22 @@ func (s Sender) Start(ch chan *protocol.Snapshot) {
for {
select {
case <-ticker.C:
msg, _ := encode(s.getBatch(ch))
batch := s.getBatch(ch)
if batch == nil {
continue
}
log.Debugf("Encoding batch: %+v", batch)
msg, _ := encode(batch)

peers := s.Agent.GetPeers(1, gossip.AuditorType)
peers = append(peers, s.Agent.GetPeers(1, gossip.MonitorType)...)
peers = append(peers, s.Agent.GetPeers(1, gossip.PublisherType)...)
log.Debugf("Peers selected: %+v", peers)

for _, peer := range peers {
err := s.Agent.Memberlist().SendReliable(peer.Node, msg)
log.Debugf("%+v", peer)
log.Debugf("Sending batch to peer: %s:%d", peer.Addr, peer.Port)
err := s.Agent.Memberlist().SendReliable(&memberlist.Node{Addr: peer.Addr, Port: peer.Port}, msg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
Expand All @@ -83,7 +92,7 @@ func (s Sender) Stop() {
s.quit <- true
}

func encode(msg protocol.BatchSnapshots) ([]byte, error) {
func encode(msg *protocol.BatchSnapshots) ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(msg); err != nil {
Expand All @@ -93,21 +102,25 @@ func encode(msg protocol.BatchSnapshots) ([]byte, error) {
return buf.Bytes(), nil
}

func (s *Sender) getBatch(ch chan *protocol.Snapshot) protocol.BatchSnapshots {
func (s *Sender) getBatch(ch chan *protocol.Snapshot) *protocol.BatchSnapshots {

if len(ch) == 0 {
return nil
}

var snapshot *protocol.Snapshot
var batch protocol.BatchSnapshots
var batchSize int = 100
var counter int = 0
batch.Snapshots = make([]*protocol.SignedSnapshot, 0)
batch.TTL = 3
batch.TTL = s.Config.TTL

for {
select {
case snapshot = <-ch:
counter++
default:
return batch
return &batch
}

ss, err := s.doSign(snapshot)
Expand All @@ -117,7 +130,7 @@ func (s *Sender) getBatch(ch chan *protocol.Snapshot) protocol.BatchSnapshots {
batch.Snapshots = append(batch.Snapshots, ss)

if counter == batchSize {
return batch
return &batch
}

}
Expand Down
3 changes: 2 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func NewServer(
// Create gossip agent
config := gossip.DefaultConfig()
config.BindAddr = gossipAddr
config.Role = gossip.ServerType
server.agent, err = gossip.Create(config, gossip.NewNopMessageHandler)
if err != nil {
return nil, err
Expand Down Expand Up @@ -233,7 +234,7 @@ func (s *Server) Start() error {
log.Debugf(" ready on %s and %s\n", s.httpAddr, s.mgmtAddr)

if !s.bootstrap {
log.Debug(" * Joining existen cluster QED MGMT HTTP server in addr: ", s.mgmtAddr)
log.Debug(" * Joining existent cluster QED MGMT HTTP server in addr: ", s.mgmtAddr)
if err := join(s.joinAddr, s.raftAddr, s.nodeID); err != nil {
log.Fatalf("failed to join node at %s: %s", s.joinAddr, err.Error())
}
Expand Down
6 changes: 3 additions & 3 deletions tests/gossip/run_gossip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

master="127.0.0.1:9000"
go run $GOPATH/src/github.com/bbva/qed/main.go agent -k key -l silent --bind $master --node 0 --role server &
master="127.0.0.1:9100"
go run $GOPATH/src/github.com/bbva/qed/main.go start -k key -l debug --node-id server0 --gossip-addr $master --raft-addr 127.0.0.1:9000 -y $HOME/.ssh/id_ed25519 &
pids[0]=$!
sleep 1s

for i in `seq 1 $1`;
do
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent -k key -l silent --bind 127.0.0.1:900$i --join $master --node $i --role monitor" &
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent -k key -l debug --bind 127.0.0.1:910$i --join $master --node auditor$i --role auditor" &
pids[${i}]=$!
done

Expand Down

0 comments on commit 2326381

Please sign in to comment.