From 55c5b59be80c48697fb78f922076fbdc0488cca6 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 11 May 2018 17:26:25 -0700 Subject: [PATCH] Register events --- command/agent/node_endpoint_test.go | 2 +- nomad/node_endpoint.go | 12 ++++ nomad/node_endpoint_test.go | 53 +++++++++++++++- nomad/server_test.go | 18 +++--- nomad/state/state_store.go | 26 ++++++-- nomad/state/state_store_test.go | 61 ++++++++----------- nomad/structs/structs.go | 9 ++- website/source/api/nodes.html.md | 2 +- .../docs/commands/node/status.html.md.erb | 8 +-- 9 files changed, 133 insertions(+), 58 deletions(-) diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index ad69c4b5a1e..6c94af9effb 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -468,7 +468,7 @@ func TestHTTP_NodeQuery(t *testing.T) { if len(n.Events) < 1 { t.Fatalf("Expected node registration event to be populated: %#v", n) } - if n.Events[0].Message != "Node Registered" { + if n.Events[0].Message != "Node registered" { t.Fatalf("Expected node registration event to be first node event: %#v", n) } }) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 1a0cb3262ff..730a60f5f3c 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -32,6 +32,10 @@ const ( NodeDrainEventDrainSet = "Node drain strategy set" NodeDrainEventDrainDisabled = "Node drain disabled" NodeDrainEventDrainUpdated = "Node drain stategy updated" + + // NodeHeartbeatEventReregistered is the message used when the node becomes + // reregistered by the heartbeat. + NodeHeartbeatEventReregistered = "Node reregistered by heartbeat" ) // Node endpoint is used for client interactions @@ -359,6 +363,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // Commit this update via Raft var index uint64 if node.Status != args.Status { + // Attach an event if we are updating the node status to ready when it + // is down via a heartbeat + if node.Status == structs.NodeStatusDown && args.NodeEvent == nil { + args.NodeEvent = structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemCluster). + SetMessage(NodeHeartbeatEventReregistered) + } + _, index, err = n.srv.raftApply(structs.NodeUpdateStatusRequestType, args) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 1279640fbc9..a2bc2dd32ad 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" vapi "github.com/hashicorp/vault/api" @@ -508,6 +509,56 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) { } } +func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Check that we have no client connections + require.Empty(s1.connectedNodes()) + + // Create the register request but make the node down + node := mock.Node() + node.Status = structs.NodeStatusDown + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.NodeUpdateResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) + + // Update the status + dereg := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: structs.NodeStatusInit, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp2 structs.NodeUpdateResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2)) + require.NotZero(resp2.Index) + + // Check for heartbeat interval + ttl := resp2.HeartbeatTTL + if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { + t.Fatalf("bad: %#v", ttl) + } + + // Check for the node in the FSM + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.NoError(err) + require.NotNil(out) + require.EqualValues(resp2.Index, out.ModifyIndex) + require.Len(out.Events, 2) + require.Equal(NodeHeartbeatEventReregistered, out.Events[1].Message) +} + func TestClientEndpoint_Register_GetEvals(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) @@ -1215,7 +1266,7 @@ func TestClientEndpoint_GetNode(t *testing.T) { if len(resp2.Node.Events) != 1 { t.Fatalf("Did not set node events: %#v", resp2.Node) } - if resp2.Node.Events[0].Message != "Node Registered" { + if resp2.Node.Events[0].Message != state.NodeRegisterEventRegistered { t.Fatalf("Did not set node register event correctly: %#v", resp2.Node) } diff --git a/nomad/server_test.go b/nomad/server_test.go index 3bdb58e7e36..ef143904759 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -3,7 +3,6 @@ package nomad import ( "fmt" "io/ioutil" - "log" "os" "path" "strings" @@ -11,7 +10,6 @@ import ( "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -531,22 +529,22 @@ func TestServer_InvalidSchedulers(t *testing.T) { t.Parallel() require := require.New(t) + // Set the config to not have the core scheduler config := DefaultConfig() - config.DevMode = true - config.LogOutput = testlog.NewWriter(t) - config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1" - logger := log.New(config.LogOutput, "", log.LstdFlags) - catalog := consul.NewMockCatalog(logger) + logger := testlog.Logger(t) + s := &Server{ + config: config, + logger: logger, + } - // Set the config to not have the core scheduler config.EnabledSchedulers = []string{"batch"} - _, err := NewServer(config, catalog, logger) + err := s.setupWorkers() require.NotNil(err) require.Contains(err.Error(), "scheduler not enabled") // Set the config to have an unknown scheduler config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"} - _, err = NewServer(config, catalog, logger) + err = s.setupWorkers() require.NotNil(err) require.Contains(err.Error(), "foo") } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 2573f42e2cb..7272452e608 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -14,6 +14,16 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // NodeRegisterEventReregistered is the message used when the node becomes + // reregistered. + NodeRegisterEventRegistered = "Node registered" + + // NodeRegisterEventReregistered is the message used when the node becomes + // reregistered. + NodeRegisterEventReregistered = "Node re-registered" +) + // IndexEntry is used with the "index" table // for managing the latest Raft index affecting a table. type IndexEntry struct { @@ -530,17 +540,23 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { // Retain node events that have already been set on the node node.Events = exist.Events + // If we are transitioning from down, record the re-registration + if exist.Status == structs.NodeStatusDown && node.Status != structs.NodeStatusDown { + appendNodeEvents(index, node, []*structs.NodeEvent{ + structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster). + SetMessage(NodeRegisterEventReregistered). + SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))}) + } + node.Drain = exist.Drain // Retain the drain mode node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy } else { // Because this is the first time the node is being registered, we should // also create a node registration event - nodeEvent := &structs.NodeEvent{ - Message: "Node Registered", - Subsystem: "Cluster", - Timestamp: time.Unix(node.StatusUpdatedAt, 0), - } + nodeEvent := structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster). + SetMessage(NodeRegisterEventRegistered). + SetTimestamp(time.Unix(node.StatusUpdatedAt, 0)) node.Events = []*structs.NodeEvent{nodeEvent} node.CreateIndex = index node.ModifyIndex = index diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 83065d097f7..95c8b34c4dd 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -551,54 +551,45 @@ func TestStateStore_DeploymentsByIDPrefix(t *testing.T) { } func TestStateStore_UpsertNode_Node(t *testing.T) { + require := require.New(t) state := testStateStore(t) node := mock.Node() // Create a watchset so we can test that upsert fires the watch ws := memdb.NewWatchSet() _, err := state.NodeByID(ws, node.ID) - if err != nil { - t.Fatalf("bad: %v", err) - } + require.NoError(err) - err = state.UpsertNode(1000, node) - if err != nil { - t.Fatalf("err: %v", err) - } - if !watchFired(ws) { - t.Fatalf("bad") - } + require.NoError(state.UpsertNode(1000, node)) + require.True(watchFired(ws)) ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } + require.NoError(err) out2, err := state.NodeBySecretID(ws, node.SecretID) - if err != nil { - t.Fatalf("err: %v", err) - } - - if !reflect.DeepEqual(node, out2) { - t.Fatalf("bad: %#v %#v", node, out2) - } - - if !reflect.DeepEqual(node, out) { - t.Fatalf("bad: %#v %#v", node, out) - } + require.NoError(err) + require.EqualValues(node, out) + require.EqualValues(node, out2) + require.Len(out.Events, 1) + require.Equal(NodeRegisterEventRegistered, out.Events[0].Message) index, err := state.Index("nodes") - if err != nil { - t.Fatalf("err: %v", err) - } - if index != 1000 { - t.Fatalf("bad: %d", index) - } + require.NoError(err) + require.EqualValues(1000, index) + require.False(watchFired(ws)) - if watchFired(ws) { - t.Fatalf("bad") - } + // Transition the node to down and then up and ensure we get a re-register + // event + down := out.Copy() + down.Status = structs.NodeStatusDown + require.NoError(state.UpsertNode(1001, down)) + require.NoError(state.UpsertNode(1002, out)) + + out, err = state.NodeByID(ws, node.ID) + require.NoError(err) + require.Len(out.Events, 2) + require.Equal(NodeRegisterEventReregistered, out.Events[1].Message) } func TestStateStore_DeleteNode_Node(t *testing.T) { @@ -794,7 +785,7 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { require.Equal(1, len(node.Events)) require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem) - require.Equal("Node Registered", node.Events[0].Message) + require.Equal(NodeRegisterEventRegistered, node.Events[0].Message) // Create a watchset so we can test that AddNodeEvent fires the watch ws := memdb.NewWatchSet() @@ -836,7 +827,7 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { } require.Equal(1, len(node.Events)) require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem) - require.Equal("Node Registered", node.Events[0].Message) + require.Equal(NodeRegisterEventRegistered, node.Events[0].Message) var out *structs.Node for i := 1; i <= 20; i++ { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index f821bcd5ad7..cd385a02998 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -293,7 +293,8 @@ type WriteMeta struct { // NodeRegisterRequest is used for Node.Register endpoint // to register a node as being a schedulable entity. type NodeRegisterRequest struct { - Node *Node + Node *Node + NodeEvent *NodeEvent WriteRequest } @@ -1248,6 +1249,12 @@ func (ne *NodeEvent) SetSubsystem(sys string) *NodeEvent { return ne } +// SetTimestamp is used to set the timestamp on the node event +func (ne *NodeEvent) SetTimestamp(ts time.Time) *NodeEvent { + ne.Timestamp = ts + return ne +} + // AddDetail is used to add a detail to the node event func (ne *NodeEvent) AddDetail(k, v string) *NodeEvent { if ne.Details == nil { diff --git a/website/source/api/nodes.html.md b/website/source/api/nodes.html.md index 1220f3413e4..1cf87d04788 100644 --- a/website/source/api/nodes.html.md +++ b/website/source/api/nodes.html.md @@ -257,7 +257,7 @@ $ curl \ { "CreateIndex": 0, "Details": null, - "Message": "Node Registered", + "Message": "Node registered", "Subsystem": "Cluster", "Timestamp": "2018-04-10T23:43:17Z" } diff --git a/website/source/docs/commands/node/status.html.md.erb b/website/source/docs/commands/node/status.html.md.erb index f3713e045c2..cd6d1e2c8ce 100644 --- a/website/source/docs/commands/node/status.html.md.erb +++ b/website/source/docs/commands/node/status.html.md.erb @@ -111,7 +111,7 @@ rkt true true Node Events Time Subsystem Message 2018-03-29T17:24:42Z Driver: docker Driver docker is not detected -2018-03-29T17:23:42Z Cluster Node Registered +2018-03-29T17:23:42Z Cluster Node registered Allocated Resources CPU Memory Disk IOPS @@ -154,7 +154,7 @@ rkt true true Node Events Time Subsystem Message 2018-03-29T17:24:42Z Driver: docker Driver docker is not detected -2018-03-29T17:23:42Z Cluster Node Registered +2018-03-29T17:23:42Z Cluster Node registered Allocated Resources CPU Memory Disk IOPS @@ -219,7 +219,7 @@ rkt true true Node Events Time Subsystem Message 2018-03-29T17:24:42Z Driver: docker Driver docker is not detected -2018-03-29T17:23:42Z Cluster Node Registered +2018-03-29T17:23:42Z Cluster Node registered Allocated Resources CPU Memory Disk IOPS @@ -300,7 +300,7 @@ rkt true true 2018-03-29T17:23:42Z Node Events Time Subsystem Message Details 2018-03-29T17:24:42Z Driver: docker Driver docker is not detected driver: docker, -2018-03-29T17:23:42Z Cluster Node Registered +2018-03-29T17:23:42Z Cluster Node registered Allocated Resources CPU Memory Disk IOPS