diff --git a/CHANGELOG.md b/CHANGELOG.md index 9dabee8db87..b186a89d6de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ __BACKWARDS INCOMPATIBILITIES:__ IMPROVEMENTS: * core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)] + * core: Node events are emitted for events such as node registration and + heartbeating [[GH-3945](https://github.com/hashicorp/nomad/issues/3945)] * core: A set of features (Autopilot) has been added to allow for automatic operator-friendly management of Nomad servers. For more information about Autopilot, see the [Autopilot Guide](https://www.nomadproject.io/guides/cluster/autopilot.html). [[GH-3670](https://github.com/hashicorp/nomad/pull/3670)] * core: Servers can now service client HTTP endpoints [[GH-3892](https://github.com/hashicorp/nomad/issues/3892)] * core: Servers can now retry connecting to Vault to verify tokens without requiring a SIGHUP to do so [[GH-3957](https://github.com/hashicorp/nomad/issues/3957)] diff --git a/api/nodes.go b/api/nodes.go index 6a61ab90b8a..db7f25ffb39 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -113,10 +113,27 @@ type Node struct { Status string StatusDescription string StatusUpdatedAt int64 + Events []*NodeEvent CreateIndex uint64 ModifyIndex uint64 } +const ( + NodeEventSubsystemDrain = "Drain" + NodeEventSubsystemDriver = "Driver" + NodeEventSubsystemHeartbeat = "Heartbeat" + NodeEventSubsystemCluster = "Cluster" +) + +// NodeEvent is a single unit representing a node’s state change +type NodeEvent struct { + Message string + Subsystem string + Details map[string]string + Timestamp int64 + CreateIndex uint64 +} + // HostStats represents resource usage stats of the host running a Nomad client type HostStats struct { Memory *HostMemoryStats diff --git a/api/nodes_test.go b/api/nodes_test.go index d020d84aea9..06b96074694 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -134,6 +134,10 @@ func TestNodes_Info(t *testing.T) { if result.StatusUpdatedAt < startTime { t.Fatalf("start time: %v, status updated: %v", startTime, result.StatusUpdatedAt) } + + if len(result.Events) < 1 { + t.Fatalf("Expected at minimum the node register event to be populated: %+v", result) + } } func TestNodes_ToggleDrain(t *testing.T) { diff --git a/client/client.go b/client/client.go index 3d73d2a258f..06fc670c402 100644 --- a/client/client.go +++ b/client/client.go @@ -133,6 +133,10 @@ type Client struct { // update it. triggerNodeUpdate chan struct{} + // triggerEmitNodeEvent sends an event and triggers the client to update the + // server for the node event + triggerEmitNodeEvent chan *structs.NodeEvent + // discovered will be ticked whenever Consul discovery completes // successfully serversDiscoveredCh chan struct{} @@ -200,20 +204,21 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic // Create the client c := &Client{ - config: cfg, - consulCatalog: consulCatalog, - consulService: consulService, - start: time.Now(), - connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), - tlsWrap: tlsWrap, - streamingRpcs: structs.NewStreamingRpcRegistry(), - logger: logger, - allocs: make(map[string]*AllocRunner), - allocUpdates: make(chan *structs.Allocation, 64), - shutdownCh: make(chan struct{}), - triggerDiscoveryCh: make(chan struct{}), - triggerNodeUpdate: make(chan struct{}, 8), - serversDiscoveredCh: make(chan struct{}), + config: cfg, + consulCatalog: consulCatalog, + consulService: consulService, + start: time.Now(), + connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), + tlsWrap: tlsWrap, + streamingRpcs: structs.NewStreamingRpcRegistry(), + logger: logger, + allocs: make(map[string]*AllocRunner), + allocUpdates: make(chan *structs.Allocation, 64), + shutdownCh: make(chan struct{}), + triggerDiscoveryCh: make(chan struct{}), + triggerNodeUpdate: make(chan struct{}, 8), + serversDiscoveredCh: make(chan struct{}), + triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8), } // Initialize the server manager @@ -1052,6 +1057,9 @@ func (c *Client) registerAndHeartbeat() { // Start watching changes for node changes go c.watchNodeUpdates() + // Start watching for emitting node events + go c.watchNodeEvents() + // Setup the heartbeat timer, for the initial registration // we want to do this quickly. We want to do it extra quickly // in development mode. @@ -1131,6 +1139,72 @@ func (c *Client) run() { } } +// submitNodeEvents is used to submit a client-side node event. Examples of +// these kinds of events include when a driver moves from healthy to unhealthy +// (and vice versa) +func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error { + nodeID := c.NodeID() + nodeEvents := map[string][]*structs.NodeEvent{ + nodeID: events, + } + req := structs.EmitNodeEventsRequest{ + NodeEvents: nodeEvents, + WriteRequest: structs.WriteRequest{Region: c.Region()}, + } + var resp structs.EmitNodeEventsResponse + if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil { + return fmt.Errorf("Emitting node events failed: %v", err) + } + return nil +} + +// watchNodeEvents is a handler which receives node events and on a interval +// and submits them in batch format to the server +func (c *Client) watchNodeEvents() { + // batchEvents stores events that have yet to be published + var batchEvents []*structs.NodeEvent + + // Create and drain the timer + timer := time.NewTimer(0) + timer.Stop() + select { + case <-timer.C: + default: + } + defer timer.Stop() + + for { + select { + case event := <-c.triggerEmitNodeEvent: + if l := len(batchEvents); l <= structs.MaxRetainedNodeEvents { + batchEvents = append(batchEvents, event) + } else { + // Drop the oldest event + c.logger.Printf("[WARN] client: dropping node event: %v", batchEvents[0]) + batchEvents = append(batchEvents[1:], event) + } + timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) + case <-timer.C: + if err := c.submitNodeEvents(batchEvents); err != nil { + c.logger.Printf("[ERR] client: submitting node events failed: %v", err) + timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) + } + case <-c.shutdownCh: + return + } + } +} + +// triggerNodeEvent triggers a emit node event +func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) { + select { + case c.triggerEmitNodeEvent <- nodeEvent: + // emit node event goroutine was released to execute + default: + // emit node event goroutine was already running + } +} + // retryRegisterNode is used to register the node or update the registration and // retry in case of failure. func (c *Client) retryRegisterNode() { diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index 8bf56e32895..a5566adc19f 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -396,5 +396,11 @@ func TestHTTP_NodeQuery(t *testing.T) { if n.ID != node.ID { t.Fatalf("bad: %#v", n) } + if len(n.Events) < 1 { + t.Fatalf("Expected node registration event to be populated: %#v", n) + } + if n.Events[0].Message != "Node Registered" { + t.Fatalf("Expected node registration event to be first node event: %#v", n) + } }) } diff --git a/command/node_status.go b/command/node_status.go index 804e63d347f..cbce475346a 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -332,6 +332,8 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } c.Ui.Output(c.Colorize().Color(formatKV(basic))) + c.outputNodeStatusEvents(node) + // Get list of running allocations on the node runningAllocs, err := getRunningAllocs(client, node.ID) if err != nil { @@ -386,6 +388,42 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } +func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { + c.Ui.Output(c.Colorize().Color("\n[bold]Node Events ")) + c.outputNodeEvent(node.Events) +} + +func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) { + size := len(events) + nodeEvents := make([]string, size+1) + if c.verbose { + nodeEvents[0] = "Time|Subsystem|Message|Details" + } else { + nodeEvents[0] = "Time|Subsystem|Message" + } + + for i, event := range events { + timestamp := formatUnixNanoTime(event.Timestamp) + subsystem := event.Subsystem + msg := event.Message + if c.verbose { + details := formatEventDetails(event.Details) + nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details) + } else { + nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s", timestamp, subsystem, msg) + } + } + c.Ui.Output(formatList(nodeEvents)) +} + +func formatEventDetails(details map[string]string) string { + output := make([]string, 0, len(details)) + for k, v := range details { + output = append(output, fmt.Sprintf("%s: %s, ", k, v)) + } + return strings.Join(output, ", ") +} + func (c *NodeStatusCommand) formatAttributes(node *api.Node) { // Print the attributes keys := make([]string, len(node.Attributes)) diff --git a/nomad/fsm.go b/nomad/fsm.go index 68cc9170b06..65344e72473 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -236,6 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyACLTokenBootstrap(buf[1:], log.Index) case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) + case structs.UpsertNodeEventsType: + return n.applyUpsertNodeEvent(buf[1:], log.Index) } // Check enterprise only message types. @@ -628,6 +630,23 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} return n.reconcileQueuedAllocations(index) } +// applyUpsertNodeEvent tracks the given node events. +func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now()) + var req structs.EmitNodeEventsRequest + if err := structs.Decode(buf, &req); err != nil { + n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventsRequest: %v", err) + return err + } + + if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil { + n.logger.Printf("[ERR] nomad.fsm: failed to add node events: %v", err) + return err + } + + return nil +} + // applyUpsertVaultAccessor stores the Vault accessors for a given allocation // and task func (n *nomadFSM) applyUpsertVaultAccessor(buf []byte, index uint64) interface{} { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 98278165ec4..11e49c9d8b7 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -74,6 +74,50 @@ func makeLog(buf []byte) *raft.Log { } } +func TestFSM_UpsertNodeEvents(t *testing.T) { + t.Parallel() + require := require.New(t) + fsm := testFSM(t) + state := fsm.State() + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + nodeEvent := &structs.NodeEvent{ + Message: "Heartbeating failed", + Subsystem: "Heartbeat", + Timestamp: time.Now().Unix(), + } + + nodeEvents := []*structs.NodeEvent{nodeEvent} + allEvents := map[string][]*structs.NodeEvent{node.ID: nodeEvents} + + req := structs.EmitNodeEventsRequest{ + NodeEvents: allEvents, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + buf, err := structs.Encode(structs.UpsertNodeEventsType, req) + require.Nil(err) + + // the response in this case will be an error + resp := fsm.Apply(makeLog(buf)) + require.Nil(resp) + + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(2, len(out.Events)) + + first := out.Events[1] + require.Equal(uint64(1), first.CreateIndex) + require.Equal("Heartbeating failed", first.Message) +} + func TestFSM_UpsertNode(t *testing.T) { t.Parallel() fsm := testFSM(t) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index f0d28ee24ac..5c6d277375d 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1363,3 +1363,30 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, n.srv.setQueryMeta(&reply.QueryMeta) return nil } + +func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error { + if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now()) + + if len(args.NodeEvents) == 0 { + return fmt.Errorf("no node events given") + } + for nodeID, events := range args.NodeEvents { + if len(events) == 0 { + return fmt.Errorf("no node events given for node %q", nodeID) + } + } + + // TODO ACLs + + _, index, err := n.srv.raftApply(structs.UpsertNodeEventsType, args) + if err != nil { + n.srv.logger.Printf("[ERR] nomad.node upserting node events failed: %v", err) + return err + } + + reply.Index = index + return nil +} diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 9bd9db8a7f5..fde2f4d3e23 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -78,6 +78,45 @@ func TestClientEndpoint_Register(t *testing.T) { }) } +func TestClientEndpoint_EmitEvents(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1 := TestServer(t, nil) + state := s1.fsm.State() + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // create a node that we can register our event to + node := mock.Node() + err := state.UpsertNode(2, node) + require.Nil(err) + + nodeEvent := &structs.NodeEvent{ + Message: "Registration failed", + Subsystem: "Server", + Timestamp: time.Now().Unix(), + } + + nodeEvents := map[string][]*structs.NodeEvent{node.ID: {nodeEvent}} + req := structs.EmitNodeEventsRequest{ + NodeEvents: nodeEvents, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvents", &req, &resp) + require.Nil(err) + require.NotEqual(0, resp.Index) + + // Check for the node in the FSM + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.Nil(err) + require.False(len(out.Events) < 2) +} + func TestClientEndpoint_Register_SecretMismatch(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) @@ -947,10 +986,19 @@ func TestClientEndpoint_GetNode(t *testing.T) { // Update the status updated at value node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt node.SecretID = "" + node.Events = resp2.Node.Events if !reflect.DeepEqual(node, resp2.Node) { t.Fatalf("bad: %#v \n %#v", node, resp2.Node) } + // assert that the node register event was set correctly + if len(resp2.Node.Events) != 1 { + t.Fatalf("Did not set node events: %#v", resp2.Node) + } + if resp2.Node.Events[0].Message != "Node Registered" { + t.Fatalf("Did not set node register event correctly: %#v", resp2.Node) + } + // Lookup non-existing node get.NodeID = "12345678-abcd-efab-cdef-123456789abc" if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", get, &resp2); err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8afb9af5561..6156a3c7502 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -526,7 +526,18 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { node.CreateIndex = exist.CreateIndex node.ModifyIndex = index node.Drain = exist.Drain // Retain the drain mode + + // Retain node events that have already been set on the node + node.Events = exist.Events } else { + // Because this is the first time the node is being registered, we should + // also create a node registration event + nodeEvent := &structs.NodeEvent{ + Message: "Node Registered", + Subsystem: "Cluster", + Timestamp: node.StatusUpdatedAt, + } + node.Events = []*structs.NodeEvent{nodeEvent} node.CreateIndex = index node.ModifyIndex = index } @@ -620,8 +631,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er // Copy the existing node existingNode := existing.(*structs.Node) - copyNode := new(structs.Node) - *copyNode = *existingNode + copyNode := existingNode.Copy() // Update the drain in the copy copyNode.Drain = drain @@ -639,6 +649,62 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er return nil } +// UpsertNodeEvents adds the node events to the nodes, rotating events as +// necessary. +func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error { + txn := s.db.Txn(true) + defer txn.Abort() + + for nodeID, events := range nodeEvents { + if err := s.upsertNodeEvents(index, nodeID, events, txn); err != nil { + return err + } + } + + txn.Commit() + return nil +} + +// upsertNodeEvent upserts a node event for a respective node. It also maintains +// that a fixed number of node events are ever stored simultaneously, deleting +// older events once this bound has been reached. +func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *memdb.Txn) error { + // Lookup the node + existing, err := txn.First("nodes", "id", nodeID) + if err != nil { + return fmt.Errorf("node lookup failed: %v", err) + } + if existing == nil { + return fmt.Errorf("node not found") + } + + // Copy the existing node + existingNode := existing.(*structs.Node) + copyNode := existingNode.Copy() + + // Add the events, updating the indexes + for _, e := range events { + e.CreateIndex = index + copyNode.Events = append(copyNode.Events, e) + } + + // Keep node events pruned to not exceed the max allowed + if l := len(copyNode.Events); l > structs.MaxRetainedNodeEvents { + delta := l - structs.MaxRetainedNodeEvents + copyNode.Events = copyNode.Events[delta:] + } + + // Insert the node + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + return nil +} + // NodeByID is used to lookup a node by ID func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) { txn := s.db.Txn(false) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 0249f627b96..d176e178b9a 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -748,6 +748,95 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { } } +func TestStateStore_AddSingleNodeEvent(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + // We create a new node event every time we register a node + err := state.UpsertNode(1000, node) + require.Nil(err) + + require.Equal(1, len(node.Events)) + require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem) + require.Equal("Node Registered", node.Events[0].Message) + + // Create a watchset so we can test that AddNodeEvent fires the watch + ws := memdb.NewWatchSet() + _, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + nodeEvent := &structs.NodeEvent{ + Message: "failed", + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + nodeEvents := map[string][]*structs.NodeEvent{ + node.ID: {nodeEvent}, + } + err = state.UpsertNodeEvents(uint64(1001), nodeEvents) + require.Nil(err) + + require.True(watchFired(ws)) + + ws = memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(2, len(out.Events)) + require.Equal(nodeEvent, out.Events[1]) +} + +// To prevent stale node events from accumulating, we limit the number of +// stored node events to 10. +func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + require.Equal(1, len(node.Events)) + require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem) + require.Equal("Node Registered", node.Events[0].Message) + + var out *structs.Node + for i := 1; i <= 20; i++ { + ws := memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + nodeEvent := &structs.NodeEvent{ + Message: fmt.Sprintf("%dith failed", i), + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + + nodeEvents := map[string][]*structs.NodeEvent{ + out.ID: {nodeEvent}, + } + err := state.UpsertNodeEvents(uint64(i), nodeEvents) + require.Nil(err) + + require.True(watchFired(ws)) + ws = memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + } + + ws := memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(10, len(out.Events)) + require.Equal(uint64(11), out.Events[0].CreateIndex) + require.Equal(uint64(20), out.Events[len(out.Events)-1].CreateIndex) +} + func TestStateStore_Nodes(t *testing.T) { state := testStateStore(t) var nodes []*structs.Node diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5202f7f35d1..402b5701fb9 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -74,6 +74,7 @@ const ( ACLTokenDeleteRequestType ACLTokenBootstrapRequestType AutopilotRequestType + UpsertNodeEventsType ) const ( @@ -122,6 +123,10 @@ const ( // the fraction. So 16 == 6.25% limit of jitter. This jitter is also // applied to RPCHoldTimeout. JitterFraction = 16 + + // MaxRetainedNodeEvents is the maximum number of node events that will be + // retained for a single node + MaxRetainedNodeEvents = 10 ) // Context defines the scope in which a search for Nomad object operates, and @@ -1050,6 +1055,55 @@ type NodeConnQueryResponse struct { QueryMeta } +// EmitNodeEventsRequest is a request to update the node events source +// with a new client-side event +type EmitNodeEventsRequest struct { + // NodeEvents are a map where the key is a node id, and value is a list of + // events for that node + NodeEvents map[string][]*NodeEvent + + WriteRequest +} + +// EmitNodeEventsResponse is a response to the client about the status of +// the node event source update. +type EmitNodeEventsResponse struct { + Index uint64 + WriteMeta +} + +const ( + NodeEventSubsystemDrain = "Drain" + NodeEventSubsystemDriver = "Driver" + NodeEventSubsystemHeartbeat = "Heartbeat" + NodeEventSubsystemCluster = "Cluster" +) + +// NodeEvent is a single unit representing a node’s state change +type NodeEvent struct { + Message string + Subsystem string + Details map[string]string + Timestamp int64 + CreateIndex uint64 +} + +func (ne *NodeEvent) String() string { + var details []string + for k, v := range ne.Details { + details = append(details, fmt.Sprintf("%s: %s", k, v)) + } + + return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, ne.Subsystem, strings.Join(details, ","), ne.Timestamp) +} + +func (ne *NodeEvent) Copy() *NodeEvent { + c := new(NodeEvent) + *c = *ne + c.Details = helper.CopyMapStringString(ne.Details) + return c +} + const ( NodeStatusInit = "initializing" NodeStatusReady = "ready" @@ -1153,6 +1207,10 @@ type Node struct { // updated StatusUpdatedAt int64 + // Events is the most recent set of events generated for the node, + // retaining only MaxRetainedNodeEvents number at a time + Events []*NodeEvent + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -1174,9 +1232,24 @@ func (n *Node) Copy() *Node { nn.Reserved = nn.Reserved.Copy() nn.Links = helper.CopyMapStringString(nn.Links) nn.Meta = helper.CopyMapStringString(nn.Meta) + nn.Events = copyNodeEvents(n.Events) return nn } +// copyNodeEvents is a helper to copy a list of NodeEvent's +func copyNodeEvents(events []*NodeEvent) []*NodeEvent { + l := len(events) + if l == 0 { + return nil + } + + c := make([]*NodeEvent, l) + for i, event := range events { + c[i] = event.Copy() + } + return c +} + // TerminalStatus returns if the current status is terminal and // will no longer transition. func (n *Node) TerminalStatus() bool {