From a6604f8926aa1608be116f17de8549b5595e6ba3 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 12 Jul 2019 14:32:52 +0800 Subject: [PATCH 1/2] rpc: add positive tests for server streaming RPC --- nomad/rpc_test.go | 132 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index a9413e525b7..5a2a6fb8406 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -10,8 +10,10 @@ import ( "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" @@ -20,6 +22,7 @@ import ( "github.com/hashicorp/yamux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/ugorji/go/codec" ) // rpcClient is a test helper method to return a ClientCodec to use to make rpc @@ -267,6 +270,135 @@ func TestRPC_streamingRpcConn_badMethod_TLS(t *testing.T) { require.True(structs.IsErrUnknownMethod(err)) } +func TestRPC_streamingRpcConn_goodMethod_Plaintext(t *testing.T) { + t.Parallel() + require := require.New(t) + dir := tmpDir(t) + defer os.RemoveAll(dir) + s1 := TestServer(t, func(c *Config) { + c.Region = "regionFoo" + c.BootstrapExpect = 2 + c.DevMode = false + c.DevDisableBootstrap = true + c.DataDir = path.Join(dir, "node1") + }) + defer s1.Shutdown() + + s2 := TestServer(t, func(c *Config) { + c.Region = "regionFoo" + c.BootstrapExpect = 2 + c.DevMode = false + c.DevDisableBootstrap = true + c.DataDir = path.Join(dir, "node2") + }) + defer s2.Shutdown() + + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + + s1.peerLock.RLock() + ok, parts := isNomadServer(s2.LocalMember()) + require.True(ok) + server := s1.localPeers[raft.ServerAddress(parts.Addr.String())] + require.NotNil(server) + s1.peerLock.RUnlock() + + conn, err := s1.streamingRpc(server, "FileSystem.Logs") + require.NotNil(conn) + require.NoError(err) + + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + allocID := uuid.Generate() + require.NoError(encoder.Encode(cstructs.FsStreamRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Region: "regionFoo", + }, + })) + + var result cstructs.StreamErrWrapper + require.NoError(decoder.Decode(&result)) + require.Empty(result.Payload) + require.True(structs.IsErrUnknownAllocation(result.Error)) +} + +func TestRPC_streamingRpcConn_goodMethod_TLS(t *testing.T) { + t.Parallel() + require := require.New(t) + const ( + cafile = "../helper/tlsutil/testdata/ca.pem" + foocert = "../helper/tlsutil/testdata/nomad-foo.pem" + fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" + ) + dir := tmpDir(t) + defer os.RemoveAll(dir) + s1 := TestServer(t, func(c *Config) { + c.Region = "regionFoo" + c.BootstrapExpect = 2 + c.DevMode = false + c.DevDisableBootstrap = true + c.DataDir = path.Join(dir, "node1") + c.TLSConfig = &config.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + } + }) + defer s1.Shutdown() + + s2 := TestServer(t, func(c *Config) { + c.Region = "regionFoo" + c.BootstrapExpect = 2 + c.DevMode = false + c.DevDisableBootstrap = true + c.DataDir = path.Join(dir, "node2") + c.TLSConfig = &config.TLSConfig{ + EnableHTTP: true, + EnableRPC: true, + VerifyServerHostname: true, + CAFile: cafile, + CertFile: foocert, + KeyFile: fookey, + } + }) + defer s2.Shutdown() + + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + + s1.peerLock.RLock() + ok, parts := isNomadServer(s2.LocalMember()) + require.True(ok) + server := s1.localPeers[raft.ServerAddress(parts.Addr.String())] + require.NotNil(server) + s1.peerLock.RUnlock() + + conn, err := s1.streamingRpc(server, "FileSystem.Logs") + require.NotNil(conn) + require.NoError(err) + + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + allocID := uuid.Generate() + require.NoError(encoder.Encode(cstructs.FsStreamRequest{ + AllocID: allocID, + QueryOptions: structs.QueryOptions{ + Region: "regionFoo", + }, + })) + + var result cstructs.StreamErrWrapper + require.NoError(decoder.Decode(&result)) + require.Empty(result.Payload) + require.True(structs.IsErrUnknownAllocation(result.Error)) +} + // COMPAT: Remove in 0.10 // This is a very low level test to assert that the V2 handling works. It is // making manual RPC calls since no helpers exist at this point since we are From b0d98d17b5e5ac9ae15d186fd28b013300a8ae89 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 12 Jul 2019 14:41:44 +0800 Subject: [PATCH 2/2] rpc: use tls wrapped connection for streaming rpc This ensures that server-to-server streaming RPC calls use the tls wrapped connections. Prior to this, `streamingRpcImpl` function uses tls for setting header and invoking the rpc method, but returns unwrapped tls connection. Thus, streaming writes fail with tls errors. This tls streaming bug existed since 0.8.0[1], but PR #5654[2] exacerbated it in 0.9.2. Prior to PR #5654, nomad client used to shuffle servers at every heartbeat -- `servers.Manager.setServers`[3] always shuffled servers and was called by heartbeat code[4]. Shuffling servers meant that a nomad client would heartbeat and establish a connection against all nomad servers eventually. When handling streaming RPC calls, nomad servers used these local connection to communicate directly to the client. The server-to-server forwarding logic was left mostly unexercised. PR #5654 means that a nomad client may connect to a single server only and caused the server-to-server forward streaming RPC code to get exercised more and unearthed the problem. [1] https://github.com/hashicorp/nomad/blob/v0.8.0/nomad/rpc.go#L501-L515 [2] https://github.com/hashicorp/nomad/pull/5654 [3] https://github.com/hashicorp/nomad/blob/v0.9.1/client/servers/manager.go#L198-L216 [4] https://github.com/hashicorp/nomad/blob/v0.9.1/client/client.go#L1603 --- nomad/rpc.go | 22 +++++++++------------- nomad/rpc_test.go | 2 +- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index ed662762a94..e173b02e1ae 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -540,18 +540,14 @@ func (r *rpcHandler) streamingRpc(server *serverParts, method string) (net.Conn, tcp.SetNoDelay(true) } - if err := r.streamingRpcImpl(conn, server.Region, method); err != nil { - return nil, err - } - - return conn, nil + return r.streamingRpcImpl(conn, server.Region, method) } // streamingRpcImpl takes a pre-established connection to a server and conducts // the handshake to establish a streaming RPC for the given method. If an error // is returned, the underlying connection has been closed. Otherwise it is // assumed that the connection has been hijacked by the RPC method. -func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) error { +func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) (net.Conn, error) { // Check if TLS is enabled r.tlsWrapLock.RLock() tlsWrap := r.tlsWrap @@ -561,14 +557,14 @@ func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) erro // Switch the connection into TLS mode if _, err := conn.Write([]byte{byte(pool.RpcTLS)}); err != nil { conn.Close() - return err + return nil, err } // Wrap the connection in a TLS client tlsConn, err := tlsWrap(region, conn) if err != nil { conn.Close() - return err + return nil, err } conn = tlsConn } @@ -576,7 +572,7 @@ func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) erro // Write the multiplex byte to set the mode if _, err := conn.Write([]byte{byte(pool.RpcStreaming)}); err != nil { conn.Close() - return err + return nil, err } // Send the header @@ -587,22 +583,22 @@ func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) erro } if err := encoder.Encode(header); err != nil { conn.Close() - return err + return nil, err } // Wait for the acknowledgement var ack structs.StreamingRpcAck if err := decoder.Decode(&ack); err != nil { conn.Close() - return err + return nil, err } if ack.Error != "" { conn.Close() - return errors.New(ack.Error) + return nil, errors.New(ack.Error) } - return nil + return conn, nil } // raftApplyFuture is used to encode a message, run it through raft, and return the Raft future. diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index 5a2a6fb8406..81354aea93d 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -453,7 +453,7 @@ func TestRPC_handleMultiplexV2(t *testing.T) { require.NotEmpty(l) // Make a streaming RPC - err = s.streamingRpcImpl(s2, s.Region(), "Bogus") + _, err = s.streamingRpcImpl(s2, s.Region(), "Bogus") require.NotNil(err) require.Contains(err.Error(), "Bogus") require.True(structs.IsErrUnknownMethod(err))