Skip to content

Commit

Permalink
Register events
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed May 22, 2018
1 parent f01c0d7 commit cfc9d1f
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 58 deletions.
2 changes: 1 addition & 1 deletion command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestHTTP_NodeQuery(t *testing.T) {
if len(n.Events) < 1 {
t.Fatalf("Expected node registration event to be populated: %#v", n)
}
if n.Events[0].Message != "Node Registered" {
if n.Events[0].Message != "Node registered" {
t.Fatalf("Expected node registration event to be first node event: %#v", n)
}
})
Expand Down
12 changes: 12 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
// NodeEligibilityEventIneligible is used when the nodes eligiblity is marked
// ineligible
NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling"

// NodeHeartbeatEventReregistered is the message used when the node becomes
// reregistered by the heartbeat.
NodeHeartbeatEventReregistered = "Node reregistered by heartbeat"
)

// Node endpoint is used for client interactions
Expand Down Expand Up @@ -367,6 +371,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Commit this update via Raft
var index uint64
if node.Status != args.Status {
// Attach an event if we are updating the node status to ready when it
// is down via a heartbeat
if node.Status == structs.NodeStatusDown && args.NodeEvent == nil {
args.NodeEvent = structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeHeartbeatEventReregistered)
}

_, index, err = n.srv.raftApply(structs.NodeUpdateStatusRequestType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err)
Expand Down
53 changes: 52 additions & 1 deletion nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
Expand Down Expand Up @@ -508,6 +509,56 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Check that we have no client connections
require.Empty(s1.connectedNodes())

// Create the register request but make the node down
node := mock.Node()
node.Status = structs.NodeStatusDown
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))

// Update the status
dereg := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusInit,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2))
require.NotZero(resp2.Index)

// Check for heartbeat interval
ttl := resp2.HeartbeatTTL
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
t.Fatalf("bad: %#v", ttl)
}

// Check for the node in the FSM
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.NoError(err)
require.NotNil(out)
require.EqualValues(resp2.Index, out.ModifyIndex)
require.Len(out.Events, 2)
require.Equal(NodeHeartbeatEventReregistered, out.Events[1].Message)
}

func TestClientEndpoint_Register_GetEvals(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
Expand Down Expand Up @@ -1222,7 +1273,7 @@ func TestClientEndpoint_GetNode(t *testing.T) {
if len(resp2.Node.Events) != 1 {
t.Fatalf("Did not set node events: %#v", resp2.Node)
}
if resp2.Node.Events[0].Message != "Node Registered" {
if resp2.Node.Events[0].Message != state.NodeRegisterEventRegistered {
t.Fatalf("Did not set node register event correctly: %#v", resp2.Node)
}

Expand Down
18 changes: 8 additions & 10 deletions nomad/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package nomad
import (
"fmt"
"io/ioutil"
"log"
"os"
"path"
"strings"
"testing"
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
Expand Down Expand Up @@ -531,22 +529,22 @@ func TestServer_InvalidSchedulers(t *testing.T) {
t.Parallel()
require := require.New(t)

// Set the config to not have the core scheduler
config := DefaultConfig()
config.DevMode = true
config.LogOutput = testlog.NewWriter(t)
config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1"
logger := log.New(config.LogOutput, "", log.LstdFlags)
catalog := consul.NewMockCatalog(logger)
logger := testlog.Logger(t)
s := &Server{
config: config,
logger: logger,
}

// Set the config to not have the core scheduler
config.EnabledSchedulers = []string{"batch"}
_, err := NewServer(config, catalog, logger)
err := s.setupWorkers()
require.NotNil(err)
require.Contains(err.Error(), "scheduler not enabled")

// Set the config to have an unknown scheduler
config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"}
_, err = NewServer(config, catalog, logger)
err = s.setupWorkers()
require.NotNil(err)
require.Contains(err.Error(), "foo")
}
26 changes: 21 additions & 5 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
NodeRegisterEventRegistered = "Node registered"

// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
NodeRegisterEventReregistered = "Node re-registered"
)

// IndexEntry is used with the "index" table
// for managing the latest Raft index affecting a table.
type IndexEntry struct {
Expand Down Expand Up @@ -530,17 +540,23 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
// Retain node events that have already been set on the node
node.Events = exist.Events

// If we are transitioning from down, record the re-registration
if exist.Status == structs.NodeStatusDown && node.Status != structs.NodeStatusDown {
appendNodeEvents(index, node, []*structs.NodeEvent{
structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeRegisterEventReregistered).
SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))})
}

node.Drain = exist.Drain // Retain the drain mode
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy
} else {
// Because this is the first time the node is being registered, we should
// also create a node registration event
nodeEvent := &structs.NodeEvent{
Message: "Node Registered",
Subsystem: "Cluster",
Timestamp: time.Unix(node.StatusUpdatedAt, 0),
}
nodeEvent := structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeRegisterEventRegistered).
SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))
node.Events = []*structs.NodeEvent{nodeEvent}
node.CreateIndex = index
node.ModifyIndex = index
Expand Down
61 changes: 26 additions & 35 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,54 +551,45 @@ func TestStateStore_DeploymentsByIDPrefix(t *testing.T) {
}

func TestStateStore_UpsertNode_Node(t *testing.T) {
require := require.New(t)
state := testStateStore(t)
node := mock.Node()

// Create a watchset so we can test that upsert fires the watch
ws := memdb.NewWatchSet()
_, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
require.NoError(err)

err = state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
require.NoError(state.UpsertNode(1000, node))
require.True(watchFired(ws))

ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)

out2, err := state.NodeBySecretID(ws, node.SecretID)
if err != nil {
t.Fatalf("err: %v", err)
}

if !reflect.DeepEqual(node, out2) {
t.Fatalf("bad: %#v %#v", node, out2)
}

if !reflect.DeepEqual(node, out) {
t.Fatalf("bad: %#v %#v", node, out)
}
require.NoError(err)
require.EqualValues(node, out)
require.EqualValues(node, out2)
require.Len(out.Events, 1)
require.Equal(NodeRegisterEventRegistered, out.Events[0].Message)

index, err := state.Index("nodes")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1000 {
t.Fatalf("bad: %d", index)
}
require.NoError(err)
require.EqualValues(1000, index)
require.False(watchFired(ws))

if watchFired(ws) {
t.Fatalf("bad")
}
// Transition the node to down and then up and ensure we get a re-register
// event
down := out.Copy()
down.Status = structs.NodeStatusDown
require.NoError(state.UpsertNode(1001, down))
require.NoError(state.UpsertNode(1002, out))

out, err = state.NodeByID(ws, node.ID)
require.NoError(err)
require.Len(out.Events, 2)
require.Equal(NodeRegisterEventReregistered, out.Events[1].Message)
}

func TestStateStore_DeleteNode_Node(t *testing.T) {
Expand Down Expand Up @@ -794,7 +785,7 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) {

require.Equal(1, len(node.Events))
require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem)
require.Equal("Node Registered", node.Events[0].Message)
require.Equal(NodeRegisterEventRegistered, node.Events[0].Message)

// Create a watchset so we can test that AddNodeEvent fires the watch
ws := memdb.NewWatchSet()
Expand Down Expand Up @@ -836,7 +827,7 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
}
require.Equal(1, len(node.Events))
require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem)
require.Equal("Node Registered", node.Events[0].Message)
require.Equal(NodeRegisterEventRegistered, node.Events[0].Message)

var out *structs.Node
for i := 1; i <= 20; i++ {
Expand Down
9 changes: 8 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ type WriteMeta struct {
// NodeRegisterRequest is used for Node.Register endpoint
// to register a node as being a schedulable entity.
type NodeRegisterRequest struct {
Node *Node
Node *Node
NodeEvent *NodeEvent
WriteRequest
}

Expand Down Expand Up @@ -1258,6 +1259,12 @@ func (ne *NodeEvent) SetSubsystem(sys string) *NodeEvent {
return ne
}

// SetTimestamp is used to set the timestamp on the node event
func (ne *NodeEvent) SetTimestamp(ts time.Time) *NodeEvent {
ne.Timestamp = ts
return ne
}

// AddDetail is used to add a detail to the node event
func (ne *NodeEvent) AddDetail(k, v string) *NodeEvent {
if ne.Details == nil {
Expand Down
2 changes: 1 addition & 1 deletion website/source/api/nodes.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ $ curl \
{
"CreateIndex": 0,
"Details": null,
"Message": "Node Registered",
"Message": "Node registered",
"Subsystem": "Cluster",
"Timestamp": "2018-04-10T23:43:17Z"
}
Expand Down
8 changes: 4 additions & 4 deletions website/source/docs/commands/node/status.html.md.erb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ rkt true true
Node Events
Time Subsystem Message
2018-03-29T17:24:42Z Driver: docker Driver docker is not detected
2018-03-29T17:23:42Z Cluster Node Registered
2018-03-29T17:23:42Z Cluster Node registered

Allocated Resources
CPU Memory Disk IOPS
Expand Down Expand Up @@ -154,7 +154,7 @@ rkt true true
Node Events
Time Subsystem Message
2018-03-29T17:24:42Z Driver: docker Driver docker is not detected
2018-03-29T17:23:42Z Cluster Node Registered
2018-03-29T17:23:42Z Cluster Node registered

Allocated Resources
CPU Memory Disk IOPS
Expand Down Expand Up @@ -219,7 +219,7 @@ rkt true true
Node Events
Time Subsystem Message
2018-03-29T17:24:42Z Driver: docker Driver docker is not detected
2018-03-29T17:23:42Z Cluster Node Registered
2018-03-29T17:23:42Z Cluster Node registered

Allocated Resources
CPU Memory Disk IOPS
Expand Down Expand Up @@ -300,7 +300,7 @@ rkt true true <none> 2018-03-29T17:23:42Z
Node Events
Time Subsystem Message Details
2018-03-29T17:24:42Z Driver: docker Driver docker is not detected driver: docker,
2018-03-29T17:23:42Z Cluster Node Registered <none>
2018-03-29T17:23:42Z Cluster Node registered <none>

Allocated Resources
CPU Memory Disk IOPS
Expand Down

0 comments on commit cfc9d1f

Please sign in to comment.