Skip to content

Commit

Permalink
node heartbeat missed event
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed May 22, 2018
1 parent 774dd0f commit f01c0d7
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 70 deletions.
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status); err != nil {
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeStatus failed: %v", err)
return err
}
Expand Down
37 changes: 17 additions & 20 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func TestFSM_DeregisterNode(t *testing.T) {

func TestFSM_UpdateNodeStatus(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)

Expand All @@ -257,43 +258,39 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
require.Nil(resp)

// Mark an eval as blocked.
eval := mock.Eval()
eval.ClassEligibility = map[string]bool{node.ComputedClass: true}
fsm.blockedEvals.Block(eval)

event := &structs.NodeEvent{
Message: "Node ready foo",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
req2 := structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
NodeID: node.ID,
Status: structs.NodeStatusReady,
NodeEvent: event,
}
buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)

resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
require.Nil(resp)

// Verify the status is ready.
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if node.Status != structs.NodeStatusReady {
t.Fatalf("bad node: %#v", node)
}
require.NoError(err)
require.Equal(structs.NodeStatusReady, node.Status)
require.Len(node.Events, 2)
require.Equal(event.Message, node.Events[1].Message)

// Verify the eval was unblocked.
testutil.WaitForResult(func() (bool, error) {
Expand Down
9 changes: 7 additions & 2 deletions nomad/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ const (
// heartbeatNotLeader is the error string returned when the heartbeat request
// couldn't be completed since the server is not the leader.
heartbeatNotLeader = "failed to reset heartbeat since server is not leader"

// NodeHeartbeatEventMissed is the event used when the Nodes heartbeat is
// missed.
NodeHeartbeatEventMissed = "Node heartbeat missed"
)

var (
Expand Down Expand Up @@ -123,8 +127,9 @@ func (s *Server) invalidateHeartbeat(id string) {

// Make a request to update the node status
req := structs.NodeUpdateStatusRequest{
NodeID: id,
Status: structs.NodeStatusDown,
NodeID: id,
Status: structs.NodeStatusDown,
NodeEvent: structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).SetMessage(NodeHeartbeatEventMissed),
WriteRequest: structs.WriteRequest{
Region: s.config.Region,
},
Expand Down
16 changes: 6 additions & 10 deletions nomad/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,30 +140,26 @@ func TestHeartbeat_ResetHeartbeatTimerLocked_Renew(t *testing.T) {

func TestHeartbeat_InvalidateHeartbeat(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Create a node
node := mock.Node()
state := s1.fsm.State()
err := state.UpsertNode(1, node)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(state.UpsertNode(1, node))

// This should cause a status update
s1.invalidateHeartbeat(node.ID)

// Check it is updated
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !out.TerminalStatus() {
t.Fatalf("should update node: %#v", out)
}
require.NoError(err)
require.True(out.TerminalStatus())
require.Len(out.Events, 2)
require.Equal(NodeHeartbeatEventMissed, out.Events[1].Message)
}

func TestHeartbeat_ClearHeartbeatTimer(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2622,7 +2622,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {

// Node status update triggers watches
time.AfterFunc(100*time.Millisecond, func() {
errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown)
errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, nil)
})

req.MinQueryIndex = 38
Expand Down
7 changes: 6 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
}

// UpdateNodeStatus is used to update the status of a node
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error {
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

Expand All @@ -602,6 +602,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()

// Add the event if given
if event != nil {
appendNodeEvents(index, copyNode, []*structs.NodeEvent{event})
}

// Update the status in the copy
copyNode.Status = status
copyNode.ModifyIndex = index
Expand Down
51 changes: 18 additions & 33 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,53 +649,38 @@ func TestStateStore_DeleteNode_Node(t *testing.T) {
}

func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
require := require.New(t)
state := testStateStore(t)
node := mock.Node()

err := state.UpsertNode(800, node)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(state.UpsertNode(800, node))

// Create a watchset so we can test that update node status fires the watch
ws := memdb.NewWatchSet()
if _, err := state.NodeByID(ws, node.ID); err != nil {
t.Fatalf("bad: %v", err)
}
_, err := state.NodeByID(ws, node.ID)
require.NoError(err)

err = state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady)
if err != nil {
t.Fatalf("err: %v", err)
event := &structs.NodeEvent{
Message: "Node ready foo",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}

if !watchFired(ws) {
t.Fatalf("bad")
}
require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, event))
require.True(watchFired(ws))

ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}

if out.Status != structs.NodeStatusReady {
t.Fatalf("bad: %#v", out)
}
if out.ModifyIndex != 801 {
t.Fatalf("bad: %#v", out)
}
require.NoError(err)
require.Equal(structs.NodeStatusReady, out.Status)
require.EqualValues(801, out.ModifyIndex)
require.Len(out.Events, 2)
require.Equal(event.Message, out.Events[1].Message)

index, err := state.Index("nodes")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 801 {
t.Fatalf("bad: %d", index)
}

if watchFired(ws) {
t.Fatalf("bad")
}
require.NoError(err)
require.EqualValues(801, index)
require.False(watchFired(ws))
}

func TestStateStore_BatchUpdateNodeDrain(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,9 @@ type NodeServerInfo struct {
// NodeUpdateStatusRequest is used for Node.UpdateStatus endpoint
// to update the status of a node.
type NodeUpdateStatusRequest struct {
NodeID string
Status string
NodeID string
Status string
NodeEvent *NodeEvent
WriteRequest
}

Expand Down

0 comments on commit f01c0d7

Please sign in to comment.