From 365d47c5f2cce34dba4deb79b7bd741ae29d827f Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Wed, 28 Feb 2018 15:07:07 -0500 Subject: [PATCH 01/19] NodeEvents struct --- nomad/structs/events.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 nomad/structs/events.go diff --git a/nomad/structs/events.go b/nomad/structs/events.go new file mode 100644 index 00000000000..769a79ee619 --- /dev/null +++ b/nomad/structs/events.go @@ -0,0 +1,18 @@ +package structs + +// Subsystem denotes the subsystem where a node event took place. +type Subsystem string + +const ( + Drain Subsystem = "Drain" + Driver Subsystem = "Driver" + Heartbeating Subsystem = "Heartbeating" +) + +// NodeEvent is a single unit representing a node’s state change +type NodeEvent struct { + Message string + Subsystem + Details map[string]string + Timestamp int64 +} From f051d43a5feb0d055ebfb1303c3466d371039cff Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Thu, 1 Mar 2018 17:17:33 -0500 Subject: [PATCH 02/19] RPC, FSM, state store for Node.EmitEvent add node event when registering a node for the first time --- nomad/fsm.go | 18 +++++++ nomad/fsm_test.go | 41 +++++++++++++++ nomad/node_endpoint.go | 27 ++++++++++ nomad/node_endpoint_test.go | 27 ++++++++++ nomad/state/events_state_store.go | 47 ++++++++++++++++++ nomad/state/events_state_store_test.go | 69 ++++++++++++++++++++++++++ nomad/structs/events.go | 19 +++++++ nomad/structs/structs.go | 4 ++ 8 files changed, 252 insertions(+) create mode 100644 nomad/state/events_state_store.go create mode 100644 nomad/state/events_state_store_test.go diff --git a/nomad/fsm.go b/nomad/fsm.go index 68cc9170b06..7f9edaa6139 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -236,6 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyACLTokenBootstrap(buf[1:], log.Index) case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) + case structs.AddNodeEventType: + return n.applyAddNodeEventType(buf[1:], log.Index) } // Check enterprise only message types. @@ -628,6 +630,22 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} return n.reconcileQueuedAllocations(index) } +// applyAddNodeEventType applies a node event to the set of currently-available +// events. +func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} { + var req structs.EmitNodeEventRequest + if err := structs.Decode(buf, &req); err != nil { + n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventREquest: %v", err) + return err + } + + if err := n.state.AddNodeEvent(index, req.NodeID, req.NodeEvent); err != nil { + n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) + return err + } + return nil +} + // applyUpsertVaultAccessor stores the Vault accessors for a given allocation // and task func (n *nomadFSM) applyUpsertVaultAccessor(buf []byte, index uint64) interface{} { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 98278165ec4..16d482c4a14 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -74,6 +74,47 @@ func makeLog(buf []byte) *raft.Log { } } +func TestFSM_ApplyNodeEvent(t *testing.T) { + t.Parallel() + require := require.New(t) + fsm := testFSM(t) + state := fsm.State() + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + nodeEvent := &structs.NodeEvent{ + Message: "Registration failed", + Subsystem: "Server", + Timestamp: time.Now().Unix(), + } + + req := structs.EmitNodeEventRequest{ + NodeID: node.ID, + NodeEvent: nodeEvent, + } + buf, err := structs.Encode(structs.AddNodeEventType, req) + require.Nil(err) + + // the response in this case will be an error + resp := fsm.Apply(makeLog(buf)) + require.Nil(resp) + + ws := memdb.NewWatchSet() + actualNode, err := state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(1, len(actualNode.NodeEvents)) + + first := actualNode.NodeEvents[0] + require.Equal(uint64(1), first.CreateIndex) + require.Equal("Registration failed", first.Message) +} + func TestFSM_UpsertNode(t *testing.T) { t.Parallel() fsm := testFSM(t) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 41ff01d1f93..8c5d4743a48 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -52,6 +52,23 @@ type Node struct { updatesLock sync.Mutex } +func (n *Node) EmitEvent(args *structs.EmitNodeEventRequest, reply *structs.EmitNodeEventResponse) error { + if done, err := n.srv.forward("Node.EmitEvent", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now()) + + _, index, err := n.srv.raftApply(structs.AddNodeEventType, args) + + if err != nil { + n.srv.logger.Printf("[ERR] nomad.node AddNodeEventType failed: %+v", err) + return err + } + + reply.Index = index + return nil +} + // Register is used to upsert a client that is available for scheduling func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.Register", args, args, reply); done { @@ -112,6 +129,16 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" { return fmt.Errorf("node secret ID does not match. Not registering node.") } + } else { + // Because this is the first time the node is being registered, we should + // also create a node registration event + nodeEvent := &structs.NodeEvent{ + Message: "Node Registered", + Subsystem: "Server", + Timestamp: time.Now().Unix(), + } + args.Node.NodeEvents = make([]*structs.NodeEvent, 0) + args.Node.NodeEvents = append(args.Node.NodeEvents, nodeEvent) } // We have a valid node connection, so add the mapping to cache the diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 9bd9db8a7f5..f817dbc832b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -78,6 +78,32 @@ func TestClientEndpoint_Register(t *testing.T) { }) } +func TestClientEndpoint_EmitEvent(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + nodeEvent := &structs.NodeEvent{ + Message: "Registration failed", + Subsystem: "Server", + Timestamp: time.Now().Unix(), + } + + req := structs.EmitNodeEventRequest{ + NodeEvent: nodeEvent, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp) + require.Nil(err) + require.NotEqual(0, resp.Index) +} + func TestClientEndpoint_Register_SecretMismatch(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) @@ -947,6 +973,7 @@ func TestClientEndpoint_GetNode(t *testing.T) { // Update the status updated at value node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt node.SecretID = "" + node.NodeEvents = resp2.Node.NodeEvents if !reflect.DeepEqual(node, resp2.Node) { t.Fatalf("bad: %#v \n %#v", node, resp2.Node) } diff --git a/nomad/state/events_state_store.go b/nomad/state/events_state_store.go new file mode 100644 index 00000000000..fa233ed27f6 --- /dev/null +++ b/nomad/state/events_state_store.go @@ -0,0 +1,47 @@ +package state + +import ( + "fmt" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/structs" +) + +// addNodeEvent is a function which wraps upsertNodeEvent +func (s *StateStore) AddNodeEvent(index uint64, nodeID string, event *structs.NodeEvent) error { + txn := s.db.Txn(true) + defer txn.Abort() + + return s.upsertNodeEvent(index, nodeID, event, txn) +} + +// upsertNodeEvent upserts a node event for a respective node. It also maintains +// that only 10 node events are ever stored simultaneously, deleting older +// events once this bound has been reached. +func (s *StateStore) upsertNodeEvent(index uint64, nodeID string, event *structs.NodeEvent, txn *memdb.Txn) error { + + ws := memdb.NewWatchSet() + node, err := s.NodeByID(ws, nodeID) + + if err != nil { + return fmt.Errorf("unable to look up nodes by id %+v", err) + } + + if node == nil { + return fmt.Errorf("unable to look up nodes by id %s", nodeID) + } + + event.CreateIndex = index + + nodeEvents := node.NodeEvents + + if len(nodeEvents) >= 10 { + delta := len(nodeEvents) - 10 + nodeEvents = nodeEvents[delta+1:] + } + nodeEvents = append(nodeEvents, event) + node.NodeEvents = nodeEvents + + txn.Commit() + return nil +} diff --git a/nomad/state/events_state_store_test.go b/nomad/state/events_state_store_test.go new file mode 100644 index 00000000000..a44de1ed041 --- /dev/null +++ b/nomad/state/events_state_store_test.go @@ -0,0 +1,69 @@ +package state + +import ( + "testing" + "time" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestStateStore_AddSingleNodeEvent(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + nodeEvent := &structs.NodeEvent{ + Message: "failed", + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + err = state.AddNodeEvent(1001, node.ID, nodeEvent) + require.Nil(err) + + ws := memdb.NewWatchSet() + actualNode, err := state.NodeByID(ws, node.ID) + require.Nil(err) + require.Equal(1, len(actualNode.NodeEvents)) + require.Equal(nodeEvent, actualNode.NodeEvents[0]) +} + +// To prevent stale node events from accumulating, we limit the number of +// stored node events to 10. +func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + for i := 1; i <= 20; i++ { + nodeEvent := &structs.NodeEvent{ + Message: "failed", + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + err := state.AddNodeEvent(uint64(i), node.ID, nodeEvent) + require.Nil(err) + } + + ws := memdb.NewWatchSet() + actualNode, err := state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(10, len(actualNode.NodeEvents)) + require.Equal(uint64(11), actualNode.NodeEvents[0].CreateIndex) + require.Equal(uint64(20), actualNode.NodeEvents[len(actualNode.NodeEvents)-1].CreateIndex) +} diff --git a/nomad/structs/events.go b/nomad/structs/events.go index 769a79ee619..bc3a2553963 100644 --- a/nomad/structs/events.go +++ b/nomad/structs/events.go @@ -7,12 +7,31 @@ const ( Drain Subsystem = "Drain" Driver Subsystem = "Driver" Heartbeating Subsystem = "Heartbeating" + Server Subsystem = "Server" ) // NodeEvent is a single unit representing a node’s state change type NodeEvent struct { + NodeID string Message string Subsystem Details map[string]string Timestamp int64 + + CreateIndex uint64 +} + +// EmitNodeEventRequest is a client request to update the node events source +// with a new client-side event +type EmitNodeEventRequest struct { + NodeID string + NodeEvent *NodeEvent + WriteRequest +} + +// EmitNodeEventResponse is a server response to the client about the status of +// the node event source update. +type EmitNodeEventResponse struct { + Index uint64 + WriteRequest } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5d868e4b7f1..d320408f90b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -74,6 +74,7 @@ const ( ACLTokenDeleteRequestType ACLTokenBootstrapRequestType AutopilotRequestType + AddNodeEventType ) const ( @@ -1153,6 +1154,9 @@ type Node struct { // updated StatusUpdatedAt int64 + // NodeEvents is a list of the last 10 or lest events for this node + NodeEvents []*NodeEvent + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 From 55e0c7d06d2308bc4510312864e0dfd9f4485493 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Tue, 6 Mar 2018 10:50:55 -0500 Subject: [PATCH 03/19] fix up error logging --- nomad/node_endpoint.go | 2 +- nomad/state/events_state_store.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 8c5d4743a48..176d79dd245 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -61,7 +61,7 @@ func (n *Node) EmitEvent(args *structs.EmitNodeEventRequest, reply *structs.Emit _, index, err := n.srv.raftApply(structs.AddNodeEventType, args) if err != nil { - n.srv.logger.Printf("[ERR] nomad.node AddNodeEventType failed: %+v", err) + n.srv.logger.Printf("[ERR] nomad.node AddNodeEventType failed: %v", err) return err } diff --git a/nomad/state/events_state_store.go b/nomad/state/events_state_store.go index fa233ed27f6..58cbdcb07c9 100644 --- a/nomad/state/events_state_store.go +++ b/nomad/state/events_state_store.go @@ -24,11 +24,11 @@ func (s *StateStore) upsertNodeEvent(index uint64, nodeID string, event *structs node, err := s.NodeByID(ws, nodeID) if err != nil { - return fmt.Errorf("unable to look up nodes by id %+v", err) + return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) } if node == nil { - return fmt.Errorf("unable to look up nodes by id %s", nodeID) + return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID) } event.CreateIndex = index From de6199ac4521c213cb0f68dc102a472876a74fcd Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Wed, 7 Mar 2018 18:54:52 -0500 Subject: [PATCH 04/19] move adding node registration event to the state store --- nomad/node_endpoint.go | 10 ---------- nomad/state/events_state_store_test.go | 11 +++++++++-- nomad/state/state_store.go | 11 +++++++++++ 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 176d79dd245..3cefa026e2c 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -129,16 +129,6 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" { return fmt.Errorf("node secret ID does not match. Not registering node.") } - } else { - // Because this is the first time the node is being registered, we should - // also create a node registration event - nodeEvent := &structs.NodeEvent{ - Message: "Node Registered", - Subsystem: "Server", - Timestamp: time.Now().Unix(), - } - args.Node.NodeEvents = make([]*structs.NodeEvent, 0) - args.Node.NodeEvents = append(args.Node.NodeEvents, nodeEvent) } // We have a valid node connection, so add the mapping to cache the diff --git a/nomad/state/events_state_store_test.go b/nomad/state/events_state_store_test.go index a44de1ed041..f9fbd764de5 100644 --- a/nomad/state/events_state_store_test.go +++ b/nomad/state/events_state_store_test.go @@ -16,10 +16,14 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { node := mock.Node() + // We create a new node event every time we register a node err := state.UpsertNode(1000, node) if err != nil { t.Fatalf("err: %v", err) } + require.Equal(1, len(node.NodeEvents)) + require.Equal(structs.Subsystem("Server"), node.NodeEvents[0].Subsystem) + require.Equal("Node Registered", node.NodeEvents[0].Message) nodeEvent := &structs.NodeEvent{ Message: "failed", @@ -32,8 +36,8 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { ws := memdb.NewWatchSet() actualNode, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.Equal(1, len(actualNode.NodeEvents)) - require.Equal(nodeEvent, actualNode.NodeEvents[0]) + require.Equal(2, len(actualNode.NodeEvents)) + require.Equal(nodeEvent, actualNode.NodeEvents[1]) } // To prevent stale node events from accumulating, we limit the number of @@ -48,6 +52,9 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + require.Equal(1, len(node.NodeEvents)) + require.Equal(structs.Subsystem("Server"), node.NodeEvents[0].Subsystem) + require.Equal("Node Registered", node.NodeEvents[0].Message) for i := 1; i <= 20; i++ { nodeEvent := &structs.NodeEvent{ diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8afb9af5561..41523f7495b 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -527,6 +527,17 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { node.ModifyIndex = index node.Drain = exist.Drain // Retain the drain mode } else { + // Because this is the first time the node is being registered, we should + // also create a node registration event + nodeEvent := &structs.NodeEvent{ + Message: "Node Registered", + Subsystem: "Server", + Timestamp: node.StatusUpdatedAt, + } + + node.NodeEvents = make([]*structs.NodeEvent, 0) + node.NodeEvents = append(node.NodeEvents, nodeEvent) + node.CreateIndex = index node.ModifyIndex = index } From 551e96feff4456acc9b15cbfed36b75639ad7293 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Thu, 8 Mar 2018 05:46:35 -0500 Subject: [PATCH 05/19] code review feedback --- nomad/fsm_test.go | 8 ++++---- nomad/node_endpoint_test.go | 8 +++++++- nomad/structs/events.go | 16 ++++++++-------- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 16d482c4a14..b0737f1876f 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -88,8 +88,8 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { } nodeEvent := &structs.NodeEvent{ - Message: "Registration failed", - Subsystem: "Server", + Message: "Heartbeating failed", + Subsystem: "Heartbeat", Timestamp: time.Now().Unix(), } @@ -108,9 +108,9 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { actualNode, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.Equal(1, len(actualNode.NodeEvents)) + require.Equal(2, len(actualNode.NodeEvents)) - first := actualNode.NodeEvents[0] + first := node.NodeEvents[1] require.Equal(uint64(1), first.CreateIndex) require.Equal("Registration failed", first.Message) } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index f817dbc832b..37d4d05cc3f 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -83,10 +83,16 @@ func TestClientEndpoint_EmitEvent(t *testing.T) { require := require.New(t) s1 := TestServer(t, nil) + state := s1.fsm.State() defer s1.Shutdown() codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) + // create a node that we can register our event to + node := mock.Node() + err := state.UpsertNode(2, node) + require.Nil(err) + nodeEvent := &structs.NodeEvent{ Message: "Registration failed", Subsystem: "Server", @@ -99,7 +105,7 @@ func TestClientEndpoint_EmitEvent(t *testing.T) { } var resp structs.GenericResponse - err := msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp) + err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp) require.Nil(err) require.NotEqual(0, resp.Index) } diff --git a/nomad/structs/events.go b/nomad/structs/events.go index bc3a2553963..3457f41028b 100644 --- a/nomad/structs/events.go +++ b/nomad/structs/events.go @@ -4,17 +4,17 @@ package structs type Subsystem string const ( - Drain Subsystem = "Drain" - Driver Subsystem = "Driver" - Heartbeating Subsystem = "Heartbeating" - Server Subsystem = "Server" + Drain Subsystem = "Drain" + Driver Subsystem = "Driver" + Heartbeat Subsystem = "Heartbeat" + Server Subsystem = "Server" + Cluster Subsystem = "Cluster" ) // NodeEvent is a single unit representing a node’s state change type NodeEvent struct { - NodeID string - Message string - Subsystem + Message string + Subsystem Subsystem Details map[string]string Timestamp int64 @@ -33,5 +33,5 @@ type EmitNodeEventRequest struct { // the node event source update. type EmitNodeEventResponse struct { Index uint64 - WriteRequest + WriteMeta } From fc819754a4d334ada336593d27630407b09ea43b Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Thu, 8 Mar 2018 09:34:08 -0500 Subject: [PATCH 06/19] Ensure node updates don't strip node events Add node events to CLI --- api/nodes.go | 22 +++++++++++++++++++++ api/nodes_test.go | 4 ++++ command/agent/node_endpoint_test.go | 6 ++++++ command/node_status.go | 30 +++++++++++++++++++++++++++++ nomad/fsm_test.go | 2 +- nomad/node_endpoint_test.go | 8 ++++++++ nomad/state/state_store.go | 3 +++ nomad/structs/structs.go | 9 +++++++++ 8 files changed, 83 insertions(+), 1 deletion(-) diff --git a/api/nodes.go b/api/nodes.go index 6a61ab90b8a..beb35070e2a 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -113,10 +113,32 @@ type Node struct { Status string StatusDescription string StatusUpdatedAt int64 + NodeEvents []*NodeEvent CreateIndex uint64 ModifyIndex uint64 } +// Subsystem denotes the subsystem where a node event took place. +type Subsystem string + +const ( + Drain Subsystem = "Drain" + Driver Subsystem = "Driver" + Heartbeat Subsystem = "Heartbeat" + Server Subsystem = "Server" + Cluster Subsystem = "Cluster" +) + +// NodeEvent is a single unit representing a node’s state change +type NodeEvent struct { + Message string + Subsystem Subsystem + Details map[string]string + Timestamp int64 + + CreateIndex uint64 +} + // HostStats represents resource usage stats of the host running a Nomad client type HostStats struct { Memory *HostMemoryStats diff --git a/api/nodes_test.go b/api/nodes_test.go index d020d84aea9..7c9a8be9767 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -134,6 +134,10 @@ func TestNodes_Info(t *testing.T) { if result.StatusUpdatedAt < startTime { t.Fatalf("start time: %v, status updated: %v", startTime, result.StatusUpdatedAt) } + + if len(result.NodeEvents) < 1 { + t.Fatalf("Expected at minimum the node register event to be populated: %+v", result) + } } func TestNodes_ToggleDrain(t *testing.T) { diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index 8bf56e32895..113fa5de877 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -396,5 +396,11 @@ func TestHTTP_NodeQuery(t *testing.T) { if n.ID != node.ID { t.Fatalf("bad: %#v", n) } + if len(n.NodeEvents) != 1 { + t.Fatalf("Expected node registration event to be populated: %#v", n) + } + if n.NodeEvents[0].Message != "Node Registered" { + t.Fatalf("Expected node registration event to be first node event: %#v", n) + } }) } diff --git a/command/node_status.go b/command/node_status.go index 804e63d347f..9b35ae758fd 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -332,6 +332,8 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } c.Ui.Output(c.Colorize().Color(formatKV(basic))) + c.outputNodeStatusEvents(node) + // Get list of running allocations on the node runningAllocs, err := getRunningAllocs(client, node.ID) if err != nil { @@ -386,6 +388,34 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } +func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { + c.Ui.Output(c.Colorize().Color("\n[bold]Node Events ")) + c.outputNodeEvent(node.NodeEvents) +} + +func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) { + size := len(events) + nodeEvents := make([]string, size+1) + nodeEvents[0] = "Timestamp|Subsystem|Message|Details" + + for i, event := range events { + timestamp := formatUnixNanoTime(event.Timestamp) + subsystem := event.Subsystem + msg := event.Message + details := formatEventDetails(event.Details) + nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details) + } + c.Ui.Output(formatList(nodeEvents)) +} + +func formatEventDetails(details map[string]string) string { + var output string + for k, v := range details { + output += fmt.Sprintf("%s: %s, ", k, v) + } + return output +} + func (c *NodeStatusCommand) formatAttributes(node *api.Node) { // Print the attributes keys := make([]string, len(node.Attributes)) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b0737f1876f..14b630fe528 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -112,7 +112,7 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { first := node.NodeEvents[1] require.Equal(uint64(1), first.CreateIndex) - require.Equal("Registration failed", first.Message) + require.Equal("Heartbeating failed", first.Message) } func TestFSM_UpsertNode(t *testing.T) { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 37d4d05cc3f..af8e7b075df 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -984,6 +984,14 @@ func TestClientEndpoint_GetNode(t *testing.T) { t.Fatalf("bad: %#v \n %#v", node, resp2.Node) } + // assert that the node register event was set correctly + if len(resp2.Node.NodeEvents) != 1 { + t.Fatalf("Did not set node events: %#v", resp2.Node) + } + if resp2.Node.NodeEvents[0].Message != "Node Registered" { + t.Fatalf("Did not set node register event correctly: %#v", resp2.Node) + } + // Lookup non-existing node get.NodeID = "12345678-abcd-efab-cdef-123456789abc" if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", get, &resp2); err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 41523f7495b..e2649cf784f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -526,6 +526,9 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { node.CreateIndex = exist.CreateIndex node.ModifyIndex = index node.Drain = exist.Drain // Retain the drain mode + + // retain node events that have already been set on the node + node.NodeEvents = exist.NodeEvents } else { // Because this is the first time the node is being registered, we should // also create a node registration event diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d320408f90b..8ae65516e83 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1178,9 +1178,18 @@ func (n *Node) Copy() *Node { nn.Reserved = nn.Reserved.Copy() nn.Links = helper.CopyMapStringString(nn.Links) nn.Meta = helper.CopyMapStringString(nn.Meta) + nn.NodeEvents = copyNodeEvents(n) return nn } +func copyNodeEvents(first *Node) []*NodeEvent { + nodeEvents := make([]*NodeEvent, 0) + for _, e := range first.NodeEvents { + nodeEvents = append(nodeEvents, e) + } + return nodeEvents +} + // TerminalStatus returns if the current status is terminal and // will no longer transition. func (n *Node) TerminalStatus() bool { From 56f5e66d25c5f2ba522e45aeb96e33311d66c523 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Thu, 8 Mar 2018 14:23:34 -0500 Subject: [PATCH 07/19] add node id to tests; check for node events in state store --- nomad/fsm_test.go | 5 +++-- nomad/node_endpoint_test.go | 11 +++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 14b630fe528..5c1ac9fb775 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -94,8 +94,9 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { } req := structs.EmitNodeEventRequest{ - NodeID: node.ID, - NodeEvent: nodeEvent, + NodeID: node.ID, + NodeEvent: nodeEvent, + WriteRequest: structs.WriteRequest{Region: "global"}, } buf, err := structs.Encode(structs.AddNodeEventType, req) require.Nil(err) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index af8e7b075df..527fe3b0781 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -100,6 +100,7 @@ func TestClientEndpoint_EmitEvent(t *testing.T) { } req := structs.EmitNodeEventRequest{ + NodeID: node.ID, NodeEvent: nodeEvent, WriteRequest: structs.WriteRequest{Region: "global"}, } @@ -108,6 +109,16 @@ func TestClientEndpoint_EmitEvent(t *testing.T) { err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp) require.Nil(err) require.NotEqual(0, resp.Index) + + // Check for the node in the FSM + ws := memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(out.NodeEvents) < 2 { + t.Fatalf("expected node to have a register event") + } } func TestClientEndpoint_Register_SecretMismatch(t *testing.T) { From cbbb60af61d0a9878476e0652b23de45c74eaa7e Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Thu, 8 Mar 2018 14:25:16 -0500 Subject: [PATCH 08/19] move all structs to structs file --- nomad/structs/events.go | 37 ------------------------------------- nomad/structs/structs.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 37 deletions(-) delete mode 100644 nomad/structs/events.go diff --git a/nomad/structs/events.go b/nomad/structs/events.go deleted file mode 100644 index 3457f41028b..00000000000 --- a/nomad/structs/events.go +++ /dev/null @@ -1,37 +0,0 @@ -package structs - -// Subsystem denotes the subsystem where a node event took place. -type Subsystem string - -const ( - Drain Subsystem = "Drain" - Driver Subsystem = "Driver" - Heartbeat Subsystem = "Heartbeat" - Server Subsystem = "Server" - Cluster Subsystem = "Cluster" -) - -// NodeEvent is a single unit representing a node’s state change -type NodeEvent struct { - Message string - Subsystem Subsystem - Details map[string]string - Timestamp int64 - - CreateIndex uint64 -} - -// EmitNodeEventRequest is a client request to update the node events source -// with a new client-side event -type EmitNodeEventRequest struct { - NodeID string - NodeEvent *NodeEvent - WriteRequest -} - -// EmitNodeEventResponse is a server response to the client about the status of -// the node event source update. -type EmitNodeEventResponse struct { - Index uint64 - WriteMeta -} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8ae65516e83..3afa3b5b2b6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1162,6 +1162,42 @@ type Node struct { ModifyIndex uint64 } +// Subsystem denotes the subsystem where a node event took place. +type Subsystem string + +const ( + Drain Subsystem = "Drain" + Driver Subsystem = "Driver" + Heartbeat Subsystem = "Heartbeat" + Server Subsystem = "Server" + Cluster Subsystem = "Cluster" +) + +// NodeEvent is a single unit representing a node’s state change +type NodeEvent struct { + Message string + Subsystem Subsystem + Details map[string]string + Timestamp int64 + + CreateIndex uint64 +} + +// EmitNodeEventRequest is a client request to update the node events source +// with a new client-side event +type EmitNodeEventRequest struct { + NodeID string + NodeEvent *NodeEvent + WriteRequest +} + +// EmitNodeEventResponse is a server response to the client about the status of +// the node event source update. +type EmitNodeEventResponse struct { + Index uint64 + WriteMeta +} + // Ready returns if the node is ready for running allocations func (n *Node) Ready() bool { return n.Status == NodeStatusReady && !n.Drain From 705ff980ad747482fee1342a86ef70d209415282 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Thu, 8 Mar 2018 17:25:10 -0500 Subject: [PATCH 09/19] code review feedback --- api/nodes.go | 2 +- nomad/fsm.go | 13 ++++++- nomad/node_endpoint_test.go | 8 ++--- nomad/state/events_state_store.go | 36 ++++++++++--------- nomad/state/events_state_store_test.go | 50 +++++++++++++++++--------- nomad/state/state_store.go | 2 +- nomad/structs/structs.go | 10 +++--- 7 files changed, 75 insertions(+), 46 deletions(-) diff --git a/api/nodes.go b/api/nodes.go index beb35070e2a..d4f089695e4 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -125,7 +125,6 @@ const ( Drain Subsystem = "Drain" Driver Subsystem = "Driver" Heartbeat Subsystem = "Heartbeat" - Server Subsystem = "Server" Cluster Subsystem = "Cluster" ) @@ -137,6 +136,7 @@ type NodeEvent struct { Timestamp int64 CreateIndex uint64 + ModifyIndex uint64 } // HostStats represents resource usage stats of the host running a Nomad client diff --git a/nomad/fsm.go b/nomad/fsm.go index 7f9edaa6139..f5fdc73fa87 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -639,7 +639,18 @@ func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} { return err } - if err := n.state.AddNodeEvent(index, req.NodeID, req.NodeEvent); err != nil { + ws := memdb.NewWatchSet() + node, err := n.state.NodeByID(ws, req.NodeID) + + if err != nil { + return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) + } + + if node == nil { + return fmt.Errorf("unable to look up node by id %s to insert node event", req.NodeID) + } + + if err := n.state.AddNodeEvent(index, node, req.NodeEvent); err != nil { n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) return err } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 527fe3b0781..c7d9f79efb8 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -113,12 +113,8 @@ func TestClientEndpoint_EmitEvent(t *testing.T) { // Check for the node in the FSM ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if len(out.NodeEvents) < 2 { - t.Fatalf("expected node to have a register event") - } + require.Nil(err) + require.False(len(out.NodeEvents) < 2) } func TestClientEndpoint_Register_SecretMismatch(t *testing.T) { diff --git a/nomad/state/events_state_store.go b/nomad/state/events_state_store.go index 58cbdcb07c9..6e3f34d38c4 100644 --- a/nomad/state/events_state_store.go +++ b/nomad/state/events_state_store.go @@ -8,40 +8,44 @@ import ( ) // addNodeEvent is a function which wraps upsertNodeEvent -func (s *StateStore) AddNodeEvent(index uint64, nodeID string, event *structs.NodeEvent) error { +func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, event *structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() - return s.upsertNodeEvent(index, nodeID, event, txn) + err := s.upsertNodeEvent(index, node, event, txn) + txn.Commit() + return err } // upsertNodeEvent upserts a node event for a respective node. It also maintains // that only 10 node events are ever stored simultaneously, deleting older // events once this bound has been reached. -func (s *StateStore) upsertNodeEvent(index uint64, nodeID string, event *structs.NodeEvent, txn *memdb.Txn) error { - - ws := memdb.NewWatchSet() - node, err := s.NodeByID(ws, nodeID) - - if err != nil { - return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) - } - - if node == nil { - return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID) - } +func (s *StateStore) upsertNodeEvent(index uint64, node *structs.Node, event *structs.NodeEvent, txn *memdb.Txn) error { event.CreateIndex = index + event.ModifyIndex = index + + // Copy the existing node + copyNode := new(structs.Node) + *copyNode = *node nodeEvents := node.NodeEvents + // keep node events pruned to below 10 simultaneously if len(nodeEvents) >= 10 { delta := len(nodeEvents) - 10 nodeEvents = nodeEvents[delta+1:] } nodeEvents = append(nodeEvents, event) - node.NodeEvents = nodeEvents + copyNode.NodeEvents = nodeEvents + + // Insert the node + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } - txn.Commit() return nil } diff --git a/nomad/state/events_state_store_test.go b/nomad/state/events_state_store_test.go index f9fbd764de5..81ec918343d 100644 --- a/nomad/state/events_state_store_test.go +++ b/nomad/state/events_state_store_test.go @@ -1,6 +1,7 @@ package state import ( + "fmt" "testing" "time" @@ -18,26 +19,33 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { // We create a new node event every time we register a node err := state.UpsertNode(1000, node) - if err != nil { - t.Fatalf("err: %v", err) - } + require.Nil(err) + require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Server"), node.NodeEvents[0].Subsystem) + require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) require.Equal("Node Registered", node.NodeEvents[0].Message) + // Create a watchset so we can test that AddNodeEvent fires the watch + ws := memdb.NewWatchSet() + _, err = state.NodeByID(ws, node.ID) + require.Nil(err) + nodeEvent := &structs.NodeEvent{ Message: "failed", Subsystem: "Driver", Timestamp: time.Now().Unix(), } - err = state.AddNodeEvent(1001, node.ID, nodeEvent) + err = state.AddNodeEvent(1001, node, nodeEvent) require.Nil(err) - ws := memdb.NewWatchSet() - actualNode, err := state.NodeByID(ws, node.ID) + require.True(watchFired(ws)) + + ws = memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.Equal(2, len(actualNode.NodeEvents)) - require.Equal(nodeEvent, actualNode.NodeEvents[1]) + + require.Equal(2, len(out.NodeEvents)) + require.Equal(nodeEvent, out.NodeEvents[1]) } // To prevent stale node events from accumulating, we limit the number of @@ -53,24 +61,34 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { t.Fatalf("err: %v", err) } require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Server"), node.NodeEvents[0].Subsystem) + require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) require.Equal("Node Registered", node.NodeEvents[0].Message) + var out *structs.Node for i := 1; i <= 20; i++ { + ws := memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + nodeEvent := &structs.NodeEvent{ - Message: "failed", + Message: fmt.Sprintf("%dith failed", i), Subsystem: "Driver", Timestamp: time.Now().Unix(), } - err := state.AddNodeEvent(uint64(i), node.ID, nodeEvent) + err := state.AddNodeEvent(uint64(i), out, nodeEvent) + require.Nil(err) + + require.True(watchFired(ws)) + ws = memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) require.Nil(err) } ws := memdb.NewWatchSet() - actualNode, err := state.NodeByID(ws, node.ID) + out, err = state.NodeByID(ws, node.ID) require.Nil(err) - require.Equal(10, len(actualNode.NodeEvents)) - require.Equal(uint64(11), actualNode.NodeEvents[0].CreateIndex) - require.Equal(uint64(20), actualNode.NodeEvents[len(actualNode.NodeEvents)-1].CreateIndex) + require.Equal(10, len(out.NodeEvents)) + require.Equal(uint64(11), out.NodeEvents[0].CreateIndex) + require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index e2649cf784f..56cba6c06f0 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -534,7 +534,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { // also create a node registration event nodeEvent := &structs.NodeEvent{ Message: "Node Registered", - Subsystem: "Server", + Subsystem: "Cluster", Timestamp: node.StatusUpdatedAt, } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 3afa3b5b2b6..b08157d12f1 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1154,7 +1154,7 @@ type Node struct { // updated StatusUpdatedAt int64 - // NodeEvents is a list of the last 10 or lest events for this node + // NodeEvents is the most recent set of events generated for the node NodeEvents []*NodeEvent // Raft Indexes @@ -1169,8 +1169,7 @@ const ( Drain Subsystem = "Drain" Driver Subsystem = "Driver" Heartbeat Subsystem = "Heartbeat" - Server Subsystem = "Server" - Cluster Subsystem = "Cluster" + Cluster Subsystem = "CLuster" ) // NodeEvent is a single unit representing a node’s state change @@ -1181,9 +1180,10 @@ type NodeEvent struct { Timestamp int64 CreateIndex uint64 + ModifyIndex uint64 } -// EmitNodeEventRequest is a client request to update the node events source +// EmitNodeEventRequest is a request to update the node events source // with a new client-side event type EmitNodeEventRequest struct { NodeID string @@ -1191,7 +1191,7 @@ type EmitNodeEventRequest struct { WriteRequest } -// EmitNodeEventResponse is a server response to the client about the status of +// EmitNodeEventResponse is a response to the client about the status of // the node event source update. type EmitNodeEventResponse struct { Index uint64 From 55670c80b0d0abd5ffe41e98a6d14122b50f829d Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Thu, 8 Mar 2018 19:30:49 -0500 Subject: [PATCH 10/19] batch submitting node events --- nomad/fsm.go | 25 ++++++++++++++----------- nomad/fsm_test.go | 12 +++++++----- nomad/node_endpoint_test.go | 4 ++-- nomad/state/events_state_store.go | 26 ++++++++++++++------------ nomad/state/events_state_store_test.go | 4 ++-- nomad/structs/structs.go | 6 ++++-- 6 files changed, 43 insertions(+), 34 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index f5fdc73fa87..0956b428f44 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -639,21 +639,24 @@ func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} { return err } - ws := memdb.NewWatchSet() - node, err := n.state.NodeByID(ws, req.NodeID) + for nodeID, nodeEvents := range req.NodeEvents { + ws := memdb.NewWatchSet() + node, err := n.state.NodeByID(ws, nodeID) - if err != nil { - return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) - } + if err != nil { + return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) + } - if node == nil { - return fmt.Errorf("unable to look up node by id %s to insert node event", req.NodeID) - } + if node == nil { + return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID) + } - if err := n.state.AddNodeEvent(index, node, req.NodeEvent); err != nil { - n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) - return err + if err := n.state.AddNodeEvent(index, node, nodeEvents); err != nil { + n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) + return err + } } + return nil } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 5c1ac9fb775..52c441402d2 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -93,9 +93,11 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { Timestamp: time.Now().Unix(), } + nodeEvents := []*structs.NodeEvent{nodeEvent} + allEvents := map[string][]*structs.NodeEvent{node.ID: nodeEvents} + req := structs.EmitNodeEventRequest{ - NodeID: node.ID, - NodeEvent: nodeEvent, + NodeEvents: allEvents, WriteRequest: structs.WriteRequest{Region: "global"}, } buf, err := structs.Encode(structs.AddNodeEventType, req) @@ -106,12 +108,12 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { require.Nil(resp) ws := memdb.NewWatchSet() - actualNode, err := state.NodeByID(ws, node.ID) + out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.Equal(2, len(actualNode.NodeEvents)) + require.Equal(2, len(out.NodeEvents)) - first := node.NodeEvents[1] + first := out.NodeEvents[1] require.Equal(uint64(1), first.CreateIndex) require.Equal("Heartbeating failed", first.Message) } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index c7d9f79efb8..7af63be0c81 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -99,9 +99,9 @@ func TestClientEndpoint_EmitEvent(t *testing.T) { Timestamp: time.Now().Unix(), } + nodeEvents := map[string][]*structs.NodeEvent{node.ID: []*structs.NodeEvent{nodeEvent}} req := structs.EmitNodeEventRequest{ - NodeID: node.ID, - NodeEvent: nodeEvent, + NodeEvents: nodeEvents, WriteRequest: structs.WriteRequest{Region: "global"}, } diff --git a/nomad/state/events_state_store.go b/nomad/state/events_state_store.go index 6e3f34d38c4..7728c849602 100644 --- a/nomad/state/events_state_store.go +++ b/nomad/state/events_state_store.go @@ -8,11 +8,11 @@ import ( ) // addNodeEvent is a function which wraps upsertNodeEvent -func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, event *structs.NodeEvent) error { +func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() - err := s.upsertNodeEvent(index, node, event, txn) + err := s.upsertNodeEvents(index, node, events, txn) txn.Commit() return err } @@ -20,10 +20,7 @@ func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, event *struc // upsertNodeEvent upserts a node event for a respective node. It also maintains // that only 10 node events are ever stored simultaneously, deleting older // events once this bound has been reached. -func (s *StateStore) upsertNodeEvent(index uint64, node *structs.Node, event *structs.NodeEvent, txn *memdb.Txn) error { - - event.CreateIndex = index - event.ModifyIndex = index +func (s *StateStore) upsertNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent, txn *memdb.Txn) error { // Copy the existing node copyNode := new(structs.Node) @@ -31,13 +28,18 @@ func (s *StateStore) upsertNodeEvent(index uint64, node *structs.Node, event *st nodeEvents := node.NodeEvents - // keep node events pruned to below 10 simultaneously - if len(nodeEvents) >= 10 { - delta := len(nodeEvents) - 10 - nodeEvents = nodeEvents[delta+1:] + for _, e := range events { + e.CreateIndex = index + e.ModifyIndex = index + + // keep node events pruned to below 10 simultaneously + if len(nodeEvents) >= 10 { + delta := len(nodeEvents) - 10 + nodeEvents = nodeEvents[delta+1:] + } + nodeEvents = append(nodeEvents, e) + copyNode.NodeEvents = nodeEvents } - nodeEvents = append(nodeEvents, event) - copyNode.NodeEvents = nodeEvents // Insert the node if err := txn.Insert("nodes", copyNode); err != nil { diff --git a/nomad/state/events_state_store_test.go b/nomad/state/events_state_store_test.go index 81ec918343d..bb2c3a65ded 100644 --- a/nomad/state/events_state_store_test.go +++ b/nomad/state/events_state_store_test.go @@ -35,7 +35,7 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { Subsystem: "Driver", Timestamp: time.Now().Unix(), } - err = state.AddNodeEvent(1001, node, nodeEvent) + err = state.AddNodeEvent(1001, node, []*structs.NodeEvent{nodeEvent}) require.Nil(err) require.True(watchFired(ws)) @@ -75,7 +75,7 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { Subsystem: "Driver", Timestamp: time.Now().Unix(), } - err := state.AddNodeEvent(uint64(i), out, nodeEvent) + err := state.AddNodeEvent(uint64(i), out, []*structs.NodeEvent{nodeEvent}) require.Nil(err) require.True(watchFired(ws)) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b08157d12f1..14a6ec2f80b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1186,8 +1186,10 @@ type NodeEvent struct { // EmitNodeEventRequest is a request to update the node events source // with a new client-side event type EmitNodeEventRequest struct { - NodeID string - NodeEvent *NodeEvent + // NodeEvents are a map where the key is a node id, and value is a list of + // events for that node + NodeEvents map[string][]*NodeEvent + WriteRequest } From a72dcbfcdda6e7b440dc27a0f7fc89a945d907d0 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Fri, 9 Mar 2018 07:05:39 -0500 Subject: [PATCH 11/19] add client side emitting of node events Changelog --- CHANGELOG.md | 2 + client/client.go | 116 +++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 104 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e06a431bcc..64ce527dec8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ __BACKWARDS INCOMPATIBILITIES:__ IMPROVEMENTS: * core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)] + * core: Node events are emitted for events such as node registration and + heartbeating [[GH-3945](https://github.com/hashicorp/nomad/issues/3945)] * core: A set of features (Autopilot) has been added to allow for automatic operator-friendly management of Nomad servers. For more information about Autopilot, see the [Autopilot Guide](https://www.nomadproject.io/guides/cluster/autopilot.html). [[GH-3670](https://github.com/hashicorp/nomad/pull/3670)] * core: Servers can now service client HTTP endpoints [[GH-3892](https://github.com/hashicorp/nomad/issues/3892)] * core: Servers can now retry connecting to Vault to verify tokens without requiring a SIGHUP to do so [[GH-3957](https://github.com/hashicorp/nomad/issues/3957)] diff --git a/client/client.go b/client/client.go index 14f6cb04306..fc687725849 100644 --- a/client/client.go +++ b/client/client.go @@ -81,6 +81,10 @@ const ( // allocSyncRetryIntv is the interval on which we retry updating // the status of the allocation allocSyncRetryIntv = 5 * time.Second + + // nodeEventsEmitIntv is the interval at which node events are synced with + // the server + nodeEventsEmitIntv = 3 * time.Second ) // ClientStatsReporter exposes all the APIs related to resource usage of a Nomad @@ -133,6 +137,10 @@ type Client struct { // update it. triggerNodeUpdate chan struct{} + // triggerEmitNodeEvent sends an event and triggers the client to update the + // server for the node event + triggerEmitNodeEvent chan *structs.NodeEvent + // discovered will be ticked whenever Consul discovery completes // successfully serversDiscoveredCh chan struct{} @@ -200,20 +208,21 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic // Create the client c := &Client{ - config: cfg, - consulCatalog: consulCatalog, - consulService: consulService, - start: time.Now(), - connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), - tlsWrap: tlsWrap, - streamingRpcs: structs.NewStreamingRpcRegistry(), - logger: logger, - allocs: make(map[string]*AllocRunner), - allocUpdates: make(chan *structs.Allocation, 64), - shutdownCh: make(chan struct{}), - triggerDiscoveryCh: make(chan struct{}), - triggerNodeUpdate: make(chan struct{}, 8), - serversDiscoveredCh: make(chan struct{}), + config: cfg, + consulCatalog: consulCatalog, + consulService: consulService, + start: time.Now(), + connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), + tlsWrap: tlsWrap, + streamingRpcs: structs.NewStreamingRpcRegistry(), + logger: logger, + allocs: make(map[string]*AllocRunner), + allocUpdates: make(chan *structs.Allocation, 64), + shutdownCh: make(chan struct{}), + triggerDiscoveryCh: make(chan struct{}), + triggerNodeUpdate: make(chan struct{}, 8), + serversDiscoveredCh: make(chan struct{}), + triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8), } // Initialize the server manager @@ -1052,6 +1061,9 @@ func (c *Client) registerAndHeartbeat() { // Start watching changes for node changes go c.watchNodeUpdates() + // Start watching for emitting node events + go c.watchEmitEvents() + // Setup the heartbeat timer, for the initial registration // we want to do this quickly. We want to do it extra quickly // in development mode. @@ -1085,6 +1097,14 @@ func (c *Client) registerAndHeartbeat() { // if heartbeating fails, trigger Consul discovery c.triggerDiscovery() + + // trigger a node event to register that the heartbeat failed + nodeEvent := &structs.NodeEvent{ + Message: fmt.Sprintf("Client heartbeat failed at %s", intv), + Subsystem: "Heartbeat", + Timestamp: time.Now().Unix(), + } + c.triggerNodeEvent(nodeEvent) } } else { c.heartbeatLock.Lock() @@ -1131,6 +1151,74 @@ func (c *Client) run() { } } +// submitNodeEvents is used to submit a client-side node event. Examples of +// these kinds of events include when a driver moves from healthy to unhealhty +// (and vice versa) +func (c *Client) submitNodeEvents(e []*structs.NodeEvent) error { + node := c.Node() + nodeEvents := map[string][]*structs.NodeEvent{ + node.ID: e, + } + req := structs.EmitNodeEventRequest{ + NodeEvents: nodeEvents, + WriteRequest: structs.WriteRequest{Region: c.Region()}, + } + var resp structs.EmitNodeEventResponse + if err := c.RPC("Node.EmitEvent", &req, &resp); err != nil { + c.logger.Printf("[ERR] client: emitting node events failed %v", err) + return err + } + c.logger.Printf("[INFO] client: emit node events complete") + return nil +} + +// emitEvent is a handler which receives node events and on a interval and +// submits them in batch format to the server +func (c *Client) watchEmitEvents() { + batchEventsLock := sync.Mutex{} + batchEvents := make([]*structs.NodeEvent, 0) + + timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv)) + defer timer.Stop() + + for { + select { + case event := <-c.triggerEmitNodeEvent: + batchEventsLock.Lock() + batchEvents = append(batchEvents, event) + batchEventsLock.Unlock() + + case <-timer.C: + timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) + + batchEventsLock.Lock() + if len(batchEvents) == 0 { + // if we haven't received any events to emit, continue until the next + // time interval + batchEventsLock.Unlock() + continue + } + + c.submitNodeEvents(batchEvents) + batchEventsLock.Unlock() + + case <-c.shutdownCh: + return + default: + } + } +} + +// emitEvent triggers a emit node event +func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) { + select { + case c.triggerEmitNodeEvent <- nodeEvent: + // emit node event goroutine was released to execute + default: + // emit node event goroutine was already running + } +} + // retryRegisterNode is used to register the node or update the registration and // retry in case of failure. func (c *Client) retryRegisterNode() { From 9ef6ad031a5e6f54e8f022a3af55ccdd12442a73 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Fri, 9 Mar 2018 12:43:20 -0500 Subject: [PATCH 12/19] fix up go check warnings --- client/client.go | 2 +- nomad/node_endpoint_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index fc687725849..893e60c0aca 100644 --- a/client/client.go +++ b/client/client.go @@ -1152,7 +1152,7 @@ func (c *Client) run() { } // submitNodeEvents is used to submit a client-side node event. Examples of -// these kinds of events include when a driver moves from healthy to unhealhty +// these kinds of events include when a driver moves from healthy to unhealthy // (and vice versa) func (c *Client) submitNodeEvents(e []*structs.NodeEvent) error { node := c.Node() diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 7af63be0c81..c140b0eb696 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -99,7 +99,7 @@ func TestClientEndpoint_EmitEvent(t *testing.T) { Timestamp: time.Now().Unix(), } - nodeEvents := map[string][]*structs.NodeEvent{node.ID: []*structs.NodeEvent{nodeEvent}} + nodeEvents := map[string][]*structs.NodeEvent{node.ID: {nodeEvent}} req := structs.EmitNodeEventRequest{ NodeEvents: nodeEvents, WriteRequest: structs.WriteRequest{Region: "global"}, From 3f561c3870c876d1cbd9e3860544a67ffd8a7fd6 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Sun, 11 Mar 2018 18:16:58 -0400 Subject: [PATCH 13/19] keep state store functions in one file --- nomad/state/events_state_store.go | 53 --------------- nomad/state/events_state_store_test.go | 94 -------------------------- nomad/state/state_store.go | 45 ++++++++++++ nomad/state/state_store_test.go | 82 ++++++++++++++++++++++ 4 files changed, 127 insertions(+), 147 deletions(-) delete mode 100644 nomad/state/events_state_store.go delete mode 100644 nomad/state/events_state_store_test.go diff --git a/nomad/state/events_state_store.go b/nomad/state/events_state_store.go deleted file mode 100644 index 7728c849602..00000000000 --- a/nomad/state/events_state_store.go +++ /dev/null @@ -1,53 +0,0 @@ -package state - -import ( - "fmt" - - memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/nomad/structs" -) - -// addNodeEvent is a function which wraps upsertNodeEvent -func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*structs.NodeEvent) error { - txn := s.db.Txn(true) - defer txn.Abort() - - err := s.upsertNodeEvents(index, node, events, txn) - txn.Commit() - return err -} - -// upsertNodeEvent upserts a node event for a respective node. It also maintains -// that only 10 node events are ever stored simultaneously, deleting older -// events once this bound has been reached. -func (s *StateStore) upsertNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent, txn *memdb.Txn) error { - - // Copy the existing node - copyNode := new(structs.Node) - *copyNode = *node - - nodeEvents := node.NodeEvents - - for _, e := range events { - e.CreateIndex = index - e.ModifyIndex = index - - // keep node events pruned to below 10 simultaneously - if len(nodeEvents) >= 10 { - delta := len(nodeEvents) - 10 - nodeEvents = nodeEvents[delta+1:] - } - nodeEvents = append(nodeEvents, e) - copyNode.NodeEvents = nodeEvents - } - - // Insert the node - if err := txn.Insert("nodes", copyNode); err != nil { - return fmt.Errorf("node update failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) - } - - return nil -} diff --git a/nomad/state/events_state_store_test.go b/nomad/state/events_state_store_test.go deleted file mode 100644 index bb2c3a65ded..00000000000 --- a/nomad/state/events_state_store_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package state - -import ( - "fmt" - "testing" - "time" - - memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" -) - -func TestStateStore_AddSingleNodeEvent(t *testing.T) { - require := require.New(t) - state := testStateStore(t) - - node := mock.Node() - - // We create a new node event every time we register a node - err := state.UpsertNode(1000, node) - require.Nil(err) - - require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) - require.Equal("Node Registered", node.NodeEvents[0].Message) - - // Create a watchset so we can test that AddNodeEvent fires the watch - ws := memdb.NewWatchSet() - _, err = state.NodeByID(ws, node.ID) - require.Nil(err) - - nodeEvent := &structs.NodeEvent{ - Message: "failed", - Subsystem: "Driver", - Timestamp: time.Now().Unix(), - } - err = state.AddNodeEvent(1001, node, []*structs.NodeEvent{nodeEvent}) - require.Nil(err) - - require.True(watchFired(ws)) - - ws = memdb.NewWatchSet() - out, err := state.NodeByID(ws, node.ID) - require.Nil(err) - - require.Equal(2, len(out.NodeEvents)) - require.Equal(nodeEvent, out.NodeEvents[1]) -} - -// To prevent stale node events from accumulating, we limit the number of -// stored node events to 10. -func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { - require := require.New(t) - state := testStateStore(t) - - node := mock.Node() - - err := state.UpsertNode(1000, node) - if err != nil { - t.Fatalf("err: %v", err) - } - require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) - require.Equal("Node Registered", node.NodeEvents[0].Message) - - var out *structs.Node - for i := 1; i <= 20; i++ { - ws := memdb.NewWatchSet() - out, err = state.NodeByID(ws, node.ID) - require.Nil(err) - - nodeEvent := &structs.NodeEvent{ - Message: fmt.Sprintf("%dith failed", i), - Subsystem: "Driver", - Timestamp: time.Now().Unix(), - } - err := state.AddNodeEvent(uint64(i), out, []*structs.NodeEvent{nodeEvent}) - require.Nil(err) - - require.True(watchFired(ws)) - ws = memdb.NewWatchSet() - out, err = state.NodeByID(ws, node.ID) - require.Nil(err) - } - - ws := memdb.NewWatchSet() - out, err = state.NodeByID(ws, node.ID) - require.Nil(err) - - require.Equal(10, len(out.NodeEvents)) - require.Equal(uint64(11), out.NodeEvents[0].CreateIndex) - require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex) -} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 56cba6c06f0..a9175688e1c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3693,3 +3693,48 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { } } } + +// addNodeEvent is a function which wraps upsertNodeEvent +func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*structs.NodeEvent) error { + txn := s.db.Txn(true) + defer txn.Abort() + + err := s.upsertNodeEvents(index, node, events, txn) + txn.Commit() + return err +} + +// upsertNodeEvent upserts a node event for a respective node. It also maintains +// that only 10 node events are ever stored simultaneously, deleting older +// events once this bound has been reached. +func (s *StateStore) upsertNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent, txn *memdb.Txn) error { + + // Copy the existing node + copyNode := new(structs.Node) + *copyNode = *node + + nodeEvents := node.NodeEvents + + for _, e := range events { + e.CreateIndex = index + e.ModifyIndex = index + + // keep node events pruned to below 10 simultaneously + if len(nodeEvents) >= 10 { + delta := len(nodeEvents) - 10 + nodeEvents = nodeEvents[delta+1:] + } + nodeEvents = append(nodeEvents, e) + copyNode.NodeEvents = nodeEvents + } + + // Insert the node + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + return nil +} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 0249f627b96..c4b96d56ff4 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -6469,6 +6469,88 @@ func TestStateStore_Abandon(t *testing.T) { } } +func TestStateStore_AddSingleNodeEvent(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + // We create a new node event every time we register a node + err := state.UpsertNode(1000, node) + require.Nil(err) + + require.Equal(1, len(node.NodeEvents)) + require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) + require.Equal("Node Registered", node.NodeEvents[0].Message) + + // Create a watchset so we can test that AddNodeEvent fires the watch + ws := memdb.NewWatchSet() + _, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + nodeEvent := &structs.NodeEvent{ + Message: "failed", + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + err = state.AddNodeEvent(1001, node, []*structs.NodeEvent{nodeEvent}) + require.Nil(err) + + require.True(watchFired(ws)) + + ws = memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(2, len(out.NodeEvents)) + require.Equal(nodeEvent, out.NodeEvents[1]) +} + +// To prevent stale node events from accumulating, we limit the number of +// stored node events to 10. +func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + require.Equal(1, len(node.NodeEvents)) + require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) + require.Equal("Node Registered", node.NodeEvents[0].Message) + + var out *structs.Node + for i := 1; i <= 20; i++ { + ws := memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + nodeEvent := &structs.NodeEvent{ + Message: fmt.Sprintf("%dith failed", i), + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + err := state.AddNodeEvent(uint64(i), out, []*structs.NodeEvent{nodeEvent}) + require.Nil(err) + + require.True(watchFired(ws)) + ws = memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + } + + ws := memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(10, len(out.NodeEvents)) + require.Equal(uint64(11), out.NodeEvents[0].CreateIndex) + require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex) +} + // watchFired is a helper for unit tests that returns if the given watch set // fired (it doesn't care which watch actually fired). This uses a fixed // timeout since we already expect the event happened before calling this and From 6eb9911caf4e3c711e6898dd00f9cb0603cebe8d Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Sun, 11 Mar 2018 21:00:13 -0400 Subject: [PATCH 14/19] code review feedback --- client/client.go | 42 ++++++++------------ command/agent/node_endpoint_test.go | 2 +- command/node_status.go | 6 ++- nomad/fsm.go | 25 +++--------- nomad/fsm_test.go | 4 +- nomad/node_endpoint.go | 8 ++-- nomad/node_endpoint_test.go | 6 +-- nomad/state/state_store.go | 61 +++++++++++++++++------------ nomad/state/state_store_test.go | 11 +++++- nomad/structs/structs.go | 17 +++++--- 10 files changed, 95 insertions(+), 87 deletions(-) diff --git a/client/client.go b/client/client.go index 893e60c0aca..05f5afc93cd 100644 --- a/client/client.go +++ b/client/client.go @@ -1097,14 +1097,6 @@ func (c *Client) registerAndHeartbeat() { // if heartbeating fails, trigger Consul discovery c.triggerDiscovery() - - // trigger a node event to register that the heartbeat failed - nodeEvent := &structs.NodeEvent{ - Message: fmt.Sprintf("Client heartbeat failed at %s", intv), - Subsystem: "Heartbeat", - Timestamp: time.Now().Unix(), - } - c.triggerNodeEvent(nodeEvent) } } else { c.heartbeatLock.Lock() @@ -1154,28 +1146,26 @@ func (c *Client) run() { // submitNodeEvents is used to submit a client-side node event. Examples of // these kinds of events include when a driver moves from healthy to unhealthy // (and vice versa) -func (c *Client) submitNodeEvents(e []*structs.NodeEvent) error { - node := c.Node() +func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error { + nodeID := c.Node().ID nodeEvents := map[string][]*structs.NodeEvent{ - node.ID: e, + nodeID: events, } - req := structs.EmitNodeEventRequest{ + req := structs.EmitNodeEventsRequest{ NodeEvents: nodeEvents, WriteRequest: structs.WriteRequest{Region: c.Region()}, } - var resp structs.EmitNodeEventResponse - if err := c.RPC("Node.EmitEvent", &req, &resp); err != nil { - c.logger.Printf("[ERR] client: emitting node events failed %v", err) - return err + var resp structs.EmitNodeEventsResponse + if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil { + return fmt.Errorf("Emitting node event failed: %v", err) } c.logger.Printf("[INFO] client: emit node events complete") return nil } -// emitEvent is a handler which receives node events and on a interval and +// watchEmitEvents is a handler which receives node events and on a interval and // submits them in batch format to the server func (c *Client) watchEmitEvents() { - batchEventsLock := sync.Mutex{} batchEvents := make([]*structs.NodeEvent, 0) timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv)) @@ -1184,23 +1174,25 @@ func (c *Client) watchEmitEvents() { for { select { case event := <-c.triggerEmitNodeEvent: - batchEventsLock.Lock() batchEvents = append(batchEvents, event) - batchEventsLock.Unlock() case <-timer.C: timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) - batchEventsLock.Lock() if len(batchEvents) == 0 { // if we haven't received any events to emit, continue until the next // time interval - batchEventsLock.Unlock() continue } - c.submitNodeEvents(batchEvents) - batchEventsLock.Unlock() + err := c.submitNodeEvents(batchEvents) + if err != nil { + batchEvents = make([]*structs.NodeEvent, 0) + c.logger.Printf("[ERR] client: Failure in thie process of trying to submit node events: %v", err) + } else if len(batchEvents) >= structs.MaxRetainedNodeEvents { + // Truncate list to under 10 + batchEvents = make([]*structs.NodeEvent, 0) + } case <-c.shutdownCh: return @@ -1209,7 +1201,7 @@ func (c *Client) watchEmitEvents() { } } -// emitEvent triggers a emit node event +// triggerNodeEvent triggers a emit node event func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) { select { case c.triggerEmitNodeEvent <- nodeEvent: diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index 113fa5de877..16ba008de6f 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -396,7 +396,7 @@ func TestHTTP_NodeQuery(t *testing.T) { if n.ID != node.ID { t.Fatalf("bad: %#v", n) } - if len(n.NodeEvents) != 1 { + if len(n.NodeEvents) < 1 { t.Fatalf("Expected node registration event to be populated: %#v", n) } if n.NodeEvents[0].Message != "Node Registered" { diff --git a/command/node_status.go b/command/node_status.go index 9b35ae758fd..bf5fef683e3 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -389,6 +389,10 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { + if !c.verbose { + return + } + c.Ui.Output(c.Colorize().Color("\n[bold]Node Events ")) c.outputNodeEvent(node.NodeEvents) } @@ -396,7 +400,7 @@ func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) { size := len(events) nodeEvents := make([]string, size+1) - nodeEvents[0] = "Timestamp|Subsystem|Message|Details" + nodeEvents[0] = "Time|Subsystem|Message|Details" for i, event := range events { timestamp := formatUnixNanoTime(event.Timestamp) diff --git a/nomad/fsm.go b/nomad/fsm.go index 0956b428f44..2b632911974 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -236,7 +236,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyACLTokenBootstrap(buf[1:], log.Index) case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) - case structs.AddNodeEventType: + case structs.AddNodeEventsType: return n.applyAddNodeEventType(buf[1:], log.Index) } @@ -633,28 +633,15 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} // applyAddNodeEventType applies a node event to the set of currently-available // events. func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} { - var req structs.EmitNodeEventRequest + var req structs.EmitNodeEventsRequest if err := structs.Decode(buf, &req); err != nil { - n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventREquest: %v", err) + n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventRequest: %v", err) return err } - for nodeID, nodeEvents := range req.NodeEvents { - ws := memdb.NewWatchSet() - node, err := n.state.NodeByID(ws, nodeID) - - if err != nil { - return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) - } - - if node == nil { - return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID) - } - - if err := n.state.AddNodeEvent(index, node, nodeEvents); err != nil { - n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) - return err - } + if err := n.state.AddNodeEvent(index, req.NodeEvents); err != nil { + n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) + return err } return nil diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 52c441402d2..5d316cb66ac 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -96,11 +96,11 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { nodeEvents := []*structs.NodeEvent{nodeEvent} allEvents := map[string][]*structs.NodeEvent{node.ID: nodeEvents} - req := structs.EmitNodeEventRequest{ + req := structs.EmitNodeEventsRequest{ NodeEvents: allEvents, WriteRequest: structs.WriteRequest{Region: "global"}, } - buf, err := structs.Encode(structs.AddNodeEventType, req) + buf, err := structs.Encode(structs.AddNodeEventsType, req) require.Nil(err) // the response in this case will be an error diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 3cefa026e2c..916c7ee08e3 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -52,16 +52,16 @@ type Node struct { updatesLock sync.Mutex } -func (n *Node) EmitEvent(args *structs.EmitNodeEventRequest, reply *structs.EmitNodeEventResponse) error { - if done, err := n.srv.forward("Node.EmitEvent", args, args, reply); done { +func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error { + if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now()) - _, index, err := n.srv.raftApply(structs.AddNodeEventType, args) + _, index, err := n.srv.raftApply(structs.AddNodeEventsType, args) if err != nil { - n.srv.logger.Printf("[ERR] nomad.node AddNodeEventType failed: %v", err) + n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err) return err } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index c140b0eb696..70c27d1c1c3 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -78,7 +78,7 @@ func TestClientEndpoint_Register(t *testing.T) { }) } -func TestClientEndpoint_EmitEvent(t *testing.T) { +func TestClientEndpoint_EmitEvents(t *testing.T) { t.Parallel() require := require.New(t) @@ -100,13 +100,13 @@ func TestClientEndpoint_EmitEvent(t *testing.T) { } nodeEvents := map[string][]*structs.NodeEvent{node.ID: {nodeEvent}} - req := structs.EmitNodeEventRequest{ + req := structs.EmitNodeEventsRequest{ NodeEvents: nodeEvents, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.GenericResponse - err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvent", &req, &resp) + err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvents", &req, &resp) require.Nil(err) require.NotEqual(0, resp.Index) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index a9175688e1c..5a774c22b53 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -538,7 +538,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { Timestamp: node.StatusUpdatedAt, } - node.NodeEvents = make([]*structs.NodeEvent, 0) + node.NodeEvents = make([]*structs.NodeEvent, 0, 1) node.NodeEvents = append(node.NodeEvents, nodeEvent) node.CreateIndex = index @@ -3695,11 +3695,11 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { } // addNodeEvent is a function which wraps upsertNodeEvent -func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*structs.NodeEvent) error { +func (s *StateStore) AddNodeEvent(index uint64, events map[string][]*structs.NodeEvent) error { txn := s.db.Txn(true) defer txn.Abort() - err := s.upsertNodeEvents(index, node, events, txn) + err := s.upsertNodeEvents(index, events, txn) txn.Commit() return err } @@ -3707,33 +3707,46 @@ func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*st // upsertNodeEvent upserts a node event for a respective node. It also maintains // that only 10 node events are ever stored simultaneously, deleting older // events once this bound has been reached. -func (s *StateStore) upsertNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent, txn *memdb.Txn) error { +func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent, txn *memdb.Txn) error { - // Copy the existing node - copyNode := new(structs.Node) - *copyNode = *node + for nodeID, events := range nodeEvents { + ws := memdb.NewWatchSet() + node, err := s.NodeByID(ws, nodeID) + + if err != nil { + return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) + } - nodeEvents := node.NodeEvents + if node == nil { + return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID) + } + + // Copy the existing node + copyNode := new(structs.Node) + *copyNode = *node + + nodeEvents := node.NodeEvents - for _, e := range events { - e.CreateIndex = index - e.ModifyIndex = index + for _, e := range events { + e.CreateIndex = index + e.ModifyIndex = index - // keep node events pruned to below 10 simultaneously - if len(nodeEvents) >= 10 { - delta := len(nodeEvents) - 10 - nodeEvents = nodeEvents[delta+1:] + // keep node events pruned to below 10 simultaneously + if len(nodeEvents) >= structs.MaxRetainedNodeEvents { + delta := len(nodeEvents) - 10 + nodeEvents = nodeEvents[delta+1:] + } + nodeEvents = append(nodeEvents, e) + copyNode.NodeEvents = nodeEvents } - nodeEvents = append(nodeEvents, e) - copyNode.NodeEvents = nodeEvents - } - // Insert the node - if err := txn.Insert("nodes", copyNode); err != nil { - return fmt.Errorf("node update failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) + // Insert the node + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } } return nil diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index c4b96d56ff4..6bff39d6ead 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -6493,7 +6493,10 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { Subsystem: "Driver", Timestamp: time.Now().Unix(), } - err = state.AddNodeEvent(1001, node, []*structs.NodeEvent{nodeEvent}) + nodeEvents := map[string][]*structs.NodeEvent{ + node.ID: []*structs.NodeEvent{nodeEvent}, + } + err = state.AddNodeEvent(uint64(1001), nodeEvents) require.Nil(err) require.True(watchFired(ws)) @@ -6533,7 +6536,11 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { Subsystem: "Driver", Timestamp: time.Now().Unix(), } - err := state.AddNodeEvent(uint64(i), out, []*structs.NodeEvent{nodeEvent}) + + nodeEvents := map[string][]*structs.NodeEvent{ + out.ID: []*structs.NodeEvent{nodeEvent}, + } + err := state.AddNodeEvent(uint64(i), nodeEvents) require.Nil(err) require.True(watchFired(ws)) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 14a6ec2f80b..e9d2e7d496b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -74,7 +74,7 @@ const ( ACLTokenDeleteRequestType ACLTokenBootstrapRequestType AutopilotRequestType - AddNodeEventType + AddNodeEventsType ) const ( @@ -123,6 +123,10 @@ const ( // the fraction. So 16 == 6.25% limit of jitter. This jitter is also // applied to RPCHoldTimeout. JitterFraction = 16 + + // MaxRetainedNodeEvents is the maximum number of node events that will be + // retained for a single node + MaxRetainedNodeEvents = 10 ) // Context defines the scope in which a search for Nomad object operates, and @@ -1154,7 +1158,8 @@ type Node struct { // updated StatusUpdatedAt int64 - // NodeEvents is the most recent set of events generated for the node + // NodeEvents is the most recent set of events generated for the node, + // retaining only MaxRetainedNodeEvents number at a time NodeEvents []*NodeEvent // Raft Indexes @@ -1183,9 +1188,9 @@ type NodeEvent struct { ModifyIndex uint64 } -// EmitNodeEventRequest is a request to update the node events source +// EmitNodeEventsRequest is a request to update the node events source // with a new client-side event -type EmitNodeEventRequest struct { +type EmitNodeEventsRequest struct { // NodeEvents are a map where the key is a node id, and value is a list of // events for that node NodeEvents map[string][]*NodeEvent @@ -1193,9 +1198,9 @@ type EmitNodeEventRequest struct { WriteRequest } -// EmitNodeEventResponse is a response to the client about the status of +// EmitNodeEventsResponse is a response to the client about the status of // the node event source update. -type EmitNodeEventResponse struct { +type EmitNodeEventsResponse struct { Index uint64 WriteMeta } From 3a1043def921e9e786a94719aa22c72b2c02f668 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Mon, 12 Mar 2018 14:32:44 -0400 Subject: [PATCH 15/19] make check fixes --- nomad/state/state_store_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 6bff39d6ead..b6553654f2e 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -6494,7 +6494,7 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { Timestamp: time.Now().Unix(), } nodeEvents := map[string][]*structs.NodeEvent{ - node.ID: []*structs.NodeEvent{nodeEvent}, + node.ID: {nodeEvent}, } err = state.AddNodeEvent(uint64(1001), nodeEvents) require.Nil(err) @@ -6538,7 +6538,7 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { } nodeEvents := map[string][]*structs.NodeEvent{ - out.ID: []*structs.NodeEvent{nodeEvent}, + out.ID: {nodeEvent}, } err := state.AddNodeEvent(uint64(i), nodeEvents) require.Nil(err) From 319f80907cf1df94d3c624a5cd13121ff915c3e7 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Tue, 13 Mar 2018 09:33:53 -0400 Subject: [PATCH 16/19] code review feedback --- client/client.go | 56 +++++++++++++++++--------------------- command/node_status.go | 17 ++++++++---- nomad/node_endpoint.go | 6 ++++ nomad/state/state_store.go | 5 ++-- nomad/structs/structs.go | 21 +++++++++----- 5 files changed, 58 insertions(+), 47 deletions(-) diff --git a/client/client.go b/client/client.go index 05f5afc93cd..8a17b414b59 100644 --- a/client/client.go +++ b/client/client.go @@ -81,10 +81,6 @@ const ( // allocSyncRetryIntv is the interval on which we retry updating // the status of the allocation allocSyncRetryIntv = 5 * time.Second - - // nodeEventsEmitIntv is the interval at which node events are synced with - // the server - nodeEventsEmitIntv = 3 * time.Second ) // ClientStatsReporter exposes all the APIs related to resource usage of a Nomad @@ -1062,7 +1058,7 @@ func (c *Client) registerAndHeartbeat() { go c.watchNodeUpdates() // Start watching for emitting node events - go c.watchEmitEvents() + go c.watchNodeEvents() // Setup the heartbeat timer, for the initial registration // we want to do this quickly. We want to do it extra quickly @@ -1147,7 +1143,7 @@ func (c *Client) run() { // these kinds of events include when a driver moves from healthy to unhealthy // (and vice versa) func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error { - nodeID := c.Node().ID + nodeID := c.NodeID() nodeEvents := map[string][]*structs.NodeEvent{ nodeID: events, } @@ -1159,44 +1155,42 @@ func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error { if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil { return fmt.Errorf("Emitting node event failed: %v", err) } - c.logger.Printf("[INFO] client: emit node events complete") return nil } -// watchEmitEvents is a handler which receives node events and on a interval and -// submits them in batch format to the server -func (c *Client) watchEmitEvents() { - batchEvents := make([]*structs.NodeEvent, 0) +// watchNodeEvents is a handler which receives node events and on a interval +// and submits them in batch format to the server +func (c *Client) watchNodeEvents() { + // batchEvents stores events that have yet to be published + var batchEvents []*structs.NodeEvent - timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv)) + // Create and drain the timer + timer := time.NewTimer(0) + timer.Stop() + select { + case <-timer.C: + default: + } defer timer.Stop() for { select { case event := <-c.triggerEmitNodeEvent: - batchEvents = append(batchEvents, event) - - case <-timer.C: - timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) - - if len(batchEvents) == 0 { - // if we haven't received any events to emit, continue until the next - // time interval - continue + if l := len(batchEvents); l <= structs.MaxRetainedNodeEvents { + batchEvents = append(batchEvents, event) + } else { + // Drop the oldest event + c.logger.Printf("[WARN] client: dropping node event: %v", batchEvents[0]) + batchEvents = append(batchEvents[1:], event) } - - err := c.submitNodeEvents(batchEvents) - if err != nil { - batchEvents = make([]*structs.NodeEvent, 0) - c.logger.Printf("[ERR] client: Failure in thie process of trying to submit node events: %v", err) - } else if len(batchEvents) >= structs.MaxRetainedNodeEvents { - // Truncate list to under 10 - batchEvents = make([]*structs.NodeEvent, 0) + timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) + case <-timer.C: + if err := c.submitNodeEvents(batchEvents); err != nil { + c.logger.Printf("[ERR] client: submitting node events failed: %v", err) + timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) } - case <-c.shutdownCh: return - default: } } } diff --git a/command/node_status.go b/command/node_status.go index bf5fef683e3..60fd5c358f0 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -389,9 +389,6 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { - if !c.verbose { - return - } c.Ui.Output(c.Colorize().Color("\n[bold]Node Events ")) c.outputNodeEvent(node.NodeEvents) @@ -400,14 +397,22 @@ func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) { size := len(events) nodeEvents := make([]string, size+1) - nodeEvents[0] = "Time|Subsystem|Message|Details" + if c.verbose { + nodeEvents[0] = "Time|Subsystem|Message|Details" + } else { + nodeEvents[0] = "Time|Subsystem|Message" + } for i, event := range events { timestamp := formatUnixNanoTime(event.Timestamp) subsystem := event.Subsystem msg := event.Message - details := formatEventDetails(event.Details) - nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details) + if c.verbose { + details := formatEventDetails(event.Details) + nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s|%s", timestamp, subsystem, msg, details) + } else { + nodeEvents[size-i] = fmt.Sprintf("%s|%s|%s", timestamp, subsystem, msg) + } } c.Ui.Output(formatList(nodeEvents)) } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 916c7ee08e3..ca7c0cb80f8 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -58,6 +58,12 @@ func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.Em } defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now()) + if args.NodeEvents == nil { + err := fmt.Errorf("No event to add; node event map is nil") + n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err) + return err + } + _, index, err := n.srv.raftApply(structs.AddNodeEventsType, args) if err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 5a774c22b53..58399e56b4e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3722,8 +3722,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*str } // Copy the existing node - copyNode := new(structs.Node) - *copyNode = *node + copyNode := node.Copy() nodeEvents := node.NodeEvents @@ -3733,7 +3732,7 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*str // keep node events pruned to below 10 simultaneously if len(nodeEvents) >= structs.MaxRetainedNodeEvents { - delta := len(nodeEvents) - 10 + delta := len(nodeEvents) - structs.MaxRetainedNodeEvents nodeEvents = nodeEvents[delta+1:] } nodeEvents = append(nodeEvents, e) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e9d2e7d496b..78b50702572 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1188,6 +1188,15 @@ type NodeEvent struct { ModifyIndex uint64 } +func (ne *NodeEvent) String() string { + var details string + for k, v := range ne.Details { + details = fmt.Sprintf("%s: %s", k, v) + } + + return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), details, ne.Timestamp) +} + // EmitNodeEventsRequest is a request to update the node events source // with a new client-side event type EmitNodeEventsRequest struct { @@ -1221,16 +1230,14 @@ func (n *Node) Copy() *Node { nn.Reserved = nn.Reserved.Copy() nn.Links = helper.CopyMapStringString(nn.Links) nn.Meta = helper.CopyMapStringString(nn.Meta) - nn.NodeEvents = copyNodeEvents(n) + nn.NodeEvents = copyNodeEvents(n.NodeEvents) return nn } -func copyNodeEvents(first *Node) []*NodeEvent { - nodeEvents := make([]*NodeEvent, 0) - for _, e := range first.NodeEvents { - nodeEvents = append(nodeEvents, e) - } - return nodeEvents +func copyNodeEvents(first []*NodeEvent) []*NodeEvent { + second := make([]*NodeEvent, len(first)) + copy(second, first) + return second } // TerminalStatus returns if the current status is terminal and From 57ddd511ccf2f191e1f816d75a1bc521ae6a2be2 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 13 Mar 2018 17:52:12 -0700 Subject: [PATCH 17/19] fixes --- command/node_status.go | 7 +- nomad/fsm.go | 16 +-- nomad/fsm_test.go | 4 +- nomad/node_endpoint.go | 50 ++++----- nomad/state/state_store.go | 122 +++++++++++----------- nomad/state/state_store_test.go | 178 ++++++++++++++++---------------- nomad/structs/structs.go | 119 +++++++++++---------- 7 files changed, 255 insertions(+), 241 deletions(-) diff --git a/command/node_status.go b/command/node_status.go index 60fd5c358f0..85b25bb76e6 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -389,7 +389,6 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { } func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { - c.Ui.Output(c.Colorize().Color("\n[bold]Node Events ")) c.outputNodeEvent(node.NodeEvents) } @@ -418,11 +417,11 @@ func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) { } func formatEventDetails(details map[string]string) string { - var output string + output := make([]string, 0, len(details)) for k, v := range details { - output += fmt.Sprintf("%s: %s, ", k, v) + output = append(output, fmt.Sprintf("%s: %s, ", k, v)) } - return output + return strings.Join(output, ", ") } func (c *NodeStatusCommand) formatAttributes(node *api.Node) { diff --git a/nomad/fsm.go b/nomad/fsm.go index 2b632911974..de51dfed6f3 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -236,8 +236,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyACLTokenBootstrap(buf[1:], log.Index) case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) - case structs.AddNodeEventsType: - return n.applyAddNodeEventType(buf[1:], log.Index) + case structs.UpsertNodeEventsType: + return n.applyUpsertNodeEventType(buf[1:], log.Index) } // Check enterprise only message types. @@ -630,17 +630,17 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} return n.reconcileQueuedAllocations(index) } -// applyAddNodeEventType applies a node event to the set of currently-available -// events. -func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} { +// applyUpsertNodeEventType tracks the given node events. +func (n *nomadFSM) applyUpsertNodeEventType(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now()) var req structs.EmitNodeEventsRequest if err := structs.Decode(buf, &req); err != nil { - n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventRequest: %v", err) + n.logger.Printf("[ERR] nomad.fsm: failed to decode EmitNodeEventsRequest: %v", err) return err } - if err := n.state.AddNodeEvent(index, req.NodeEvents); err != nil { - n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err) + if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil { + n.logger.Printf("[ERR] nomad.fsm: failed to add node events: %v", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 5d316cb66ac..6205c703830 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -74,7 +74,7 @@ func makeLog(buf []byte) *raft.Log { } } -func TestFSM_ApplyNodeEvent(t *testing.T) { +func TestFSM_UpsertNodeEvents(t *testing.T) { t.Parallel() require := require.New(t) fsm := testFSM(t) @@ -100,7 +100,7 @@ func TestFSM_ApplyNodeEvent(t *testing.T) { NodeEvents: allEvents, WriteRequest: structs.WriteRequest{Region: "global"}, } - buf, err := structs.Encode(structs.AddNodeEventsType, req) + buf, err := structs.Encode(structs.UpsertNodeEventsType, req) require.Nil(err) // the response in this case will be an error diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index ca7c0cb80f8..5b78190c7fd 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -52,29 +52,6 @@ type Node struct { updatesLock sync.Mutex } -func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error { - if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done { - return err - } - defer metrics.MeasureSince([]string{"nomad", "client", "emit_event"}, time.Now()) - - if args.NodeEvents == nil { - err := fmt.Errorf("No event to add; node event map is nil") - n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err) - return err - } - - _, index, err := n.srv.raftApply(structs.AddNodeEventsType, args) - - if err != nil { - n.srv.logger.Printf("[ERR] nomad.node AddNodeEventsType failed: %v", err) - return err - } - - reply.Index = index - return nil -} - // Register is used to upsert a client that is available for scheduling func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { if done, err := n.srv.forward("Node.Register", args, args, reply); done { @@ -1380,3 +1357,30 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, n.srv.setQueryMeta(&reply.QueryMeta) return nil } + +func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error { + if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now()) + + if len(args.NodeEvents) == 0 { + return fmt.Errorf("no node events given") + } + for nodeID, events := range args.NodeEvents { + if len(events) == 0 { + return fmt.Errorf("no node events given for node %q", nodeID) + } + } + + // TODO ACLs + + _, index, err := n.srv.raftApply(structs.UpsertNodeEventsType, args) + if err != nil { + n.srv.logger.Printf("[ERR] nomad.node upserting node events failed: %v", err) + return err + } + + reply.Index = index + return nil +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 58399e56b4e..8a6f4ce5081 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -537,10 +537,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { Subsystem: "Cluster", Timestamp: node.StatusUpdatedAt, } - - node.NodeEvents = make([]*structs.NodeEvent, 0, 1) - node.NodeEvents = append(node.NodeEvents, nodeEvent) - + node.NodeEvents = []*structs.NodeEvent{nodeEvent} node.CreateIndex = index node.ModifyIndex = index } @@ -634,8 +631,7 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er // Copy the existing node existingNode := existing.(*structs.Node) - copyNode := new(structs.Node) - *copyNode = *existingNode + copyNode := existingNode.Copy() // Update the drain in the copy copyNode.Drain = drain @@ -653,6 +649,63 @@ func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain bool) er return nil } +// UpsertNodeEvents adds the node events to the nodes, rotating events as +// necessary. +func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error { + txn := s.db.Txn(true) + defer txn.Abort() + + for nodeID, events := range nodeEvents { + if err := s.upsertNodeEvents(index, nodeID, events, txn); err != nil { + return err + } + } + + txn.Commit() + return nil +} + +// upsertNodeEvent upserts a node event for a respective node. It also maintains +// that a fixed number of node events are ever stored simultaneously, deleting +// older events once this bound has been reached. +func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *memdb.Txn) error { + // Lookup the node + existing, err := txn.First("nodes", "id", nodeID) + if err != nil { + return fmt.Errorf("node lookup failed: %v", err) + } + if existing == nil { + return fmt.Errorf("node not found") + } + + // Copy the existing node + existingNode := existing.(*structs.Node) + copyNode := existingNode.Copy() + + // Add the events, updating the indexes + for _, e := range events { + e.CreateIndex = index + e.ModifyIndex = index + copyNode.NodeEvents = append(copyNode.NodeEvents, e) + } + + // Keep node events pruned to not exceed the max allowed + if l := len(copyNode.NodeEvents); l > structs.MaxRetainedNodeEvents { + delta := l - structs.MaxRetainedNodeEvents + copyNode.NodeEvents = copyNode.NodeEvents[delta:] + } + + // Insert the node + if err := txn.Insert("nodes", copyNode); err != nil { + return fmt.Errorf("node update failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + return nil +} + // NodeByID is used to lookup a node by ID func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) { txn := s.db.Txn(false) @@ -3693,60 +3746,3 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { } } } - -// addNodeEvent is a function which wraps upsertNodeEvent -func (s *StateStore) AddNodeEvent(index uint64, events map[string][]*structs.NodeEvent) error { - txn := s.db.Txn(true) - defer txn.Abort() - - err := s.upsertNodeEvents(index, events, txn) - txn.Commit() - return err -} - -// upsertNodeEvent upserts a node event for a respective node. It also maintains -// that only 10 node events are ever stored simultaneously, deleting older -// events once this bound has been reached. -func (s *StateStore) upsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent, txn *memdb.Txn) error { - - for nodeID, events := range nodeEvents { - ws := memdb.NewWatchSet() - node, err := s.NodeByID(ws, nodeID) - - if err != nil { - return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err) - } - - if node == nil { - return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID) - } - - // Copy the existing node - copyNode := node.Copy() - - nodeEvents := node.NodeEvents - - for _, e := range events { - e.CreateIndex = index - e.ModifyIndex = index - - // keep node events pruned to below 10 simultaneously - if len(nodeEvents) >= structs.MaxRetainedNodeEvents { - delta := len(nodeEvents) - structs.MaxRetainedNodeEvents - nodeEvents = nodeEvents[delta+1:] - } - nodeEvents = append(nodeEvents, e) - copyNode.NodeEvents = nodeEvents - } - - // Insert the node - if err := txn.Insert("nodes", copyNode); err != nil { - return fmt.Errorf("node update failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) - } - } - - return nil -} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index b6553654f2e..acba103c446 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -748,6 +748,95 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { } } +func TestStateStore_AddSingleNodeEvent(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + // We create a new node event every time we register a node + err := state.UpsertNode(1000, node) + require.Nil(err) + + require.Equal(1, len(node.NodeEvents)) + require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) + require.Equal("Node Registered", node.NodeEvents[0].Message) + + // Create a watchset so we can test that AddNodeEvent fires the watch + ws := memdb.NewWatchSet() + _, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + nodeEvent := &structs.NodeEvent{ + Message: "failed", + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + nodeEvents := map[string][]*structs.NodeEvent{ + node.ID: {nodeEvent}, + } + err = state.UpsertNodeEvents(uint64(1001), nodeEvents) + require.Nil(err) + + require.True(watchFired(ws)) + + ws = memdb.NewWatchSet() + out, err := state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(2, len(out.NodeEvents)) + require.Equal(nodeEvent, out.NodeEvents[1]) +} + +// To prevent stale node events from accumulating, we limit the number of +// stored node events to 10. +func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { + require := require.New(t) + state := testStateStore(t) + + node := mock.Node() + + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + require.Equal(1, len(node.NodeEvents)) + require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) + require.Equal("Node Registered", node.NodeEvents[0].Message) + + var out *structs.Node + for i := 1; i <= 20; i++ { + ws := memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + nodeEvent := &structs.NodeEvent{ + Message: fmt.Sprintf("%dith failed", i), + Subsystem: "Driver", + Timestamp: time.Now().Unix(), + } + + nodeEvents := map[string][]*structs.NodeEvent{ + out.ID: {nodeEvent}, + } + err := state.UpsertNodeEvents(uint64(i), nodeEvents) + require.Nil(err) + + require.True(watchFired(ws)) + ws = memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + } + + ws := memdb.NewWatchSet() + out, err = state.NodeByID(ws, node.ID) + require.Nil(err) + + require.Equal(10, len(out.NodeEvents)) + require.Equal(uint64(11), out.NodeEvents[0].CreateIndex) + require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex) +} + func TestStateStore_Nodes(t *testing.T) { state := testStateStore(t) var nodes []*structs.Node @@ -6469,95 +6558,6 @@ func TestStateStore_Abandon(t *testing.T) { } } -func TestStateStore_AddSingleNodeEvent(t *testing.T) { - require := require.New(t) - state := testStateStore(t) - - node := mock.Node() - - // We create a new node event every time we register a node - err := state.UpsertNode(1000, node) - require.Nil(err) - - require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) - require.Equal("Node Registered", node.NodeEvents[0].Message) - - // Create a watchset so we can test that AddNodeEvent fires the watch - ws := memdb.NewWatchSet() - _, err = state.NodeByID(ws, node.ID) - require.Nil(err) - - nodeEvent := &structs.NodeEvent{ - Message: "failed", - Subsystem: "Driver", - Timestamp: time.Now().Unix(), - } - nodeEvents := map[string][]*structs.NodeEvent{ - node.ID: {nodeEvent}, - } - err = state.AddNodeEvent(uint64(1001), nodeEvents) - require.Nil(err) - - require.True(watchFired(ws)) - - ws = memdb.NewWatchSet() - out, err := state.NodeByID(ws, node.ID) - require.Nil(err) - - require.Equal(2, len(out.NodeEvents)) - require.Equal(nodeEvent, out.NodeEvents[1]) -} - -// To prevent stale node events from accumulating, we limit the number of -// stored node events to 10. -func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { - require := require.New(t) - state := testStateStore(t) - - node := mock.Node() - - err := state.UpsertNode(1000, node) - if err != nil { - t.Fatalf("err: %v", err) - } - require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) - require.Equal("Node Registered", node.NodeEvents[0].Message) - - var out *structs.Node - for i := 1; i <= 20; i++ { - ws := memdb.NewWatchSet() - out, err = state.NodeByID(ws, node.ID) - require.Nil(err) - - nodeEvent := &structs.NodeEvent{ - Message: fmt.Sprintf("%dith failed", i), - Subsystem: "Driver", - Timestamp: time.Now().Unix(), - } - - nodeEvents := map[string][]*structs.NodeEvent{ - out.ID: {nodeEvent}, - } - err := state.AddNodeEvent(uint64(i), nodeEvents) - require.Nil(err) - - require.True(watchFired(ws)) - ws = memdb.NewWatchSet() - out, err = state.NodeByID(ws, node.ID) - require.Nil(err) - } - - ws := memdb.NewWatchSet() - out, err = state.NodeByID(ws, node.ID) - require.Nil(err) - - require.Equal(10, len(out.NodeEvents)) - require.Equal(uint64(11), out.NodeEvents[0].CreateIndex) - require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex) -} - // watchFired is a helper for unit tests that returns if the given watch set // fired (it doesn't care which watch actually fired). This uses a fixed // timeout since we already expect the event happened before calling this and diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 78b50702572..e3fbe9cf704 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -74,7 +74,7 @@ const ( ACLTokenDeleteRequestType ACLTokenBootstrapRequestType AutopilotRequestType - AddNodeEventsType + UpsertNodeEventsType ) const ( @@ -1055,6 +1055,60 @@ type NodeConnQueryResponse struct { QueryMeta } +// EmitNodeEventsRequest is a request to update the node events source +// with a new client-side event +type EmitNodeEventsRequest struct { + // NodeEvents are a map where the key is a node id, and value is a list of + // events for that node + NodeEvents map[string][]*NodeEvent + + WriteRequest +} + +// EmitNodeEventsResponse is a response to the client about the status of +// the node event source update. +type EmitNodeEventsResponse struct { + Index uint64 + WriteMeta +} + +// TODO needs to be a more specific name +// Subsystem denotes the subsystem where a node event took place. +type Subsystem string + +const ( + Drain Subsystem = "Drain" + Driver Subsystem = "Driver" + Heartbeat Subsystem = "Heartbeat" + Cluster Subsystem = "Cluster" +) + +// NodeEvent is a single unit representing a node’s state change +type NodeEvent struct { + Message string + Subsystem Subsystem + Details map[string]string + Timestamp int64 + CreateIndex uint64 + ModifyIndex uint64 +} + +func (ne *NodeEvent) String() string { + var details []string + for k, v := range ne.Details { + details = append(details, fmt.Sprintf("%s: %s", k, v)) + } + + return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), strings.Join(details, ","), ne.Timestamp) +} + +func (ne *NodeEvent) Copy() *NodeEvent { + c := new(NodeEvent) + *c = *ne + c.Details = helper.CopyMapStringString(ne.Details) + return c +} + const ( NodeStatusInit = "initializing" NodeStatusReady = "ready" @@ -1167,53 +1221,6 @@ type Node struct { ModifyIndex uint64 } -// Subsystem denotes the subsystem where a node event took place. -type Subsystem string - -const ( - Drain Subsystem = "Drain" - Driver Subsystem = "Driver" - Heartbeat Subsystem = "Heartbeat" - Cluster Subsystem = "CLuster" -) - -// NodeEvent is a single unit representing a node’s state change -type NodeEvent struct { - Message string - Subsystem Subsystem - Details map[string]string - Timestamp int64 - - CreateIndex uint64 - ModifyIndex uint64 -} - -func (ne *NodeEvent) String() string { - var details string - for k, v := range ne.Details { - details = fmt.Sprintf("%s: %s", k, v) - } - - return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), details, ne.Timestamp) -} - -// EmitNodeEventsRequest is a request to update the node events source -// with a new client-side event -type EmitNodeEventsRequest struct { - // NodeEvents are a map where the key is a node id, and value is a list of - // events for that node - NodeEvents map[string][]*NodeEvent - - WriteRequest -} - -// EmitNodeEventsResponse is a response to the client about the status of -// the node event source update. -type EmitNodeEventsResponse struct { - Index uint64 - WriteMeta -} - // Ready returns if the node is ready for running allocations func (n *Node) Ready() bool { return n.Status == NodeStatusReady && !n.Drain @@ -1234,10 +1241,18 @@ func (n *Node) Copy() *Node { return nn } -func copyNodeEvents(first []*NodeEvent) []*NodeEvent { - second := make([]*NodeEvent, len(first)) - copy(second, first) - return second +// copyNodeEvents is a helper to copy a list of NodeEvent's +func copyNodeEvents(events []*NodeEvent) []*NodeEvent { + l := len(events) + if l == 0 { + return nil + } + + c := make([]*NodeEvent, l) + for i, event := range events { + c[i] = event.Copy() + } + return c } // TerminalStatus returns if the current status is terminal and From eceabb66f8da293650af6350bfd3da30a49f29be Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 13 Mar 2018 17:59:37 -0700 Subject: [PATCH 18/19] nodeevents -> events --- api/nodes.go | 15 ++++++--------- api/nodes_test.go | 2 +- command/agent/node_endpoint_test.go | 4 ++-- command/node_status.go | 2 +- nomad/fsm_test.go | 4 ++-- nomad/node_endpoint_test.go | 8 ++++---- nomad/state/state_store.go | 10 +++++----- nomad/state/state_store_test.go | 22 +++++++++++----------- nomad/structs/structs.go | 22 +++++++++------------- 9 files changed, 41 insertions(+), 48 deletions(-) diff --git a/api/nodes.go b/api/nodes.go index d4f089695e4..3daf2f7a58c 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -113,25 +113,22 @@ type Node struct { Status string StatusDescription string StatusUpdatedAt int64 - NodeEvents []*NodeEvent + Events []*NodeEvent CreateIndex uint64 ModifyIndex uint64 } -// Subsystem denotes the subsystem where a node event took place. -type Subsystem string - const ( - Drain Subsystem = "Drain" - Driver Subsystem = "Driver" - Heartbeat Subsystem = "Heartbeat" - Cluster Subsystem = "Cluster" + NodeEventSubsystemDrain = "Drain" + NodeEventSubsystemDriver = "Driver" + NodeEventSubsystemHeartbeat = "Heartbeat" + NodeEventSubsystemCluster = "Cluster" ) // NodeEvent is a single unit representing a node’s state change type NodeEvent struct { Message string - Subsystem Subsystem + Subsystem string Details map[string]string Timestamp int64 diff --git a/api/nodes_test.go b/api/nodes_test.go index 7c9a8be9767..06b96074694 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -135,7 +135,7 @@ func TestNodes_Info(t *testing.T) { t.Fatalf("start time: %v, status updated: %v", startTime, result.StatusUpdatedAt) } - if len(result.NodeEvents) < 1 { + if len(result.Events) < 1 { t.Fatalf("Expected at minimum the node register event to be populated: %+v", result) } } diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index 16ba008de6f..a5566adc19f 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -396,10 +396,10 @@ func TestHTTP_NodeQuery(t *testing.T) { if n.ID != node.ID { t.Fatalf("bad: %#v", n) } - if len(n.NodeEvents) < 1 { + if len(n.Events) < 1 { t.Fatalf("Expected node registration event to be populated: %#v", n) } - if n.NodeEvents[0].Message != "Node Registered" { + if n.Events[0].Message != "Node Registered" { t.Fatalf("Expected node registration event to be first node event: %#v", n) } }) diff --git a/command/node_status.go b/command/node_status.go index 85b25bb76e6..cbce475346a 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -390,7 +390,7 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { c.Ui.Output(c.Colorize().Color("\n[bold]Node Events ")) - c.outputNodeEvent(node.NodeEvents) + c.outputNodeEvent(node.Events) } func (c *NodeStatusCommand) outputNodeEvent(events []*api.NodeEvent) { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 6205c703830..11e49c9d8b7 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -111,9 +111,9 @@ func TestFSM_UpsertNodeEvents(t *testing.T) { out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.Equal(2, len(out.NodeEvents)) + require.Equal(2, len(out.Events)) - first := out.NodeEvents[1] + first := out.Events[1] require.Equal(uint64(1), first.CreateIndex) require.Equal("Heartbeating failed", first.Message) } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 70c27d1c1c3..fde2f4d3e23 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -114,7 +114,7 @@ func TestClientEndpoint_EmitEvents(t *testing.T) { ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.False(len(out.NodeEvents) < 2) + require.False(len(out.Events) < 2) } func TestClientEndpoint_Register_SecretMismatch(t *testing.T) { @@ -986,16 +986,16 @@ func TestClientEndpoint_GetNode(t *testing.T) { // Update the status updated at value node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt node.SecretID = "" - node.NodeEvents = resp2.Node.NodeEvents + node.Events = resp2.Node.Events if !reflect.DeepEqual(node, resp2.Node) { t.Fatalf("bad: %#v \n %#v", node, resp2.Node) } // assert that the node register event was set correctly - if len(resp2.Node.NodeEvents) != 1 { + if len(resp2.Node.Events) != 1 { t.Fatalf("Did not set node events: %#v", resp2.Node) } - if resp2.Node.NodeEvents[0].Message != "Node Registered" { + if resp2.Node.Events[0].Message != "Node Registered" { t.Fatalf("Did not set node register event correctly: %#v", resp2.Node) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8a6f4ce5081..1aedcf63af5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -528,7 +528,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { node.Drain = exist.Drain // Retain the drain mode // retain node events that have already been set on the node - node.NodeEvents = exist.NodeEvents + node.Events = exist.Events } else { // Because this is the first time the node is being registered, we should // also create a node registration event @@ -537,7 +537,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { Subsystem: "Cluster", Timestamp: node.StatusUpdatedAt, } - node.NodeEvents = []*structs.NodeEvent{nodeEvent} + node.Events = []*structs.NodeEvent{nodeEvent} node.CreateIndex = index node.ModifyIndex = index } @@ -686,13 +686,13 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*str for _, e := range events { e.CreateIndex = index e.ModifyIndex = index - copyNode.NodeEvents = append(copyNode.NodeEvents, e) + copyNode.Events = append(copyNode.Events, e) } // Keep node events pruned to not exceed the max allowed - if l := len(copyNode.NodeEvents); l > structs.MaxRetainedNodeEvents { + if l := len(copyNode.Events); l > structs.MaxRetainedNodeEvents { delta := l - structs.MaxRetainedNodeEvents - copyNode.NodeEvents = copyNode.NodeEvents[delta:] + copyNode.Events = copyNode.Events[delta:] } // Insert the node diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index acba103c446..d176e178b9a 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -758,9 +758,9 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { err := state.UpsertNode(1000, node) require.Nil(err) - require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) - require.Equal("Node Registered", node.NodeEvents[0].Message) + require.Equal(1, len(node.Events)) + require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem) + require.Equal("Node Registered", node.Events[0].Message) // Create a watchset so we can test that AddNodeEvent fires the watch ws := memdb.NewWatchSet() @@ -784,8 +784,8 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) { out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.Equal(2, len(out.NodeEvents)) - require.Equal(nodeEvent, out.NodeEvents[1]) + require.Equal(2, len(out.Events)) + require.Equal(nodeEvent, out.Events[1]) } // To prevent stale node events from accumulating, we limit the number of @@ -800,9 +800,9 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - require.Equal(1, len(node.NodeEvents)) - require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem) - require.Equal("Node Registered", node.NodeEvents[0].Message) + require.Equal(1, len(node.Events)) + require.Equal(structs.NodeEventSubsystemCluster, node.Events[0].Subsystem) + require.Equal("Node Registered", node.Events[0].Message) var out *structs.Node for i := 1; i <= 20; i++ { @@ -832,9 +832,9 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) { out, err = state.NodeByID(ws, node.ID) require.Nil(err) - require.Equal(10, len(out.NodeEvents)) - require.Equal(uint64(11), out.NodeEvents[0].CreateIndex) - require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex) + require.Equal(10, len(out.Events)) + require.Equal(uint64(11), out.Events[0].CreateIndex) + require.Equal(uint64(20), out.Events[len(out.Events)-1].CreateIndex) } func TestStateStore_Nodes(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e3fbe9cf704..5564f94037a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1072,21 +1072,17 @@ type EmitNodeEventsResponse struct { WriteMeta } -// TODO needs to be a more specific name -// Subsystem denotes the subsystem where a node event took place. -type Subsystem string - const ( - Drain Subsystem = "Drain" - Driver Subsystem = "Driver" - Heartbeat Subsystem = "Heartbeat" - Cluster Subsystem = "Cluster" + NodeEventSubsystemDrain = "Drain" + NodeEventSubsystemDriver = "Driver" + NodeEventSubsystemHeartbeat = "Heartbeat" + NodeEventSubsystemCluster = "Cluster" ) // NodeEvent is a single unit representing a node’s state change type NodeEvent struct { Message string - Subsystem Subsystem + Subsystem string Details map[string]string Timestamp int64 CreateIndex uint64 @@ -1099,7 +1095,7 @@ func (ne *NodeEvent) String() string { details = append(details, fmt.Sprintf("%s: %s", k, v)) } - return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, string(ne.Subsystem), strings.Join(details, ","), ne.Timestamp) + return fmt.Sprintf("Message: %s, Subsystem: %s, Details: %s, Timestamp: %d", ne.Message, ne.Subsystem, strings.Join(details, ","), ne.Timestamp) } func (ne *NodeEvent) Copy() *NodeEvent { @@ -1212,9 +1208,9 @@ type Node struct { // updated StatusUpdatedAt int64 - // NodeEvents is the most recent set of events generated for the node, + // Events is the most recent set of events generated for the node, // retaining only MaxRetainedNodeEvents number at a time - NodeEvents []*NodeEvent + Events []*NodeEvent // Raft Indexes CreateIndex uint64 @@ -1237,7 +1233,7 @@ func (n *Node) Copy() *Node { nn.Reserved = nn.Reserved.Copy() nn.Links = helper.CopyMapStringString(nn.Links) nn.Meta = helper.CopyMapStringString(nn.Meta) - nn.NodeEvents = copyNodeEvents(n.NodeEvents) + nn.Events = copyNodeEvents(n.Events) return nn } From 8ba5ba33de8a67f82ea219f5bcd817463b2f13ce Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 13 Mar 2018 18:04:55 -0700 Subject: [PATCH 19/19] small cleanup --- api/nodes.go | 10 ++++------ client/client.go | 2 +- nomad/fsm.go | 6 +++--- nomad/state/state_store.go | 3 +-- nomad/structs/structs.go | 1 - 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/api/nodes.go b/api/nodes.go index 3daf2f7a58c..db7f25ffb39 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -127,13 +127,11 @@ const ( // NodeEvent is a single unit representing a node’s state change type NodeEvent struct { - Message string - Subsystem string - Details map[string]string - Timestamp int64 - + Message string + Subsystem string + Details map[string]string + Timestamp int64 CreateIndex uint64 - ModifyIndex uint64 } // HostStats represents resource usage stats of the host running a Nomad client diff --git a/client/client.go b/client/client.go index 8a17b414b59..b43f57fca04 100644 --- a/client/client.go +++ b/client/client.go @@ -1153,7 +1153,7 @@ func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error { } var resp structs.EmitNodeEventsResponse if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil { - return fmt.Errorf("Emitting node event failed: %v", err) + return fmt.Errorf("Emitting node events failed: %v", err) } return nil } diff --git a/nomad/fsm.go b/nomad/fsm.go index de51dfed6f3..65344e72473 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -237,7 +237,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) case structs.UpsertNodeEventsType: - return n.applyUpsertNodeEventType(buf[1:], log.Index) + return n.applyUpsertNodeEvent(buf[1:], log.Index) } // Check enterprise only message types. @@ -630,8 +630,8 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} return n.reconcileQueuedAllocations(index) } -// applyUpsertNodeEventType tracks the given node events. -func (n *nomadFSM) applyUpsertNodeEventType(buf []byte, index uint64) interface{} { +// applyUpsertNodeEvent tracks the given node events. +func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now()) var req structs.EmitNodeEventsRequest if err := structs.Decode(buf, &req); err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1aedcf63af5..6156a3c7502 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -527,7 +527,7 @@ func (s *StateStore) UpsertNode(index uint64, node *structs.Node) error { node.ModifyIndex = index node.Drain = exist.Drain // Retain the drain mode - // retain node events that have already been set on the node + // Retain node events that have already been set on the node node.Events = exist.Events } else { // Because this is the first time the node is being registered, we should @@ -685,7 +685,6 @@ func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*str // Add the events, updating the indexes for _, e := range events { e.CreateIndex = index - e.ModifyIndex = index copyNode.Events = append(copyNode.Events, e) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5564f94037a..66413e3d6cb 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1086,7 +1086,6 @@ type NodeEvent struct { Details map[string]string Timestamp int64 CreateIndex uint64 - ModifyIndex uint64 } func (ne *NodeEvent) String() string {