From 35288b09361d6a4a4c4b520076eab3d7568861f3 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 15 Nov 2015 18:27:02 -0800 Subject: [PATCH] Update Nomad Client/Server RPC codecs to use custom msgpackHandle --- nomad/pool.go | 2 +- nomad/rpc.go | 24 +++++++++++++++++++++++- nomad/rpc_test.go | 16 ++++++++++++++++ nomad/status_endpoint_test.go | 14 -------------- 4 files changed, 40 insertions(+), 16 deletions(-) diff --git a/nomad/pool.go b/nomad/pool.go index 395cdea218b..22708ab0f27 100644 --- a/nomad/pool.go +++ b/nomad/pool.go @@ -71,7 +71,7 @@ func (c *Conn) getClient() (*StreamClient, error) { } // Create a client codec - codec := msgpackrpc.NewClientCodec(stream) + codec := NewClientCodec(stream) // Return a new stream client sc := &StreamClient{ diff --git a/nomad/rpc.go b/nomad/rpc.go index 21f9c9dc671..123c35028fd 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -6,10 +6,12 @@ import ( "io" "math/rand" "net" + "net/rpc" "strings" "time" "github.com/armon/go-metrics" + "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -51,6 +53,26 @@ const ( enqueueLimit = 30 * time.Second ) +var ( + // rpcHandle is the MsgpackHandle to be used by both Client and Server codecs. + rpcHandle = &codec.MsgpackHandle{ + // Enables proper encoding of strings within nil interfaces. + RawToString: true, + } +) + +// NewClientCodec returns a new rpc.ClientCodec to be used to make RPC calls to +// the Nomad Server. +func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec { + return msgpackrpc.NewCodecFromHandle(true, true, conn, rpcHandle) +} + +// NewServerCodec returns a new rpc.ServerCodec to be used by the Nomad Server +// to handle rpcs. +func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { + return msgpackrpc.NewCodecFromHandle(true, true, conn, rpcHandle) +} + // listen is used to listen for incoming RPC connections func (s *Server) listen() { for { @@ -139,7 +161,7 @@ func (s *Server) handleMultiplex(conn net.Conn) { // handleNomadConn is used to service a single Nomad RPC connection func (s *Server) handleNomadConn(conn net.Conn) { defer conn.Close() - rpcCodec := msgpackrpc.NewServerCodec(conn) + rpcCodec := NewServerCodec(conn) for { select { case <-s.shutdownCh: diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index a369b1ebf0c..3a4fda39492 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -1,11 +1,27 @@ package nomad import ( + "net" + "net/rpc" "testing" + "time" "github.com/hashicorp/nomad/testutil" ) +// rpcClient is a test helper method to return a ClientCodec to use to make rpc +// calls to the passed server. +func rpcClient(t *testing.T, s *Server) rpc.ClientCodec { + addr := s.config.RPCAddr + conn, err := net.DialTimeout("tcp", addr.String(), time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + // Write the Consul RPC byte to set the mode + conn.Write([]byte{byte(rpcNomad)}) + return NewClientCodec(conn) +} + func TestRPC_forwardLeader(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/status_endpoint_test.go b/nomad/status_endpoint_test.go index 1439bad5f17..ebbab2ead18 100644 --- a/nomad/status_endpoint_test.go +++ b/nomad/status_endpoint_test.go @@ -1,27 +1,13 @@ package nomad import ( - "net" - "net/rpc" "testing" - "time" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" ) -func rpcClient(t *testing.T, s *Server) rpc.ClientCodec { - addr := s.config.RPCAddr - conn, err := net.DialTimeout("tcp", addr.String(), time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - // Write the Consul RPC byte to set the mode - conn.Write([]byte{byte(rpcNomad)}) - return msgpackrpc.NewClientCodec(conn) -} - func TestStatusVersion(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown()