diff --git a/nomad/fsm.go b/nomad/fsm.go index c45ce6c3bf5..9a1e777686d 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -236,6 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyACLTokenBootstrap(buf[1:], log.Index) case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) + case structs.AddNodeEventType: + return n.applyAddNodeEventType(buf[1:], log.Index) } // Check enterprise only message types. @@ -628,6 +630,22 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} return n.reconcileQueuedAllocations(index) } +// applyAddNodeEventType applies a node event to the set of currently-available +// events. +func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} { + var req structs.EmitNodeEventRequest + if err := structs.Decode(buf, &req); err != nil { + n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventREquest: %v", err) + return err + } + + if err := n.state.AddNodeEvent(index, req.NodeID, req.NodeEvent); err != nil { + n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) + return err + } + return nil +} + // applyUpsertVaultAccessor stores the Vault accessors for a given allocation // and task func (n *nomadFSM) applyUpsertVaultAccessor(buf []byte, index uint64) interface{} { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index d64df1d3320..a1c3d5e57ac 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -74,6 +74,47 @@ func makeLog(buf []byte) *raft.Log { } } +func TestFSM_ApplyNodeEvent(t *testing.T) { + t.Parallel() + require := require.New(t) + fsm := testFSM(t) + state := fsm.State() + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + nodeEvent := &structs.NodeEvent{ + Message: "Registration failed", + Subsystem: "Server", + Timestamp: time.Now().Unix(), + } + + req := structs.EmitNodeEventRequest{ + NodeID: node.ID, + NodeEvent: nodeEvent, + } + buf, err := structs.Encode(structs.AddNodeEventType, req) + require.Nil(err) + + // the response in this case will be an error + resp := fsm.Apply(makeLog(buf)) + require.Nil(resp) + + ws := memdb.NewWatchSet() + actualNode, err := state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(1, len(actualNode.NodeEvents)) + + first := actualNode.NodeEvents[0] + require.Equal(uint64(1), first.CreateIndex) + require.Equal("Registration failed", first.Message) +} + func TestFSM_UpsertNode(t *testing.T) { t.Parallel() fsm := testFSM(t) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 75e0cf9ffe0..bb45333abfc 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -52,6 +52,23 @@ type Node struct { updatesLock sync.Mutex } +func (n *Node) EmitEvent(args *structs.EmitNodeEventRequest, reply *structs.EmitNodeEventResponse) error { + if done, err := n.srv.forward("Node.EmitEvent", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now()) + + _, index, err := n.srv.raftApply(structs.AddNodeEventType, args) + + if err != nil { + n.srv.logger.Printf("[ERR] nomad.node AddNodeEventType failed: %+v", err) + return err + } + + reply.Index = index + return nil +} + // Register is used to upsert a client that is available for scheduling func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.Register", args, args, reply); done { @@ -112,6 +129,16 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" { return fmt.Errorf("node secret ID does not match. Not registering node.") } + } else { + // Because this is the first time the node is being registered, we should + // also create a node registration event + nodeEvent := &structs.NodeEvent{ + Message: "Node Registered", + Subsystem: "Server", + Timestamp: time.Now().Unix(), + } + args.Node.NodeEvents = make([]*structs.NodeEvent, 0) + args.Node.NodeEvents = append(args.Node.NodeEvents, nodeEvent) } // We have a valid node connection, so add the mapping to cache the diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 9bd9db8a7f5..f817dbc832b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -78,6 +78,32 @@ func TestClientEndpoint_Register(t *testing.T) { }) } +func TestClientEndpoint_EmitEvent(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + nodeEvent := &structs.NodeEvent{ + Message: "Registration failed", + Subsystem: "Server", + Timestamp: time.Now().Unix(), + } + + req := structs.EmitNodeEventRequest{ + NodeEvent: nodeEvent, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp) + require.Nil(err) + require.NotEqual(0, resp.Index) +} + func TestClientEndpoint_Register_SecretMismatch(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) @@ -947,6 +973,7 @@ func TestClientEndpoint_GetNode(t *testing.T) { // Update the status updated at value node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt node.SecretID = "" + node.NodeEvents = resp2.Node.NodeEvents if !reflect.DeepEqual(node, resp2.Node) { t.Fatalf("bad: %#v \n %#v", node, resp2.Node) } diff --git a/nomad/state/events_state_store.go b/nomad/state/events_state_store.go new file mode 100644 index 00000000000..fa233ed27f6 --- /dev/null +++ b/nomad/state/events_state_store.go @@ -0,0 +1,47 @@ +package state + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/structs" +) + +// addNodeEvent is a function which wraps upsertNodeEvent +func (s *StateStore) AddNodeEvent(index uint64, nodeID string, event *structs.NodeEvent) error { + txn := s.db.Txn(true) + defer txn.Abort() + + return s.upsertNodeEvent(index, nodeID, event, txn) +} + +// upsertNodeEvent upserts a node event for a respective node. It also maintains +// that only 10 node events are ever stored simultaneously, deleting older +// events once this bound has been reached. +func (s *StateStore) upsertNodeEvent(index uint64, nodeID string, event *structs.NodeEvent, txn *memdb.Txn) error { + + ws := memdb.NewWatchSet() + node, err := s.NodeByID(ws, nodeID) + + if err != nil { + return fmt.Errorf("unable to look up nodes by id %+v", err) + } + + if node == nil { + return fmt.Errorf("unable to look up nodes by id %s", nodeID) + } + + event.CreateIndex = index + + nodeEvents := node.NodeEvents + + if len(nodeEvents) >= 10 { + delta := len(nodeEvents) - 10 + nodeEvents = nodeEvents[delta+1:] + } + nodeEvents = append(nodeEvents, event) + node.NodeEvents = nodeEvents + + txn.Commit() + return nil +} diff --git a/nomad/state/events_state_store_test.go b/nomad/state/events_state_store_test.go new file mode 100644 index 00000000000..a44de1ed041 --- /dev/null +++ b/nomad/state/events_state_store_test.go @@ -0,0 +1,69 @@ +package state + +import ( + "testing" + "time" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestStateStore_AddSingleNodeEvent(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + nodeEvent := &structs.NodeEvent{ + Message: "failed", + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + err = state.AddNodeEvent(1001, node.ID, nodeEvent) + require.Nil(err) + + ws := memdb.NewWatchSet() + actualNode, err := state.NodeByID(ws, node.ID) + require.Nil(err) + require.Equal(1, len(actualNode.NodeEvents)) + require.Equal(nodeEvent, actualNode.NodeEvents[0]) +} + +// To prevent stale node events from accumulating, we limit the number of +// stored node events to 10. +func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + for i := 1; i <= 20; i++ { + nodeEvent := &structs.NodeEvent{ + Message: "failed", + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + err := state.AddNodeEvent(uint64(i), node.ID, nodeEvent) + require.Nil(err) + } + + ws := memdb.NewWatchSet() + actualNode, err := state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(10, len(actualNode.NodeEvents)) + require.Equal(uint64(11), actualNode.NodeEvents[0].CreateIndex) + require.Equal(uint64(20), actualNode.NodeEvents[len(actualNode.NodeEvents)-1].CreateIndex) +} diff --git a/nomad/structs/events.go b/nomad/structs/events.go index 769a79ee619..bc3a2553963 100644 --- a/nomad/structs/events.go +++ b/nomad/structs/events.go @@ -7,12 +7,31 @@ const ( Drain Subsystem = "Drain" Driver Subsystem = "Driver" Heartbeating Subsystem = "Heartbeating" + Server Subsystem = "Server" ) // NodeEvent is a single unit representing a node’s state change type NodeEvent struct { + NodeID string Message string Subsystem Details map[string]string Timestamp int64 + + CreateIndex uint64 +} + +// EmitNodeEventRequest is a client request to update the node events source +// with a new client-side event +type EmitNodeEventRequest struct { + NodeID string + NodeEvent *NodeEvent + WriteRequest +} + +// EmitNodeEventResponse is a server response to the client about the status of +// the node event source update. +type EmitNodeEventResponse struct { + Index uint64 + WriteRequest } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 52630853383..7b3dfc85f93 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -74,6 +74,7 @@ const ( ACLTokenDeleteRequestType ACLTokenBootstrapRequestType AutopilotRequestType + AddNodeEventType ) const ( @@ -1153,6 +1154,9 @@ type Node struct { // updated StatusUpdatedAt int64 + // NodeEvents is a list of the last 10 or lest events for this node + NodeEvents []*NodeEvent + // Raft Indexes CreateIndex uint64 ModifyIndex uint64