Skip to content

Commit

Permalink
Merge pull request #5784 from hashicorp/b-batch-node-dereg
Browse files Browse the repository at this point in the history
batch node deregistration
  • Loading branch information
langmartin authored Jul 10, 2019
2 parents acd762d + 17f4951 commit 8da0b91
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 105 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

IMPROVEMENTS:
* core: Removed deprecated upgrade path code pertaining to older versions of Nomad [[GH-5894](https://github.com/hashicorp/nomad/issues/5894)]
* core: Deregister nodes in batches rather than one at a time [[GH-5784](https://github.com/hashicorp/nomad/pull/5784)
* client: Improved task event display message to include kill time out [[GH-5943](https://github.com/hashicorp/nomad/issues/5943)]
* api: Used region from job hcl when not provided as query parameter in job registration and plan endpoints [[GH-5664](https://github.com/hashicorp/nomad/pull/5664)]
* api: Inferred content type of file in alloc filesystem stat endpoint [[GH-5907](https://github.com/hashicorp/nomad/issues/5907)]
Expand Down
29 changes: 29 additions & 0 deletions contributing/checklist-rpc-endpoint.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# New RPC Endpoint Checklist

Prefer adding a new message to changing any existing RPC messages.

## Code

* [ ] `Request` struct and `*RequestType` constant in
`nomad/structs/structs.go`. Append the constant, old constant
values must remain unchanged
* [ ] In `nomad/fsm.go`, add a dispatch case to the switch statement in `Apply`
* `*nomadFSM` method to decode the request and call the state method
* [ ] State method for modifying objects in a `Txn` in `nomad/state/state_store.go`
* `nomad/state/state_store_test.go`
* [ ] Handler for the request in `nomad/foo_endpoint.go`
* RPCs are resolved by matching the method name for bound structs
[net/rpc](https://golang.org/pkg/net/rpc/)
* Wrapper for the HTTP request in `command/agent/foo_endpoint.go`
* Backwards compatibility requires a new endpoint, an upgraded
client or server may be forwarding this request to an old server,
without support for the new RPC
* RPCs triggered by an internal process may not need support
* [ ] `nomad/core_sched.go` sends many RPCs
* `ServersMeetMinimumVersion` asserts that the server cluster is
upgraded, so use this to gaurd sending the new RPC, else send the old RPC
* Version must match the actual release version!

## Docs

* [ ] Changelog
34 changes: 29 additions & 5 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
Expand Down Expand Up @@ -482,19 +483,42 @@ OUTER:
return nil
}
c.logger.Debug("node GC found eligible nodes", "nodes", len(gcNode))
return c.nodeReap(eval, gcNode)
}

func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error {
// For old clusters, send single deregistration messages COMPAT(0.11)
minVersionBatchNodeDeregister := version.Must(version.NewVersion("0.9.4"))
if !ServersMeetMinimumVersion(c.srv.Members(), minVersionBatchNodeDeregister, true) {
for _, id := range nodeIDs {
req := structs.NodeDeregisterRequest{
NodeID: id,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
AuthToken: eval.LeaderACL,
},
}
var resp structs.NodeUpdateResponse
if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil {
c.logger.Error("node reap failed", "node_id", id, "error", err)
return err
}
}
return nil
}

// Call to the leader to issue the reap
for _, nodeID := range gcNode {
req := structs.NodeDeregisterRequest{
NodeID: nodeID,
for _, ids := range partitionAll(maxIdsPerReap, nodeIDs) {
req := structs.NodeBatchDeregisterRequest{
NodeIDs: ids,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
AuthToken: eval.LeaderACL,
},
}
var resp structs.NodeUpdateResponse
if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil {
c.logger.Error("node reap failed", "node_id", nodeID, "error", err)
if err := c.srv.RPC("Node.BatchDeregister", &req, &resp); err != nil {
c.logger.Error("node reap failed", "node_ids", ids, "error", err)
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/drainer/watch_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestNodeDrainWatcher_Remove_Nonexistent(t *testing.T) {
require.Equal(n, tracked[n.ID])

// Delete the node
require.Nil(state.DeleteNode(101, n.ID))
require.Nil(state.DeleteNode(101, []string{n.ID}))
testutil.WaitForResult(func() (bool, error) {
return len(m.events()) == 2, nil
}, func(err error) {
Expand Down
20 changes: 19 additions & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyBatchDrainUpdate(buf[1:], log.Index)
case structs.SchedulerConfigRequestType:
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
case structs.NodeBatchDeregisterRequestType:
return n.applyDeregisterNodeBatch(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -296,10 +298,26 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteNode(index, req.NodeID); err != nil {
if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}

return nil
}

func (n *nomadFSM) applyDeregisterNodeBatch(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_node"}, time.Now())
var req structs.NodeBatchDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteNode(index, req.NodeIDs); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ func TestFSM_DeregisterNode(t *testing.T) {
t.Fatalf("resp: %v", resp)
}

req2 := structs.NodeDeregisterRequest{
NodeID: node.ID,
req2 := structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node.ID},
}
buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2)
buf, err = structs.Encode(structs.NodeBatchDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
109 changes: 74 additions & 35 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,67 +253,106 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
}
defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now())

// Check node permissions
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
}

// deregister takes a batch
repack := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{args.NodeID},
WriteRequest: args.WriteRequest,
}

return n.deregister(repack, reply, func() (interface{}, uint64, error) {
return n.srv.raftApply(structs.NodeDeregisterRequestType, args)
})
}

// BatchDeregister is used to remove client nodes from the cluster.
func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error {
if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now())

if len(args.NodeIDs) == 0 {
return fmt.Errorf("missing node IDs for client deregistration")
}

return n.deregister(args, reply, func() (interface{}, uint64, error) {
return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args)
})
}

// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister
func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
reply *structs.NodeUpdateResponse,
raftApplyFn func() (interface{}, uint64, error),
) error {
// Check request permissions
if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
return structs.ErrPermissionDenied
}

// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
}
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}

ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
for _, nodeID := range args.NodeIDs {
node, err := snap.NodeByID(ws, nodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
}
}

// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args)
_, index, err := raftApplyFn()
if err != nil {
n.logger.Error("deregister failed", "error", err)
n.logger.Error("raft message failed", "error", err)
return err
}

// Clear the heartbeat timer if any
n.srv.clearHeartbeatTimer(args.NodeID)

// Create the evaluations for this node
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}
for _, nodeID := range args.NodeIDs {
// Clear the heartbeat timer if any
n.srv.clearHeartbeatTimer(nodeID)

// Determine if there are any Vault accessors on the node
accessors, err := snap.VaultAccessorsByNode(ws, args.NodeID)
if err != nil {
n.logger.Error("looking up accessors for node failed", "node_id", args.NodeID, "error", err)
return err
}
// Create the evaluations for this node
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}

if l := len(accessors); l != 0 {
n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", args.NodeID)
if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil {
n.logger.Error("revoking accessors for node failed", "node_id", args.NodeID, "error", err)
// Determine if there are any Vault accessors on the node
accessors, err := snap.VaultAccessorsByNode(ws, nodeID)
if err != nil {
n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err)
return err
}

if l := len(accessors); l != 0 {
n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", nodeID)
if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil {
n.logger.Error("revoking accessors for node failed", "node_id", nodeID, "error", err)
return err
}
}

reply.EvalIDs = append(reply.EvalIDs, evalIDs...)
// Set the reply eval create index just the first time
if reply.EvalCreateIndex == 0 {
reply.EvalCreateIndex = evalIndex
}
}

// Setup the reply
reply.EvalIDs = evalIDs
reply.EvalCreateIndex = evalIndex
reply.NodeModifyIndex = index
reply.Index = index
return nil
Expand Down
29 changes: 15 additions & 14 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ func TestClientEndpoint_Register_SecretMismatch(t *testing.T) {
}
}

func TestClientEndpoint_Deregister(t *testing.T) {
// Test the deprecated single node deregistration path
func TestClientEndpoint_DeregisterOne(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -269,18 +270,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) {
invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead))

// Deregister without any token and expect it to fail
dereg := &structs.NodeDeregisterRequest{
NodeID: node.ID,
dereg := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err == nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err == nil {
t.Fatalf("node de-register succeeded")
}

// Deregister with a valid token
dereg.AuthToken = validToken.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err != nil {
t.Fatalf("err: %v", err)
}

Expand All @@ -295,18 +296,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) {
}

// Deregister with an invalid token.
dereg1 := &structs.NodeDeregisterRequest{
NodeID: node1.ID,
dereg1 := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node1.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
dereg1.AuthToken = invalidToken.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err == nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err == nil {
t.Fatalf("rpc should not have succeeded")
}

// Try with a root token
dereg1.AuthToken = root.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err != nil {
t.Fatalf("err: %v", err)
}
}
Expand Down Expand Up @@ -344,12 +345,12 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) {
state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2})

// Deregister
dereg := &structs.NodeDeregisterRequest{
NodeID: node.ID,
dereg := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index == 0 {
Expand Down Expand Up @@ -1447,7 +1448,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) {

// Node delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteNode(400, node2.ID); err != nil {
if err := state.DeleteNode(400, []string{node2.ID}); err != nil {
t.Fatalf("err: %v", err)
}
})
Expand Down Expand Up @@ -2714,7 +2715,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {

// Node delete triggers watches.
time.AfterFunc(100*time.Millisecond, func() {
errCh <- state.DeleteNode(50, node.ID)
errCh <- state.DeleteNode(50, []string{node.ID})
})

req.MinQueryIndex = 45
Expand Down
Loading

0 comments on commit 8da0b91

Please sign in to comment.