Skip to content

Commit

Permalink
Merge pull request #3945 from hashicorp/f-add-node-events
Browse files Browse the repository at this point in the history
Add node events
  • Loading branch information
chelseakomlo authored Mar 14, 2018
2 parents 48ae5eb + 8ba5ba3 commit 9cb3a02
Show file tree
Hide file tree
Showing 13 changed files with 523 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ __BACKWARDS INCOMPATIBILITIES:__

IMPROVEMENTS:
* core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)]
* core: Node events are emitted for events such as node registration and
heartbeating [[GH-3945](https://github.com/hashicorp/nomad/issues/3945)]
* core: A set of features (Autopilot) has been added to allow for automatic operator-friendly management of Nomad servers. For more information about Autopilot, see the [Autopilot Guide](https://www.nomadproject.io/guides/cluster/autopilot.html). [[GH-3670](https://github.com/hashicorp/nomad/pull/3670)]
* core: Servers can now service client HTTP endpoints [[GH-3892](https://github.com/hashicorp/nomad/issues/3892)]
* core: Servers can now retry connecting to Vault to verify tokens without requiring a SIGHUP to do so [[GH-3957](https://github.com/hashicorp/nomad/issues/3957)]
Expand Down
17 changes: 17 additions & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,27 @@ type Node struct {
Status string
StatusDescription string
StatusUpdatedAt int64
Events []*NodeEvent
CreateIndex uint64
ModifyIndex uint64
}

const (
NodeEventSubsystemDrain = "Drain"
NodeEventSubsystemDriver = "Driver"
NodeEventSubsystemHeartbeat = "Heartbeat"
NodeEventSubsystemCluster = "Cluster"
)

// NodeEvent is a single unit representing a node’s state change
type NodeEvent struct {
Message string
Subsystem string
Details map[string]string
Timestamp int64
CreateIndex uint64
}

// HostStats represents resource usage stats of the host running a Nomad client
type HostStats struct {
Memory *HostMemoryStats
Expand Down
4 changes: 4 additions & 0 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func TestNodes_Info(t *testing.T) {
if result.StatusUpdatedAt < startTime {
t.Fatalf("start time: %v, status updated: %v", startTime, result.StatusUpdatedAt)
}

if len(result.Events) < 1 {
t.Fatalf("Expected at minimum the node register event to be populated: %+v", result)
}
}

func TestNodes_ToggleDrain(t *testing.T) {
Expand Down
102 changes: 88 additions & 14 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ type Client struct {
// update it.
triggerNodeUpdate chan struct{}

// triggerEmitNodeEvent sends an event and triggers the client to update the
// server for the node event
triggerEmitNodeEvent chan *structs.NodeEvent

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
Expand Down Expand Up @@ -200,20 +204,21 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic

// Create the client
c := &Client{
config: cfg,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
tlsWrap: tlsWrap,
streamingRpcs: structs.NewStreamingRpcRegistry(),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
config: cfg,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
tlsWrap: tlsWrap,
streamingRpcs: structs.NewStreamingRpcRegistry(),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
}

// Initialize the server manager
Expand Down Expand Up @@ -1052,6 +1057,9 @@ func (c *Client) registerAndHeartbeat() {
// Start watching changes for node changes
go c.watchNodeUpdates()

// Start watching for emitting node events
go c.watchNodeEvents()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
// in development mode.
Expand Down Expand Up @@ -1131,6 +1139,72 @@ func (c *Client) run() {
}
}

// submitNodeEvents is used to submit a client-side node event. Examples of
// these kinds of events include when a driver moves from healthy to unhealthy
// (and vice versa)
func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
nodeID := c.NodeID()
nodeEvents := map[string][]*structs.NodeEvent{
nodeID: events,
}
req := structs.EmitNodeEventsRequest{
NodeEvents: nodeEvents,
WriteRequest: structs.WriteRequest{Region: c.Region()},
}
var resp structs.EmitNodeEventsResponse
if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil {
return fmt.Errorf("Emitting node events failed: %v", err)
}
return nil
}

// watchNodeEvents is a handler which receives node events and on a interval
// and submits them in batch format to the server
func (c *Client) watchNodeEvents() {
// batchEvents stores events that have yet to be published
var batchEvents []*structs.NodeEvent

// Create and drain the timer
timer := time.NewTimer(0)
timer.Stop()
select {
case <-timer.C:
default:
}
defer timer.Stop()

for {
select {
case event := <-c.triggerEmitNodeEvent:
if l := len(batchEvents); l <= structs.MaxRetainedNodeEvents {
batchEvents = append(batchEvents, event)
} else {
// Drop the oldest event
c.logger.Printf("[WARN] client: dropping node event: %v", batchEvents[0])
batchEvents = append(batchEvents[1:], event)
}
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
case <-timer.C:
if err := c.submitNodeEvents(batchEvents); err != nil {
c.logger.Printf("[ERR] client: submitting node events failed: %v", err)
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
}
case <-c.shutdownCh:
return
}
}
}

// triggerNodeEvent triggers a emit node event
func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) {
select {
case c.triggerEmitNodeEvent <- nodeEvent:
// emit node event goroutine was released to execute
default:
// emit node event goroutine was already running
}
}

// retryRegisterNode is used to register the node or update the registration and
// retry in case of failure.
func (c *Client) retryRegisterNode() {
Expand Down
6 changes: 6 additions & 0 deletions command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,5 +396,11 @@ func TestHTTP_NodeQuery(t *testing.T) {
if n.ID != node.ID {
t.Fatalf("bad: %#v", n)
}
if len(n.Events) < 1 {
t.Fatalf("Expected node registration event to be populated: %#v", n)
}
if n.Events[0].Message != "Node Registered" {
t.Fatalf("Expected node registration event to be first node event: %#v", n)
}
})
}
38 changes: 38 additions & 0 deletions command/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {
}
c.Ui.Output(c.Colorize().Color(formatKV(basic)))

c.outputNodeStatusEvents(node)

// Get list of running allocations on the node
runningAllocs, err := getRunningAllocs(client, node.ID)
if err != nil {
Expand Down Expand Up @@ -386,6 +388,42 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {

}

func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
c.Ui.Output(c.Colorize().Color("\n[bold]Node Events "))
c.outputNodeEvent(node.Events)
}

func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) {
size := len(events)
nodeEvents := make([]string, size+1)
if c.verbose {
nodeEvents[0] = "Time|Subsystem|Message|Details"
} else {
nodeEvents[0] = "Time|Subsystem|Message"
}

for i, event := range events {
timestamp := formatUnixNanoTime(event.Timestamp)
subsystem := event.Subsystem
msg := event.Message
if c.verbose {
details := formatEventDetails(event.Details)
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details)
} else {
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s", timestamp, subsystem, msg)
}
}
c.Ui.Output(formatList(nodeEvents))
}

func formatEventDetails(details map[string]string) string {
output := make([]string, 0, len(details))
for k, v := range details {
output = append(output, fmt.Sprintf("%s: %s, ", k, v))
}
return strings.Join(output, ", ")
}

func (c *NodeStatusCommand) formatAttributes(node *api.Node) {
// Print the attributes
keys := make([]string, len(node.Attributes))
Expand Down
19 changes: 19 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyACLTokenBootstrap(buf[1:], log.Index)
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -628,6 +630,23 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{}
return n.reconcileQueuedAllocations(index)
}

// applyUpsertNodeEvent tracks the given node events.
func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now())
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventsRequest: %v", err)
return err
}

if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to add node events: %v", err)
return err
}

return nil
}

// applyUpsertVaultAccessor stores the Vault accessors for a given allocation
// and task
func (n *nomadFSM) applyUpsertVaultAccessor(buf []byte, index uint64) interface{} {
Expand Down
44 changes: 44 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,50 @@ func makeLog(buf []byte) *raft.Log {
}
}

func TestFSM_UpsertNodeEvents(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
state := fsm.State()

node := mock.Node()

err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}

nodeEvent := &structs.NodeEvent{
Message: "Heartbeating failed",
Subsystem: "Heartbeat",
Timestamp: time.Now().Unix(),
}

nodeEvents := []*structs.NodeEvent{nodeEvent}
allEvents := map[string][]*structs.NodeEvent{node.ID: nodeEvents}

req := structs.EmitNodeEventsRequest{
NodeEvents: allEvents,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf, err := structs.Encode(structs.UpsertNodeEventsType, req)
require.Nil(err)

// the response in this case will be an error
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)

require.Equal(2, len(out.Events))

first := out.Events[1]
require.Equal(uint64(1), first.CreateIndex)
require.Equal("Heartbeating failed", first.Message)
}

func TestFSM_UpsertNode(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down
27 changes: 27 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,3 +1363,30 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
n.srv.setQueryMeta(&reply.QueryMeta)
return nil
}

func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error {
if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now())

if len(args.NodeEvents) == 0 {
return fmt.Errorf("no node events given")
}
for nodeID, events := range args.NodeEvents {
if len(events) == 0 {
return fmt.Errorf("no node events given for node %q", nodeID)
}
}

// TODO ACLs

_, index, err := n.srv.raftApply(structs.UpsertNodeEventsType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.node upserting node events failed: %v", err)
return err
}

reply.Index = index
return nil
}
Loading

0 comments on commit 9cb3a02

Please sign in to comment.