Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit heartbeat and node registration events #4292

Merged
merged 2 commits into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestHTTP_NodeQuery(t *testing.T) {
if len(n.Events) < 1 {
t.Fatalf("Expected node registration event to be populated: %#v", n)
}
if n.Events[0].Message != "Node Registered" {
if n.Events[0].Message != "Node registered" {
t.Fatalf("Expected node registration event to be first node event: %#v", n)
}
})
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status); err != nil {
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeStatus failed: %v", err)
return err
}
Expand Down
37 changes: 17 additions & 20 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func TestFSM_DeregisterNode(t *testing.T) {

func TestFSM_UpdateNodeStatus(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)

Expand All @@ -257,43 +258,39 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
require.Nil(resp)

// Mark an eval as blocked.
eval := mock.Eval()
eval.ClassEligibility = map[string]bool{node.ComputedClass: true}
fsm.blockedEvals.Block(eval)

event := &structs.NodeEvent{
Message: "Node ready foo",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
req2 := structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
NodeID: node.ID,
Status: structs.NodeStatusReady,
NodeEvent: event,
}
buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)

resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
require.Nil(resp)

// Verify the status is ready.
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if node.Status != structs.NodeStatusReady {
t.Fatalf("bad node: %#v", node)
}
require.NoError(err)
require.Equal(structs.NodeStatusReady, node.Status)
require.Len(node.Events, 2)
require.Equal(event.Message, node.Events[1].Message)

// Verify the eval was unblocked.
testutil.WaitForResult(func() (bool, error) {
Expand Down
9 changes: 7 additions & 2 deletions nomad/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ const (
// heartbeatNotLeader is the error string returned when the heartbeat request
// couldn't be completed since the server is not the leader.
heartbeatNotLeader = "failed to reset heartbeat since server is not leader"

// NodeHeartbeatEventMissed is the event used when the Nodes heartbeat is
// missed.
NodeHeartbeatEventMissed = "Node heartbeat missed"
)

var (
Expand Down Expand Up @@ -123,8 +127,9 @@ func (s *Server) invalidateHeartbeat(id string) {

// Make a request to update the node status
req := structs.NodeUpdateStatusRequest{
NodeID: id,
Status: structs.NodeStatusDown,
NodeID: id,
Status: structs.NodeStatusDown,
NodeEvent: structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).SetMessage(NodeHeartbeatEventMissed),
WriteRequest: structs.WriteRequest{
Region: s.config.Region,
},
Expand Down
16 changes: 6 additions & 10 deletions nomad/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,30 +140,26 @@ func TestHeartbeat_ResetHeartbeatTimerLocked_Renew(t *testing.T) {

func TestHeartbeat_InvalidateHeartbeat(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Create a node
node := mock.Node()
state := s1.fsm.State()
err := state.UpsertNode(1, node)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(state.UpsertNode(1, node))

// This should cause a status update
s1.invalidateHeartbeat(node.ID)

// Check it is updated
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !out.TerminalStatus() {
t.Fatalf("should update node: %#v", out)
}
require.NoError(err)
require.True(out.TerminalStatus())
require.Len(out.Events, 2)
require.Equal(NodeHeartbeatEventMissed, out.Events[1].Message)
}

func TestHeartbeat_ClearHeartbeatTimer(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
// NodeEligibilityEventIneligible is used when the nodes eligiblity is marked
// ineligible
NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling"

// NodeHeartbeatEventReregistered is the message used when the node becomes
// reregistered by the heartbeat.
NodeHeartbeatEventReregistered = "Node reregistered by heartbeat"
)

// Node endpoint is used for client interactions
Expand Down Expand Up @@ -367,6 +371,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Commit this update via Raft
var index uint64
if node.Status != args.Status {
// Attach an event if we are updating the node status to ready when it
// is down via a heartbeat
if node.Status == structs.NodeStatusDown && args.NodeEvent == nil {
args.NodeEvent = structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeHeartbeatEventReregistered)
}

_, index, err = n.srv.raftApply(structs.NodeUpdateStatusRequestType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: status update failed: %v", err)
Expand Down
55 changes: 53 additions & 2 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
Expand Down Expand Up @@ -508,6 +509,56 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Check that we have no client connections
require.Empty(s1.connectedNodes())

// Create the register request but make the node down
node := mock.Node()
node.Status = structs.NodeStatusDown
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
var resp structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))

// Update the status
dereg := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusInit,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2))
require.NotZero(resp2.Index)

// Check for heartbeat interval
ttl := resp2.HeartbeatTTL
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
t.Fatalf("bad: %#v", ttl)
}

// Check for the node in the FSM
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.NoError(err)
require.NotNil(out)
require.EqualValues(resp2.Index, out.ModifyIndex)
require.Len(out.Events, 2)
require.Equal(NodeHeartbeatEventReregistered, out.Events[1].Message)
}

func TestClientEndpoint_Register_GetEvals(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
Expand Down Expand Up @@ -1222,7 +1273,7 @@ func TestClientEndpoint_GetNode(t *testing.T) {
if len(resp2.Node.Events) != 1 {
t.Fatalf("Did not set node events: %#v", resp2.Node)
}
if resp2.Node.Events[0].Message != "Node Registered" {
if resp2.Node.Events[0].Message != state.NodeRegisterEventRegistered {
t.Fatalf("Did not set node register event correctly: %#v", resp2.Node)
}

Expand Down Expand Up @@ -2622,7 +2673,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {

// Node status update triggers watches
time.AfterFunc(100*time.Millisecond, func() {
errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown)
errCh <- state.UpdateNodeStatus(40, node.ID, structs.NodeStatusDown, nil)
})

req.MinQueryIndex = 38
Expand Down
18 changes: 8 additions & 10 deletions nomad/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package nomad
import (
"fmt"
"io/ioutil"
"log"
"os"
"path"
"strings"
"testing"
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
Expand Down Expand Up @@ -531,22 +529,22 @@ func TestServer_InvalidSchedulers(t *testing.T) {
t.Parallel()
require := require.New(t)

// Set the config to not have the core scheduler
config := DefaultConfig()
config.DevMode = true
config.LogOutput = testlog.NewWriter(t)
config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1"
logger := log.New(config.LogOutput, "", log.LstdFlags)
catalog := consul.NewMockCatalog(logger)
logger := testlog.Logger(t)
s := &Server{
config: config,
logger: logger,
}

// Set the config to not have the core scheduler
config.EnabledSchedulers = []string{"batch"}
_, err := NewServer(config, catalog, logger)
err := s.setupWorkers()
require.NotNil(err)
require.Contains(err.Error(), "scheduler not enabled")

// Set the config to have an unknown scheduler
config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"}
_, err = NewServer(config, catalog, logger)
err = s.setupWorkers()
require.NotNil(err)
require.Contains(err.Error(), "foo")
}
33 changes: 27 additions & 6 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
NodeRegisterEventRegistered = "Node registered"

// NodeRegisterEventReregistered is the message used when the node becomes
// reregistered.
NodeRegisterEventReregistered = "Node re-registered"
)

// IndexEntry is used with the "index" table
// for managing the latest Raft index affecting a table.
type IndexEntry struct {
Expand Down Expand Up @@ -530,17 +540,23 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error {
// Retain node events that have already been set on the node
node.Events = exist.Events

// If we are transitioning from down, record the re-registration
if exist.Status == structs.NodeStatusDown && node.Status != structs.NodeStatusDown {
appendNodeEvents(index, node, []*structs.NodeEvent{
structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeRegisterEventReregistered).
SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))})
}

node.Drain = exist.Drain // Retain the drain mode
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy
} else {
// Because this is the first time the node is being registered, we should
// also create a node registration event
nodeEvent := &structs.NodeEvent{
Message: "Node Registered",
Subsystem: "Cluster",
Timestamp: time.Unix(node.StatusUpdatedAt, 0),
}
nodeEvent := structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
SetMessage(NodeRegisterEventRegistered).
SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))
node.Events = []*structs.NodeEvent{nodeEvent}
node.CreateIndex = index
node.ModifyIndex = index
Expand Down Expand Up @@ -585,7 +601,7 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
}

// UpdateNodeStatus is used to update the status of a node
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error {
func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string, event *structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

Expand All @@ -602,6 +618,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
existingNode := existing.(*structs.Node)
copyNode := existingNode.Copy()

// Add the event if given
if event != nil {
appendNodeEvents(index, copyNode, []*structs.NodeEvent{event})
}

// Update the status in the copy
copyNode.Status = status
copyNode.ModifyIndex = index
Expand Down
Loading