From 0805c411963afb6b58349dc26d063801be53abe8 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Tue, 16 Jan 2018 14:16:35 -0500 Subject: [PATCH] fixing up raft reload tests close second goroutine in raft-net --- command/agent/command.go | 4 +- command/agent/http_test.go | 71 ------------------- nomad/server.go | 15 ++-- nomad/server_test.go | 23 +++--- .../hashicorp/raft/net_transport.go | 21 ++++-- 5 files changed, 37 insertions(+), 97 deletions(-) diff --git a/command/agent/command.go b/command/agent/command.go index cac8616c4a0..98c6fa3a5be 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -598,7 +598,7 @@ WAIT: } } -func (c *Command) reloadHTTPServer(newConfig *Config) error { +func (c *Command) reloadHTTPServer() error { c.agent.logger.Println("[INFO] agent: Reloading HTTP server with new TLS configuration") c.httpServer.Shutdown() @@ -676,7 +676,7 @@ func (c *Command) handleReload() { // server to a TLS connection could succeed, while reloading the server's rpc // connections could fail. if shouldReloadHTTPServer { - err := c.reloadHTTPServer(newConf) + err := c.reloadHTTPServer() if err != nil { c.agent.logger.Printf("[ERR] http: failed to reload the config: %v", err) return diff --git a/command/agent/http_test.go b/command/agent/http_test.go index b145d43571f..5d4004c18e1 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -673,74 +673,3 @@ func encodeReq(obj interface{}) io.ReadCloser { enc.Encode(obj) return ioutil.NopCloser(buf) } - -func TestHTTP_VerifyHTTPSClientUpgrade_AfterConfigReload(t *testing.T) { - t.Parallel() - assert := assert.New(t) - - const ( - cafile = "../../helper/tlsutil/testdata/ca.pem" - foocert = "../../helper/tlsutil/testdata/nomad-foo.pem" - fookey = "../../helper/tlsutil/testdata/nomad-foo-key.pem" - ) - - newConfig := &Config{ - TLSConfig: &config.TLSConfig{ - EnableHTTP: true, - VerifyHTTPSClient: true, - CAFile: cafile, - CertFile: foocert, - KeyFile: fookey, - }, - } - - s := makeHTTPServer(t, func(c *Config) { - c.TLSConfig = newConfig.TLSConfig - }) - defer s.Shutdown() - - // HTTP plaintext request should succeed - reqURL := fmt.Sprintf("http://%s/v1/agent/self", s.Agent.config.AdvertiseAddrs.HTTP) - - // First test with a plaintext request - transport := &http.Transport{} - client := &http.Client{Transport: transport} - _, err := http.NewRequest("GET", reqURL, nil) - assert.Nil(err) - - // Next, reload the TLS configuration - err = s.Agent.Reload(newConfig) - assert.Nil(err) - - // PASS: Requests that specify a valid hostname, CA cert, and client - // certificate succeed. - tlsConf := &tls.Config{ - ServerName: "client.regionFoo.nomad", - RootCAs: x509.NewCertPool(), - GetClientCertificate: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { - c, err := tls.LoadX509KeyPair(foocert, fookey) - if err != nil { - return nil, err - } - return &c, nil - }, - } - - // HTTPS request should succeed - httpsReqURL := fmt.Sprintf("https://%s/v1/agent/self", s.Agent.config.AdvertiseAddrs.HTTP) - - cacertBytes, err := ioutil.ReadFile(cafile) - assert.Nil(err) - tlsConf.RootCAs.AppendCertsFromPEM(cacertBytes) - - transport = &http.Transport{TLSClientConfig: tlsConf} - client = &http.Client{Transport: transport} - req, err := http.NewRequest("GET", httpsReqURL, nil) - assert.Nil(err) - - resp, err := client.Do(req) - assert.Nil(err) - - resp.Body.Close() - assert.Equal(resp.StatusCode, 200) -} diff --git a/nomad/server.go b/nomad/server.go index a2523caf169..0b3c1fb1274 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -400,8 +400,6 @@ func getTLSConf(enableRPC bool, tlsConf *tlsutil.Config) (*tls.Config, tlsutil.R func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error { s.logger.Printf("[INFO] nomad: reloading server connections due to configuration changes") - // the server config must be in sync with the latest config changes, due to - // testing for TLS configuration settings in rpc.go tlsConf := tlsutil.NewTLSConfiguration(newTLSConfig) incomingTLS, tlsWrap, err := getTLSConf(newTLSConfig.EnableRPC, tlsConf) if err != nil { @@ -430,10 +428,7 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error { // reinitialize our rpc listener s.rpcListener.Close() <-s.listenerCh - - // Close existing Raft connections - s.raftTransport.Pause() - s.raftLayer.Close() + s.startRPCListener() listener, err := s.createRPCListener() if err != nil { @@ -441,14 +436,14 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error { return err } - // Close existing streams + // Close and reload existing Raft connections + s.raftTransport.Pause() + s.raftLayer.Close() wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap) s.raftLayer = NewRaftLayer(s.rpcAdvertise, wrapper) - - // Reload raft connections s.raftTransport.Reload(s.raftLayer) - s.startRPCListener() + time.Sleep(3 * time.Second) s.logger.Printf("[DEBUG] nomad: finished reloading server connections") return nil diff --git a/nomad/server_test.go b/nomad/server_test.go index e7d063b97a4..5de6b8c6b7d 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -293,6 +293,7 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) { ) dir := tmpDir(t) defer os.RemoveAll(dir) + s1 := testServer(t, func(c *Config) { c.DataDir = path.Join(dir, "nodeA") }) @@ -312,10 +313,8 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) { err := s1.reloadTLSConnections(newTLSConfig) assert.Nil(err) - assert.True(s1.config.TLSConfig.Equals(newTLSConfig)) - time.Sleep(10 * time.Second) codec := rpcClient(t, s1) node := mock.Node() @@ -327,6 +326,7 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) { var resp structs.GenericResponse err = msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp) assert.NotNil(err) + assert.Contains("rpc error: EOF", err.Error()) } // Tests that the server will successfully reload its network connections, @@ -343,6 +343,7 @@ func TestServer_Reload_TLSConnections_TLSToPlaintext_RPC(t *testing.T) { dir := tmpDir(t) defer os.RemoveAll(dir) + s1 := testServer(t, func(c *Config) { c.DataDir = path.Join(dir, "nodeB") c.TLSConfig = &config.TLSConfig{ @@ -362,8 +363,6 @@ func TestServer_Reload_TLSConnections_TLSToPlaintext_RPC(t *testing.T) { assert.Nil(err) assert.True(s1.config.TLSConfig.Equals(newTLSConfig)) - time.Sleep(10 * time.Second) - codec := rpcClient(t, s1) node := mock.Node() @@ -391,6 +390,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { ) dir := tmpDir(t) defer os.RemoveAll(dir) + s1 := testServer(t, func(c *Config) { c.BootstrapExpect = 2 c.DevMode = false @@ -420,7 +420,6 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { t.Fatalf("should have 2 peers") }) - // the server should be connected to the rest of the cluster testutil.WaitForLeader(t, s2.RPC) { @@ -439,6 +438,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { var resp structs.JobRegisterResponse err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) assert.Nil(err) + assert.NotEqual(0, resp.Index) // Check for the job in the FSM of each server in the cluster { @@ -454,7 +454,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { ws := memdb.NewWatchSet() out, err := state.JobByID(ws, job.Namespace, job.ID) assert.Nil(err) - assert.NotNil(out) // TODO Occasionally is flaky + assert.NotNil(out) assert.Equal(out.CreateIndex, resp.JobModifyIndex) } } @@ -478,17 +478,19 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { req := &structs.JobRegisterRequest{ Job: job, WriteRequest: structs.WriteRequest{ - Region: "global", + Region: "regionFoo", Namespace: job.Namespace, }, } + // TODO(CK) This occasionally is flaky var resp structs.JobRegisterResponse err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) assert.NotNil(err) + assert.Contains("rpc error: EOF", err.Error()) // Check that the job was not persisted - state := s2.fsm.State() + state := s1.fsm.State() ws := memdb.NewWatchSet() out, _ := state.JobByID(ws, job.Namespace, job.ID) assert.Nil(out) @@ -507,12 +509,12 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { err = s2.reloadTLSConnections(secondNewTLSConfig) assert.Nil(err) - // the server should be connected to the rest of the cluster testutil.WaitForLeader(t, s2.RPC) { // assert that a job register request will succeed codec := rpcClient(t, s2) + job := mock.Job() req := &structs.JobRegisterRequest{ Job: job, @@ -526,6 +528,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { var resp structs.JobRegisterResponse err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) assert.Nil(err) + assert.NotEqual(0, resp.Index) // Check for the job in the FSM of each server in the cluster { @@ -533,7 +536,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { ws := memdb.NewWatchSet() out, err := state.JobByID(ws, job.Namespace, job.ID) assert.Nil(err) - assert.NotNil(out) + assert.NotNil(out) // TODO(CK) This occasionally is flaky assert.Equal(out.CreateIndex, resp.JobModifyIndex) } { diff --git a/vendor/github.com/hashicorp/raft/net_transport.go b/vendor/github.com/hashicorp/raft/net_transport.go index a2ab9def108..b45d03e2402 100644 --- a/vendor/github.com/hashicorp/raft/net_transport.go +++ b/vendor/github.com/hashicorp/raft/net_transport.go @@ -161,13 +161,20 @@ func NewNetworkTransportWithLogger( return trans } -// Pause closes the current stream for a NetworkTransport instance +// Pause closes the current stream and existing connections for a +// NetworkTransport instance func (n *NetworkTransport) Pause() { + for _, e := range n.connPool { + for _, conn := range e { + conn.Release() + } + } n.streamCancel() } // Pause creates a new stream for a NetworkTransport instance func (n *NetworkTransport) Reload(s StreamLayer) { + n.stream = s ctx, cancel := context.WithCancel(context.Background()) n.streamCancel = cancel go n.listen(ctx) @@ -389,19 +396,18 @@ func (n *NetworkTransport) listen(ctx context.Context) { if n.IsShutdown() { return } - // TODO Getting an error here n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err) continue } n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) // Handle the connection in dedicated routine - go n.handleConn(conn) + go n.handleConn(ctx, conn) } } // handleConn is used to handle an inbound connection for its lifespan. -func (n *NetworkTransport) handleConn(conn net.Conn) { +func (n *NetworkTransport) handleConn(ctx context.Context, conn net.Conn) { defer conn.Close() r := bufio.NewReader(conn) w := bufio.NewWriter(conn) @@ -409,6 +415,13 @@ func (n *NetworkTransport) handleConn(conn net.Conn) { enc := codec.NewEncoder(w, &codec.MsgpackHandle{}) for { + select { + case <-ctx.Done(): + n.logger.Println("[INFO] raft-net: stream layer is closed for handleConn") + return + default: + } + if err := n.handleCommand(r, dec, enc); err != nil { if err != io.EOF { n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err)