Skip to content

Commit

Permalink
reload raft transport layer
Browse files Browse the repository at this point in the history
fix up linting
  • Loading branch information
chelseakomlo committed Jan 8, 2018
1 parent c70702e commit 11089b2
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 59 deletions.
11 changes: 6 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,9 @@ func (c *Client) init() error {
return nil
}

// ReloadTLSConnections allows a client to reload RPC connections if the
// client's TLS configuration changes from plaintext to TLS
func (c *Client) ReloadTLSConnections(newConfig *nconfig.TLSConfig) error {
// reloadTLSConnections allows a client to reload its TLS configuration on the
// fly
func (c *Client) reloadTLSConnections(newConfig *nconfig.TLSConfig) error {
var tlsWrap tlsutil.RegionWrapper
if newConfig != nil && newConfig.EnableRPC {
tw, err := c.config.NewTLSConfiguration(newConfig).OutgoingTLSWrapper()
Expand All @@ -378,6 +378,8 @@ func (c *Client) ReloadTLSConnections(newConfig *nconfig.TLSConfig) error {
tlsWrap = tw
}

// Keep the client configuration up to date as we use configuration values to
// decide on what type of connections to accept
c.configLock.Lock()
c.config.TLSConfig = newConfig
c.configLock.Unlock()
Expand All @@ -387,8 +389,7 @@ func (c *Client) ReloadTLSConnections(newConfig *nconfig.TLSConfig) error {
return nil
}

// Reload allows a client to reload RPC connections if the
// client's TLS configuration changes
// Reload allows a client to reload its configuration on the fly
func (c *Client) Reload(newConfig *config.Config) error {
return c.reloadTLSConnections(newConfig.TLSConfig)
}
Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ func TestClient_ReloadTLS_UpgradePlaintextToTLS(t *testing.T) {
assert := assert.New(t)

s1, addr := testServer(t, func(c *nomad.Config) {
c.Region = "dc1"
c.Region = "foo"
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) {
assert := assert.New(t)

s1, addr := testServer(t, func(c *nomad.Config) {
c.Region = "dc1"
c.Region = "foo"
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
Expand Down Expand Up @@ -1090,7 +1090,7 @@ func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) {

req := structs.NodeSpecificRequest{
NodeID: c1.Node().ID,
QueryOptions: structs.QueryOptions{Region: "dc1"},
QueryOptions: structs.QueryOptions{Region: "foo"},
}
var out structs.SingleNodeResponse
testutil.AssertUntil(100*time.Millisecond,
Expand Down
2 changes: 1 addition & 1 deletion client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (c *Config) TLSConfiguration() *tlsutil.Config {
}

// NewTLSConfiguration returns a TLSUtil Config for a new TLS config object
// This allows a TLSConfig object to be created without first explicitely
// This allows a TLSConfig object to be created without first explicitly
// setting it
func (c *Config) NewTLSConfiguration(tlsConfig *config.TLSConfig) *tlsutil.Config {
return &tlsutil.Config{
Expand Down
8 changes: 4 additions & 4 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -47,7 +46,6 @@ type Command struct {
args []string
agent *Agent
httpServer *HTTPServer
httpServerLock sync.Mutex
logFilter *logutils.LevelFilter
logOutput io.Writer
retryJoinErrCh chan struct{}
Expand Down Expand Up @@ -602,8 +600,6 @@ WAIT:

func (c *Command) reloadHTTPServer(newConfig *Config) error {
c.agent.logger.Println("[INFO] agent: Reloading HTTP server with new TLS configuration")
c.httpServerLock.Lock()
defer c.httpServerLock.Unlock()

c.httpServer.Shutdown()

Expand Down Expand Up @@ -640,6 +636,7 @@ func (c *Command) handleReload() {

shouldReload := c.agent.ShouldReload(newConf)
if shouldReload {
c.agent.logger.Printf("[DEBUG] agent: starting reload of agent config")
err := c.agent.Reload(newConf)
if err != nil {
c.agent.logger.Printf("[ERR] agent: failed to reload the config: %v", err)
Expand All @@ -649,8 +646,10 @@ func (c *Command) handleReload() {

if s := c.agent.Server(); s != nil {
sconf, err := convertServerConfig(newConf, c.logOutput)
c.agent.logger.Printf("[DEBUG] agent: starting reload of server config")
if err != nil {
c.agent.logger.Printf("[ERR] agent: failed to convert server config: %v", err)
return
} else {
if err := s.Reload(sconf); err != nil {
c.agent.logger.Printf("[ERR] agent: reloading server config failed: %v", err)
Expand All @@ -661,6 +660,7 @@ func (c *Command) handleReload() {

if s := c.agent.Client(); s != nil {
clientConfig, err := c.agent.clientConfig()
c.agent.logger.Printf("[DEBUG] agent: starting reload of client config")
if err != nil {
c.agent.logger.Printf("[ERR] agent: reloading client config failed: %v", err)
return
Expand Down
79 changes: 37 additions & 42 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ const (
// schedulers, and notification bus for agents.
type Server struct {
config *Config
configLock sync.RWMutex
logger *log.Logger
configLock sync.Mutex

logger *log.Logger

// Connection pool to other Nomad servers
connPool *ConnPool
Expand All @@ -96,32 +97,28 @@ type Server struct {

// The raft instance is used among Nomad nodes within the
// region to protect operations that require strong consistency
leaderCh <-chan bool
raft *raft.Raft
raftLayer *RaftLayer
raftLayerLock sync.Mutex
leaderCh <-chan bool
raft *raft.Raft
raftLayer *RaftLayer

raftStore *raftboltdb.BoltStore
raftInmem *raft.InmemStore

raftTransport *raft.NetworkTransport
raftTransportLock sync.Mutex
raftTransport *raft.NetworkTransport

// fsm is the state machine used with Raft
fsm *nomadFSM

// rpcListener is used to listen for incoming connections
rpcListener net.Listener
rpcListenerLock sync.Mutex
listenerCh chan struct{}
rpcListener net.Listener
listenerCh chan struct{}

rpcServer *rpc.Server
rpcAdvertise net.Addr

// rpcTLS is the TLS config for incoming TLS requests
rpcTLS *tls.Config
rpcCancel context.CancelFunc
rpcTLSLock sync.Mutex
rpcTLS *tls.Config
rpcCancel context.CancelFunc

// peers is used to track the known Nomad servers. This is
// used for region forwarding and clustering.
Expand Down Expand Up @@ -329,6 +326,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
// Start ingesting events for Serf
go s.serfEventHandler()

// start the RPC listener for the server
s.startRPCListener()

// Emit metrics for the eval broker
Expand All @@ -353,10 +351,8 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
return s, nil
}

// Start the RPC listeners
// startRPCListener starts the server's the RPC listener
func (s *Server) startRPCListener() {
s.rpcListenerLock.Lock()
defer s.rpcListenerLock.Unlock()
ctx, cancel := context.WithCancel(context.Background())
s.rpcCancel = cancel
go func() {
Expand All @@ -365,10 +361,8 @@ func (s *Server) startRPCListener() {
}()
}

// createRPCListener creates the server's RPC listener
func (s *Server) createRPCListener() error {
s.rpcListenerLock.Lock()
defer s.rpcListenerLock.Unlock()

s.listenerCh = make(chan struct{})
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil || list == nil {
Expand All @@ -380,6 +374,8 @@ func (s *Server) createRPCListener() error {
return nil
}

// getTLSConf gets the server's TLS configuration based on the config supplied
// by the operator
func getTLSConf(enableRPC bool, tlsConf *tlsutil.Config) (*tls.Config, tlsutil.RegionWrapper, error) {
var tlsWrap tlsutil.RegionWrapper
var incomingTLS *tls.Config
Expand All @@ -399,18 +395,27 @@ func getTLSConf(enableRPC bool, tlsConf *tlsutil.Config) (*tls.Config, tlsutil.R
return incomingTLS, tlsWrap, nil
}

// ReloadTLSConnections updates a server's TLS configuration and reloads RPC
// connections. This will handle both TLS upgrades and downgrades.
// reloadTLSConnections updates a server's TLS configuration and reloads RPC
// connections.
func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error {
s.logger.Printf("[INFO] nomad: reloading server connections due to configuration changes")

// the server config must be in sync with the latest config changes, due to
// testing for TLS configuration settings in rpc.go
tlsConf := s.config.newTLSConfig(newTLSConfig)
incomingTLS, tlsWrap, err := getTLSConf(newTLSConfig.EnableRPC, tlsConf)
if err != nil {
s.logger.Printf("[ERR] nomad: unable to reset TLS context")
return err
}

// Keeping configuration in sync is important for other places that require
// access to config information, such as rpc.go, where we decide on what kind
// of network connections to accept depending on the server configuration
s.configLock.Lock()
s.config.TLSConfig = newTLSConfig
s.configLock.Unlock()

if s.rpcCancel == nil {
s.logger.Printf("[ERR] nomad: No TLS Context to reset")
return fmt.Errorf("Unable to reset tls context")
Expand All @@ -422,38 +427,27 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error {
s.config.TLSConfig = newTLSConfig
s.configLock.Unlock()

s.rpcTLSLock.Lock()
s.rpcTLS = incomingTLS
s.rpcTLSLock.Unlock()

s.raftTransportLock.Lock()
defer s.raftTransportLock.Unlock()
s.raftTransport.Close()

s.connPool.ReloadTLS(tlsWrap)

// reinitialize our rpc listener
s.rpcListenerLock.Lock()
s.rpcListener.Close()
<-s.listenerCh
s.rpcListenerLock.Unlock()
s.raftTransport.Pause()
s.raftLayer.Close()

err = s.createRPCListener()
if err != nil {
return err
}
s.startRPCListener()

s.raftLayerLock.Lock()
s.raftLayer.Close()
// CLose existing streams
wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap)
s.raftLayer = NewRaftLayer(s.rpcAdvertise, wrapper)
s.raftLayerLock.Unlock()

// re-initialize the network transport with a re-initialized stream layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, s.config.RaftTimeout,
s.config.LogOutput)
s.raftTransport = trans
s.startRPCListener()

time.Sleep(3 * time.Second)
if err != nil {
return err
}

s.logger.Printf("[DEBUG] nomad: finished reloading server connections")
return nil
Expand Down Expand Up @@ -621,6 +615,7 @@ func (s *Server) Reload(newConfig *Config) error {

if !newConfig.TLSConfig.Equals(s.config.TLSConfig) {
if err := s.reloadTLSConnections(newConfig.TLSConfig); err != nil {
s.logger.Printf("[DEBUG] nomad: reloading server TLS configuration")
multierror.Append(&mErr, err)
}
}
Expand Down
Loading

0 comments on commit 11089b2

Please sign in to comment.