From 4771de646f6f84d32d5abecf41c35f819251b337 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Mon, 3 Jun 2019 13:18:10 -0400 Subject: [PATCH 01/34] struct NodeDeregisterRequest has a batch of NodeIDs --- nomad/structs/structs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index dee93eee733..16afdd1d20b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -316,9 +316,9 @@ type NodeRegisterRequest struct { } // NodeDeregisterRequest is used for Node.Deregister endpoint -// to deregister a node as being a schedulable entity. +// to deregister a batch of nodes from being schedulable entities. type NodeDeregisterRequest struct { - NodeID string + NodeIDs []string WriteRequest } From 47203067c35bf0e36b6275df8bf8e7ebb2905a20 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Mon, 3 Jun 2019 13:20:09 -0400 Subject: [PATCH 02/34] state_store DeleteNode operates on a batch of ids --- nomad/state/state_store.go | 35 ++++++++++++++++----------------- nomad/state/state_store_test.go | 2 +- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9763d6c1231..e68b7186a1c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -677,28 +677,27 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { return nil } -// DeleteNode is used to deregister a node -func (s *StateStore) DeleteNode(index uint64, nodeID string) error { +// DeleteNode deregisters a batch of nodes +func (s *StateStore) DeleteNode(index uint64, nodes []string) error { txn := s.db.Txn(true) defer txn.Abort() + for _, nodeID := range nodes { + existing, err := txn.First("nodes", "id", nodeID) + if err != nil { + return fmt.Errorf("node lookup failed: %v", err) + } + if existing == nil { + return fmt.Errorf("node not found") + } - // Lookup the node - existing, err := txn.First("nodes", "id", nodeID) - if err != nil { - return fmt.Errorf("node lookup failed: %v", err) - } - if existing == nil { - return fmt.Errorf("node not found") - } - - // Delete the node - if err := txn.Delete("nodes", existing); err != nil { - return fmt.Errorf("node delete failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) + // Delete the node + if err := txn.Delete("nodes", existing); err != nil { + return fmt.Errorf("node delete failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } } - txn.Commit() return nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index d21e4bbdefb..f1822ce7b04 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -822,7 +822,7 @@ func TestStateStore_DeleteNode_Node(t *testing.T) { t.Fatalf("bad: %v", err) } - err = state.DeleteNode(1001, node.ID) + err = state.DeleteNode(1001, []string{node.ID}) if err != nil { t.Fatalf("err: %v", err) } From 851691181da54a03c02b6c527a263cb7de843005 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Mon, 3 Jun 2019 14:30:27 -0400 Subject: [PATCH 03/34] util partitionAll for paging --- nomad/util.go | 28 ++++++++++++++++++++++++++++ nomad/util_test.go | 16 ++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/nomad/util.go b/nomad/util.go index ccd4504af05..809d0a37f9b 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -196,6 +196,34 @@ func shuffleStrings(list []string) { } } +// partitionAll splits a slice of strings into a slice of slices of strings, each with a max +// size of `size`. All entries from the original slice are preserved. The last slice may be +// smaller than `size`. The input slice is unmodified +func partitionAll(size int, xs []string) [][]string { + out := make([][]string, 0) + if size < 1 { + return append(out, xs) + } + + got, part, i, j := 0, 0, 0, 0 + for got < len(xs) { + i = size * part + j = minInt(size*(part+1), len(xs)) + out = append(out, xs[i:j]) + part = part + 1 + got = j + } + + return out +} + +func minInt(x, y int) int { + if x < y { + return x + } + return y +} + // maxUint64 returns the maximum value func maxUint64(inputs ...uint64) uint64 { l := len(inputs) diff --git a/nomad/util_test.go b/nomad/util_test.go index f216d5e4243..b1df2e52344 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -8,6 +8,7 @@ import ( version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/require" ) func TestIsNomadServer(t *testing.T) { @@ -230,6 +231,21 @@ func TestShuffleStrings(t *testing.T) { } } +func Test_partitionAll(t *testing.T) { + xs := []string{"a", "b", "c", "d", "e", "f"} + // evenly divisible + require.Equal(t, [][]string{{"a", "b"}, {"c", "d"}, {"e", "f"}}, partitionAll(2, xs)) + require.Equal(t, [][]string{{"a", "b", "c"}, {"d", "e", "f"}}, partitionAll(3, xs)) + // whole thing fits int the last part + require.Equal(t, [][]string{{"a", "b", "c", "d", "e", "f"}}, partitionAll(7, xs)) + // odd remainder + require.Equal(t, [][]string{{"a", "b", "c", "d"}, {"e", "f"}}, partitionAll(4, xs)) + // zero size + require.Equal(t, [][]string{{"a", "b", "c", "d", "e", "f"}}, partitionAll(0, xs)) + // one size + require.Equal(t, [][]string{{"a"}, {"b"}, {"c"}, {"d"}, {"e"}, {"f"}}, partitionAll(1, xs)) +} + func TestMaxUint64(t *testing.T) { t.Parallel() if maxUint64(1, 2) != 2 { From fbc7fb80a5dce29ff27d1eac8375e5278f0950e5 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 5 Jun 2019 10:19:21 -0400 Subject: [PATCH 04/34] core_sched batch node deregistration requests --- nomad/core_sched.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 3d691d605f9..b0af20edbd7 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -484,9 +484,9 @@ OUTER: c.logger.Debug("node GC found eligible nodes", "nodes", len(gcNode)) // Call to the leader to issue the reap - for _, nodeID := range gcNode { + for _, ids := range partitionAll(maxIdsPerReap, gcNode) { req := structs.NodeDeregisterRequest{ - NodeID: nodeID, + NodeIDs: ids, WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, AuthToken: eval.LeaderACL, @@ -494,7 +494,7 @@ OUTER: } var resp structs.NodeUpdateResponse if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil { - c.logger.Error("node reap failed", "node_id", nodeID, "error", err) + c.logger.Error("node reap failed", "node_ids", ids, "error", err) return err } } From f685b00367ee938b2cced86e54ea590e11d4abb5 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 5 Jun 2019 10:47:29 -0400 Subject: [PATCH 05/34] fsm NodeDeregisterRequest is now a batch --- nomad/fsm.go | 2 +- nomad/fsm_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 49392d292f7..f1a7dd1d03d 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -296,7 +296,7 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.DeleteNode(index, req.NodeID); err != nil { + if err := n.state.DeleteNode(index, req.NodeIDs); err != nil { n.logger.Error("DeleteNode failed", "error", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index ef79d146198..4316fbe128e 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -225,7 +225,7 @@ func TestFSM_DeregisterNode(t *testing.T) { } req2 := structs.NodeDeregisterRequest{ - NodeID: node.ID, + NodeIDs: []string{node.ID}, } buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2) if err != nil { From 64fd8b3e9f9ed53eb8cde5b3ad8013e43f4f6d58 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 5 Jun 2019 10:49:57 -0400 Subject: [PATCH 06/34] node_endpoint deregister the batch of nodes --- nomad/node_endpoint.go | 74 ++++++++++++++++++++++--------------- nomad/node_endpoint_test.go | 12 +++--- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 1fed74bbc28..c6bfae09e3c 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -261,22 +261,45 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No } // Verify the arguments - if args.NodeID == "" { - return fmt.Errorf("missing node ID for client deregistration") + if len(args.NodeIDs) == 0 { + return fmt.Errorf("missing node IDs for client deregistration") } - // Look for the node + + // Open state handles snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } ws := memdb.NewWatchSet() - node, err := snap.NodeByID(ws, args.NodeID) - if err != nil { - return err - } - if node == nil { - return fmt.Errorf("node not found") + + for _, nodeID := range args.NodeIDs { + // Look for the node + node, err := snap.NodeByID(ws, nodeID) + if err != nil { + return err + } + if node == nil { + return fmt.Errorf("node not found") + } + + // Clear the heartbeat timer if any + n.srv.clearHeartbeatTimer(nodeID) + + // Determine if there are any Vault accessors on the node + accessors, err := snap.VaultAccessorsByNode(ws, nodeID) + if err != nil { + n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err) + return err + } + + if l := len(accessors); l != 0 { + n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", nodeID) + if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil { + n.logger.Error("revoking accessors for node failed", "node_id", nodeID, "error", err) + return err + } + } } // Commit this update via Raft @@ -286,36 +309,27 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No return err } - // Clear the heartbeat timer if any - n.srv.clearHeartbeatTimer(args.NodeID) - - // Create the evaluations for this node - evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) - if err != nil { - n.logger.Error("eval creation failed", "error", err) - return err - } + // Create the evaluations for these nodes + for _, nodeID := range args.NodeIDs { - // Determine if there are any Vault accessors on the node - accessors, err := snap.VaultAccessorsByNode(ws, args.NodeID) - if err != nil { - n.logger.Error("looking up accessors for node failed", "node_id", args.NodeID, "error", err) - return err - } + // QUESTION createNodeEvals opens it's own state and watch handles, does + // that break atomicity? - if l := len(accessors); l != 0 { - n.logger.Debug("revoking accessors on node due to deregister", "num_accessors", l, "node_id", args.NodeID) - if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil { - n.logger.Error("revoking accessors for node failed", "node_id", args.NodeID, "error", err) + evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) + if err != nil { + n.logger.Error("eval creation failed", "error", err) return err } + reply.EvalIDs = append(reply.EvalIDs, evalIDs...) + reply.EvalCreateIndex = evalIndex } // Setup the reply - reply.EvalIDs = evalIDs - reply.EvalCreateIndex = evalIndex + // reply.EvalIDs = evalIDs + // reply.EvalCreateIndex = evalIndex reply.NodeModifyIndex = index reply.Index = index + return nil } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 6b7b90e2f40..29ec2cc0fc3 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -223,7 +223,7 @@ func TestClientEndpoint_Deregister(t *testing.T) { // Deregister dereg := &structs.NodeDeregisterRequest{ - NodeID: node.ID, + NodeIDs: []string{node.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.GenericResponse @@ -270,7 +270,7 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) { // Deregister without any token and expect it to fail dereg := &structs.NodeDeregisterRequest{ - NodeID: node.ID, + NodeIDs: []string{node.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.GenericResponse @@ -296,7 +296,7 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) { // Deregister with an invalid token. dereg1 := &structs.NodeDeregisterRequest{ - NodeID: node1.ID, + NodeIDs: []string{node1.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } dereg1.AuthToken = invalidToken.SecretID @@ -345,7 +345,7 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) { // Deregister dereg := &structs.NodeDeregisterRequest{ - NodeID: node.ID, + NodeIDs: []string{node.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.GenericResponse @@ -1447,7 +1447,7 @@ func TestClientEndpoint_GetNode_Blocking(t *testing.T) { // Node delete triggers watches time.AfterFunc(100*time.Millisecond, func() { - if err := state.DeleteNode(400, node2.ID); err != nil { + if err := state.DeleteNode(400, []string{node2.ID}); err != nil { t.Fatalf("err: %v", err) } }) @@ -2714,7 +2714,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { // Node delete triggers watches. time.AfterFunc(100*time.Millisecond, func() { - errCh <- state.DeleteNode(50, node.ID) + errCh <- state.DeleteNode(50, []string{node.ID}) }) req.MinQueryIndex = 45 From 94cd86f37729cec8e8cb7da20a88bede5fbfa17e Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 5 Jun 2019 11:33:30 -0400 Subject: [PATCH 07/34] drainer watch_nodes_test batch of 1 --- nomad/drainer/watch_nodes_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/drainer/watch_nodes_test.go b/nomad/drainer/watch_nodes_test.go index efabefc7c3c..d29086dce03 100644 --- a/nomad/drainer/watch_nodes_test.go +++ b/nomad/drainer/watch_nodes_test.go @@ -126,7 +126,7 @@ func TestNodeDrainWatcher_Remove_Nonexistent(t *testing.T) { require.Equal(n, tracked[n.ID]) // Delete the node - require.Nil(state.DeleteNode(101, n.ID)) + require.Nil(state.DeleteNode(101, []string{n.ID})) testutil.WaitForResult(func() (bool, error) { return len(m.events()) == 2, nil }, func(err error) { From 0dd5335a70e619e5da8e65803d36384626f65c11 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 5 Jun 2019 12:45:20 -0400 Subject: [PATCH 08/34] state_store improve error messages --- nomad/state/state_store.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index e68b7186a1c..754b1aa1c07 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -684,18 +684,18 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error { for _, nodeID := range nodes { existing, err := txn.First("nodes", "id", nodeID) if err != nil { - return fmt.Errorf("node lookup failed: %v", err) + return fmt.Errorf("node lookup failed: %s: %v", nodeID, err) } if existing == nil { - return fmt.Errorf("node not found") + return fmt.Errorf("node not found: %s", nodeID) } // Delete the node if err := txn.Delete("nodes", existing); err != nil { - return fmt.Errorf("node delete failed: %v", err) + return fmt.Errorf("node delete failed: %s: %v", nodeID, err) } if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) + return fmt.Errorf("index update failed: %s: %v", nodeID, err) } } txn.Commit() From fbd4c25e05f42c7587303080b41ecc72a2239fd7 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 5 Jun 2019 12:45:42 -0400 Subject: [PATCH 09/34] node_endpoint improve error messages --- nomad/node_endpoint.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index c6bfae09e3c..7459f83d714 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -277,16 +277,16 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No // Look for the node node, err := snap.NodeByID(ws, nodeID) if err != nil { - return err + return fmt.Errorf("node lookup failed: %s: %v", nodeID, err) } if node == nil { - return fmt.Errorf("node not found") + return fmt.Errorf("node not found: %s", nodeID) } // Clear the heartbeat timer if any n.srv.clearHeartbeatTimer(nodeID) - // Determine if there are any Vault accessors on the node + // If there are any Vault accessors on the node, revoke them accessors, err := snap.VaultAccessorsByNode(ws, nodeID) if err != nil { n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err) @@ -311,22 +311,22 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No // Create the evaluations for these nodes for _, nodeID := range args.NodeIDs { - // QUESTION createNodeEvals opens it's own state and watch handles, does // that break atomicity? - evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) if err != nil { n.logger.Error("eval creation failed", "error", err) return err } + reply.EvalIDs = append(reply.EvalIDs, evalIDs...) - reply.EvalCreateIndex = evalIndex + // Set the reply evalIndex only the first time + if reply.EvalCreateIndex == 0 { + reply.EvalCreateIndex = evalIndex + } } // Setup the reply - // reply.EvalIDs = evalIDs - // reply.EvalCreateIndex = evalIndex reply.NodeModifyIndex = index reply.Index = index From 5810a097093bcc88d9761adb415d34a7361303db Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 5 Jun 2019 15:29:52 -0400 Subject: [PATCH 10/34] state_store just one index update, test deletion --- nomad/state/state_store.go | 13 +++++-- nomad/state/state_store_test.go | 62 +++++++++++++++------------------ 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 754b1aa1c07..fa6cb084078 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -679,8 +679,13 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { // DeleteNode deregisters a batch of nodes func (s *StateStore) DeleteNode(index uint64, nodes []string) error { + if len(nodes) == 0 { + return nil + } + txn := s.db.Txn(true) defer txn.Abort() + for _, nodeID := range nodes { existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -694,10 +699,12 @@ func (s *StateStore) DeleteNode(index uint64, nodes []string) error { if err := txn.Delete("nodes", existing); err != nil { return fmt.Errorf("node delete failed: %s: %v", nodeID, err) } - if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { - return fmt.Errorf("index update failed: %s: %v", nodeID, err) - } } + + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + txn.Commit() return nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f1822ce7b04..68e6d4ab35b 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -809,49 +809,45 @@ func TestStateStore_UpsertNode_Node(t *testing.T) { func TestStateStore_DeleteNode_Node(t *testing.T) { state := testStateStore(t) - node := mock.Node() - err := state.UpsertNode(1000, node) - if err != nil { - t.Fatalf("err: %v", err) - } + // Create and insert two nodes, which we'll delete + node0 := mock.Node() + node1 := mock.Node() + err := state.UpsertNode(1000, node0) + require.NoError(t, err) + err = state.UpsertNode(1001, node1) + require.NoError(t, err) // Create a watchset so we can test that delete fires the watch ws := memdb.NewWatchSet() - if _, err := state.NodeByID(ws, node.ID); err != nil { - t.Fatalf("bad: %v", err) - } - err = state.DeleteNode(1001, []string{node.ID}) - if err != nil { - t.Fatalf("err: %v", err) - } + // Check that both nodes are not nil + out, err := state.NodeByID(ws, node0.ID) + require.NoError(t, err) + require.NotNil(t, out) + out, err = state.NodeByID(ws, node1.ID) + require.NoError(t, err) + require.NotNil(t, out) - if !watchFired(ws) { - t.Fatalf("bad") - } + // Delete both nodes in a batch, fires the watch + err = state.DeleteNode(1002, []string{node0.ID, node1.ID}) + require.NoError(t, err) + require.True(t, watchFired(ws)) + // Check that both nodes are nil ws = memdb.NewWatchSet() - out, err := state.NodeByID(ws, node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if out != nil { - t.Fatalf("bad: %#v %#v", node, out) - } + out, err = state.NodeByID(ws, node0.ID) + require.NoError(t, err) + require.Nil(t, out) + out, err = state.NodeByID(ws, node1.ID) + require.NoError(t, err) + require.Nil(t, out) + // Ensure that the index is still at 1002, from DeleteNode index, err := state.Index("nodes") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 1001 { - t.Fatalf("bad: %d", index) - } - - if watchFired(ws) { - t.Fatalf("bad") - } + require.NoError(t, err) + require.Equal(t, uint64(1002), index) + require.False(t, watchFired(ws)) } func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { From 3c97bdcb827a8cbf6150fc95a38644c98a942c1c Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 6 Jun 2019 15:59:14 -0400 Subject: [PATCH 11/34] core_sched check ServersMeetMinimumVersion, send old node deregister --- nomad/core_sched.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index b0af20edbd7..a5b88bc2da6 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -482,6 +482,28 @@ OUTER: return nil } c.logger.Debug("node GC found eligible nodes", "nodes", len(gcNode)) + return c.nodeReap(gcNode) +} + +func (c *CoreScheduler) nodeReap(nodeIDs []string) error { + // For pre 0.9.3 clusters, send single deregistration messages + if !ServersMeetMinimumVersion(c.srv.Members(), "0.9.3", true) { + for _, id := range nodeIDs { + req := structs.NodeDeregisterRequest{ + NodeID: id, + WriteRequest: structs.WriteRequest{ + Region: c.srv.config.Region, + AuthToken: eval.LeaderACL, + }, + } + var resp structs.NodeUpdateResponse + if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil { + c.logger.Error("node reap failed", "node_id", id, "error", err) + return err + } + } + return nil + } // Call to the leader to issue the reap for _, ids := range partitionAll(maxIdsPerReap, gcNode) { From 7258fabfc8694617b2944249749b4f55a19945b7 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 6 Jun 2019 16:00:25 -0400 Subject: [PATCH 12/34] structs add back NodeDeregisterRequest.NodeID, compatibility --- nomad/structs/structs.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 16afdd1d20b..99b8fb0d2f7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -318,6 +318,7 @@ type NodeRegisterRequest struct { // NodeDeregisterRequest is used for Node.Deregister endpoint // to deregister a batch of nodes from being schedulable entities. type NodeDeregisterRequest struct { + NodeID string // Deprecated in 0.9.3 NodeIDs []string WriteRequest } From 97a76afe21c2ad7355c9b8cb3eca8fed9e125728 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 6 Jun 2019 16:01:29 -0400 Subject: [PATCH 13/34] fsm honor new and old style NodeDeregisterRequests --- nomad/fsm.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index f1a7dd1d03d..77675e125d8 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -296,7 +296,15 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.DeleteNode(index, req.NodeIDs); err != nil { + // Messages pre 0.9.3 use a single NodeID + var ids []string + if len(req.NodeIDs) == 0 { + ids = []string{req.NodeID} + } else { + ids = req.NodeIDs + } + + if err := n.state.DeleteNode(index, ids); err != nil { n.logger.Error("DeleteNode failed", "error", err) return err } From aa5193baf952695ae5655fbf5353e07746e3c8b3 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Fri, 7 Jun 2019 10:57:57 -0400 Subject: [PATCH 14/34] core_sched check ServersMeetMinimumVersion --- nomad/core_sched.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index a5b88bc2da6..8295e518304 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -7,6 +7,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -482,12 +483,13 @@ OUTER: return nil } c.logger.Debug("node GC found eligible nodes", "nodes", len(gcNode)) - return c.nodeReap(gcNode) + return c.nodeReap(eval, gcNode) } -func (c *CoreScheduler) nodeReap(nodeIDs []string) error { +func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error { // For pre 0.9.3 clusters, send single deregistration messages - if !ServersMeetMinimumVersion(c.srv.Members(), "0.9.3", true) { + version, _ := version.NewVersion("0.9.3") + if !ServersMeetMinimumVersion(c.srv.Members(), version, true) { for _, id := range nodeIDs { req := structs.NodeDeregisterRequest{ NodeID: id, @@ -506,7 +508,7 @@ func (c *CoreScheduler) nodeReap(nodeIDs []string) error { } // Call to the leader to issue the reap - for _, ids := range partitionAll(maxIdsPerReap, gcNode) { + for _, ids := range partitionAll(maxIdsPerReap, nodeIDs) { req := structs.NodeDeregisterRequest{ NodeIDs: ids, WriteRequest: structs.WriteRequest{ From 48c1859b55ad51f3b135adec60a52d0a3fb5834f Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Fri, 7 Jun 2019 10:58:16 -0400 Subject: [PATCH 15/34] util simplify partitionAll --- nomad/util.go | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/nomad/util.go b/nomad/util.go index 809d0a37f9b..055b39e2ef4 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -200,30 +200,23 @@ func shuffleStrings(list []string) { // size of `size`. All entries from the original slice are preserved. The last slice may be // smaller than `size`. The input slice is unmodified func partitionAll(size int, xs []string) [][]string { - out := make([][]string, 0) if size < 1 { - return append(out, xs) + return [][]string{xs} } - got, part, i, j := 0, 0, 0, 0 - for got < len(xs) { - i = size * part - j = minInt(size*(part+1), len(xs)) + out := [][]string{} + + for i := 0; i < len(xs); i += size { + j := i + size + if j > len(xs) { + j = len(xs) + } out = append(out, xs[i:j]) - part = part + 1 - got = j } return out } -func minInt(x, y int) int { - if x < y { - return x - } - return y -} - // maxUint64 returns the maximum value func maxUint64(inputs ...uint64) uint64 { l := len(inputs) From d55e79efd15446367f04f8a57c3226d29ecffd6e Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Fri, 7 Jun 2019 11:25:55 -0400 Subject: [PATCH 16/34] node_endpoint raft store then shutdown, test deprecation --- nomad/node_endpoint.go | 36 +++++++++++++++++--------------- nomad/node_endpoint_test.go | 41 +++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 7459f83d714..cc0965e49d7 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -261,8 +261,14 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No } // Verify the arguments + var nodeIDs []string if len(args.NodeIDs) == 0 { - return fmt.Errorf("missing node IDs for client deregistration") + if args.NodeID == "" { + return fmt.Errorf("missing node IDs for client deregistration") + } + nodeIDs = append(nodeIDs, args.NodeID) + } else if args.NodeID != "" { + return fmt.Errorf("use only NodeIDs, the NodeID field is deprecated") } // Open state handles @@ -273,8 +279,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No ws := memdb.NewWatchSet() - for _, nodeID := range args.NodeIDs { - // Look for the node + // Assert that the state contains the nodes + for _, nodeID := range nodeIDs { node, err := snap.NodeByID(ws, nodeID) if err != nil { return fmt.Errorf("node lookup failed: %s: %v", nodeID, err) @@ -282,8 +288,17 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No if node == nil { return fmt.Errorf("node not found: %s", nodeID) } + } + + // Commit this update to Raft, before we clear the heartbeatTimer so that failure + // leaves the node running + _, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args) + if err != nil { + n.logger.Error("deregister failed", "error", err) + return err + } - // Clear the heartbeat timer if any + for _, nodeID := range nodeIDs { n.srv.clearHeartbeatTimer(nodeID) // If there are any Vault accessors on the node, revoke them @@ -300,19 +315,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No return err } } - } - - // Commit this update via Raft - _, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args) - if err != nil { - n.logger.Error("deregister failed", "error", err) - return err - } - // Create the evaluations for these nodes - for _, nodeID := range args.NodeIDs { - // QUESTION createNodeEvals opens it's own state and watch handles, does - // that break atomicity? + // Create the evaluations for these nodes evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) if err != nil { n.logger.Error("eval creation failed", "error", err) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 29ec2cc0fc3..84642d66240 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -246,6 +246,47 @@ func TestClientEndpoint_Deregister(t *testing.T) { } } +func TestClientEndpoint_DeregisterDeprecatedField(t *testing.T) { + t.Parallel() + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.GenericResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) + + // Deregister fails, because the DeregisterRequest should use only the batch field + dereg := &structs.NodeDeregisterRequest{ + NodeID: "foo", + NodeIDs: []string{node.ID}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Error(t, msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp)) + + // Deregister succeeds using the pre-0.9.3 single node request + dereg = &structs.NodeDeregisterRequest{ + NodeID: node.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp)) + + // Check for the node in the FSM + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.NoError(t, err) + require.Nil(t, out) +} + func TestClientEndpoint_Deregister_ACL(t *testing.T) { t.Parallel() s1, root := TestACLServer(t, nil) From 5921e6ff8fc9ad2dc07c665c5f9309640a7aa42c Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Fri, 7 Jun 2019 11:28:17 -0400 Subject: [PATCH 17/34] fsm variable names for consistency --- nomad/fsm.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 77675e125d8..8a417c4268e 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -297,14 +297,14 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { } // Messages pre 0.9.3 use a single NodeID - var ids []string + var nodeIDs []string if len(req.NodeIDs) == 0 { - ids = []string{req.NodeID} + nodeIDs = append(nodeIDs, req.NodeID) } else { - ids = req.NodeIDs + nodeIDs = req.NodeIDs } - if err := n.state.DeleteNode(index, ids); err != nil { + if err := n.state.DeleteNode(index, nodeIDs); err != nil { n.logger.Error("DeleteNode failed", "error", err) return err } From 3e097c424558fb8cd42cad812f07af524512d8d9 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Fri, 7 Jun 2019 11:55:04 -0400 Subject: [PATCH 18/34] fsm return an error on both NodeDeregisterRequest fields set --- nomad/fsm.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nomad/fsm.go b/nomad/fsm.go index 8a417c4268e..c72f4059986 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -300,6 +300,8 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { var nodeIDs []string if len(req.NodeIDs) == 0 { nodeIDs = append(nodeIDs, req.NodeID) + } else if req.NodeID != "" { + return fmt.Errorf("invalid request: set NodeIDs instead of NodeID") } else { nodeIDs = req.NodeIDs } @@ -308,6 +310,7 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { n.logger.Error("DeleteNode failed", "error", err) return err } + return nil } From bc5e5cfc65380e73b637a4a6e6f70ce6818b8fb4 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Fri, 7 Jun 2019 14:26:29 -0400 Subject: [PATCH 19/34] node_endpoint argument setup --- nomad/node_endpoint.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index cc0965e49d7..b0db5faf30a 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -269,6 +269,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No nodeIDs = append(nodeIDs, args.NodeID) } else if args.NodeID != "" { return fmt.Errorf("use only NodeIDs, the NodeID field is deprecated") + } else { + nodeIDs = args.NodeIDs } // Open state handles From f32dceb934028c5d41f5ce7f0e3327e83eff8430 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 12 Jun 2019 16:41:26 -0400 Subject: [PATCH 20/34] structs add NodeDeregisterBatchRequest --- nomad/structs/structs.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 99b8fb0d2f7..44a7b6ef9cb 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -54,6 +54,7 @@ type MessageType uint8 const ( NodeRegisterRequestType MessageType = iota NodeDeregisterRequestType + NodeDeregisterBatchRequestType // QUESTION does iota make it important to append this list? NodeUpdateStatusRequestType NodeUpdateDrainRequestType JobRegisterRequestType @@ -317,8 +318,15 @@ type NodeRegisterRequest struct { // NodeDeregisterRequest is used for Node.Deregister endpoint // to deregister a batch of nodes from being schedulable entities. +// Deprecated in 0.9.3 type NodeDeregisterRequest struct { - NodeID string // Deprecated in 0.9.3 + NodeID string + WriteRequest +} + +// NodeDeregisterBatchRequest is used for Node.DeregisterBatch endpoint +// to deregister a batch of nodes from being schedulable entities. +type NodeDeregisterBatchRequest struct { NodeIDs []string WriteRequest } From 23fdadfd95e29026ae0cb7c7355a194a1b26fc8c Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 13 Jun 2019 09:46:05 -0400 Subject: [PATCH 21/34] node endpoint support new NodeDeregisterBatchRequest --- nomad/node_endpoint.go | 30 +++++++++--------- nomad/node_endpoint_test.go | 62 +++++++------------------------------ 2 files changed, 26 insertions(+), 66 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index b0db5faf30a..2a18dd7a3a8 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -245,10 +245,18 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply return nil } -// Deregister is used to remove a client from the cluster. If a client should -// just be made unavailable for scheduling, a status update is preferred. +// Deregister is the deprecated single deregistration endpoint func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error { - if done, err := n.srv.forward("Node.Deregister", args, args, reply); done { + return n.DeregisterBatch(&structs.NodeDeregisterBatchRequest{ + NodeIDs: []string{args.NodeID}, + WriteRequest: args.WriteRequest, + }, reply) +} + +// DeregisterBatch is used to remove client nodes from the cluster. If a client should just +// be made unavailable for scheduling, a status update is preferred. +func (n *Node) DeregisterBatch(args *structs.NodeDeregisterBatchRequest, reply *structs.NodeUpdateResponse) error { + if done, err := n.srv.forward("Node.DeregisterBatch", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) @@ -261,16 +269,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No } // Verify the arguments - var nodeIDs []string if len(args.NodeIDs) == 0 { - if args.NodeID == "" { - return fmt.Errorf("missing node IDs for client deregistration") - } - nodeIDs = append(nodeIDs, args.NodeID) - } else if args.NodeID != "" { - return fmt.Errorf("use only NodeIDs, the NodeID field is deprecated") - } else { - nodeIDs = args.NodeIDs + return fmt.Errorf("missing node IDs for client deregistration") } // Open state handles @@ -282,7 +282,7 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No ws := memdb.NewWatchSet() // Assert that the state contains the nodes - for _, nodeID := range nodeIDs { + for _, nodeID := range args.NodeIDs { node, err := snap.NodeByID(ws, nodeID) if err != nil { return fmt.Errorf("node lookup failed: %s: %v", nodeID, err) @@ -294,13 +294,13 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No // Commit this update to Raft, before we clear the heartbeatTimer so that failure // leaves the node running - _, index, err := n.srv.raftApply(structs.NodeDeregisterRequestType, args) + _, index, err := n.srv.raftApply(structs.NodeDeregisterBatchRequestType, args) if err != nil { n.logger.Error("deregister failed", "error", err) return err } - for _, nodeID := range nodeIDs { + for _, nodeID := range args.NodeIDs { n.srv.clearHeartbeatTimer(nodeID) // If there are any Vault accessors on the node, revoke them diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 84642d66240..18ff327f01f 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -201,7 +201,8 @@ func TestClientEndpoint_Register_SecretMismatch(t *testing.T) { } } -func TestClientEndpoint_Deregister(t *testing.T) { +// Test the deprecated single node deregistration path +func TestClientEndpoint_DeregisterOne(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) defer s1.Shutdown() @@ -223,7 +224,7 @@ func TestClientEndpoint_Deregister(t *testing.T) { // Deregister dereg := &structs.NodeDeregisterRequest{ - NodeIDs: []string{node.ID}, + NodeID: node.ID, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.GenericResponse @@ -246,47 +247,6 @@ func TestClientEndpoint_Deregister(t *testing.T) { } } -func TestClientEndpoint_DeregisterDeprecatedField(t *testing.T) { - t.Parallel() - s1 := TestServer(t, nil) - defer s1.Shutdown() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) - - // Create the register request - node := mock.Node() - reg := &structs.NodeRegisterRequest{ - Node: node, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - - // Fetch the response - var resp structs.GenericResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) - - // Deregister fails, because the DeregisterRequest should use only the batch field - dereg := &structs.NodeDeregisterRequest{ - NodeID: "foo", - NodeIDs: []string{node.ID}, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - require.Error(t, msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp)) - - // Deregister succeeds using the pre-0.9.3 single node request - dereg = &structs.NodeDeregisterRequest{ - NodeID: node.ID, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp)) - - // Check for the node in the FSM - state := s1.fsm.State() - ws := memdb.NewWatchSet() - out, err := state.NodeByID(ws, node.ID) - require.NoError(t, err) - require.Nil(t, out) -} - func TestClientEndpoint_Deregister_ACL(t *testing.T) { t.Parallel() s1, root := TestACLServer(t, nil) @@ -310,18 +270,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) { invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead)) // Deregister without any token and expect it to fail - dereg := &structs.NodeDeregisterRequest{ + dereg := &structs.NodeDeregisterBatchRequest{ NodeIDs: []string{node.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.GenericResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err == nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg, &resp); err == nil { t.Fatalf("node de-register succeeded") } // Deregister with a valid token dereg.AuthToken = validToken.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg, &resp); err != nil { t.Fatalf("err: %v", err) } @@ -336,18 +296,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) { } // Deregister with an invalid token. - dereg1 := &structs.NodeDeregisterRequest{ + dereg1 := &structs.NodeDeregisterBatchRequest{ NodeIDs: []string{node1.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } dereg1.AuthToken = invalidToken.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err == nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg1, &resp); err == nil { t.Fatalf("rpc should not have succeeded") } // Try with a root token dereg1.AuthToken = root.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg1, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg1, &resp); err != nil { t.Fatalf("err: %v", err) } } @@ -385,12 +345,12 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) { state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2}) // Deregister - dereg := &structs.NodeDeregisterRequest{ + dereg := &structs.NodeDeregisterBatchRequest{ NodeIDs: []string{node.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.GenericResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg, &resp2); err != nil { t.Fatalf("err: %v", err) } if resp2.Index == 0 { From 9c42d03366883b67a7d8b643b193f6e5ea50bcea Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 13 Jun 2019 09:47:07 -0400 Subject: [PATCH 22/34] fsm support new NodeDeregisterBatchRequest --- nomad/fsm.go | 25 ++++++++++++++++--------- nomad/fsm_test.go | 4 ++-- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index c72f4059986..fcc1b1426a9 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -191,6 +191,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyUpsertNode(buf[1:], log.Index) case structs.NodeDeregisterRequestType: return n.applyDeregisterNode(buf[1:], log.Index) + case structs.NodeDeregisterBatchRequestType: + return n.applyDeregisterNodeBatch(buf[1:], log.Index) case structs.NodeUpdateStatusRequestType: return n.applyStatusUpdate(buf[1:], log.Index) case structs.NodeUpdateDrainRequestType: @@ -296,17 +298,22 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - // Messages pre 0.9.3 use a single NodeID - var nodeIDs []string - if len(req.NodeIDs) == 0 { - nodeIDs = append(nodeIDs, req.NodeID) - } else if req.NodeID != "" { - return fmt.Errorf("invalid request: set NodeIDs instead of NodeID") - } else { - nodeIDs = req.NodeIDs + if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil { + n.logger.Error("DeleteNode failed", "error", err) + return err + } + + return nil +} + +func (n *nomadFSM) applyDeregisterNodeBatch(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now()) + var req structs.NodeDeregisterBatchRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.DeleteNode(index, nodeIDs); err != nil { + if err := n.state.DeleteNode(index, req.NodeIDs); err != nil { n.logger.Error("DeleteNode failed", "error", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 4316fbe128e..4278992e01f 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -224,10 +224,10 @@ func TestFSM_DeregisterNode(t *testing.T) { t.Fatalf("resp: %v", resp) } - req2 := structs.NodeDeregisterRequest{ + req2 := structs.NodeDeregisterBatchRequest{ NodeIDs: []string{node.ID}, } - buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2) + buf, err = structs.Encode(structs.NodeDeregisterBatchRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } From 4d11c32e19b44068a569caf7f2cbefabb03b6903 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 13 Jun 2019 15:04:38 -0400 Subject: [PATCH 23/34] core_sched use the new rpc names --- nomad/core_sched.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 8295e518304..5022c2b2eb5 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -487,9 +487,9 @@ OUTER: } func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error { - // For pre 0.9.3 clusters, send single deregistration messages - version, _ := version.NewVersion("0.9.3") - if !ServersMeetMinimumVersion(c.srv.Members(), version, true) { + // For old clusters, send single deregistration messages + minVersionBatchNodeDeregister := version.Must(version.NewVersion("0.9.4")) + if !ServersMeetMinimumVersion(c.srv.Members(), minVersionBatchNodeDeregister, true) { for _, id := range nodeIDs { req := structs.NodeDeregisterRequest{ NodeID: id, @@ -509,7 +509,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err // Call to the leader to issue the reap for _, ids := range partitionAll(maxIdsPerReap, nodeIDs) { - req := structs.NodeDeregisterRequest{ + req := structs.NodeDeregisterBatchRequest{ NodeIDs: ids, WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, @@ -517,7 +517,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err }, } var resp structs.NodeUpdateResponse - if err := c.srv.RPC("Node.Deregister", &req, &resp); err != nil { + if err := c.srv.RPC("Node.DeregisterBatch", &req, &resp); err != nil { c.logger.Error("node reap failed", "node_ids", ids, "error", err) return err } From c915bf08a08a32ef91cb959f9567d5e661cd3bf0 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 13 Jun 2019 15:08:33 -0400 Subject: [PATCH 24/34] new file: contributing/checklist-rpc-endpoint.md --- contributing/checklist-rpc-endpoint.md | 27 ++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 contributing/checklist-rpc-endpoint.md diff --git a/contributing/checklist-rpc-endpoint.md b/contributing/checklist-rpc-endpoint.md new file mode 100644 index 00000000000..fbcbd74b886 --- /dev/null +++ b/contributing/checklist-rpc-endpoint.md @@ -0,0 +1,27 @@ +# New RPC Endpoint Checklist + +Prefer adding a new message to changing any existing RPC messages. + +## Code + +* [ ] `Request` struct and `RequestType` constant in `nomad/structs/structs.go` +* [ ] In `nomad/fsm.go`, add a dispatch case to the switch statement in `Apply` + * `*nomadFSM` method to decode the request and call the state method +* [ ] State method for modifying objects in a `Txn` in `nomad/state/state_store.go` + * `nomad/state/state_store_test.go` +* [ ] Handler for the request in `nomad/foo_endpoint.go` + * RPCs are resolved by matching the method name for bound structs + [net/rpc](https://golang.org/pkg/net/rpc/) +* Wrapper for the HTTP request in `command/agent/foo_endpoint.go` + * Backwards compatibility requires a new endpoint, an upgraded + client or server may be forwarding this request to an old server, + without support for the new RPC + * RPCs triggered by an internal process may not need support +* [ ] `nomad/core_sched.go` sends many RPCs + * `ServersMeetMinimumVersion` asserts that the server cluster is + upgraded, so use this to gaurd sending the new RPC, else send the old RPC + * Version must match the actual release version! + +## Docs + +* [ ] Changelog From c20c232eab3902c4b7994c19164c9de4368a0e60 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 26 Jun 2019 10:44:56 -0400 Subject: [PATCH 25/34] fsm label batch_deregister_node metrics explicitly Co-Authored-By: Mahmood Ali --- nomad/fsm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index fcc1b1426a9..64f96b4dbb7 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -307,7 +307,7 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { } func (n *nomadFSM) applyDeregisterNodeBatch(buf []byte, index uint64) interface{} { - defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now()) + defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_node"}, time.Now()) var req structs.NodeDeregisterBatchRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) From f5632ba98a5d85acb02627ca21925775e40f9394 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 26 Jun 2019 10:40:04 -0400 Subject: [PATCH 26/34] structs NodeDeregisterBatchRequestType must go at the end --- nomad/structs/structs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 44a7b6ef9cb..14a2147aa80 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -54,7 +54,6 @@ type MessageType uint8 const ( NodeRegisterRequestType MessageType = iota NodeDeregisterRequestType - NodeDeregisterBatchRequestType // QUESTION does iota make it important to append this list? NodeUpdateStatusRequestType NodeUpdateDrainRequestType JobRegisterRequestType @@ -84,6 +83,7 @@ const ( NodeUpdateEligibilityRequestType BatchNodeUpdateDrainRequestType SchedulerConfigRequestType + NodeDeregisterBatchRequestType ) const ( From 0deb209d99c1f6e1822d2b6152a62ec411267d54 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 26 Jun 2019 10:45:29 -0400 Subject: [PATCH 27/34] checklist NodeDeregisterBatchRequestType must go at the end --- contributing/checklist-rpc-endpoint.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/contributing/checklist-rpc-endpoint.md b/contributing/checklist-rpc-endpoint.md index fbcbd74b886..53e3578e262 100644 --- a/contributing/checklist-rpc-endpoint.md +++ b/contributing/checklist-rpc-endpoint.md @@ -4,7 +4,9 @@ Prefer adding a new message to changing any existing RPC messages. ## Code -* [ ] `Request` struct and `RequestType` constant in `nomad/structs/structs.go` +* [ ] `Request` struct and `*RequestType` constant in + `nomad/structs/structs.go`. Append the constant, old constant + values must remain unchanged * [ ] In `nomad/fsm.go`, add a dispatch case to the switch statement in `Apply` * `*nomadFSM` method to decode the request and call the state method * [ ] State method for modifying objects in a `Txn` in `nomad/state/state_store.go` From 838460e70c5512fc972fbfc2b57059cf386582d5 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 26 Jun 2019 10:46:37 -0400 Subject: [PATCH 28/34] fsm new NodeDeregisterBatchRequestType sorted at the end of the case --- nomad/fsm.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 64f96b4dbb7..93976c41c2f 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -191,8 +191,6 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyUpsertNode(buf[1:], log.Index) case structs.NodeDeregisterRequestType: return n.applyDeregisterNode(buf[1:], log.Index) - case structs.NodeDeregisterBatchRequestType: - return n.applyDeregisterNodeBatch(buf[1:], log.Index) case structs.NodeUpdateStatusRequestType: return n.applyStatusUpdate(buf[1:], log.Index) case structs.NodeUpdateDrainRequestType: @@ -251,6 +249,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyBatchDrainUpdate(buf[1:], log.Index) case structs.SchedulerConfigRequestType: return n.applySchedulerConfigUpdate(buf[1:], log.Index) + case structs.NodeDeregisterBatchRequestType: + return n.applyDeregisterNodeBatch(buf[1:], log.Index) } // Check enterprise only message types. From 3ad8a2498cf26d63dadf62b699cf1c09da2ec525 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 26 Jun 2019 10:52:31 -0400 Subject: [PATCH 29/34] state_store error if called without node_ids --- nomad/state/state_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index fa6cb084078..a384e2b4dca 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -680,7 +680,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { // DeleteNode deregisters a batch of nodes func (s *StateStore) DeleteNode(index uint64, nodes []string) error { if len(nodes) == 0 { - return nil + return fmt.Errorf("node ids missing") } txn := s.db.Txn(true) From 9bdf35ad4d5a4ffb05748049df2b55e3c34c8191 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 26 Jun 2019 10:57:58 -0400 Subject: [PATCH 30/34] NodeDeregisterBatch -> NodeBatchDeregister match JobBatch pattern --- nomad/core_sched.go | 4 ++-- nomad/fsm.go | 4 ++-- nomad/fsm_test.go | 4 ++-- nomad/node_endpoint.go | 8 ++++---- nomad/node_endpoint_test.go | 16 ++++++++-------- nomad/structs/structs.go | 6 +++--- 6 files changed, 21 insertions(+), 21 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 5022c2b2eb5..ebd4a19e2e4 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -509,7 +509,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err // Call to the leader to issue the reap for _, ids := range partitionAll(maxIdsPerReap, nodeIDs) { - req := structs.NodeDeregisterBatchRequest{ + req := structs.NodeBatchDeregisterRequest{ NodeIDs: ids, WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, @@ -517,7 +517,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err }, } var resp structs.NodeUpdateResponse - if err := c.srv.RPC("Node.DeregisterBatch", &req, &resp); err != nil { + if err := c.srv.RPC("Node.BatchDeregister", &req, &resp); err != nil { c.logger.Error("node reap failed", "node_ids", ids, "error", err) return err } diff --git a/nomad/fsm.go b/nomad/fsm.go index 93976c41c2f..50a397e0b8e 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -249,7 +249,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyBatchDrainUpdate(buf[1:], log.Index) case structs.SchedulerConfigRequestType: return n.applySchedulerConfigUpdate(buf[1:], log.Index) - case structs.NodeDeregisterBatchRequestType: + case structs.NodeBatchDeregisterRequestType: return n.applyDeregisterNodeBatch(buf[1:], log.Index) } @@ -308,7 +308,7 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} { func (n *nomadFSM) applyDeregisterNodeBatch(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_node"}, time.Now()) - var req structs.NodeDeregisterBatchRequest + var req structs.NodeBatchDeregisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 4278992e01f..d75ba1ee396 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -224,10 +224,10 @@ func TestFSM_DeregisterNode(t *testing.T) { t.Fatalf("resp: %v", resp) } - req2 := structs.NodeDeregisterBatchRequest{ + req2 := structs.NodeBatchDeregisterRequest{ NodeIDs: []string{node.ID}, } - buf, err = structs.Encode(structs.NodeDeregisterBatchRequestType, req2) + buf, err = structs.Encode(structs.NodeBatchDeregisterRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 2a18dd7a3a8..d87683aea94 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -247,7 +247,7 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply // Deregister is the deprecated single deregistration endpoint func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error { - return n.DeregisterBatch(&structs.NodeDeregisterBatchRequest{ + return n.DeregisterBatch(&structs.NodeBatchDeregisterRequest{ NodeIDs: []string{args.NodeID}, WriteRequest: args.WriteRequest, }, reply) @@ -255,8 +255,8 @@ func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.No // DeregisterBatch is used to remove client nodes from the cluster. If a client should just // be made unavailable for scheduling, a status update is preferred. -func (n *Node) DeregisterBatch(args *structs.NodeDeregisterBatchRequest, reply *structs.NodeUpdateResponse) error { - if done, err := n.srv.forward("Node.DeregisterBatch", args, args, reply); done { +func (n *Node) DeregisterBatch(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error { + if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) @@ -294,7 +294,7 @@ func (n *Node) DeregisterBatch(args *structs.NodeDeregisterBatchRequest, reply * // Commit this update to Raft, before we clear the heartbeatTimer so that failure // leaves the node running - _, index, err := n.srv.raftApply(structs.NodeDeregisterBatchRequestType, args) + _, index, err := n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args) if err != nil { n.logger.Error("deregister failed", "error", err) return err diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 18ff327f01f..00e7706f897 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -270,18 +270,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) { invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead)) // Deregister without any token and expect it to fail - dereg := &structs.NodeDeregisterBatchRequest{ + dereg := &structs.NodeBatchDeregisterRequest{ NodeIDs: []string{node.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.GenericResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg, &resp); err == nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err == nil { t.Fatalf("node de-register succeeded") } // Deregister with a valid token dereg.AuthToken = validToken.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err != nil { t.Fatalf("err: %v", err) } @@ -296,18 +296,18 @@ func TestClientEndpoint_Deregister_ACL(t *testing.T) { } // Deregister with an invalid token. - dereg1 := &structs.NodeDeregisterBatchRequest{ + dereg1 := &structs.NodeBatchDeregisterRequest{ NodeIDs: []string{node1.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } dereg1.AuthToken = invalidToken.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg1, &resp); err == nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err == nil { t.Fatalf("rpc should not have succeeded") } // Try with a root token dereg1.AuthToken = root.SecretID - if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg1, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err != nil { t.Fatalf("err: %v", err) } } @@ -345,12 +345,12 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) { state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2}) // Deregister - dereg := &structs.NodeDeregisterBatchRequest{ + dereg := &structs.NodeBatchDeregisterRequest{ NodeIDs: []string{node.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.GenericResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.DeregisterBatch", dereg, &resp2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp2); err != nil { t.Fatalf("err: %v", err) } if resp2.Index == 0 { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 14a2147aa80..921d2185c89 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -83,7 +83,7 @@ const ( NodeUpdateEligibilityRequestType BatchNodeUpdateDrainRequestType SchedulerConfigRequestType - NodeDeregisterBatchRequestType + NodeBatchDeregisterRequestType ) const ( @@ -324,9 +324,9 @@ type NodeDeregisterRequest struct { WriteRequest } -// NodeDeregisterBatchRequest is used for Node.DeregisterBatch endpoint +// NodeBatchDeregisterRequest is used for Node.BatchDeregister endpoint // to deregister a batch of nodes from being schedulable entities. -type NodeDeregisterBatchRequest struct { +type NodeBatchDeregisterRequest struct { NodeIDs []string WriteRequest } From 675f750038d2ddee003bc6a2efdef2c50b5f4907 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 26 Jun 2019 11:08:41 -0400 Subject: [PATCH 31/34] structs drop deprecation warning, revert unnecessary comment change --- nomad/structs/structs.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 921d2185c89..c6145e14e7c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -317,8 +317,7 @@ type NodeRegisterRequest struct { } // NodeDeregisterRequest is used for Node.Deregister endpoint -// to deregister a batch of nodes from being schedulable entities. -// Deprecated in 0.9.3 +// to deregister a node as being a schedulable entity. type NodeDeregisterRequest struct { NodeID string WriteRequest From 5eac71913388121a49bafd0b33cf40fd76c03844 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 27 Jun 2019 15:16:27 -0400 Subject: [PATCH 32/34] core_sched add compat comment for later removal --- nomad/core_sched.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index ebd4a19e2e4..1fb7330ea02 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -487,7 +487,7 @@ OUTER: } func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error { - // For old clusters, send single deregistration messages + // For old clusters, send single deregistration messages COMPAT(0.11) minVersionBatchNodeDeregister := version.Must(version.NewVersion("0.9.4")) if !ServersMeetMinimumVersion(c.srv.Members(), minVersionBatchNodeDeregister, true) { for _, id := range nodeIDs { From e0edc7d93e66f34028660eb277b86ee92bc71a62 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 26 Jun 2019 16:14:36 -0400 Subject: [PATCH 33/34] node_endpoint preserve both messages as rpcs and in raft --- nomad/node_endpoint.go | 85 ++++++++++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index d87683aea94..b5c96c7cf2f 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -245,65 +245,93 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply return nil } -// Deregister is the deprecated single deregistration endpoint +// Deregister is used to remove a client from the cluster. If a client should +// just be made unavailable for scheduling, a status update is preferred. func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error { - return n.DeregisterBatch(&structs.NodeBatchDeregisterRequest{ + if done, err := n.srv.forward("Node.Deregister", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) + + if args.NodeID == "" { + return fmt.Errorf("missing node ID for client deregistration") + } + + // deregister takes a batch + repack := &structs.NodeBatchDeregisterRequest{ NodeIDs: []string{args.NodeID}, WriteRequest: args.WriteRequest, - }, reply) + } + + return n.deregister(repack, reply, func() (interface{}, uint64, error) { + return n.srv.raftApply(structs.NodeDeregisterRequestType, args) + }) } -// DeregisterBatch is used to remove client nodes from the cluster. If a client should just -// be made unavailable for scheduling, a status update is preferred. -func (n *Node) DeregisterBatch(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error { +// BatchDeregister is used to remove client nodes from the cluster. +func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done { return err } - defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) + defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now()) - // Check node permissions + if len(args.NodeIDs) == 0 { + return fmt.Errorf("missing node IDs for client deregistration") + } + + return n.deregister(args, reply, func() (interface{}, uint64, error) { + return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args) + }) +} + +// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister +func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, + reply *structs.NodeUpdateResponse, + raftApplyFn func() (interface{}, uint64, error), +) error { + // Check request permissions if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNodeWrite() { return structs.ErrPermissionDenied } - // Verify the arguments - if len(args.NodeIDs) == 0 { - return fmt.Errorf("missing node IDs for client deregistration") - } - - // Open state handles + // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } ws := memdb.NewWatchSet() - - // Assert that the state contains the nodes for _, nodeID := range args.NodeIDs { node, err := snap.NodeByID(ws, nodeID) if err != nil { - return fmt.Errorf("node lookup failed: %s: %v", nodeID, err) + return err } if node == nil { - return fmt.Errorf("node not found: %s", nodeID) + return fmt.Errorf("node not found") } } - // Commit this update to Raft, before we clear the heartbeatTimer so that failure - // leaves the node running - _, index, err := n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args) + // Commit this update via Raft + _, index, err := raftApplyFn() if err != nil { - n.logger.Error("deregister failed", "error", err) + n.logger.Error("raft message failed", "error", err) return err } for _, nodeID := range args.NodeIDs { + // Clear the heartbeat timer if any n.srv.clearHeartbeatTimer(nodeID) - // If there are any Vault accessors on the node, revoke them + // Create the evaluations for this node + evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) + if err != nil { + n.logger.Error("eval creation failed", "error", err) + return err + } + + // Determine if there are any Vault accessors on the node accessors, err := snap.VaultAccessorsByNode(ws, nodeID) if err != nil { n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err) @@ -318,24 +346,15 @@ func (n *Node) DeregisterBatch(args *structs.NodeBatchDeregisterRequest, reply * } } - // Create the evaluations for these nodes - evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) - if err != nil { - n.logger.Error("eval creation failed", "error", err) - return err - } - reply.EvalIDs = append(reply.EvalIDs, evalIDs...) - // Set the reply evalIndex only the first time + // Set the reply eval create index just the first time if reply.EvalCreateIndex == 0 { reply.EvalCreateIndex = evalIndex } } - // Setup the reply reply.NodeModifyIndex = index reply.Index = index - return nil } From 17f49512eea1f94211a8006e9efadcba567e3a23 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 27 Jun 2019 15:44:28 -0400 Subject: [PATCH 34/34] Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0402054257..c605789ec6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ IMPROVEMENTS: * core: Removed deprecated upgrade path code pertaining to older versions of Nomad [[GH-5894](https://github.com/hashicorp/nomad/issues/5894)] + * core: Deregister nodes in batches rather than one at a time [[GH-5784](https://github.com/hashicorp/nomad/pull/5784) * client: Improved task event display message to include kill time out [[GH-5943](https://github.com/hashicorp/nomad/issues/5943)] * api: Used region from job hcl when not provided as query parameter in job registration and plan endpoints [[GH-5664](https://github.com/hashicorp/nomad/pull/5664)] * api: Inferred content type of file in alloc filesystem stat endpoint [[GH-5907](https://github.com/hashicorp/nomad/issues/5907)]