Skip to content

Commit

Permalink
node_endpoint preserve both messages as rpcs and in raft
Browse files Browse the repository at this point in the history
  • Loading branch information
langmartin committed Jul 8, 2019
1 parent 49f693d commit bd12b75
Showing 1 changed file with 52 additions and 33 deletions.
85 changes: 52 additions & 33 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,65 +245,93 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
return nil
}

// Deregister is the deprecated single deregistration endpoint
// Deregister is used to remove a client from the cluster. If a client should
// just be made unavailable for scheduling, a status update is preferred.
func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error {
return n.DeregisterBatch(&structs.NodeBatchDeregisterRequest{
if done, err := n.srv.forward("Node.Deregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now())

if args.NodeID == "" {
return fmt.Errorf("missing node ID for client deregistration")
}

// deregister takes a batch
repack := &structs.NodeBatchDeregisterRequest{
NodeIDs: []string{args.NodeID},
WriteRequest: args.WriteRequest,
}, reply)
}

return n.deregister(repack, reply, func() (interface{}, uint64, error) {
return n.srv.raftApply(structs.NodeDeregisterRequestType, args)
})
}

// DeregisterBatch is used to remove client nodes from the cluster. If a client should just
// be made unavailable for scheduling, a status update is preferred.
func (n *Node) DeregisterBatch(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error {
// BatchDeregister is used to remove client nodes from the cluster.
func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error {
if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now())
defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now())

// Check node permissions
if len(args.NodeIDs) == 0 {
return fmt.Errorf("missing node IDs for client deregistration")
}

return n.deregister(args, reply, func() (interface{}, uint64, error) {
return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args)
})
}

// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister
func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
reply *structs.NodeUpdateResponse,
raftApplyFn func() (interface{}, uint64, error),
) error {
// Check request permissions
if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNodeWrite() {
return structs.ErrPermissionDenied
}

// Verify the arguments
if len(args.NodeIDs) == 0 {
return fmt.Errorf("missing node IDs for client deregistration")
}

// Open state handles
// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
if err != nil {
return err
}

ws := memdb.NewWatchSet()

// Assert that the state contains the nodes
for _, nodeID := range args.NodeIDs {
node, err := snap.NodeByID(ws, nodeID)
if err != nil {
return fmt.Errorf("node lookup failed: %s: %v", nodeID, err)
return err
}
if node == nil {
return fmt.Errorf("node not found: %s", nodeID)
return fmt.Errorf("node not found")
}
}

// Commit this update to Raft, before we clear the heartbeatTimer so that failure
// leaves the node running
_, index, err := n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args)
// Commit this update via Raft
_, index, err := raftApplyFn()
if err != nil {
n.logger.Error("deregister failed", "error", err)
n.logger.Error("raft message failed", "error", err)
return err
}

for _, nodeID := range args.NodeIDs {
// Clear the heartbeat timer if any
n.srv.clearHeartbeatTimer(nodeID)

// If there are any Vault accessors on the node, revoke them
// Create the evaluations for this node
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}

// Determine if there are any Vault accessors on the node
accessors, err := snap.VaultAccessorsByNode(ws, nodeID)
if err != nil {
n.logger.Error("looking up accessors for node failed", "node_id", nodeID, "error", err)
Expand All @@ -318,24 +346,15 @@ func (n *Node) DeregisterBatch(args *structs.NodeBatchDeregisterRequest, reply *
}
}

// Create the evaluations for these nodes
evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
return err
}

reply.EvalIDs = append(reply.EvalIDs, evalIDs...)
// Set the reply evalIndex only the first time
// Set the reply eval create index just the first time
if reply.EvalCreateIndex == 0 {
reply.EvalCreateIndex = evalIndex
}
}

// Setup the reply
reply.NodeModifyIndex = index
reply.Index = index

return nil
}

Expand Down

0 comments on commit bd12b75

Please sign in to comment.