Skip to content

Commit

Permalink
batch submitting node events
Browse files Browse the repository at this point in the history
  • Loading branch information
chelseakomlo committed Mar 9, 2018
1 parent ee70aef commit 2676fc9
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 34 deletions.
25 changes: 14 additions & 11 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,21 +639,24 @@ func (n *nomadFSM) applyAddNodeEventType(buf []byte, index uint64) interface{} {
return err
}

ws := memdb.NewWatchSet()
node, err := n.state.NodeByID(ws, req.NodeID)
for nodeID, nodeEvents := range req.NodeEvents {
ws := memdb.NewWatchSet()
node, err := n.state.NodeByID(ws, nodeID)

if err != nil {
return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err)
}
if err != nil {
return fmt.Errorf("encountered error when looking up nodes by id to insert node event: %v", err)
}

if node == nil {
return fmt.Errorf("unable to look up node by id %s to insert node event", req.NodeID)
}
if node == nil {
return fmt.Errorf("unable to look up node by id %s to insert node event", nodeID)
}

if err := n.state.AddNodeEvent(index, node, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err)
return err
if err := n.state.AddNodeEvent(index, node, nodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: EmitNodeEventRequest failed to add node event: %v", err)
return err
}
}

return nil
}

Expand Down
12 changes: 7 additions & 5 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ func TestFSM_ApplyNodeEvent(t *testing.T) {
Timestamp: time.Now().Unix(),
}

nodeEvents := []*structs.NodeEvent{nodeEvent}
allEvents := map[string][]*structs.NodeEvent{node.ID: nodeEvents}

req := structs.EmitNodeEventRequest{
NodeID: node.ID,
NodeEvent: nodeEvent,
NodeEvents: allEvents,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf, err := structs.Encode(structs.AddNodeEventType, req)
Expand All @@ -106,12 +108,12 @@ func TestFSM_ApplyNodeEvent(t *testing.T) {
require.Nil(resp)

ws := memdb.NewWatchSet()
actualNode, err := state.NodeByID(ws, node.ID)
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)

require.Equal(2, len(actualNode.NodeEvents))
require.Equal(2, len(out.NodeEvents))

first := node.NodeEvents[1]
first := out.NodeEvents[1]
require.Equal(uint64(1), first.CreateIndex)
require.Equal("Heartbeating failed", first.Message)
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func TestClientEndpoint_EmitEvent(t *testing.T) {
Timestamp: time.Now().Unix(),
}

nodeEvents := map[string][]*structs.NodeEvent{node.ID: []*structs.NodeEvent{nodeEvent}}
req := structs.EmitNodeEventRequest{
NodeID: node.ID,
NodeEvent: nodeEvent,
NodeEvents: nodeEvents,
WriteRequest: structs.WriteRequest{Region: "global"},
}

Expand Down
26 changes: 14 additions & 12 deletions nomad/state/events_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,38 @@ import (
)

// addNodeEvent is a function which wraps upsertNodeEvent
func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, event *structs.NodeEvent) error {
func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

err := s.upsertNodeEvent(index, node, event, txn)
err := s.upsertNodeEvents(index, node, events, txn)
txn.Commit()
return err
}

// upsertNodeEvent upserts a node event for a respective node. It also maintains
// that only 10 node events are ever stored simultaneously, deleting older
// events once this bound has been reached.
func (s *StateStore) upsertNodeEvent(index uint64, node *structs.Node, event *structs.NodeEvent, txn *memdb.Txn) error {

event.CreateIndex = index
event.ModifyIndex = index
func (s *StateStore) upsertNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent, txn *memdb.Txn) error {

// Copy the existing node
copyNode := new(structs.Node)
*copyNode = *node

nodeEvents := node.NodeEvents

// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= 10 {
delta := len(nodeEvents) - 10
nodeEvents = nodeEvents[delta+1:]
for _, e := range events {
e.CreateIndex = index
e.ModifyIndex = index

// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= 10 {
delta := len(nodeEvents) - 10
nodeEvents = nodeEvents[delta+1:]
}
nodeEvents = append(nodeEvents, e)
copyNode.NodeEvents = nodeEvents
}
nodeEvents = append(nodeEvents, event)
copyNode.NodeEvents = nodeEvents

// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions nomad/state/events_state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestStateStore_AddSingleNodeEvent(t *testing.T) {
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err = state.AddNodeEvent(1001, node, nodeEvent)
err = state.AddNodeEvent(1001, node, []*structs.NodeEvent{nodeEvent})
require.Nil(err)

require.True(watchFired(ws))
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err := state.AddNodeEvent(uint64(i), out, nodeEvent)
err := state.AddNodeEvent(uint64(i), out, []*structs.NodeEvent{nodeEvent})
require.Nil(err)

require.True(watchFired(ws))
Expand Down
6 changes: 4 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,8 +1186,10 @@ type NodeEvent struct {
// EmitNodeEventRequest is a request to update the node events source
// with a new client-side event
type EmitNodeEventRequest struct {
NodeID string
NodeEvent *NodeEvent
// NodeEvents are a map where the key is a node id, and value is a list of
// events for that node
NodeEvents map[string][]*NodeEvent

WriteRequest
}

Expand Down

0 comments on commit 2676fc9

Please sign in to comment.