Skip to content

Commit

Permalink
Join to every specified raft address
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Dec 3, 2018
1 parent 0d6c560 commit e650307
Showing 1 changed file with 15 additions and 20 deletions.
35 changes: 15 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"net/http"
_ "net/http/pprof" // this will enable the default profiling capabilities
"os"
"os/signal"
"syscall"

"github.com/bbva/qed/api/apihttp"
"github.com/bbva/qed/api/mgmthttp"
Expand All @@ -42,6 +40,7 @@ import (
"github.com/bbva/qed/sign"
"github.com/bbva/qed/storage"
"github.com/bbva/qed/storage/badger"
"github.com/bbva/qed/util"
)

// Server encapsulates the data and login to start/stop a QED server
Expand Down Expand Up @@ -195,9 +194,11 @@ func (s *Server) Start() error {
log.Debugf(" ready on %s and %s\n", s.conf.HttpAddr, s.conf.MgmtAddr)

if !s.bootstrap {
log.Debug(" * Joining existent cluster QED MGMT HTTP server in addr: ", s.conf.MgmtAddr)
if err := join(s.conf.RaftJoinAddr[0], s.conf.RaftAddr, s.conf.NodeID); err != nil {
log.Fatalf("failed to join node at %s: %s", s.conf.RaftJoinAddr[0], err.Error())
for _, addr := range s.conf.RaftJoinAddr {
log.Debug(" * Joining existent cluster QED MGMT HTTP server in addr: ", s.conf.MgmtAddr)
if err := join(addr, s.conf.RaftAddr, s.conf.NodeID); err != nil {
log.Fatalf("failed to join node at %s: %s", addr, err.Error())
}
}
}

Expand All @@ -206,20 +207,21 @@ func (s *Server) Start() error {
s.sender.Start(s.agentsQueue)
}()

awaitTermSignal(s.Stop)
util.AwaitTermSignal(s.Stop)

log.Debug("Stopping server, about to exit...")

return nil
}

// Stop will close all the channels from the mux servers.
func (s *Server) Stop() {
func (s *Server) Stop() error {

if s.tamperingServer != nil {
log.Debugf("Tampering enabled: stopping server...")
if err := s.tamperingServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
return err
}
log.Debugf("Done.\n")
}
Expand All @@ -228,24 +230,28 @@ func (s *Server) Stop() {
log.Debugf("Profiling enabled: stopping server...")
if err := s.profilingServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
return err
}
log.Debugf("Done.\n")
}

log.Debugf("Stopping MGMT server...")
if err := s.mgmtServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
return err
}

log.Debugf("Stopping HTTP server...")
if err := s.httpServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
return err
}

log.Debugf("Stopping RAFT server...")
err := s.raftBalloon.Close(true)
if err != nil {
log.Error(err)
return err
}

log.Debugf("Closing QED sender...")
Expand All @@ -255,9 +261,11 @@ func (s *Server) Stop() {
log.Debugf("Stopping QED agent...")
if err := s.agent.Shutdown(); err != nil {
log.Error(err)
return err
}

log.Debugf("Done. Exiting...\n")
return nil
}

func newHTTPServer(endpoint string, raftBalloon raftwal.RaftBalloonApi) *http.Server {
Expand Down Expand Up @@ -290,16 +298,3 @@ func newTamperingServer(endpoint string, store storage.DeletableStore, hasher ha
Handler: apihttp.LogHandler(router),
}
}

func awaitTermSignal(closeFn func()) {

signals := make(chan os.Signal, 1)
// sigint: Ctrl-C, sigterm: kill command
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

// block main and wait for a signal
sig := <-signals
log.Infof("Signal received: %v", sig)

closeFn()
}

0 comments on commit e650307

Please sign in to comment.