Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events/msgtype cleanup #9117

Merged
merged 6 commits into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,13 @@ func TestClient_WatchAllocs(t *testing.T) {
alloc2.Job = job

state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(102, []*structs.Allocation{alloc1, alloc2})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -515,7 +515,7 @@ func TestClient_WatchAllocs(t *testing.T) {
// alloc runner.
alloc2_2 := alloc2.Copy()
alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(104, []*structs.Allocation{alloc2_2}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 104, []*structs.Allocation{alloc2_2}); err != nil {
t.Fatalf("err upserting stopped alloc: %v", err)
}

Expand Down Expand Up @@ -581,13 +581,13 @@ func TestClient_SaveRestoreState(t *testing.T) {
alloc1.ClientStatus = structs.AllocClientStatusRunning

state := s1.State()
if err := state.UpsertJob(100, job); err != nil {
if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job); err != nil {
t.Fatal(err)
}
if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(102, []*structs.Allocation{alloc1}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -687,13 +687,13 @@ func TestClient_AddAllocError(t *testing.T) {
alloc1.TaskResources = nil

state := s1.State()
err := state.UpsertJob(100, job)
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job)
require.Nil(err)

err = state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID))
require.Nil(err)

err = state.UpsertAllocs(102, []*structs.Allocation{alloc1})
err = state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1})
require.Nil(err)

// Push this alloc update to the client
Expand Down Expand Up @@ -795,7 +795,7 @@ func TestClient_BlockedAllocations(t *testing.T) {
}

state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(100, []*structs.Allocation{alloc})
state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})

// Wait until the client downloads and starts the allocation
testutil.WaitForResult(func() (bool, error) {
Expand All @@ -818,7 +818,7 @@ func TestClient_BlockedAllocations(t *testing.T) {
alloc2.Job = alloc.Job
alloc2.JobID = alloc.JobID
alloc2.PreviousAllocation = alloc.ID
if err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}

Expand All @@ -839,7 +839,7 @@ func TestClient_BlockedAllocations(t *testing.T) {
// Change the desired state of the parent alloc to stop
alloc1 := alloc.Copy()
alloc1.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(300, []*structs.Allocation{alloc1}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 300, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down
4 changes: 2 additions & 2 deletions client/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func TestAllocGarbageCollector_MakeRoomFor_MaxAllocs(t *testing.T) {

upsertJobFn := func(server *nomad.Server, j *structs.Job) {
state := server.State()
require.NoError(state.UpsertJob(nextIndex(), j))
require.NoError(state.UpsertJob(structs.MsgTypeTestSetup, nextIndex(), j))
require.NoError(state.UpsertJobSummary(nextIndex(), mock.JobSummary(j.ID)))
}

Expand All @@ -392,7 +392,7 @@ func TestAllocGarbageCollector_MakeRoomFor_MaxAllocs(t *testing.T) {

upsertAllocFn := func(server *nomad.Server, a *structs.Allocation) {
state := server.State()
require.NoError(state.UpsertAllocs(nextIndex(), []*structs.Allocation{a}))
require.NoError(state.UpsertAllocs(structs.MsgTypeTestSetup, nextIndex(), []*structs.Allocation{a}))
}

upsertNewAllocFn := func(server *nomad.Server, j *structs.Job) *structs.Allocation {
Expand Down
15 changes: 6 additions & 9 deletions command/agent/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func TestHTTP_AllocsList(t *testing.T) {

state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -112,8 +111,7 @@ func TestHTTP_AllocsPrefixList(t *testing.T) {
if err := state.UpsertJobSummary(999, summary2); err != nil {
t.Fatal(err)
}
if err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2}); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -167,8 +165,7 @@ func TestHTTP_AllocQuery(t *testing.T) {
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -220,7 +217,7 @@ func TestHTTP_AllocQuery_Payload(t *testing.T) {
compressed := snappy.Encode(nil, expected)
alloc.Job.Payload = compressed

err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -404,7 +401,7 @@ func TestHTTP_AllocStop(t *testing.T) {
require := require.New(t)
require.NoError(state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)))

require.NoError(state.UpsertAllocs(1000, []*structs.Allocation{alloc}))
require.NoError(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}))

// Test that the happy path works
{
Expand Down Expand Up @@ -629,7 +626,7 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
}
alloc.NodeID = s.client.NodeID()
state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
if err := state.UpsertAllocs(1000, []*structs.Allocation{alloc.Copy()}); err != nil {
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc.Copy()}); err != nil {
t.Fatalf("error upserting alloc: %v", err)
}

Expand Down
14 changes: 7 additions & 7 deletions command/agent/deployment_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func TestHTTP_DeploymentAllocations(t *testing.T) {
a2.TaskStates = make(map[string]*structs.TaskState)
a2.TaskStates["test"] = taskState2

assert.Nil(state.UpsertJob(998, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(999, d), "UpsertDeployment")
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a1, a2}), "UpsertAllocs")
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a1, a2}), "UpsertAllocs")

// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/deployment/allocations/"+d.ID, nil)
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestHTTP_DeploymentPause(t *testing.T) {
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertJob(999, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")

// Create the pause request
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestHTTP_DeploymentPromote(t *testing.T) {
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertJob(999, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(1000, d), "UpsertDeployment")

// Create the pause request
Expand Down Expand Up @@ -259,9 +259,9 @@ func TestHTTP_DeploymentAllocHealth(t *testing.T) {
a := mock.Alloc()
a.JobID = j.ID
a.DeploymentID = d.ID
assert.Nil(state.UpsertJob(998, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(999, d), "UpsertDeployment")
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}), "UpsertAllocs")
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}), "UpsertAllocs")

// Create the pause request
args := structs.DeploymentAllocHealthRequest{
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestHTTP_DeploymentFail(t *testing.T) {
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
assert.Nil(state.UpsertJob(998, j), "UpsertJob")
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 998, j), "UpsertJob")
assert.Nil(state.UpsertDeployment(999, d), "UpsertDeployment")

// Make the HTTP request
Expand Down
11 changes: 4 additions & 7 deletions command/agent/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ func TestHTTP_EvalList(t *testing.T) {
state := s.Agent.server.State()
eval1 := mock.Eval()
eval2 := mock.Eval()
err := state.UpsertEvals(1000,
[]*structs.Evaluation{eval1, eval2})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -63,8 +62,7 @@ func TestHTTP_EvalPrefixList(t *testing.T) {
eval1.ID = "aaabbbbb-e8f7-fd38-c855-ab94ceb89706"
eval2 := mock.Eval()
eval2.ID = "aaabbbbb-e8f7-fd38-c855-ab94ceb89706"
err := state.UpsertEvals(1000,
[]*structs.Evaluation{eval1, eval2})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -116,8 +114,7 @@ func TestHTTP_EvalAllocations(t *testing.T) {
alloc2.EvalID = alloc1.EvalID
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -161,7 +158,7 @@ func TestHTTP_EvalQuery(t *testing.T) {
// Directly manipulate the state
state := s.Agent.server.State()
eval := mock.Eval()
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions command/agent/fs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func addAllocToClient(agent *TestAgent, alloc *structs.Allocation, wait clientAl

// Upsert the allocation
state := agent.server.State()
require.Nil(state.UpsertJob(999, alloc.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{alloc}))
require.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 999, alloc.Job))
require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc}))

if wait == noWaitClientAlloc {
return
Expand Down
4 changes: 2 additions & 2 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestHTTP_JobQuery_Payload(t *testing.T) {

// Directly manipulate the state
state := s.Agent.server.State()
if err := state.UpsertJob(1000, job); err != nil {
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job); err != nil {
t.Fatalf("Failed to upsert job: %v", err)
}

Expand Down Expand Up @@ -1007,7 +1007,7 @@ func TestHTTP_JobAllocations(t *testing.T) {
alloc1.TaskStates = make(map[string]*structs.TaskState)
alloc1.TaskStates["test"] = taskState
state := s.Agent.server.State()
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestHTTP_NodeForceEval(t *testing.T) {
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestHTTP_NodeAllocations(t *testing.T) {
alloc1.TaskStates = make(map[string]*structs.TaskState)
alloc1.TaskStates["test"] = taskState

err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestHTTP_NodePurge(t *testing.T) {
if err := state.UpsertJobSummary(999, mock.JobSummary(alloc1.JobID)); err != nil {
t.Fatal(err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
11 changes: 5 additions & 6 deletions command/agent/search_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func createJobForTest(jobID string, s *TestAgent, t *testing.T) {
job.TaskGroups[0].Count = 1

state := s.Agent.server.State()
err := state.UpsertJob(1000, job)
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
assert.Nil(err)
}

Expand Down Expand Up @@ -147,8 +147,7 @@ func TestHTTP_Search_Evaluation(t *testing.T) {
state := s.Agent.server.State()
eval1 := mock.Eval()
eval2 := mock.Eval()
err := state.UpsertEvals(9000,
[]*structs.Evaluation{eval1, eval2})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 9000, []*structs.Evaluation{eval1, eval2})
assert.Nil(err)

prefix := eval1.ID[:len(eval1.ID)-2]
Expand Down Expand Up @@ -182,7 +181,7 @@ func TestHTTP_Search_Allocations(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
alloc := mock.Alloc()
err := state.UpsertAllocs(7000, []*structs.Allocation{alloc})
err := state.UpsertAllocs(structs.MsgTypeTestSetup, 7000, []*structs.Allocation{alloc})
assert.Nil(err)

prefix := alloc.ID[:len(alloc.ID)-2]
Expand Down Expand Up @@ -215,7 +214,7 @@ func TestHTTP_Search_Nodes(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
state := s.Agent.server.State()
node := mock.Node()
err := state.UpsertNode(6000, node)
err := state.UpsertNode(structs.MsgTypeTestSetup, node, 6000)
assert.Nil(err)

prefix := node.ID[:len(node.ID)-2]
Expand Down Expand Up @@ -307,7 +306,7 @@ func TestHTTP_Search_AllContext(t *testing.T) {
state := s.Agent.server.State()
eval1 := mock.Eval()
eval1.ID = testJobID
err := state.UpsertEvals(8000, []*structs.Evaluation{eval1})
err := state.UpsertEvals(structs.MsgTypeTestSetup, 8000, []*structs.Evaluation{eval1})
assert.Nil(err)

data := structs.SearchRequest{Prefix: testJobPrefix, Context: structs.All}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestAllocExecCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{Last: prefix}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestFSCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{Last: prefix}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestLogsCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{Last: prefix}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestAllocRestartCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{Last: prefix}
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestAllocSignalCommand_AutocompleteArgs(t *testing.T) {
// Create a fake alloc
state := srv.Agent.Server().State()
a := mock.Alloc()
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{a}))

prefix := a.ID[:5]
args := complete.Args{All: []string{"signal", prefix}, Last: prefix}
Expand Down
Loading