Skip to content

Commit

Permalink
keep state store functions in one file
Browse files Browse the repository at this point in the history
  • Loading branch information
chelseakomlo committed Mar 12, 2018
1 parent abf12f6 commit d68d90e
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 147 deletions.
53 changes: 0 additions & 53 deletions nomad/state/events_state_store.go

This file was deleted.

94 changes: 0 additions & 94 deletions nomad/state/events_state_store_test.go

This file was deleted.

45 changes: 45 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3693,3 +3693,48 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) {
}
}
}

// addNodeEvent is a function which wraps upsertNodeEvent
func (s *StateStore) AddNodeEvent(index uint64, node *structs.Node, events []*structs.NodeEvent) error {
txn := s.db.Txn(true)
defer txn.Abort()

err := s.upsertNodeEvents(index, node, events, txn)
txn.Commit()
return err
}

// upsertNodeEvent upserts a node event for a respective node. It also maintains
// that only 10 node events are ever stored simultaneously, deleting older
// events once this bound has been reached.
func (s *StateStore) upsertNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent, txn *memdb.Txn) error {

// Copy the existing node
copyNode := new(structs.Node)
*copyNode = *node

nodeEvents := node.NodeEvents

for _, e := range events {
e.CreateIndex = index
e.ModifyIndex = index

// keep node events pruned to below 10 simultaneously
if len(nodeEvents) >= 10 {
delta := len(nodeEvents) - 10
nodeEvents = nodeEvents[delta+1:]
}
nodeEvents = append(nodeEvents, e)
copyNode.NodeEvents = nodeEvents
}

// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return nil
}
82 changes: 82 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6469,6 +6469,88 @@ func TestStateStore_Abandon(t *testing.T) {
}
}

func TestStateStore_AddSingleNodeEvent(t *testing.T) {
require := require.New(t)
state := testStateStore(t)

node := mock.Node()

// We create a new node event every time we register a node
err := state.UpsertNode(1000, node)
require.Nil(err)

require.Equal(1, len(node.NodeEvents))
require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem)
require.Equal("Node Registered", node.NodeEvents[0].Message)

// Create a watchset so we can test that AddNodeEvent fires the watch
ws := memdb.NewWatchSet()
_, err = state.NodeByID(ws, node.ID)
require.Nil(err)

nodeEvent := &structs.NodeEvent{
Message: "failed",
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err = state.AddNodeEvent(1001, node, []*structs.NodeEvent{nodeEvent})
require.Nil(err)

require.True(watchFired(ws))

ws = memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)

require.Equal(2, len(out.NodeEvents))
require.Equal(nodeEvent, out.NodeEvents[1])
}

// To prevent stale node events from accumulating, we limit the number of
// stored node events to 10.
func TestStateStore_NodeEvents_RetentionWindow(t *testing.T) {
require := require.New(t)
state := testStateStore(t)

node := mock.Node()

err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(1, len(node.NodeEvents))
require.Equal(structs.Subsystem("Cluster"), node.NodeEvents[0].Subsystem)
require.Equal("Node Registered", node.NodeEvents[0].Message)

var out *structs.Node
for i := 1; i <= 20; i++ {
ws := memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)

nodeEvent := &structs.NodeEvent{
Message: fmt.Sprintf("%dith failed", i),
Subsystem: "Driver",
Timestamp: time.Now().Unix(),
}
err := state.AddNodeEvent(uint64(i), out, []*structs.NodeEvent{nodeEvent})
require.Nil(err)

require.True(watchFired(ws))
ws = memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)
}

ws := memdb.NewWatchSet()
out, err = state.NodeByID(ws, node.ID)
require.Nil(err)

require.Equal(10, len(out.NodeEvents))
require.Equal(uint64(11), out.NodeEvents[0].CreateIndex)
require.Equal(uint64(20), out.NodeEvents[len(out.NodeEvents)-1].CreateIndex)
}

// watchFired is a helper for unit tests that returns if the given watch set
// fired (it doesn't care which watch actually fired). This uses a fixed
// timeout since we already expect the event happened before calling this and
Expand Down

0 comments on commit d68d90e

Please sign in to comment.