Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic RPC Server and Node connection map #3721

Merged
merged 5 commits into from
Jan 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion nomad/endpoints_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

package nomad

import "net/rpc"

// EnterpriseEndpoints holds the set of enterprise only endpoints to register
type EnterpriseEndpoints struct{}

Expand All @@ -12,4 +14,4 @@ func NewEnterpriseEndpoints(s *Server) *EnterpriseEndpoints {
}

// Register is a no-op in oss.
func (e *EnterpriseEndpoints) Register(s *Server) {}
func (e *EnterpriseEndpoints) Register(s *rpc.Server) {}
2 changes: 1 addition & 1 deletion nomad/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *Server) invalidateHeartbeat(id string) {
},
}
var resp structs.NodeUpdateResponse
if err := s.endpoints.Node.UpdateStatus(&req, &resp); err != nil {
if err := s.staticEndpoints.Node.UpdateStatus(&req, &resp); err != nil {
s.logger.Printf("[ERR] nomad.heartbeat: update status failed: %v", err)
}
}
Expand Down
24 changes: 24 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
type Node struct {
srv *Server

// ctx provides context regarding the underlying connection
ctx *RPCContext

// updates holds pending client status updates for allocations
updates []*structs.Allocation

Expand Down Expand Up @@ -114,6 +117,13 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
}
}

// We have a valid node connection, so add the mapping to cache the
// connection and allow the server to send RPCs to the client.
if n.ctx != nil && n.ctx.NodeID == "" {
n.ctx.NodeID = args.Node.ID
n.srv.addNodeConn(n.ctx)
}

// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args)
if err != nil {
Expand Down Expand Up @@ -305,6 +315,13 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
return fmt.Errorf("node not found")
}

// We have a valid node connection, so add the mapping to cache the
// connection and allow the server to send RPCs to the client.
if n.ctx != nil && n.ctx.NodeID == "" {
n.ctx.NodeID = args.NodeID
n.srv.addNodeConn(n.ctx)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why only a few endpoints attempt to add a valid connection like this? for e.g why not do the same thing in GetNode?

Copy link
Contributor Author

@dadgar dadgar Jan 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am only adding it to RPCs that are only ever called by a Nomad node. The mapping should only map connections to clients. So that is why I only added to a few.

// XXX: Could use the SecretID here but have to update the heartbeat system
// to track SecretIDs.

Expand Down Expand Up @@ -724,6 +741,13 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
return fmt.Errorf("node secret ID does not match")
}

// We have a valid node connection, so add the mapping to cache the
// connection and allow the server to send RPCs to the client.
if n.ctx != nil && n.ctx.NodeID == "" {
n.ctx.NodeID = args.NodeID
n.srv.addNodeConn(n.ctx)
}

var err error
allocs, err = state.AllocsByNode(ws, args.NodeID)
if err != nil {
Expand Down
74 changes: 59 additions & 15 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ import (
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClientEndpoint_Register(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Check that we have no client connections
require.Empty(s1.connectedNodes())

// Create the register request
node := mock.Node()
req := &structs.NodeRegisterRequest{
Expand All @@ -41,6 +46,11 @@ func TestClientEndpoint_Register(t *testing.T) {
t.Fatalf("bad index: %d", resp.Index)
}

// Check that we have the client connections
nodes := s1.connectedNodes()
require.Len(nodes, 1)
require.Equal(node.ID, nodes[0])

// Check for the node in the FSM
state := s1.fsm.State()
ws := memdb.NewWatchSet()
Expand All @@ -57,6 +67,15 @@ func TestClientEndpoint_Register(t *testing.T) {
if out.ComputedClass == "" {
t.Fatal("ComputedClass not set")
}

// Close the connection and check that we remove the client connections
require.Nil(codec.Close())
testutil.WaitForResult(func() (bool, error) {
nodes := s1.connectedNodes()
return len(nodes) == 0, nil
}, func(err error) {
t.Fatalf("should have no clients")
})
}

func TestClientEndpoint_Register_SecretMismatch(t *testing.T) {
Expand Down Expand Up @@ -260,11 +279,15 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) {

func TestClientEndpoint_UpdateStatus(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Check that we have no client connections
require.Empty(s1.connectedNodes())

// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Expand Down Expand Up @@ -304,6 +327,11 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) {
t.Fatalf("bad: %#v", ttl)
}

// Check that we have the client connections
nodes := s1.connectedNodes()
require.Len(nodes, 1)
require.Equal(node.ID, nodes[0])

// Check for the node in the FSM
state := s1.fsm.State()
ws := memdb.NewWatchSet()
Expand All @@ -317,6 +345,15 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) {
if out.ModifyIndex != resp2.Index {
t.Fatalf("index mis-match")
}

// Close the connection and check that we remove the client connections
require.Nil(codec.Close())
testutil.WaitForResult(func() (bool, error) {
nodes := s1.connectedNodes()
return len(nodes) == 0, nil
}, func(err error) {
t.Fatalf("should have no clients")
})
}

func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
Expand Down Expand Up @@ -1230,30 +1267,23 @@ func TestClientEndpoint_GetAllocs_ACL_Basic(t *testing.T) {

func TestClientEndpoint_GetClientAllocs(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Check that we have no client connections
require.Empty(s1.connectedNodes())

// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
node.CreateIndex = resp.Index
node.ModifyIndex = resp.Index
state := s1.fsm.State()
require.Nil(state.UpsertNode(98, node))

// Inject fake evaluations
alloc := mock.Alloc()
alloc.NodeID = node.ID
state := s1.fsm.State()
state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
err := state.UpsertAllocs(100, []*structs.Allocation{alloc})
if err != nil {
Expand All @@ -1278,6 +1308,11 @@ func TestClientEndpoint_GetClientAllocs(t *testing.T) {
t.Fatalf("bad: %#v", resp2.Allocs)
}

// Check that we have the client connections
nodes := s1.connectedNodes()
require.Len(nodes, 1)
require.Equal(node.ID, nodes[0])

// Lookup node with bad SecretID
get.SecretID = "foobarbaz"
var resp3 structs.NodeClientAllocsResponse
Expand All @@ -1298,6 +1333,15 @@ func TestClientEndpoint_GetClientAllocs(t *testing.T) {
if len(resp4.Allocs) != 0 {
t.Fatalf("unexpected node %#v", resp3.Allocs)
}

// Close the connection and check that we remove the client connections
require.Nil(codec.Close())
testutil.WaitForResult(func() (bool, error) {
nodes := s1.connectedNodes()
return len(nodes) == 0, nil
}, func(err error) {
t.Fatalf("should have no clients")
})
}

func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
Expand Down Expand Up @@ -1746,7 +1790,7 @@ func TestClientEndpoint_BatchUpdate(t *testing.T) {

// Call to do the batch update
bf := NewBatchFuture()
endpoint := s1.endpoints.Node
endpoint := s1.staticEndpoints.Node
endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc})
if err := bf.Wait(); err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -1864,7 +1908,7 @@ func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
}

// Create some evaluations
ids, index, err := s1.endpoints.Node.createNodeEvals(alloc.NodeID, 1)
ids, index, err := s1.staticEndpoints.Node.createNodeEvals(alloc.NodeID, 1)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
Loading