Skip to content

Commit

Permalink
Server start() does not block. Use stop() to shutitdown
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Apr 10, 2019
1 parent 8729a04 commit 066b569
Showing 1 changed file with 10 additions and 24 deletions.
34 changes: 10 additions & 24 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,12 @@ import (
"github.com/bbva/qed/api/apihttp"
"github.com/bbva/qed/api/mgmthttp"
"github.com/bbva/qed/gossip"
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/gossip/sender"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/raftwal"
"github.com/bbva/qed/sign"
"github.com/bbva/qed/storage/rocks"
"github.com/bbva/qed/util"
)

// Server encapsulates the data and login to start/stop a QED server
Expand All @@ -57,7 +54,7 @@ type Server struct {
metricsServer *metrics.Server
prometheusRegistry *prometheus.Registry
signer sign.Signer
sender *sender.Sender
sender *Sender
agent *gossip.Agent
snapshotsCh chan *protocol.Snapshot
}
Expand Down Expand Up @@ -125,26 +122,19 @@ func NewServer(conf *Config) (*Server, error) {
// Create gossip agent
config := gossip.DefaultConfig()
config.BindAddr = conf.GossipAddr
config.Role = member.Server
config.Role = "server"
config.NodeName = conf.NodeID

server.agent, err = gossip.NewAgent(config, nil, server.metricsServer)
server.agent, err = gossip.NewAgentFromConfig(config)
if err != nil {
return nil, err
}

if len(conf.GossipJoinAddr) > 0 {
_, err = server.agent.Join(conf.GossipJoinAddr)
if err != nil {
return nil, err
}
}

// TODO: add queue size to config
server.snapshotsCh = make(chan *protocol.Snapshot, 2<<16)
server.snapshotsCh = make(chan *protocol.Snapshot, 1<<16)

// Create sender
server.sender = sender.NewSender(server.agent, sender.DefaultConfig(), server.signer)
server.sender = NewSender(server.agent, server.signer, 100, 2, 3)

// Create RaftBalloon
server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.snapshotsCh)
Expand Down Expand Up @@ -253,14 +243,9 @@ func (s *Server) Start() error {
}
}

go func() {
log.Debug(" * Starting QED gossip agent.")
s.sender.Start(s.snapshotsCh)
}()

util.AwaitTermSignal(s.Stop)
s.sender.Start(s.snapshotsCh)

log.Debug("Stopping server, about to exit...")
s.agent.Start()

return nil
}
Expand Down Expand Up @@ -292,8 +277,9 @@ func (s *Server) Stop() error {
return err
}

log.Debugf("Closing QED sender...")
s.sender.Stop()
/*
log.Debugf("Closing QED sender...")
s.sender.Stop() */
close(s.snapshotsCh)

log.Debugf("Stopping QED agent...")
Expand Down

0 comments on commit 066b569

Please sign in to comment.