diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index ad69c4b5a1e..6c94af9effb 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -468,7 +468,7 @@ func TestHTTP_NodeQuery(t *testing.T) { if len(n.Events) < 1 { t.Fatalf("Expected node registration event to be populated: %#v", n) } - if n.Events[0].Message != "Node Registered" { + if n.Events[0].Message != "Node registered" { t.Fatalf("Expected node registration event to be first node event: %#v", n) } }) diff --git a/nomad/fsm.go b/nomad/fsm.go index abf858af279..faa4e14bf5b 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -307,7 +307,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status); err != nil { + if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpdateNodeStatus failed: %v", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 2440b50075a..14f984b2b60 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -249,6 +249,7 @@ func TestFSM_DeregisterNode(t *testing.T) { func TestFSM_UpdateNodeStatus(t *testing.T) { t.Parallel() + require := require.New(t) fsm := testFSM(t) fsm.blockedEvals.SetEnabled(true) @@ -257,43 +258,39 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { Node: node, } buf, err := structs.Encode(structs.NodeRegisterRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(err) resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } + require.Nil(resp) // Mark an eval as blocked. eval := mock.Eval() eval.ClassEligibility = map[string]bool{node.ComputedClass: true} fsm.blockedEvals.Block(eval) + event := &structs.NodeEvent{ + Message: "Node ready foo", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } req2 := structs.NodeUpdateStatusRequest{ - NodeID: node.ID, - Status: structs.NodeStatusReady, + NodeID: node.ID, + Status: structs.NodeStatusReady, + NodeEvent: event, } buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(err) resp = fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } + require.Nil(resp) // Verify the status is ready. ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if node.Status != structs.NodeStatusReady { - t.Fatalf("bad node: %#v", node) - } + require.NoError(err) + require.Equal(structs.NodeStatusReady, node.Status) + require.Len(node.Events, 2) + require.Equal(event.Message, node.Events[1].Message) // Verify the eval was unblocked. testutil.WaitForResult(func() (bool, error) { diff --git a/nomad/heartbeat.go b/nomad/heartbeat.go index b41e3540a8f..8468c67bd45 100644 --- a/nomad/heartbeat.go +++ b/nomad/heartbeat.go @@ -14,6 +14,10 @@ const ( // heartbeatNotLeader is the error string returned when the heartbeat request // couldn't be completed since the server is not the leader. heartbeatNotLeader = "failed to reset heartbeat since server is not leader" + + // NodeHeartbeatEventMissed is the event used when the Nodes heartbeat is + // missed. + NodeHeartbeatEventMissed = "Node heartbeat missed" ) var ( @@ -123,8 +127,9 @@ func (s *Server) invalidateHeartbeat(id string) { // Make a request to update the node status req := structs.NodeUpdateStatusRequest{ - NodeID: id, - Status: structs.NodeStatusDown, + NodeID: id, + Status: structs.NodeStatusDown, + NodeEvent: structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).SetMessage(NodeHeartbeatEventMissed), WriteRequest: structs.WriteRequest{ Region: s.config.Region, }, diff --git a/nomad/heartbeat_test.go b/nomad/heartbeat_test.go index 94daca013b8..72a21377ca3 100644 --- a/nomad/heartbeat_test.go +++ b/nomad/heartbeat_test.go @@ -140,6 +140,7 @@ func TestHeartbeat_ResetHeartbeatTimerLocked_Renew(t *testing.T) { func TestHeartbeat_InvalidateHeartbeat(t *testing.T) { t.Parallel() + require := require.New(t) s1 := TestServer(t, nil) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) @@ -147,10 +148,7 @@ func TestHeartbeat_InvalidateHeartbeat(t *testing.T) { // Create a node node := mock.Node() state := s1.fsm.State() - err := state.UpsertNode(1, node) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(state.UpsertNode(1, node)) // This should cause a status update s1.invalidateHeartbeat(node.ID) @@ -158,12 +156,10 @@ func TestHeartbeat_InvalidateHeartbeat(t *testing.T) { // Check it is updated ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if !out.TerminalStatus() { - t.Fatalf("should update node: %#v", out) - } + require.NoError(err) + require.True(out.TerminalStatus()) + require.Len(out.Events, 2) + require.Equal(NodeHeartbeatEventMissed, out.Events[1].Message) } func TestHeartbeat_ClearHeartbeatTimer(t *testing.T) { diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 65b9374392f..df3265a37cd 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -40,6 +40,10 @@ const ( // NodeEligibilityEventIneligible is used when the nodes eligiblity is marked // ineligible NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling" + + // NodeHeartbeatEventReregistered is the message used when the node becomes + // reregistered by the heartbeat. + NodeHeartbeatEventReregistered = "Node reregistered by heartbeat" ) // Node endpoint is used for client interactions @@ -367,6 +371,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // Commit this update via Raft var index uint64 if node.Status != args.Status { + // Attach an event if we are updating the node status to ready when it + // is down via a heartbeat + if node.Status == structs.NodeStatusDown && args.NodeEvent == nil { + args.NodeEvent = structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemCluster). + SetMessage(NodeHeartbeatEventReregistered) + } + _, index, err = n.srv.raftApply(structs.NodeUpdateStatusRequestType, args) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 7d2616a502e..64a2ab17d7e 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" vapi "github.com/hashicorp/vault/api" @@ -508,6 +509,56 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) { } } +func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Check that we have no client connections + require.Empty(s1.connectedNodes()) + + // Create the register request but make the node down + node := mock.Node() + node.Status = structs.NodeStatusDown + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.NodeUpdateResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) + + // Update the status + dereg := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusInit, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp2 structs.NodeUpdateResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2)) + require.NotZero(resp2.Index) + + // Check for heartbeat interval + ttl := resp2.HeartbeatTTL + if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { + t.Fatalf("bad: %#v", ttl) + } + + // Check for the node in the FSM + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.NoError(err) + require.NotNil(out) + require.EqualValues(resp2.Index, out.ModifyIndex) + require.Len(out.Events, 2) + require.Equal(NodeHeartbeatEventReregistered, out.Events[1].Message) +} + func TestClientEndpoint_Register_GetEvals(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) @@ -1222,7 +1273,7 @@ func TestClientEndpoint_GetNode(t *testing.T) { if len(resp2.Node.Events) != 1 { t.Fatalf("Did not set node events: %#v", resp2.Node) } - if resp2.Node.Events[0].Message != "Node Registered" { + if resp2.Node.Events[0].Message != state.NodeRegisterEventRegistered { t.Fatalf("Did not set node register event correctly: %#v", resp2.Node) } @@ -2622,7 +2673,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { // Node status update triggers watches time.AfterFunc(100*time.Millisecond, func() { - errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown) + errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, nil) }) req.MinQueryIndex = 38 diff --git a/nomad/server_test.go b/nomad/server_test.go index 3bdb58e7e36..ef143904759 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -3,7 +3,6 @@ package nomad import ( "fmt" "io/ioutil" - "log" "os" "path" "strings" @@ -11,7 +10,6 @@ import ( "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -531,22 +529,22 @@ func TestServer_InvalidSchedulers(t *testing.T) { t.Parallel() require := require.New(t) + // Set the config to not have the core scheduler config := DefaultConfig() - config.DevMode = true - config.LogOutput = testlog.NewWriter(t) - config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1" - logger := log.New(config.LogOutput, "", log.LstdFlags) - catalog := consul.NewMockCatalog(logger) + logger := testlog.Logger(t) + s := &Server{ + config: config, + logger: logger, + } - // Set the config to not have the core scheduler config.EnabledSchedulers = []string{"batch"} - _, err := NewServer(config, catalog, logger) + err := s.setupWorkers() require.NotNil(err) require.Contains(err.Error(), "scheduler not enabled") // Set the config to have an unknown scheduler config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"} - _, err = NewServer(config, catalog, logger) + err = s.setupWorkers() require.NotNil(err) require.Contains(err.Error(), "foo") } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 142b63b9ac3..ac88328b9e2 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -14,6 +14,16 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // NodeRegisterEventReregistered is the message used when the node becomes + // reregistered. + NodeRegisterEventRegistered = "Node registered" + + // NodeRegisterEventReregistered is the message used when the node becomes + // reregistered. + NodeRegisterEventReregistered = "Node re-registered" +) + // IndexEntry is used with the "index" table // for managing the latest Raft index affecting a table. type IndexEntry struct { @@ -530,17 +540,23 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { // Retain node events that have already been set on the node node.Events = exist.Events + // If we are transitioning from down, record the re-registration + if exist.Status == structs.NodeStatusDown && node.Status != structs.NodeStatusDown { + appendNodeEvents(index, node, []*structs.NodeEvent{ + structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster). + SetMessage(NodeRegisterEventReregistered). + SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))}) + } + node.Drain = exist.Drain // Retain the drain mode node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy } else { // Because this is the first time the node is being registered, we should // also create a node registration event - nodeEvent := &structs.NodeEvent{ - Message: "Node Registered", - Subsystem: "Cluster", - Timestamp: time.Unix(node.StatusUpdatedAt, 0), - } + nodeEvent := structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster). + SetMessage(NodeRegisterEventRegistered). + SetTimestamp(time.Unix(node.StatusUpdatedAt, 0)) node.Events = []*structs.NodeEvent{nodeEvent} node.CreateIndex = index node.ModifyIndex = index @@ -585,7 +601,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error { } // UpdateNodeStatus is used to update the status of a node -func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error { +func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() @@ -602,6 +618,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error existingNode := existing.(*structs.Node) copyNode := existingNode.Copy() + // Add the event if given + if event != nil { + appendNodeEvents(index, copyNode, []*structs.NodeEvent{event}) + } + // Update the status in the copy copyNode.Status = status copyNode.ModifyIndex = index diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index ec9e76076b8..18c56363852 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -551,54 +551,45 @@ func TestStateStore_DeploymentsByIDPrefix(t *testing.T) { } func TestStateStore_UpsertNode_Node(t *testing.T) { + require := require.New(t) state := testStateStore(t) node := mock.Node() // Create a watchset so we can test that upsert fires the watch ws := memdb.NewWatchSet() _, err := state.NodeByID(ws, node.ID) - if err != nil { - t.Fatalf("bad: %v", err) - } + require.NoError(err) - err = state.UpsertNode(1000, node) - if err != nil { - t.Fatalf("err: %v", err) - } - if !watchFired(ws) { - t.Fatalf("bad") - } + require.NoError(state.UpsertNode(1000, node)) + require.True(watchFired(ws)) ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(err) out2, err := state.NodeBySecretID(ws, node.SecretID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if !reflect.DeepEqual(node, out2) { - t.Fatalf("bad: %#v %#v", node, out2) - } - - if !reflect.DeepEqual(node, out) { - t.Fatalf("bad: %#v %#v", node, out) - } + require.NoError(err) + require.EqualValues(node, out) + require.EqualValues(node, out2) + require.Len(out.Events, 1) + require.Equal(NodeRegisterEventRegistered, out.Events[0].Message) index, err := state.Index("nodes") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 1000 { - t.Fatalf("bad: %d", index) - } + require.NoError(err) + require.EqualValues(1000, index) + require.False(watchFired(ws)) - if watchFired(ws) { - t.Fatalf("bad") - } + // Transition the node to down and then up and ensure we get a re-register + // event + down := out.Copy() + down.Status = structs.NodeStatusDown + require.NoError(state.UpsertNode(1001, down)) + require.NoError(state.UpsertNode(1002, out)) + + out, err = state.NodeByID(ws, node.ID) + require.NoError(err) + require.Len(out.Events, 2) + require.Equal(NodeRegisterEventReregistered, out.Events[1].Message) } func TestStateStore_DeleteNode_Node(t *testing.T) { @@ -649,53 +640,38 @@ func TestStateStore_DeleteNode_Node(t *testing.T) { } func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { + require := require.New(t) state := testStateStore(t) node := mock.Node() - err := state.UpsertNode(800, node) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(state.UpsertNode(800, node)) // Create a watchset so we can test that update node status fires the watch ws := memdb.NewWatchSet() - if _, err := state.NodeByID(ws, node.ID); err != nil { - t.Fatalf("bad: %v", err) - } + _, err := state.NodeByID(ws, node.ID) + require.NoError(err) - err = state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady) - if err != nil { - t.Fatalf("err: %v", err) + event := &structs.NodeEvent{ + Message: "Node ready foo", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), } - if !watchFired(ws) { - t.Fatalf("bad") - } + require.NoError(state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady, event)) + require.True(watchFired(ws)) ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if out.Status != structs.NodeStatusReady { - t.Fatalf("bad: %#v", out) - } - if out.ModifyIndex != 801 { - t.Fatalf("bad: %#v", out) - } + require.NoError(err) + require.Equal(structs.NodeStatusReady, out.Status) + require.EqualValues(801, out.ModifyIndex) + require.Len(out.Events, 2) + require.Equal(event.Message, out.Events[1].Message) index, err := state.Index("nodes") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 801 { - t.Fatalf("bad: %d", index) - } - - if watchFired(ws) { - t.Fatalf("bad") - } + require.NoError(err) + require.EqualValues(801, index) + require.False(watchFired(ws)) } func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { @@ -809,7 +785,7 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { require.Equal(1, len(node.Events)) require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem) - require.Equal("Node Registered", node.Events[0].Message) + require.Equal(NodeRegisterEventRegistered, node.Events[0].Message) // Create a watchset so we can test that AddNodeEvent fires the watch ws := memdb.NewWatchSet() @@ -851,7 +827,7 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { } require.Equal(1, len(node.Events)) require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem) - require.Equal("Node Registered", node.Events[0].Message) + require.Equal(NodeRegisterEventRegistered, node.Events[0].Message) var out *structs.Node for i := 1; i <= 20; i++ { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index dee23e42ba4..658b75479cb 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -293,7 +293,8 @@ type WriteMeta struct { // NodeRegisterRequest is used for Node.Register endpoint // to register a node as being a schedulable entity. type NodeRegisterRequest struct { - Node *Node + Node *Node + NodeEvent *NodeEvent WriteRequest } @@ -326,8 +327,9 @@ type NodeServerInfo struct { // NodeUpdateStatusRequest is used for Node.UpdateStatus endpoint // to update the status of a node. type NodeUpdateStatusRequest struct { - NodeID string - Status string + NodeID string + Status string + NodeEvent *NodeEvent WriteRequest } @@ -1257,6 +1259,12 @@ func (ne *NodeEvent) SetSubsystem(sys string) *NodeEvent { return ne } +// SetTimestamp is used to set the timestamp on the node event +func (ne *NodeEvent) SetTimestamp(ts time.Time) *NodeEvent { + ne.Timestamp = ts + return ne +} + // AddDetail is used to add a detail to the node event func (ne *NodeEvent) AddDetail(k, v string) *NodeEvent { if ne.Details == nil { diff --git a/website/source/api/nodes.html.md b/website/source/api/nodes.html.md index 1220f3413e4..1cf87d04788 100644 --- a/website/source/api/nodes.html.md +++ b/website/source/api/nodes.html.md @@ -257,7 +257,7 @@ $ curl \ { "CreateIndex": 0, "Details": null, - "Message": "Node Registered", + "Message": "Node registered", "Subsystem": "Cluster", "Timestamp": "2018-04-10T23:43:17Z" } diff --git a/website/source/docs/commands/node/status.html.md.erb b/website/source/docs/commands/node/status.html.md.erb index f3713e045c2..cd6d1e2c8ce 100644 --- a/website/source/docs/commands/node/status.html.md.erb +++ b/website/source/docs/commands/node/status.html.md.erb @@ -111,7 +111,7 @@ rkt true true Node Events Time Subsystem Message 2018-03-29T17:24:42Z Driver: docker Driver docker is not detected -2018-03-29T17:23:42Z Cluster Node Registered +2018-03-29T17:23:42Z Cluster Node registered Allocated Resources CPU Memory Disk IOPS @@ -154,7 +154,7 @@ rkt true true Node Events Time Subsystem Message 2018-03-29T17:24:42Z Driver: docker Driver docker is not detected -2018-03-29T17:23:42Z Cluster Node Registered +2018-03-29T17:23:42Z Cluster Node registered Allocated Resources CPU Memory Disk IOPS @@ -219,7 +219,7 @@ rkt true true Node Events Time Subsystem Message 2018-03-29T17:24:42Z Driver: docker Driver docker is not detected -2018-03-29T17:23:42Z Cluster Node Registered +2018-03-29T17:23:42Z Cluster Node registered Allocated Resources CPU Memory Disk IOPS @@ -300,7 +300,7 @@ rkt true true 2018-03-29T17:23:42Z Node Events Time Subsystem Message Details 2018-03-29T17:24:42Z Driver: docker Driver docker is not detected driver: docker, -2018-03-29T17:23:42Z Cluster Node Registered +2018-03-29T17:23:42Z Cluster Node registered Allocated Resources CPU Memory Disk IOPS