Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node events #3945

Merged
merged 19 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ __BACKWARDS INCOMPATIBILITIES:__

IMPROVEMENTS:
* core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)]
* core: Node events are emitted for events such as node registration and
heartbeating [[GH-3945](https://github.com/hashicorp/nomad/issues/3945)]
* core: A set of features (Autopilot) has been added to allow for automatic operator-friendly management of Nomad servers. For more information about Autopilot, see the [Autopilot Guide](https://www.nomadproject.io/guides/cluster/autopilot.html). [[GH-3670](https://github.com/hashicorp/nomad/pull/3670)]
* core: Servers can now service client HTTP endpoints [[GH-3892](https://github.com/hashicorp/nomad/issues/3892)]
* core: Servers can now retry connecting to Vault to verify tokens without requiring a SIGHUP to do so [[GH-3957](https://github.com/hashicorp/nomad/issues/3957)]
Expand Down
17 changes: 17 additions & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,27 @@ type Node struct {
Status string
StatusDescription string
StatusUpdatedAt int64
Events []*NodeEvent
CreateIndex uint64
ModifyIndex uint64
}

const (
NodeEventSubsystemDrain = "Drain"
NodeEventSubsystemDriver = "Driver"
NodeEventSubsystemHeartbeat = "Heartbeat"
NodeEventSubsystemCluster = "Cluster"
)

// NodeEvent is a single unit representing a node’s state change
type NodeEvent struct {
Message string
Subsystem string
Details map[string]string
Timestamp int64
CreateIndex uint64
}

// HostStats represents resource usage stats of the host running a Nomad client
type HostStats struct {
Memory *HostMemoryStats
Expand Down
4 changes: 4 additions & 0 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func TestNodes_Info(t *testing.T) {
if result.StatusUpdatedAt < startTime {
t.Fatalf("start time: %v, status updated: %v", startTime, result.StatusUpdatedAt)
}

if len(result.Events) < 1 {
t.Fatalf("Expected at minimum the node register event to be populated: %+v", result)
}
}

func TestNodes_ToggleDrain(t *testing.T) {
Expand Down
102 changes: 88 additions & 14 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ type Client struct {
// update it.
triggerNodeUpdate chan struct{}

// triggerEmitNodeEvent sends an event and triggers the client to update the
// server for the node event
triggerEmitNodeEvent chan *structs.NodeEvent

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
Expand Down Expand Up @@ -200,20 +204,21 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic

// Create the client
c := &Client{
config: cfg,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
tlsWrap: tlsWrap,
streamingRpcs: structs.NewStreamingRpcRegistry(),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
config: cfg,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
tlsWrap: tlsWrap,
streamingRpcs: structs.NewStreamingRpcRegistry(),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
}

// Initialize the server manager
Expand Down Expand Up @@ -1052,6 +1057,9 @@ func (c *Client) registerAndHeartbeat() {
// Start watching changes for node changes
go c.watchNodeUpdates()

// Start watching for emitting node events
go c.watchNodeEvents()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
// in development mode.
Expand Down Expand Up @@ -1131,6 +1139,72 @@ func (c *Client) run() {
}
}

// submitNodeEvents is used to submit a client-side node event. Examples of
// these kinds of events include when a driver moves from healthy to unhealthy
// (and vice versa)
func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
nodeID := c.NodeID()
nodeEvents := map[string][]*structs.NodeEvent{
nodeID: events,
}
req := structs.EmitNodeEventsRequest{
NodeEvents: nodeEvents,
WriteRequest: structs.WriteRequest{Region: c.Region()},
}
var resp structs.EmitNodeEventsResponse
if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil {
return fmt.Errorf("Emitting node events failed: %v", err)
}
return nil
}

// watchNodeEvents is a handler which receives node events and on a interval
// and submits them in batch format to the server
func (c *Client) watchNodeEvents() {
// batchEvents stores events that have yet to be published
var batchEvents []*structs.NodeEvent

// Create and drain the timer
timer := time.NewTimer(0)
timer.Stop()
select {
case <-timer.C:
default:
}
defer timer.Stop()

for {
select {
case event := <-c.triggerEmitNodeEvent:
if l := len(batchEvents); l <= structs.MaxRetainedNodeEvents {
batchEvents = append(batchEvents, event)
} else {
// Drop the oldest event
c.logger.Printf("[WARN] client: dropping node event: %v", batchEvents[0])
batchEvents = append(batchEvents[1:], event)
}
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
case <-timer.C:
if err := c.submitNodeEvents(batchEvents); err != nil {
c.logger.Printf("[ERR] client: submitting node events failed: %v", err)
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
}
case <-c.shutdownCh:
return
}
}
}

// triggerNodeEvent triggers a emit node event
func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) {
select {
case c.triggerEmitNodeEvent <- nodeEvent:
// emit node event goroutine was released to execute
default:
// emit node event goroutine was already running
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, consider adding a test to client_test that verifies that the node events correctly propagate from the client via this goroutine to the server

// retryRegisterNode is used to register the node or update the registration and
// retry in case of failure.
func (c *Client) retryRegisterNode() {
Expand Down
6 changes: 6 additions & 0 deletions command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,5 +396,11 @@ func TestHTTP_NodeQuery(t *testing.T) {
if n.ID != node.ID {
t.Fatalf("bad: %#v", n)
}
if len(n.Events) < 1 {
t.Fatalf("Expected node registration event to be populated: %#v", n)
}
if n.Events[0].Message != "Node Registered" {
t.Fatalf("Expected node registration event to be first node event: %#v", n)
}
})
}
38 changes: 38 additions & 0 deletions command/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {
}
c.Ui.Output(c.Colorize().Color(formatKV(basic)))

c.outputNodeStatusEvents(node)

// Get list of running allocations on the node
runningAllocs, err := getRunningAllocs(client, node.ID)
if err != nil {
Expand Down Expand Up @@ -386,6 +388,42 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {

}

func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
c.Ui.Output(c.Colorize().Color("\n[bold]Node Events "))
c.outputNodeEvent(node.Events)
}

func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) {
size := len(events)
nodeEvents := make([]string, size+1)
if c.verbose {
nodeEvents[0] = "Time|Subsystem|Message|Details"
} else {
nodeEvents[0] = "Time|Subsystem|Message"
}

for i, event := range events {
timestamp := formatUnixNanoTime(event.Timestamp)
subsystem := event.Subsystem
msg := event.Message
if c.verbose {
details := formatEventDetails(event.Details)
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details)
} else {
nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s", timestamp, subsystem, msg)
}
}
c.Ui.Output(formatList(nodeEvents))
}

func formatEventDetails(details map[string]string) string {
output := make([]string, 0, len(details))
for k, v := range details {
output = append(output, fmt.Sprintf("%s: %s, ", k, v))
}
return strings.Join(output, ", ")
}

func (c *NodeStatusCommand) formatAttributes(node *api.Node) {
// Print the attributes
keys := make([]string, len(node.Attributes))
Expand Down
19 changes: 19 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyACLTokenBootstrap(buf[1:], log.Index)
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -628,6 +630,23 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{}
return n.reconcileQueuedAllocations(index)
}

// applyUpsertNodeEvent tracks the given node events.
func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now())
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventsRequest: %v", err)
return err
}

if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to add node events: %v", err)
return err
}

return nil
}

// applyUpsertVaultAccessor stores the Vault accessors for a given allocation
// and task
func (n *nomadFSM) applyUpsertVaultAccessor(buf []byte, index uint64) interface{} {
Expand Down
44 changes: 44 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,50 @@ func makeLog(buf []byte) *raft.Log {
}
}

func TestFSM_UpsertNodeEvents(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
state := fsm.State()

node := mock.Node()

err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}

nodeEvent := &structs.NodeEvent{
Message: "Heartbeating failed",
Subsystem: "Heartbeat",
Timestamp: time.Now().Unix(),
}

nodeEvents := []*structs.NodeEvent{nodeEvent}
allEvents := map[string][]*structs.NodeEvent{node.ID: nodeEvents}

req := structs.EmitNodeEventsRequest{
NodeEvents: allEvents,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf, err := structs.Encode(structs.UpsertNodeEventsType, req)
require.Nil(err)

// the response in this case will be an error
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)

require.Equal(2, len(out.Events))

first := out.Events[1]
require.Equal(uint64(1), first.CreateIndex)
require.Equal("Heartbeating failed", first.Message)
}

func TestFSM_UpsertNode(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down
27 changes: 27 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,3 +1357,30 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
n.srv.setQueryMeta(&reply.QueryMeta)
return nil
}

func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error {
if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now())

if len(args.NodeEvents) == 0 {
return fmt.Errorf("no node events given")
}
for nodeID, events := range args.NodeEvents {
if len(events) == 0 {
return fmt.Errorf("no node events given for node %q", nodeID)
}
}

// TODO ACLs

_, index, err := n.srv.raftApply(structs.UpsertNodeEventsType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.node upserting node events failed: %v", err)
return err
}

reply.Index = index
return nil
}
Loading