Skip to content

Commit

Permalink
Merge pull request #4291 from hashicorp/f-eligibility
Browse files Browse the repository at this point in the history
Emit events when node eligibility is set
  • Loading branch information
dadgar authored May 22, 2018
2 parents b9d9b44 + adb5808 commit 774dd0f
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 4 deletions.
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
return err
}

if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility); err != nil {
if err := n.state.UpdateNodeEligibility(index, req.NodeID, req.Eligibility, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeEligibility failed: %v", err)
return err
}
Expand Down
9 changes: 9 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,17 @@ func TestFSM_UpdateNodeEligibility(t *testing.T) {
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

event := &structs.NodeEvent{
Message: "Node marked as ineligible",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}

// Set the eligibility
req2 := structs.NodeUpdateEligibilityRequest{
NodeID: node.ID,
Eligibility: structs.NodeSchedulingIneligible,
NodeEvent: event,
}
buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req2)
require.Nil(err)
Expand All @@ -466,6 +473,8 @@ func TestFSM_UpdateNodeEligibility(t *testing.T) {
node, err = fsm.State().NodeByID(nil, req.Node.ID)
require.Nil(err)
require.Equal(node.SchedulingEligibility, structs.NodeSchedulingIneligible)
require.Len(node.Events, 2)
require.Equal(event.Message, node.Events[1].Message)

// Update the drain
strategy := &structs.DrainStrategy{
Expand Down
21 changes: 21 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ const (
NodeDrainEventDrainSet = "Node drain strategy set"
NodeDrainEventDrainDisabled = "Node drain disabled"
NodeDrainEventDrainUpdated = "Node drain stategy updated"

// NodeEligibilityEventEligible is used when the nodes eligiblity is marked
// eligible
NodeEligibilityEventEligible = "Node marked as eligible for scheduling"

// NodeEligibilityEventIneligible is used when the nodes eligiblity is marked
// ineligible
NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling"
)

// Node endpoint is used for client interactions
Expand Down Expand Up @@ -532,6 +540,9 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
if args.NodeID == "" {
return fmt.Errorf("missing node ID for setting scheduling eligibility")
}
if args.NodeEvent != nil {
return fmt.Errorf("node event must not be set")
}

// Check that only allowed types are set
switch args.Eligibility {
Expand Down Expand Up @@ -563,6 +574,16 @@ func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
}

// Construct the node event
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster)
if node.SchedulingEligibility == args.Eligibility {
return nil // Nothing to do
} else if args.Eligibility == structs.NodeSchedulingEligible {
args.NodeEvent.SetMessage(NodeEligibilityEventEligible)
} else {
args.NodeEvent.SetMessage(NodeEligibilityEventIneligible)
}

// Commit this update via Raft
outErr, index, err := n.srv.raftApply(structs.NodeUpdateEligibilityRequestType, args)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) {
out, err := state.NodeByID(nil, node.ID)
require.Nil(err)
require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible)
require.Len(out.Events, 2)
require.Equal(NodeEligibilityEventIneligible, out.Events[1].Message)

// Register a system job
job := mock.SystemJob()
Expand All @@ -1107,6 +1109,11 @@ func TestClientEndpoint_UpdateEligibility(t *testing.T) {
require.NotZero(resp3.Index)
require.NotZero(resp3.EvalCreateIndex)
require.Len(resp3.EvalIDs, 1)

out, err = state.NodeByID(nil, node.ID)
require.Nil(err)
require.Len(out.Events, 3)
require.Equal(NodeEligibilityEventEligible, out.Events[2].Message)
}

func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st
}

// UpdateNodeEligibility is used to update the scheduling eligibility of a node
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string) error {
func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibility string, event *structs.NodeEvent) error {

txn := s.db.Txn(true)
defer txn.Abort()
Expand All @@ -706,6 +706,11 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()

// Add the event if given
if event != nil {
appendNodeEvents(index, copyNode, []*structs.NodeEvent{event})
}

// Check if this is a valid action
if copyNode.DrainStrategy != nil && eligibility == structs.NodeSchedulingEligible {
return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining")
Expand Down
11 changes: 9 additions & 2 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,13 +952,20 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
t.Fatalf("bad: %v", err)
}

require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility))
event := &structs.NodeEvent{
Message: "Node marked as ineligible",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
require.Nil(state.UpdateNodeEligibility(1001, node.ID, expectedEligibility, event))
require.True(watchFired(ws))

ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)
require.Equal(out.SchedulingEligibility, expectedEligibility)
require.Len(out.Events, 2)
require.Equal(out.Events[1], event)
require.EqualValues(1001, out.ModifyIndex)

index, err := state.Index("nodes")
Expand All @@ -975,7 +982,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) {
require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil))

// Try to set the node to eligible
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible)
err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible, nil)
require.NotNil(err)
require.Contains(err.Error(), "while it is draining")
}
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ type DrainUpdate struct {
type NodeUpdateEligibilityRequest struct {
NodeID string
Eligibility string

// NodeEvent is the event added to the node
NodeEvent *NodeEvent

WriteRequest
}

Expand Down

0 comments on commit 774dd0f

Please sign in to comment.