Skip to content

Commit

Permalink
Place agent inside Sender
Browse files Browse the repository at this point in the history
Co-authored-by: Gabriel Díaz <[email protected]>
  • Loading branch information
Jose Luis Lucas and gdiazlo committed Nov 19, 2018
1 parent 6fb93eb commit ac88a2a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
31 changes: 25 additions & 6 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
"github.com/hashicorp/go-msgpack/codec"
)

type Sender struct {
Agent *gossip.Agent
Config *Config
quit chan bool
}

type Config struct {
BatchSize uint
BatchInterval time.Duration
Expand All @@ -39,29 +45,42 @@ func DefaultConfig() *Config {
}
}

func Start(n *gossip.Agent, ch chan *protocol.Snapshot) {
func NewSender(a *gossip.Agent, c *Config) *Sender {
return &Sender{
Agent: a,
Config: c,
quit: make(chan bool),
}
}

func (s Sender) Start(ch chan *protocol.Snapshot) {
ticker := time.NewTicker(1 * time.Second)

for {
select {
case <-ticker.C:
msg, _ := encode(getBatch(ch))

peers := n.GetPeers(1, gossip.AuditorType)
peers = append(peers, n.GetPeers(1, gossip.MonitorType)...)
peers = append(peers, n.GetPeers(1, gossip.PublisherType)...)
peers := s.Agent.GetPeers(1, gossip.AuditorType)
peers = append(peers, s.Agent.GetPeers(1, gossip.MonitorType)...)
peers = append(peers, s.Agent.GetPeers(1, gossip.PublisherType)...)

for _, peer := range peers {
err := n.Memberlist().SendReliable(peer.Node, msg)
err := s.Agent.Memberlist().SendReliable(peer.Node, msg)
if err != nil {
log.Errorf("Failed send message: %v", err)
}
}
// TODO: Implement graceful shutdown.
case <-s.quit:
return
}
}
}

func (s Sender) Stop() {
s.quit <- true
}

func encode(msg protocol.BatchSnapshots) ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
Expand Down
9 changes: 7 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Server struct {
tamperingServer *http.Server
profilingServer *http.Server
signer sign.Signer
sender *sender.Sender
agent *gossip.Agent
agentsQueue chan *protocol.Snapshot
}
Expand Down Expand Up @@ -136,6 +137,9 @@ func NewServer(
// TODO: add queue size to config
server.agentsQueue = make(chan *protocol.Snapshot, 10000)

// Create sender
server.sender = sender.NewSender(server.agent, sender.DefaultConfig())

// Create RaftBalloon
server.raftBalloon, err = raftwal.NewRaftBalloon(raftPath, raftAddr, nodeID, store, server.agentsQueue)
if err != nil {
Expand Down Expand Up @@ -225,7 +229,7 @@ func (s *Server) Start() error {

go func() {
log.Debug(" * Starting QED agent.")
sender.Start(s.agent, s.agentsQueue)
s.sender.Start(s.agentsQueue)
}()

awaitTermSignal(s.Stop)
Expand Down Expand Up @@ -270,7 +274,8 @@ func (s *Server) Stop() {
log.Error(err)
}

log.Debugf("Closing QED agent queue...")
log.Debugf("Closing QED sender...")
s.sender.Stop()
close(s.agentsQueue)

log.Debugf("Stopping QED agent...")
Expand Down

0 comments on commit ac88a2a

Please sign in to comment.