Skip to content

Commit

Permalink
Merge pull request #420 from hashicorp/f-rpc-codecs
Browse files Browse the repository at this point in the history
Update Nomad Client/Server RPC codecs to use custom msgpackHandle
  • Loading branch information
dadgar committed Nov 16, 2015
2 parents c4eabe2 + 35288b0 commit c4a2a9c
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 16 deletions.
2 changes: 1 addition & 1 deletion nomad/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *Conn) getClient() (*StreamClient, error) {
}

// Create a client codec
codec := msgpackrpc.NewClientCodec(stream)
codec := NewClientCodec(stream)

// Return a new stream client
sc := &StreamClient{
Expand Down
24 changes: 23 additions & 1 deletion nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"io"
"math/rand"
"net"
"net/rpc"
"strings"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -51,6 +53,26 @@ const (
enqueueLimit = 30 * time.Second
)

var (
// rpcHandle is the MsgpackHandle to be used by both Client and Server codecs.
rpcHandle = &codec.MsgpackHandle{
// Enables proper encoding of strings within nil interfaces.
RawToString: true,
}
)

// NewClientCodec returns a new rpc.ClientCodec to be used to make RPC calls to
// the Nomad Server.
func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec {
return msgpackrpc.NewCodecFromHandle(true, true, conn, rpcHandle)
}

// NewServerCodec returns a new rpc.ServerCodec to be used by the Nomad Server
// to handle rpcs.
func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
return msgpackrpc.NewCodecFromHandle(true, true, conn, rpcHandle)
}

// listen is used to listen for incoming RPC connections
func (s *Server) listen() {
for {
Expand Down Expand Up @@ -139,7 +161,7 @@ func (s *Server) handleMultiplex(conn net.Conn) {
// handleNomadConn is used to service a single Nomad RPC connection
func (s *Server) handleNomadConn(conn net.Conn) {
defer conn.Close()
rpcCodec := msgpackrpc.NewServerCodec(conn)
rpcCodec := NewServerCodec(conn)
for {
select {
case <-s.shutdownCh:
Expand Down
16 changes: 16 additions & 0 deletions nomad/rpc_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
package nomad

import (
"net"
"net/rpc"
"testing"
"time"

"github.com/hashicorp/nomad/testutil"
)

// rpcClient is a test helper method to return a ClientCodec to use to make rpc
// calls to the passed server.
func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
addr := s.config.RPCAddr
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
// Write the Consul RPC byte to set the mode
conn.Write([]byte{byte(rpcNomad)})
return NewClientCodec(conn)
}

func TestRPC_forwardLeader(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down
14 changes: 0 additions & 14 deletions nomad/status_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
package nomad

import (
"net"
"net/rpc"
"testing"
"time"

"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)

func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
addr := s.config.RPCAddr
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
// Write the Consul RPC byte to set the mode
conn.Write([]byte{byte(rpcNomad)})
return msgpackrpc.NewClientCodec(conn)
}

func TestStatusVersion(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down

0 comments on commit c4a2a9c

Please sign in to comment.