Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch node deregistration #5784

Merged
merged 34 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4771de6
struct NodeDeregisterRequest has a batch of NodeIDs
langmartin Jun 3, 2019
4720306
state_store DeleteNode operates on a batch of ids
langmartin Jun 3, 2019
8516911
util partitionAll for paging
langmartin Jun 3, 2019
fbc7fb8
core_sched batch node deregistration requests
langmartin Jun 5, 2019
f685b00
fsm NodeDeregisterRequest is now a batch
langmartin Jun 5, 2019
64fd8b3
node_endpoint deregister the batch of nodes
langmartin Jun 5, 2019
94cd86f
drainer watch_nodes_test batch of 1
langmartin Jun 5, 2019
0dd5335
state_store improve error messages
langmartin Jun 5, 2019
fbd4c25
node_endpoint improve error messages
langmartin Jun 5, 2019
5810a09
state_store just one index update, test deletion
langmartin Jun 5, 2019
3c97bdc
core_sched check ServersMeetMinimumVersion, send old node deregister
langmartin Jun 6, 2019
7258fab
structs add back NodeDeregisterRequest.NodeID, compatibility
langmartin Jun 6, 2019
97a76af
fsm honor new and old style NodeDeregisterRequests
langmartin Jun 6, 2019
aa5193b
core_sched check ServersMeetMinimumVersion
langmartin Jun 7, 2019
48c1859
util simplify partitionAll
langmartin Jun 7, 2019
d55e79e
node_endpoint raft store then shutdown, test deprecation
langmartin Jun 7, 2019
5921e6f
fsm variable names for consistency
langmartin Jun 7, 2019
3e097c4
fsm return an error on both NodeDeregisterRequest fields set
langmartin Jun 7, 2019
bc5e5cf
node_endpoint argument setup
langmartin Jun 7, 2019
f32dceb
structs add NodeDeregisterBatchRequest
langmartin Jun 12, 2019
23fdadf
node endpoint support new NodeDeregisterBatchRequest
langmartin Jun 13, 2019
9c42d03
fsm support new NodeDeregisterBatchRequest
langmartin Jun 13, 2019
4d11c32
core_sched use the new rpc names
langmartin Jun 13, 2019
c915bf0
new file: contributing/checklist-rpc-endpoint.md
langmartin Jun 13, 2019
c20c232
fsm label batch_deregister_node metrics explicitly
langmartin Jun 26, 2019
f5632ba
structs NodeDeregisterBatchRequestType must go at the end
langmartin Jun 26, 2019
0deb209
checklist NodeDeregisterBatchRequestType must go at the end
langmartin Jun 26, 2019
838460e
fsm new NodeDeregisterBatchRequestType sorted at the end of the case
langmartin Jun 26, 2019
3ad8a24
state_store error if called without node_ids
langmartin Jun 26, 2019
9bdf35a
NodeDeregisterBatch -> NodeBatchDeregister match JobBatch pattern
langmartin Jun 26, 2019
675f750
structs drop deprecation warning, revert unnecessary comment change
langmartin Jun 26, 2019
5eac719
core_sched add compat comment for later removal
langmartin Jun 27, 2019
e0edc7d
node_endpoint preserve both messages as rpcs and in raft
langmartin Jun 26, 2019
17f4951
Changelog
langmartin Jun 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

IMPROVEMENTS:
* core: Removed deprecated upgrade path code pertaining to older versions of Nomad [[GH-5894](https://github.com/hashicorp/nomad/issues/5894)]
* core: Deregister nodes in batches rather than one at a time [[GH-5784](https://github.com/hashicorp/nomad/pull/5784)
* client: Improved task event display message to include kill time out [[GH-5943](https://github.com/hashicorp/nomad/issues/5943)]
* api: Used region from job hcl when not provided as query parameter in job registration and plan endpoints [[GH-5664](https://github.com/hashicorp/nomad/pull/5664)]
* api: Inferred content type of file in alloc filesystem stat endpoint [[GH-5907](https://github.com/hashicorp/nomad/issues/5907)]
Expand Down
29 changes: 29 additions & 0 deletions contributing/checklist-rpc-endpoint.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# New RPC Endpoint Checklist

Prefer adding a new message to changing any existing RPC messages.

## Code

* [ ] `Request` struct and `*RequestType` constant in
`nomad/structs/structs.go`. Append the constant, old constant
values must remain unchanged
* [ ] In `nomad/fsm.go`, add a dispatch case to the switch statement in `Apply`
* `*nomadFSM` method to decode the request and call the state method
* [ ] State method for modifying objects in a `Txn` in `nomad/state/state_store.go`
* `nomad/state/state_store_test.go`
* [ ] Handler for the request in `nomad/foo_endpoint.go`
* RPCs are resolved by matching the method name for bound structs
[net/rpc](https://golang.org/pkg/net/rpc/)
* Wrapper for the HTTP request in `command/agent/foo_endpoint.go`
* Backwards compatibility requires a new endpoint, an upgraded
client or server may be forwarding this request to an old server,
without support for the new RPC
* RPCs triggered by an internal process may not need support
* [ ] `nomad/core_sched.go` sends many RPCs
* `ServersMeetMinimumVersion` asserts that the server cluster is
upgraded, so use this to gaurd sending the new RPC, else send the old RPC
* Version must match the actual release version!

## Docs

* [ ] Changelog
34 changes: 29 additions & 5 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
Expand Down Expand Up @@ -482,19 +483,42 @@ OUTER:
return nil
}
c.logger.Debug("node GC found eligible nodes", "nodes", len(gcNode))
return c.nodeReap(eval, gcNode)
}

func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error {
// For old clusters, send single deregistration messages COMPAT(0.11)
minVersionBatchNodeDeregister := version.Must(version.NewVersion("0.9.4"))
if !ServersMeetMinimumVersion(c.srv.Members(), minVersionBatchNodeDeregister, true) {
notnoop marked this conversation as resolved.
Show resolved Hide resolved
for _, id := range nodeIDs {
req := structs.NodeDeregisterRequest{
NodeID: id,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
AuthToken: eval.LeaderACL,
},
}
var resp structs.NodeUpdateResponse
if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil {
c.logger.Error("node reap failed", "node_id", id, "error", err)
return err
}
}
return nil
}

// Call to the leader to issue the reap
for _, nodeID := range gcNode {
req := structs.NodeDeregisterRequest{
NodeID: nodeID,
for _, ids := range partitionAll(maxIdsPerReap, nodeIDs) {
req := structs.NodeBatchDeregisterRequest{
NodeIDs: ids,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
AuthToken: eval.LeaderACL,
},
}
var resp structs.NodeUpdateResponse
if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil {
c.logger.Error("node reap failed", "node_id", nodeID, "error", err)
if err := c.srv.RPC("Node.BatchDeregister", &req, &resp); err != nil {
c.logger.Error("node reap failed", "node_ids", ids, "error", err)
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/drainer/watch_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestNodeDrainWatcher_Remove_Nonexistent(t *testing.T) {
require.Equal(n, tracked[n.ID])

// Delete the node
require.Nil(state.DeleteNode(101, n.ID))
require.Nil(state.DeleteNode(101, []string{n.ID}))
testutil.WaitForResult(func() (bool, error) {
return len(m.events()) == 2, nil
}, func(err error) {
Expand Down
20 changes: 19 additions & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyBatchDrainUpdate(buf[1:], log.Index)
case structs.SchedulerConfigRequestType:
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
case structs.NodeBatchDeregisterRequestType:
return n.applyDeregisterNodeBatch(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -296,10 +298,26 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteNode(index, req.NodeID); err != nil {
if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}

return nil
}

func (n *nomadFSM) applyDeregisterNodeBatch(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_node"}, time.Now())
var req structs.NodeBatchDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteNode(index, req.NodeIDs); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ func TestFSM_DeregisterNode(t *testing.T) {
t.Fatalf("resp: %v", resp)
}

req2 := structs.NodeDeregisterRequest{
NodeID: node.ID,
req2 := structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node.ID},
}
buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2)
buf, err = structs.Encode(structs.NodeBatchDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
109 changes: 74 additions & 35 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,67 +253,106 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No
}
defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now())

// Check node permissions
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
}

// deregister takes a batch
repack := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{args.NodeID},
WriteRequest: args.WriteRequest,
}

return n.deregister(repack, reply, func() (interface{}, uint64, error) {
return n.srv.raftApply(structs.NodeDeregisterRequestType, args)
})
}

// BatchDeregister is used to remove client nodes from the cluster.
func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error {
if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now())

if len(args.NodeIDs) == 0 {
return fmt.Errorf("missing node IDs for client deregistration")
}

return n.deregister(args, reply, func() (interface{}, uint64, error) {
return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args)
})
}

// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister
func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
reply *structs.NodeUpdateResponse,
raftApplyFn func() (interface{}, uint64, error),
) error {
// Check request permissions
if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
return structs.ErrPermissionDenied
}

// Verify the arguments
if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
}
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}

ws := memdb.NewWatchSet()
node, err := snap.NodeByID(ws, args.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
for _, nodeID := range args.NodeIDs {
node, err := snap.NodeByID(ws, nodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("node not found")
}
}

// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args)
_, index, err := raftApplyFn()
if err != nil {
n.logger.Error("deregister failed", "error", err)
n.logger.Error("raft message failed", "error", err)
return err
}

// Clear the heartbeat timer if any
n.srv.clearHeartbeatTimer(args.NodeID)

// Create the evaluations for this node
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}
for _, nodeID := range args.NodeIDs {
// Clear the heartbeat timer if any
n.srv.clearHeartbeatTimer(nodeID)
langmartin marked this conversation as resolved.
Show resolved Hide resolved

// Determine if there are any Vault accessors on the node
accessors, err := snap.VaultAccessorsByNode(ws, args.NodeID)
if err != nil {
n.logger.Error("looking up accessors for node failed", "node_id", args.NodeID, "error", err)
return err
}
// Create the evaluations for this node
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}

if l := len(accessors); l != 0 {
n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", args.NodeID)
if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil {
n.logger.Error("revoking accessors for node failed", "node_id", args.NodeID, "error", err)
// Determine if there are any Vault accessors on the node
accessors, err := snap.VaultAccessorsByNode(ws, nodeID)
if err != nil {
n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err)
return err
}

if l := len(accessors); l != 0 {
n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", nodeID)
if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil {
n.logger.Error("revoking accessors for node failed", "node_id", nodeID, "error", err)
return err
}
}

reply.EvalIDs = append(reply.EvalIDs, evalIDs...)
// Set the reply eval create index just the first time
if reply.EvalCreateIndex == 0 {
reply.EvalCreateIndex = evalIndex
}
}

// Setup the reply
reply.EvalIDs = evalIDs
reply.EvalCreateIndex = evalIndex
reply.NodeModifyIndex = index
reply.Index = index
return nil
Expand Down
29 changes: 15 additions & 14 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ func TestClientEndpoint_Register_SecretMismatch(t *testing.T) {
}
}

func TestClientEndpoint_Deregister(t *testing.T) {
// Test the deprecated single node deregistration path
func TestClientEndpoint_DeregisterOne(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -269,18 +270,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) {
invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead))

// Deregister without any token and expect it to fail
dereg := &structs.NodeDeregisterRequest{
NodeID: node.ID,
dereg := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err == nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err == nil {
t.Fatalf("node de-register succeeded")
}

// Deregister with a valid token
dereg.AuthToken = validToken.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err != nil {
t.Fatalf("err: %v", err)
}

Expand All @@ -295,18 +296,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) {
}

// Deregister with an invalid token.
dereg1 := &structs.NodeDeregisterRequest{
NodeID: node1.ID,
dereg1 := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node1.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
dereg1.AuthToken = invalidToken.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err == nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err == nil {
t.Fatalf("rpc should not have succeeded")
}

// Try with a root token
dereg1.AuthToken = root.SecretID
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err != nil {
t.Fatalf("err: %v", err)
}
}
Expand Down Expand Up @@ -344,12 +345,12 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) {
state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2})

// Deregister
dereg := &structs.NodeDeregisterRequest{
NodeID: node.ID,
dereg := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{node.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index == 0 {
Expand Down Expand Up @@ -1447,7 +1448,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) {

// Node delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteNode(400, node2.ID); err != nil {
if err := state.DeleteNode(400, []string{node2.ID}); err != nil {
t.Fatalf("err: %v", err)
}
})
Expand Down Expand Up @@ -2714,7 +2715,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {

// Node delete triggers watches.
time.AfterFunc(100*time.Millisecond, func() {
errCh <- state.DeleteNode(50, node.ID)
errCh <- state.DeleteNode(50, []string{node.ID})
})

req.MinQueryIndex = 45
Expand Down
Loading