Skip to content

Commit

Permalink
Events/msgtype cleanup (#9117)
Browse files Browse the repository at this point in the history
* use msgtype in upsert node

adds message type to signature for upsert node, update tests, remove placeholder method

* UpsertAllocs msg type test setup

* use upsertallocs with msg type in signature

update test usage of delete node

delete placeholder msgtype method

* add msgtype to upsert evals signature, update test call sites with test setup msg type

handle snapshot upsert eval outside of FSM and ignore eval event

remove placeholder upsertevalsmsgtype

handle job plan rpc and prevent event creation for plan

msgtype cleanup upsertnodeevents

updatenodedrain msgtype

msg type 0 is a node registration event, so set the default  to the ignore type

* fix named import

* fix signature ordering on upsertnode to match
  • Loading branch information
drewbailey authored Oct 19, 2020
1 parent b4034d6 commit 7ce0b50
Show file tree
Hide file tree
Showing 74 changed files with 1,298 additions and 1,383 deletions.
20 changes: 10 additions & 10 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,13 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2.Job = job

state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(102, []*structs.Allocation{alloc1, alloc2})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -515,7 +515,7 @@ func TestClient_WatchAllocs(t *testing.T) {
// alloc runner.
alloc2_2 := alloc2.Copy()
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 104, []*structs.Allocation{alloc2_2}); err != nil {
t.Fatalf("err upserting stopped alloc: %v", err)
}

Expand Down Expand Up @@ -581,13 +581,13 @@ func TestClient_SaveRestoreState(t *testing.T) {
alloc1.ClientStatus = structs.AllocClientStatusRunning

state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(102, []*structs.Allocation{alloc1}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -687,13 +687,13 @@ func TestClient_AddAllocError(t *testing.T) {
alloc1.TaskResources = nil

state := s1.State()
err := state.UpsertJob(100, job)
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job)
require.Nil(err)

err = state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID))
require.Nil(err)

err = state.UpsertAllocs(102, []*structs.Allocation{alloc1})
err = state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1})
require.Nil(err)

// Push this alloc update to the client
Expand Down Expand Up @@ -795,7 +795,7 @@ func TestClient_BlockedAllocations(t *testing.T) {
}

state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(100, []*structs.Allocation{alloc})
state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})

// Wait until the client downloads and starts the allocation
testutil.WaitForResult(func() (bool, error) {
Expand All @@ -818,7 +818,7 @@ func TestClient_BlockedAllocations(t *testing.T) {
alloc2.Job = alloc.Job
alloc2.JobID = alloc.JobID
alloc2.PreviousAllocation = alloc.ID
if err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}

Expand All @@ -839,7 +839,7 @@ func TestClient_BlockedAllocations(t *testing.T) {
// Change the desired state of the parent alloc to stop
alloc1 := alloc.Copy()
alloc1.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(300, []*structs.Allocation{alloc1}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 300, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down
4 changes: 2 additions & 2 deletions client/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func TestAllocGarbageCollector_MakeRoomFor_MaxAllocs(t *testing.T) {

upsertJobFn := func(server *nomad.Server, j *structs.Job) {
state := server.State()
require.NoError(state.UpsertJob(nextIndex(), j))
require.NoError(state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(), j))
require.NoError(state.UpsertJobSummary(nextIndex(), mock.JobSummary(j.ID)))
}

Expand All @@ -392,7 +392,7 @@ func TestAllocGarbageCollector_MakeRoomFor_MaxAllocs(t *testing.T) {

upsertAllocFn := func(server *nomad.Server, a *structs.Allocation) {
state := server.State()
require.NoError(state.UpsertAllocs(nextIndex(), []*structs.Allocation{a}))
require.NoError(state.UpsertAllocs(structs.MsgTypeTestSetup, nextIndex(), []*structs.Allocation{a}))
}

upsertNewAllocFn := func(server *nomad.Server, j *structs.Job) *structs.Allocation {
Expand Down
15 changes: 6 additions & 9 deletions command/agent/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func TestHTTP_AllocsList(t *testing.T) {

state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -112,8 +111,7 @@ func TestHTTP_AllocsPrefixList(t *testing.T) {
if err := state.UpsertJobSummary(999, summary2); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -167,8 +165,7 @@ func TestHTTP_AllocQuery(t *testing.T) {
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -220,7 +217,7 @@ func TestHTTP_AllocQuery_Payload(t *testing.T) {
compressed := snappy.Encode(nil, expected)
alloc.Job.Payload = compressed

err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -404,7 +401,7 @@ func TestHTTP_AllocStop(t *testing.T) {
require := require.New(t)
require.NoError(state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)))

require.NoError(state.UpsertAllocs(1000, []*structs.Allocation{alloc}))
require.NoError(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}))

// Test that the happy path works
{
Expand Down Expand Up @@ -629,7 +626,7 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
}
alloc.NodeID = s.client.NodeID()
state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc.Copy()}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc.Copy()}); err != nil {
t.Fatalf("error upserting alloc: %v", err)
}

Expand Down
14 changes: 7 additions & 7 deletions command/agent/deployment_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func TestHTTP_DeploymentAllocations(t *testing.T) {
a2.TaskStates = make(map[string]*structs.TaskState)
a2.TaskStates["test"] = taskState2

assert.Nil(state.UpsertJob(998, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(999, d), "UpsertDeployment")
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a1, a2}), "UpsertAllocs")
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a1, a2}), "UpsertAllocs")

// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/deployment/allocations/"+d.ID, nil)
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestHTTP_DeploymentPause(t *testing.T) {
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertJob(999, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")

// Create the pause request
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestHTTP_DeploymentPromote(t *testing.T) {
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertJob(999, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")

// Create the pause request
Expand Down Expand Up @@ -259,9 +259,9 @@ func TestHTTP_DeploymentAllocHealth(t *testing.T) {
a := mock.Alloc()
a.JobID = j.ID
a.DeploymentID = d.ID
assert.Nil(state.UpsertJob(998, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(999, d), "UpsertDeployment")
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}), "UpsertAllocs")
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}), "UpsertAllocs")

// Create the pause request
args := structs.DeploymentAllocHealthRequest{
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestHTTP_DeploymentFail(t *testing.T) {
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertJob(998, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(999, d), "UpsertDeployment")

// Make the HTTP request
Expand Down
11 changes: 4 additions & 7 deletions command/agent/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ func TestHTTP_EvalList(t *testing.T) {
state := s.Agent.server.State()
eval1 := mock.Eval()
eval2 := mock.Eval()
err := state.UpsertEvals(1000,
[]*structs.Evaluation{eval1, eval2})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -63,8 +62,7 @@ func TestHTTP_EvalPrefixList(t *testing.T) {
eval1.ID = "aaabbbbb-e8f7-fd38-c855-ab94ceb89706"
eval2 := mock.Eval()
eval2.ID = "aaabbbbb-e8f7-fd38-c855-ab94ceb89706"
err := state.UpsertEvals(1000,
[]*structs.Evaluation{eval1, eval2})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -116,8 +114,7 @@ func TestHTTP_EvalAllocations(t *testing.T) {
alloc2.EvalID = alloc1.EvalID
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -161,7 +158,7 @@ func TestHTTP_EvalQuery(t *testing.T) {
// Directly manipulate the state
state := s.Agent.server.State()
eval := mock.Eval()
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions command/agent/fs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func addAllocToClient(agent *TestAgent, alloc *structs.Allocation, wait clientAl

// Upsert the allocation
state := agent.server.State()
require.Nil(state.UpsertJob(999, alloc.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{alloc}))
require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job))
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc}))

if wait == noWaitClientAlloc {
return
Expand Down
4 changes: 2 additions & 2 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestHTTP_JobQuery_Payload(t *testing.T) {

// Directly manipulate the state
state := s.Agent.server.State()
if err := state.UpsertJob(1000, job); err != nil {
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job); err != nil {
t.Fatalf("Failed to upsert job: %v", err)
}

Expand Down Expand Up @@ -1007,7 +1007,7 @@ func TestHTTP_JobAllocations(t *testing.T) {
alloc1.TaskStates = make(map[string]*structs.TaskState)
alloc1.TaskStates["test"] = taskState
state := s.Agent.server.State()
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestHTTP_NodeForceEval(t *testing.T) {
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestHTTP_NodeAllocations(t *testing.T) {
alloc1.TaskStates = make(map[string]*structs.TaskState)
alloc1.TaskStates["test"] = taskState

err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestHTTP_NodePurge(t *testing.T) {
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
11 changes: 5 additions & 6 deletions command/agent/search_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func createJobForTest(jobID string, s *TestAgent, t *testing.T) {
job.TaskGroups[0].Count = 1

state := s.Agent.server.State()
err := state.UpsertJob(1000, job)
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
assert.Nil(err)
}

Expand Down Expand Up @@ -147,8 +147,7 @@ func TestHTTP_Search_Evaluation(t *testing.T) {
state := s.Agent.server.State()
eval1 := mock.Eval()
eval2 := mock.Eval()
err := state.UpsertEvals(9000,
[]*structs.Evaluation{eval1, eval2})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 9000, []*structs.Evaluation{eval1, eval2})
assert.Nil(err)

prefix := eval1.ID[:len(eval1.ID)-2]
Expand Down Expand Up @@ -182,7 +181,7 @@ func TestHTTP_Search_Allocations(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
alloc := mock.Alloc()
err := state.UpsertAllocs(7000, []*structs.Allocation{alloc})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 7000, []*structs.Allocation{alloc})
assert.Nil(err)

prefix := alloc.ID[:len(alloc.ID)-2]
Expand Down Expand Up @@ -215,7 +214,7 @@ func TestHTTP_Search_Nodes(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
node := mock.Node()
err := state.UpsertNode(6000, node)
err := state.UpsertNode(structs.MsgTypeTestSetup, 6000, node)
assert.Nil(err)

prefix := node.ID[:len(node.ID)-2]
Expand Down Expand Up @@ -307,7 +306,7 @@ func TestHTTP_Search_AllContext(t *testing.T) {
state := s.Agent.server.State()
eval1 := mock.Eval()
eval1.ID = testJobID
err := state.UpsertEvals(8000, []*structs.Evaluation{eval1})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 8000, []*structs.Evaluation{eval1})
assert.Nil(err)

data := structs.SearchRequest{Prefix: testJobPrefix, Context: structs.All}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestAllocExecCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{Last: prefix}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestFSCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{Last: prefix}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestLogsCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{Last: prefix}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestAllocRestartCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{Last: prefix}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestAllocSignalCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{All: []string{"signal", prefix}, Last: prefix}
Expand Down
Loading

0 comments on commit 7ce0b50

Please sign in to comment.