Skip to content

Commit

Permalink
fixing up raft reload tests
Browse files Browse the repository at this point in the history
close second goroutine in raft-net
  • Loading branch information
chelseakomlo committed Jan 17, 2018
1 parent d97c91c commit 0805c41
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 97 deletions.
4 changes: 2 additions & 2 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ WAIT:
}
}

func (c *Command) reloadHTTPServer(newConfig *Config) error {
func (c *Command) reloadHTTPServer() error {
c.agent.logger.Println("[INFO] agent: Reloading HTTP server with new TLS configuration")

c.httpServer.Shutdown()
Expand Down Expand Up @@ -676,7 +676,7 @@ func (c *Command) handleReload() {
// server to a TLS connection could succeed, while reloading the server's rpc
// connections could fail.
if shouldReloadHTTPServer {
err := c.reloadHTTPServer(newConf)
err := c.reloadHTTPServer()
if err != nil {
c.agent.logger.Printf("[ERR] http: failed to reload the config: %v", err)
return
Expand Down
71 changes: 0 additions & 71 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,74 +673,3 @@ func encodeReq(obj interface{}) io.ReadCloser {
enc.Encode(obj)
return ioutil.NopCloser(buf)
}

func TestHTTP_VerifyHTTPSClientUpgrade_AfterConfigReload(t *testing.T) {
t.Parallel()
assert := assert.New(t)

const (
cafile = "../../helper/tlsutil/testdata/ca.pem"
foocert = "../../helper/tlsutil/testdata/nomad-foo.pem"
fookey = "../../helper/tlsutil/testdata/nomad-foo-key.pem"
)

newConfig := &Config{
TLSConfig: &config.TLSConfig{
EnableHTTP: true,
VerifyHTTPSClient: true,
CAFile: cafile,
CertFile: foocert,
KeyFile: fookey,
},
}

s := makeHTTPServer(t, func(c *Config) {
c.TLSConfig = newConfig.TLSConfig
})
defer s.Shutdown()

// HTTP plaintext request should succeed
reqURL := fmt.Sprintf("http://%s/v1/agent/self", s.Agent.config.AdvertiseAddrs.HTTP)

// First test with a plaintext request
transport := &http.Transport{}
client := &http.Client{Transport: transport}
_, err := http.NewRequest("GET", reqURL, nil)
assert.Nil(err)

// Next, reload the TLS configuration
err = s.Agent.Reload(newConfig)
assert.Nil(err)

// PASS: Requests that specify a valid hostname, CA cert, and client
// certificate succeed.
tlsConf := &tls.Config{
ServerName: "client.regionFoo.nomad",
RootCAs: x509.NewCertPool(),
GetClientCertificate: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
c, err := tls.LoadX509KeyPair(foocert, fookey)
if err != nil {
return nil, err
}
return &c, nil
},
}

// HTTPS request should succeed
httpsReqURL := fmt.Sprintf("https://%s/v1/agent/self", s.Agent.config.AdvertiseAddrs.HTTP)

cacertBytes, err := ioutil.ReadFile(cafile)
assert.Nil(err)
tlsConf.RootCAs.AppendCertsFromPEM(cacertBytes)

transport = &http.Transport{TLSClientConfig: tlsConf}
client = &http.Client{Transport: transport}
req, err := http.NewRequest("GET", httpsReqURL, nil)
assert.Nil(err)

resp, err := client.Do(req)
assert.Nil(err)

resp.Body.Close()
assert.Equal(resp.StatusCode, 200)
}
15 changes: 5 additions & 10 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,6 @@ func getTLSConf(enableRPC bool, tlsConf *tlsutil.Config) (*tls.Config, tlsutil.R
func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error {
s.logger.Printf("[INFO] nomad: reloading server connections due to configuration changes")

// the server config must be in sync with the latest config changes, due to
// testing for TLS configuration settings in rpc.go
tlsConf := tlsutil.NewTLSConfiguration(newTLSConfig)
incomingTLS, tlsWrap, err := getTLSConf(newTLSConfig.EnableRPC, tlsConf)
if err != nil {
Expand Down Expand Up @@ -430,25 +428,22 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error {
// reinitialize our rpc listener
s.rpcListener.Close()
<-s.listenerCh

// Close existing Raft connections
s.raftTransport.Pause()
s.raftLayer.Close()
s.startRPCListener()

listener, err := s.createRPCListener()
if err != nil {
listener.Close()
return err
}

// Close existing streams
// Close and reload existing Raft connections
s.raftTransport.Pause()
s.raftLayer.Close()
wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap)
s.raftLayer = NewRaftLayer(s.rpcAdvertise, wrapper)

// Reload raft connections
s.raftTransport.Reload(s.raftLayer)

s.startRPCListener()
time.Sleep(3 * time.Second)

s.logger.Printf("[DEBUG] nomad: finished reloading server connections")
return nil
Expand Down
23 changes: 13 additions & 10 deletions nomad/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) {
)
dir := tmpDir(t)
defer os.RemoveAll(dir)

s1 := testServer(t, func(c *Config) {
c.DataDir = path.Join(dir, "nodeA")
})
Expand All @@ -312,10 +313,8 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) {

err := s1.reloadTLSConnections(newTLSConfig)
assert.Nil(err)

assert.True(s1.config.TLSConfig.Equals(newTLSConfig))

time.Sleep(10 * time.Second)
codec := rpcClient(t, s1)

node := mock.Node()
Expand All @@ -327,6 +326,7 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) {
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp)
assert.NotNil(err)
assert.Contains("rpc error: EOF", err.Error())
}

// Tests that the server will successfully reload its network connections,
Expand All @@ -343,6 +343,7 @@ func TestServer_Reload_TLSConnections_TLSToPlaintext_RPC(t *testing.T) {

dir := tmpDir(t)
defer os.RemoveAll(dir)

s1 := testServer(t, func(c *Config) {
c.DataDir = path.Join(dir, "nodeB")
c.TLSConfig = &config.TLSConfig{
Expand All @@ -362,8 +363,6 @@ func TestServer_Reload_TLSConnections_TLSToPlaintext_RPC(t *testing.T) {
assert.Nil(err)
assert.True(s1.config.TLSConfig.Equals(newTLSConfig))

time.Sleep(10 * time.Second)

codec := rpcClient(t, s1)

node := mock.Node()
Expand Down Expand Up @@ -391,6 +390,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
)
dir := tmpDir(t)
defer os.RemoveAll(dir)

s1 := testServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.DevMode = false
Expand Down Expand Up @@ -420,7 +420,6 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
t.Fatalf("should have 2 peers")
})

// the server should be connected to the rest of the cluster
testutil.WaitForLeader(t, s2.RPC)

{
Expand All @@ -439,6 +438,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
assert.Nil(err)
assert.NotEqual(0, resp.Index)

// Check for the job in the FSM of each server in the cluster
{
Expand All @@ -454,7 +454,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.Namespace, job.ID)
assert.Nil(err)
assert.NotNil(out) // TODO Occasionally is flaky
assert.NotNil(out)
assert.Equal(out.CreateIndex, resp.JobModifyIndex)
}
}
Expand All @@ -478,17 +478,19 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Region: "regionFoo",
Namespace: job.Namespace,
},
}

// TODO(CK) This occasionally is flaky
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
assert.NotNil(err)
assert.Contains("rpc error: EOF", err.Error())

// Check that the job was not persisted
state := s2.fsm.State()
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, _ := state.JobByID(ws, job.Namespace, job.ID)
assert.Nil(out)
Expand All @@ -507,12 +509,12 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
err = s2.reloadTLSConnections(secondNewTLSConfig)
assert.Nil(err)

// the server should be connected to the rest of the cluster
testutil.WaitForLeader(t, s2.RPC)

{
// assert that a job register request will succeed
codec := rpcClient(t, s2)

job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
Expand All @@ -526,14 +528,15 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) {
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
assert.Nil(err)
assert.NotEqual(0, resp.Index)

// Check for the job in the FSM of each server in the cluster
{
state := s2.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.Namespace, job.ID)
assert.Nil(err)
assert.NotNil(out)
assert.NotNil(out) // TODO(CK) This occasionally is flaky
assert.Equal(out.CreateIndex, resp.JobModifyIndex)
}
{
Expand Down
21 changes: 17 additions & 4 deletions vendor/github.com/hashicorp/raft/net_transport.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0805c41

Please sign in to comment.