From 1dab7b55e93eaad973aed90963b59604d35455e0 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Tue, 16 Jan 2018 14:16:35 -0500 Subject: [PATCH] fixing up raft reload tests close second goroutine in raft-net --- nomad/server.go | 13 ++++------- nomad/server_test.go | 23 +++++++++++-------- .../hashicorp/raft/net_transport.go | 21 +++++++++++++---- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/nomad/server.go b/nomad/server.go index a2523caf169..5762d49b601 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -430,10 +430,7 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error { // reinitialize our rpc listener s.rpcListener.Close() <-s.listenerCh - - // Close existing Raft connections - s.raftTransport.Pause() - s.raftLayer.Close() + s.startRPCListener() listener, err := s.createRPCListener() if err != nil { @@ -441,14 +438,14 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error { return err } - // Close existing streams + // Close and reload existing Raft connections + s.raftTransport.Pause() + s.raftLayer.Close() wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap) s.raftLayer = NewRaftLayer(s.rpcAdvertise, wrapper) - - // Reload raft connections s.raftTransport.Reload(s.raftLayer) - s.startRPCListener() + time.Sleep(3 * time.Second) s.logger.Printf("[DEBUG] nomad: finished reloading server connections") return nil diff --git a/nomad/server_test.go b/nomad/server_test.go index e7d063b97a4..5de6b8c6b7d 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -293,6 +293,7 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) { ) dir := tmpDir(t) defer os.RemoveAll(dir) + s1 := testServer(t, func(c *Config) { c.DataDir = path.Join(dir, "nodeA") }) @@ -312,10 +313,8 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) { err := s1.reloadTLSConnections(newTLSConfig) assert.Nil(err) - assert.True(s1.config.TLSConfig.Equals(newTLSConfig)) - time.Sleep(10 * time.Second) codec := rpcClient(t, s1) node := mock.Node() @@ -327,6 +326,7 @@ func TestServer_Reload_TLSConnections_PlaintextToTLS(t *testing.T) { var resp structs.GenericResponse err = msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp) assert.NotNil(err) + assert.Contains("rpc error: EOF", err.Error()) } // Tests that the server will successfully reload its network connections, @@ -343,6 +343,7 @@ func TestServer_Reload_TLSConnections_TLSToPlaintext_RPC(t *testing.T) { dir := tmpDir(t) defer os.RemoveAll(dir) + s1 := testServer(t, func(c *Config) { c.DataDir = path.Join(dir, "nodeB") c.TLSConfig = &config.TLSConfig{ @@ -362,8 +363,6 @@ func TestServer_Reload_TLSConnections_TLSToPlaintext_RPC(t *testing.T) { assert.Nil(err) assert.True(s1.config.TLSConfig.Equals(newTLSConfig)) - time.Sleep(10 * time.Second) - codec := rpcClient(t, s1) node := mock.Node() @@ -391,6 +390,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { ) dir := tmpDir(t) defer os.RemoveAll(dir) + s1 := testServer(t, func(c *Config) { c.BootstrapExpect = 2 c.DevMode = false @@ -420,7 +420,6 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { t.Fatalf("should have 2 peers") }) - // the server should be connected to the rest of the cluster testutil.WaitForLeader(t, s2.RPC) { @@ -439,6 +438,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { var resp structs.JobRegisterResponse err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) assert.Nil(err) + assert.NotEqual(0, resp.Index) // Check for the job in the FSM of each server in the cluster { @@ -454,7 +454,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { ws := memdb.NewWatchSet() out, err := state.JobByID(ws, job.Namespace, job.ID) assert.Nil(err) - assert.NotNil(out) // TODO Occasionally is flaky + assert.NotNil(out) assert.Equal(out.CreateIndex, resp.JobModifyIndex) } } @@ -478,17 +478,19 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { req := &structs.JobRegisterRequest{ Job: job, WriteRequest: structs.WriteRequest{ - Region: "global", + Region: "regionFoo", Namespace: job.Namespace, }, } + // TODO(CK) This occasionally is flaky var resp structs.JobRegisterResponse err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) assert.NotNil(err) + assert.Contains("rpc error: EOF", err.Error()) // Check that the job was not persisted - state := s2.fsm.State() + state := s1.fsm.State() ws := memdb.NewWatchSet() out, _ := state.JobByID(ws, job.Namespace, job.ID) assert.Nil(out) @@ -507,12 +509,12 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { err = s2.reloadTLSConnections(secondNewTLSConfig) assert.Nil(err) - // the server should be connected to the rest of the cluster testutil.WaitForLeader(t, s2.RPC) { // assert that a job register request will succeed codec := rpcClient(t, s2) + job := mock.Job() req := &structs.JobRegisterRequest{ Job: job, @@ -526,6 +528,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { var resp structs.JobRegisterResponse err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) assert.Nil(err) + assert.NotEqual(0, resp.Index) // Check for the job in the FSM of each server in the cluster { @@ -533,7 +536,7 @@ func TestServer_Reload_TLSConnections_Raft(t *testing.T) { ws := memdb.NewWatchSet() out, err := state.JobByID(ws, job.Namespace, job.ID) assert.Nil(err) - assert.NotNil(out) + assert.NotNil(out) // TODO(CK) This occasionally is flaky assert.Equal(out.CreateIndex, resp.JobModifyIndex) } { diff --git a/vendor/github.com/hashicorp/raft/net_transport.go b/vendor/github.com/hashicorp/raft/net_transport.go index a2ab9def108..b45d03e2402 100644 --- a/vendor/github.com/hashicorp/raft/net_transport.go +++ b/vendor/github.com/hashicorp/raft/net_transport.go @@ -161,13 +161,20 @@ func NewNetworkTransportWithLogger( return trans } -// Pause closes the current stream for a NetworkTransport instance +// Pause closes the current stream and existing connections for a +// NetworkTransport instance func (n *NetworkTransport) Pause() { + for _, e := range n.connPool { + for _, conn := range e { + conn.Release() + } + } n.streamCancel() } // Pause creates a new stream for a NetworkTransport instance func (n *NetworkTransport) Reload(s StreamLayer) { + n.stream = s ctx, cancel := context.WithCancel(context.Background()) n.streamCancel = cancel go n.listen(ctx) @@ -389,19 +396,18 @@ func (n *NetworkTransport) listen(ctx context.Context) { if n.IsShutdown() { return } - // TODO Getting an error here n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err) continue } n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) // Handle the connection in dedicated routine - go n.handleConn(conn) + go n.handleConn(ctx, conn) } } // handleConn is used to handle an inbound connection for its lifespan. -func (n *NetworkTransport) handleConn(conn net.Conn) { +func (n *NetworkTransport) handleConn(ctx context.Context, conn net.Conn) { defer conn.Close() r := bufio.NewReader(conn) w := bufio.NewWriter(conn) @@ -409,6 +415,13 @@ func (n *NetworkTransport) handleConn(conn net.Conn) { enc := codec.NewEncoder(w, &codec.MsgpackHandle{}) for { + select { + case <-ctx.Done(): + n.logger.Println("[INFO] raft-net: stream layer is closed for handleConn") + return + default: + } + if err := n.handleCommand(r, dec, enc); err != nil { if err != io.EOF { n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err)