From e0edc7d93e66f34028660eb277b86ee92bc71a62 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 26 Jun 2019 16:14:36 -0400 Subject: [PATCH] node_endpoint preserve both messages as rpcs and in raft --- nomad/node_endpoint.go | 85 ++++++++++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index d87683aea94..b5c96c7cf2f 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -245,65 +245,93 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply return nil } -// Deregister is the deprecated single deregistration endpoint +// Deregister is used to remove a client from the cluster. If a client should +// just be made unavailable for scheduling, a status update is preferred. func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error { - return n.DeregisterBatch(&structs.NodeBatchDeregisterRequest{ + if done, err := n.srv.forward("Node.Deregister", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) + + if args.NodeID == "" { + return fmt.Errorf("missing node ID for client deregistration") + } + + // deregister takes a batch + repack := &structs.NodeBatchDeregisterRequest{ NodeIDs: []string{args.NodeID}, WriteRequest: args.WriteRequest, - }, reply) + } + + return n.deregister(repack, reply, func() (interface{}, uint64, error) { + return n.srv.raftApply(structs.NodeDeregisterRequestType, args) + }) } -// DeregisterBatch is used to remove client nodes from the cluster. If a client should just -// be made unavailable for scheduling, a status update is preferred. -func (n *Node) DeregisterBatch(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error { +// BatchDeregister is used to remove client nodes from the cluster. +func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done { return err } - defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) + defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now()) - // Check node permissions + if len(args.NodeIDs) == 0 { + return fmt.Errorf("missing node IDs for client deregistration") + } + + return n.deregister(args, reply, func() (interface{}, uint64, error) { + return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args) + }) +} + +// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister +func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, + reply *structs.NodeUpdateResponse, + raftApplyFn func() (interface{}, uint64, error), +) error { + // Check request permissions if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNodeWrite() { return structs.ErrPermissionDenied } - // Verify the arguments - if len(args.NodeIDs) == 0 { - return fmt.Errorf("missing node IDs for client deregistration") - } - - // Open state handles + // Look for the node snap, err := n.srv.fsm.State().Snapshot() if err != nil { return err } ws := memdb.NewWatchSet() - - // Assert that the state contains the nodes for _, nodeID := range args.NodeIDs { node, err := snap.NodeByID(ws, nodeID) if err != nil { - return fmt.Errorf("node lookup failed: %s: %v", nodeID, err) + return err } if node == nil { - return fmt.Errorf("node not found: %s", nodeID) + return fmt.Errorf("node not found") } } - // Commit this update to Raft, before we clear the heartbeatTimer so that failure - // leaves the node running - _, index, err := n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args) + // Commit this update via Raft + _, index, err := raftApplyFn() if err != nil { - n.logger.Error("deregister failed", "error", err) + n.logger.Error("raft message failed", "error", err) return err } for _, nodeID := range args.NodeIDs { + // Clear the heartbeat timer if any n.srv.clearHeartbeatTimer(nodeID) - // If there are any Vault accessors on the node, revoke them + // Create the evaluations for this node + evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) + if err != nil { + n.logger.Error("eval creation failed", "error", err) + return err + } + + // Determine if there are any Vault accessors on the node accessors, err := snap.VaultAccessorsByNode(ws, nodeID) if err != nil { n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err) @@ -318,24 +346,15 @@ func (n *Node) DeregisterBatch(args *structs.NodeBatchDeregisterRequest, reply * } } - // Create the evaluations for these nodes - evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) - if err != nil { - n.logger.Error("eval creation failed", "error", err) - return err - } - reply.EvalIDs = append(reply.EvalIDs, evalIDs...) - // Set the reply evalIndex only the first time + // Set the reply eval create index just the first time if reply.EvalCreateIndex == 0 { reply.EvalCreateIndex = evalIndex } } - // Setup the reply reply.NodeModifyIndex = index reply.Index = index - return nil }