Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node events #3945

Merged
merged 19 commits into from
Mar 14, 2018
Merged

Add node events #3945

merged 19 commits into from
Mar 14, 2018

Conversation

chelseakomlo
Copy link
Contributor

@chelseakomlo chelseakomlo commented Mar 6, 2018

This PR adds an the concept of node events. Only up to 10 node events for a single node are kept at one time. Node events can be created for different purposes, this PR adds the creation of a node event when a node is registered for the first time.

A few considerations:

  • Should we create constant values for expected node event messages, such as registration?

@jippi
Copy link
Contributor

jippi commented Mar 6, 2018

node event wish list:

  • node connect
  • node disconnect
  • node lost connection (~same as a disconnect, which is "planned")

@@ -1153,6 +1154,9 @@ type Node struct {
// updated
StatusUpdatedAt int64

// NodeEvents is a list of the last 10 or lest events for this node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NodeEvents is the most recent set of events generated for the node.
Spelling error and generally don't include the const values in comments. Keep it a bit more generic

WriteRequest
}

// EmitNodeEventResponse is a server response to the client about the status of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client isn't the only one who can call this, so make it more generic. Here and on the request.

// EmitNodeEventRequest is a client request to update the node events source
// with a new client-side event
type EmitNodeEventRequest struct {
NodeID string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map[string][]*NodeEvent please

// also create a node registration event
nodeEvent := &structs.NodeEvent{
Message: "Node Registered",
Subsystem: "Server",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cluster

@@ -0,0 +1,47 @@
package state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep this in the same file. We should break it up at some point into a bit higher level groups: [node, eval, alloc, etc]

var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp)
require.Nil(err)
require.NotEqual(0, resp.Index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the node has the event

// events once this bound has been reached.
func (s *StateStore) upsertNodeEvent(index uint64, nodeID string, event *structs.NodeEvent, txn *memdb.Txn) error {

ws := memdb.NewWatchSet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look how UpdateNodeStatus is done. This method needs to:

  1. access the node directly, not via NodeByID
  2. Copy the Node and reinsert it. (treat objects as immutable). This data structure allows multiple readers to be accessing it so you are modifying a consistent view if you don't copy and re-insert.
  3. Update the Node's Modify index otherwise blocking queries will not work.

nodeEvents = append(nodeEvents, event)
node.NodeEvents = nodeEvents

txn.Commit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't commit in the implementation method but rather the caller. Otherwise you can't use this method as follows:

handleSomething() {
   txn := s.db.Txn(true)
   defer txn.Abort()
   handleOne()
   upsertNodeEvent()
   handleTwo()
   txn.Commit()
}

"github.com/stretchr/testify/require"
)

func TestStateStore_AddSingleNodeEvent(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look how the other tests access the field that is being modified first, then modify and then assert the watch has been fired. This verifies that the update to the object will unblock blocked queries.

@@ -1153,6 +1154,9 @@ type Node struct {
// updated
StatusUpdatedAt int64

// NodeEvents is a list of the last 10 or lest events for this node
NodeEvents []*NodeEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add to the api pkg

e.ModifyIndex = index

// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= 10 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this 10 a constant named something like max_retained_node_events and use it here and in the tests.

client/client.go Outdated
for {
select {
case event := <-c.triggerEmitNodeEvent:
batchEventsLock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a Go question - why does this need a lock if only one of these goroutines run per client, and this goroutine is the only one that owns appending to batchevents?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 remove the lock

client/client.go Outdated
continue
}

c.submitNodeEvents(batchEvents)
Copy link
Contributor

@preetapan preetapan Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should check for error state, and if there is no error set batchEvents to nil so that the next batch can start over.

If there's an error we could accommodate more events in the buffer but will need to have an upper bound so that it doesn't keep growing unbounded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this and you should truncate this list to < 10 as well since if the client is disconnected for a while it may want to keep emitting events but they will fail. So limit the number of events. Please use the const value

// emit node event goroutine was already running
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, consider adding a test to client_test that verifies that the node events correctly propagate from the client via this goroutine to the server

@@ -386,6 +388,34 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {

}

func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
c.Ui.Output(c.Colorize().Color("\n[bold]Node Events "))
c.outputNodeEvent(node.NodeEvents)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you take in whether it is in verbose mode and only emit details in verbose

func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) {
size := len(events)
nodeEvents := make([]string, size+1)
nodeEvents[0] = "Timestamp|Subsystem|Message|Details"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time and not timestamp

client/client.go Outdated
// these kinds of events include when a driver moves from healthy to unhealthy
// (and vice versa)
func (c *Client) submitNodeEvents(e []*structs.NodeEvent) error {
node := c.Node()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use c.NodeID()

client/client.go Outdated
c.logger.Printf("[ERR] client: emitting node events failed %v", err)
return err
}
c.logger.Printf("[INFO] client: emit node events complete")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this INFO log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be replaced with any log level, as this is logging?

client/client.go Outdated
}
var resp structs.EmitNodeEventResponse
if err := c.RPC("Node.EmitEvent", &req, &resp); err != nil {
c.logger.Printf("[ERR] client: emitting node events failed %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return the error and have the logging in the watcher

}

func formatEventDetails(details map[string]string) string {
var output string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last detail pair will have trailing comma. Do:

output := make([]string, 0, len(details))
for k, v := range details {
  output = append(output, fmt.Sprintf("%s: %s, ", k, v))
}
return strings.Join(output, ", ")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump

nomad/fsm.go Outdated
@@ -236,6 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyACLTokenBootstrap(buf[1:], log.Index)
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.AddNodeEventType:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to AddNodeEventsType and the methods to capture the plurality

nomad/fsm.go Outdated
return err
}

for nodeID, nodeEvents := range req.NodeEvents {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all should be handled in the state store. Look at something like UpsertEvals.

It should be

if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil {
 	n.logger.Printf("[ERR] nomad.fsm: failed to upsert node events: %v", err)
 	return err
 }

nomad/fsm.go Outdated
func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} {
var req structs.EmitNodeEventRequest
if err := structs.Decode(buf, &req); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventREquest: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EmitNodeEventREquest

Timestamp: node.StatusUpdatedAt,
}

node.NodeEvents = make([]*structs.NodeEvent, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it at least size 1 so it doesn't resize immediately: make([]*structs.NodeEvent, 0, 1)

client/client.go Outdated
return nil
}

// emitEvent is a handler which receives node events and on a interval and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? "node events and on a interval"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or? "node events and on a interval and submits them"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think s/emitEvent/watchEmitEvents in this comment?

client/client.go Outdated
}
}

// emitEvent triggers a emit node event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/emitEvent/triggerNodeEvent?

@@ -396,5 +396,11 @@ func TestHTTP_NodeQuery(t *testing.T) {
if n.ID != node.ID {
t.Fatalf("bad: %#v", n)
}
if len(n.NodeEvents) != 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be <1 instead of !=1?

@@ -1153,11 +1154,52 @@ type Node struct {
// updated
StatusUpdatedAt int64

// NodeEvents is the most recent set of events generated for the node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you add a constant per @preetapan 's feedback here: https://github.com/hashicorp/nomad/pull/3945/files#r173500252

then mention that constant in this comment. (Otherwise when reading this comment I'm curious "most recent events, like how many though?")

client/client.go Outdated
@@ -1052,6 +1061,9 @@ func (c *Client) registerAndHeartbeat() {
// Start watching changes for node changes
go c.watchNodeUpdates()

// Start watching for emitting node events
go c.watchEmitEvents()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watchNodeEvents?

client/client.go Outdated
// these kinds of events include when a driver moves from healthy to unhealthy
// (and vice versa)
func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
nodeID := c.Node().ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.NodeID()

client/client.go Outdated
if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil {
return fmt.Errorf("Emitting node event failed: %v", err)
}
c.logger.Printf("[INFO] client: emit node events complete")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a useful info level log. These should be useful for operators. Can you remove this

client/client.go Outdated

// watchEmitEvents is a handler which receives node events and on a interval and
// submits them in batch format to the server
func (c *Client) watchEmitEvents() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -386,6 +388,38 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int {

}

func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) {
if !c.verbose {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry should have been clearer. The "details" column should only be outputted in verbose mode.

}

func formatEventDetails(details map[string]string) string {
var output string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump

return err
}
defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return an error if there are no events

}

// Copy the existing node
copyNode := new(structs.Node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copyNode = node.Copy()


// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= structs.MaxRetainedNodeEvents {
delta := len(nodeEvents) - 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the constant

return nn
}

func copyNodeEvents(first *Node) []*NodeEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually copying. This is just appending them to a new list.

@chelseakomlo chelseakomlo force-pushed the f-add-node-events branch 2 times, most recently from 2295446 to 42c17f7 Compare March 13, 2018 23:36
@dadgar dadgar force-pushed the f-add-node-events branch from f1641b5 to 8ba5ba3 Compare March 14, 2018 01:08
@chelseakomlo chelseakomlo merged commit 9cb3a02 into master Mar 14, 2018
@chelseakomlo chelseakomlo deleted the f-add-node-events branch March 14, 2018 12:43
@github-actions
Copy link

I'm going to lock this pull request because it has been closed for 120 days ⏳. This helps our maintainers find and focus on the active contributions.
If you have found a problem that seems related to this change, please open a new issue and complete the issue template so we can capture all the details necessary to investigate further.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Mar 11, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants