diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index 841445cb3ba..586b5e5ab84 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -30,13 +30,21 @@ const ( // NodeDeadlineCoalesceWindow is the duration in which deadlining nodes will // be coalesced together NodeDeadlineCoalesceWindow = 5 * time.Second + + // NodeDrainEventComplete is used to indicate that the node drain is + // finished. + NodeDrainEventComplete = "Node drain complete" + + // NodeDrainEventDetailDeadlined is the key to use when the drain is + // complete because a deadline. The acceptable values are "true" and "false" + NodeDrainEventDetailDeadlined = "deadline_reached" ) // RaftApplier contains methods for applying the raft requests required by the // NodeDrainer. type RaftApplier interface { AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error) - NodesDrainComplete(nodes []string) (uint64, error) + NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error) } // NodeTracker is the interface to notify an object that is tracking draining @@ -254,10 +262,16 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) { n.l.RUnlock() n.batchDrainAllocs(forceStop) + // Create the node event + event := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemDrain). + SetMessage(NodeDrainEventComplete). + AddDetail(NodeDrainEventDetailDeadlined, "true") + // Submit the node transitions in a sharded form to ensure a reasonable // Raft transaction size. for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) { - if _, err := n.raft.NodesDrainComplete(nodes); err != nil { + if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err) } } @@ -324,10 +338,15 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) { } } + // Create the node event + event := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemDrain). + SetMessage(NodeDrainEventComplete) + // Submit the node transitions in a sharded form to ensure a reasonable // Raft transaction size. for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) { - if _, err := n.raft.NodesDrainComplete(nodes); err != nil { + if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err) } } diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 8862abd2394..ac23b16680b 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -102,7 +102,12 @@ func (n *NodeDrainer) Update(node *structs.Node) { } } - index, err := n.raft.NodesDrainComplete([]string{node.ID}) + // Create the node event + event := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemDrain). + SetMessage(NodeDrainEventComplete) + + index, err := n.raft.NodesDrainComplete([]string{node.ID}, event) if err != nil { n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err) } else { diff --git a/nomad/drainer/watch_nodes_test.go b/nomad/drainer/watch_nodes_test.go index 476c7a39bb5..74a62e85253 100644 --- a/nomad/drainer/watch_nodes_test.go +++ b/nomad/drainer/watch_nodes_test.go @@ -97,7 +97,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) { require.Equal(n, tracked[n.ID]) // Change the node to be not draining and wait for it to be untracked - require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false)) + require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, nil)) testutil.WaitForResult(func() (bool, error) { return len(m.Events) == 2, nil }, func(err error) { @@ -175,7 +175,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) { // Change the node to have a new spec s2 := n.DrainStrategy.Copy() s2.Deadline += time.Hour - require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false)) + require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, nil)) // Wait for it to be updated testutil.WaitForResult(func() (bool, error) { diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 81d8625ca84..379aee34047 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -12,6 +12,7 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/drainer" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -212,6 +213,12 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) } func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { @@ -300,6 +307,13 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) } func TestDrainer_DrainEmptyNode(t *testing.T) { @@ -343,6 +357,12 @@ func TestDrainer_DrainEmptyNode(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) } func TestDrainer_AllTypes_Deadline(t *testing.T) { @@ -500,6 +520,13 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { } } require.True(serviceMax < batchMax) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) } // Test that drain is unset when batch jobs naturally finish @@ -659,6 +686,12 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) } func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { @@ -824,6 +857,13 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 3) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) } // Test that transitions to force drain work. @@ -962,6 +1002,13 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { }, func(err error) { t.Fatalf("err: %v", err) }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + require.Len(node.Events, 4) + require.Equal(drainer.NodeDrainEventComplete, node.Events[3].Message) + require.Contains(node.Events[3].Details, drainer.NodeDrainEventDetailDeadlined) }) } } diff --git a/nomad/drainer_shims.go b/nomad/drainer_shims.go index 0eb8c43a27b..c9795d5ac4e 100644 --- a/nomad/drainer_shims.go +++ b/nomad/drainer_shims.go @@ -8,15 +8,19 @@ type drainerShim struct { s *Server } -func (d drainerShim) NodesDrainComplete(nodes []string) (uint64, error) { +func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error) { args := &structs.BatchNodeUpdateDrainRequest{ Updates: make(map[string]*structs.DrainUpdate, len(nodes)), + NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)), WriteRequest: structs.WriteRequest{Region: d.s.config.Region}, } update := &structs.DrainUpdate{} for _, node := range nodes { args.Updates[node] = update + if event != nil { + args.NodeEvents[node] = event + } } resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args) diff --git a/nomad/fsm.go b/nomad/fsm.go index cdbe9207c18..9453a6d5a56 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -349,7 +349,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} { } } - if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible); err != nil { + if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpdateNodeDrain failed: %v", err) return err } @@ -363,7 +363,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.BatchUpdateNodeDrain(index, req.Updates); err != nil { + if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil { n.logger.Printf("[ERR] nomad.fsm: BatchUpdateNodeDrain failed: %v", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 5cace9d794d..d2b65b6b020 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -327,12 +327,20 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) { Deadline: 10 * time.Second, }, } + event := &structs.NodeEvent{ + Message: "Drain strategy enabled", + Subsystem: structs.NodeEventSubsystemDrain, + Timestamp: time.Now(), + } req2 := structs.BatchNodeUpdateDrainRequest{ Updates: map[string]*structs.DrainUpdate{ node.ID: { DrainStrategy: strategy, }, }, + NodeEvents: map[string]*structs.NodeEvent{ + node.ID: event, + }, } buf, err = structs.Encode(structs.BatchNodeUpdateDrainRequestType, req2) require.Nil(err) @@ -346,6 +354,7 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) { require.Nil(err) require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) + require.Len(node.Events, 2) } func TestFSM_UpdateNodeDrain(t *testing.T) { @@ -371,6 +380,11 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { req2 := structs.NodeUpdateDrainRequest{ NodeID: node.ID, DrainStrategy: strategy, + NodeEvent: &structs.NodeEvent{ + Message: "Drain strategy enabled", + Subsystem: structs.NodeEventSubsystemDrain, + Timestamp: time.Now(), + }, } buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2) require.Nil(err) @@ -384,6 +398,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { require.Nil(err) require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) + require.Len(node.Events, 2) } func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) { diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 6bd56499844..675b7116e61 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -27,6 +27,11 @@ const ( // maxParallelRequestsPerDerive is the maximum number of parallel Vault // create token requests that may be outstanding per derive request maxParallelRequestsPerDerive = 16 + + // NodeDrainEvents are the various drain messages + NodeDrainEventDrainSet = "Node drain strategy set" + NodeDrainEventDrainDisabled = "Node drain disabled" + NodeDrainEventDrainUpdated = "Node drain stategy updated" ) // Node endpoint is used for client interactions @@ -439,6 +444,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, if args.NodeID == "" { return fmt.Errorf("missing node ID for drain update") } + if args.NodeEvent != nil { + return fmt.Errorf("node event must not be set") + } // Look for the node snap, err := n.srv.fsm.State().Snapshot() @@ -468,6 +476,18 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, args.DrainStrategy.ForceDeadline = time.Now().Add(args.DrainStrategy.Deadline) } + // Construct the node event + args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemDrain) + if node.DrainStrategy == nil && args.DrainStrategy == nil { + return nil // Nothing to do + } else if node.DrainStrategy == nil && args.DrainStrategy != nil { + args.NodeEvent.SetMessage(NodeDrainEventDrainSet) + } else if node.DrainStrategy != nil && args.DrainStrategy != nil { + args.NodeEvent.SetMessage(NodeDrainEventDrainUpdated) + } else if node.DrainStrategy != nil && args.DrainStrategy == nil { + args.NodeEvent.SetMessage(NodeDrainEventDrainDisabled) + } + // Commit this update via Raft _, index, err := n.srv.raftApply(structs.NodeUpdateDrainRequestType, args) if err != nil { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index fdec0164ee5..073110f7ca2 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -845,6 +845,8 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) { require.Nil(err) require.True(out.Drain) require.Equal(strategy.Deadline, out.DrainStrategy.Deadline) + require.Len(out.Events, 2) + require.Equal(NodeDrainEventDrainSet, out.Events[1].Message) // before+deadline should be before the forced deadline require.True(beforeUpdate.Add(strategy.Deadline).Before(out.DrainStrategy.ForceDeadline)) @@ -2587,7 +2589,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { Deadline: 10 * time.Second, }, } - errCh <- state.UpdateNodeDrain(3, node.ID, s, false) + errCh <- state.UpdateNodeDrain(3, node.ID, s, false, nil) }) req.MinQueryIndex = 2 diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 568fcbed616..cb257b56e7f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -619,11 +619,11 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error } // BatchUpdateNodeDrain is used to update the drain of a node set of nodes -func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate) error { +func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() for node, update := range updates { - if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible); err != nil { + if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, events[node]); err != nil { return err } } @@ -633,11 +633,11 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updates map[string]*stru // UpdateNodeDrain is used to update the drain of a node func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, - drain *structs.DrainStrategy, markEligible bool) error { + drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() - if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible); err != nil { + if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, event); err != nil { return err } txn.Commit() @@ -645,7 +645,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, } func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID string, - drain *structs.DrainStrategy, markEligible bool) error { + drain *structs.DrainStrategy, markEligible bool, event *structs.NodeEvent) error { // Lookup the node existing, err := txn.First("nodes", "id", nodeID) @@ -660,6 +660,11 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st existingNode := existing.(*structs.Node) copyNode := existingNode.Copy() + // Add the event if given + if event != nil { + appendNodeEvents(index, copyNode, []*structs.NodeEvent{event}) + } + // Update the drain in the copy copyNode.Drain = drain != nil // COMPAT: Remove in Nomad 0.9 copyNode.DrainStrategy = drain @@ -754,18 +759,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*str // Copy the existing node existingNode := existing.(*structs.Node) copyNode := existingNode.Copy() - - // Add the events, updating the indexes - for _, e := range events { - e.CreateIndex = index - copyNode.Events = append(copyNode.Events, e) - } - - // Keep node events pruned to not exceed the max allowed - if l := len(copyNode.Events); l > structs.MaxRetainedNodeEvents { - delta := l - structs.MaxRetainedNodeEvents - copyNode.Events = copyNode.Events[delta:] - } + appendNodeEvents(index, copyNode, events) // Insert the node if err := txn.Insert("nodes", copyNode); err != nil { @@ -778,6 +772,22 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*str return nil } +// appendNodeEvents is a helper that takes a node and new events and appends +// them, pruning older events as needed. +func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent) { + // Add the events, updating the indexes + for _, e := range events { + e.CreateIndex = index + node.Events = append(node.Events, e) + } + + // Keep node events pruned to not exceed the max allowed + if l := len(node.Events); l > structs.MaxRetainedNodeEvents { + delta := l - structs.MaxRetainedNodeEvents + node.Events = node.Events[delta:] + } +} + // NodeByID is used to lookup a node by ID func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) { txn := s.db.Txn(false) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 2c9f42ff7fd..1a6c66de90b 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -726,7 +726,17 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { }, } - require.Nil(state.BatchUpdateNodeDrain(1002, update)) + event := &structs.NodeEvent{ + Message: "Drain strategy enabled", + Subsystem: structs.NodeEventSubsystemDrain, + Timestamp: time.Now(), + } + events := map[string]*structs.NodeEvent{ + n1.ID: event, + n2.ID: event, + } + + require.Nil(state.BatchUpdateNodeDrain(1002, update, events)) require.True(watchFired(ws)) ws = memdb.NewWatchSet() @@ -736,6 +746,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) + require.Len(out.Events, 2) require.EqualValues(1002, out.ModifyIndex) } @@ -763,7 +774,12 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { }, } - require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false)) + event := &structs.NodeEvent{ + Message: "Drain strategy enabled", + Subsystem: structs.NodeEventSubsystemDrain, + Timestamp: time.Now(), + } + require.Nil(state.UpdateNodeDrain(1001, node.ID, expectedDrain, false, event)) require.True(watchFired(ws)) ws = memdb.NewWatchSet() @@ -772,6 +788,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) + require.Len(out.Events, 2) require.EqualValues(1001, out.ModifyIndex) index, err := state.Index("nodes") @@ -886,11 +903,21 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) { }, } - require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false)) + event1 := &structs.NodeEvent{ + Message: "Drain strategy enabled", + Subsystem: structs.NodeEventSubsystemDrain, + Timestamp: time.Now(), + } + require.Nil(state.UpdateNodeDrain(1001, node.ID, drain, false, event1)) require.True(watchFired(ws)) // Remove the drain - require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true)) + event2 := &structs.NodeEvent{ + Message: "Drain strategy disabled", + Subsystem: structs.NodeEventSubsystemDrain, + Timestamp: time.Now(), + } + require.Nil(state.UpdateNodeDrain(1002, node.ID, nil, true, event2)) ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) @@ -898,6 +925,7 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) { require.False(out.Drain) require.Nil(out.DrainStrategy) require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible) + require.Len(out.Events, 3) require.EqualValues(1002, out.ModifyIndex) index, err := state.Index("nodes") @@ -944,7 +972,7 @@ func TestStateStore_UpdateNodeEligibility(t *testing.T) { Deadline: -1 * time.Second, }, } - require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false)) + require.Nil(state.UpdateNodeDrain(1002, node.ID, expectedDrain, false, nil)) // Try to set the node to eligible err = state.UpdateNodeEligibility(1003, node.ID, structs.NodeSchedulingEligible) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 615921f5bf0..03af89ae37c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -344,6 +344,10 @@ type NodeUpdateDrainRequest struct { // MarkEligible marks the node as eligible if removing the drain strategy. MarkEligible bool + + // NodeEvent is the event added to the node + NodeEvent *NodeEvent + WriteRequest } @@ -352,6 +356,10 @@ type NodeUpdateDrainRequest struct { type BatchNodeUpdateDrainRequest struct { // Updates is a mapping of nodes to their updated drain strategy Updates map[string]*DrainUpdate + + // NodeEvents is a mapping of the node to the event to add to the node + NodeEvents map[string]*NodeEvent + WriteRequest } @@ -1221,6 +1229,33 @@ func (ne *NodeEvent) Copy() *NodeEvent { return c } +// NewNodeEvent generates a new node event storing the current time as the +// timestamp +func NewNodeEvent() *NodeEvent { + return &NodeEvent{Timestamp: time.Now()} +} + +// SetMessage is used to set the message on the node event +func (ne *NodeEvent) SetMessage(msg string) *NodeEvent { + ne.Message = msg + return ne +} + +// SetSubsystem is used to set the subsystem on the node event +func (ne *NodeEvent) SetSubsystem(sys string) *NodeEvent { + ne.Subsystem = sys + return ne +} + +// AddDetail is used to add a detail to the node event +func (ne *NodeEvent) AddDetail(k, v string) *NodeEvent { + if ne.Details == nil { + ne.Details = make(map[string]string, 1) + } + ne.Details[k] = v + return ne +} + const ( NodeStatusInit = "initializing" NodeStatusReady = "ready"