Skip to content

Commit

Permalink
tests: don't mutate global structs in core scheduler tests
Browse files Browse the repository at this point in the history
Some of the core scheduler tests need the maximum batch size for writes to be
smaller than the usual `structs.MaxUUIDsPerWriteRequest`. But they do so by
unsafely modifying the global struct, which creates test flakes in other
tests.

Modify the functions under test to take a batch size parameter. Production code
will pass the global while the tests can inject smaller values.
  • Loading branch information
tgross committed Feb 9, 2023
1 parent 05f6fbc commit fdc9906
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 25 deletions.
18 changes: 9 additions & 9 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ OUTER:
// jobReap contacts the leader and issues a reap on the passed jobs
func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error {
// Call to the leader to issue the reap
for _, req := range c.partitionJobReap(jobs, leaderACL) {
for _, req := range c.partitionJobReap(jobs, leaderACL, structs.MaxUUIDsPerWriteRequest) {
var resp structs.JobBatchDeregisterResponse
if err := c.srv.RPC("Job.BatchDeregister", req, &resp); err != nil {
c.logger.Error("batch job reap failed", "error", err)
Expand All @@ -194,7 +194,7 @@ func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error {
// partitionJobReap returns a list of JobBatchDeregisterRequests to make,
// ensuring a single request does not contain too many jobs. This is necessary
// to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) []*structs.JobBatchDeregisterRequest {
func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string, batchSize int) []*structs.JobBatchDeregisterRequest {
option := &structs.JobDeregisterOptions{Purge: true}
var requests []*structs.JobBatchDeregisterRequest
submittedJobs := 0
Expand All @@ -207,7 +207,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string)
},
}
requests = append(requests, req)
available := structs.MaxUUIDsPerWriteRequest
available := batchSize

if remaining := len(jobs) - submittedJobs; remaining > 0 {
if remaining <= available {
Expand Down Expand Up @@ -360,7 +360,7 @@ func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job,
// allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
// Call to the leader to issue the reap
for _, req := range c.partitionEvalReap(evals, allocs) {
for _, req := range c.partitionEvalReap(evals, allocs, structs.MaxUUIDsPerWriteRequest) {
var resp structs.GenericResponse
if err := c.srv.RPC("Eval.Reap", req, &resp); err != nil {
c.logger.Error("eval reap failed", "error", err)
Expand All @@ -374,7 +374,7 @@ func (c *CoreScheduler) evalReap(evals, allocs []string) error {
// partitionEvalReap returns a list of EvalReapRequest to make, ensuring a single
// request does not contain too many allocations and evaluations. This is
// necessary to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionEvalReap(evals, allocs []string) []*structs.EvalReapRequest {
func (c *CoreScheduler) partitionEvalReap(evals, allocs []string, batchSize int) []*structs.EvalReapRequest {
var requests []*structs.EvalReapRequest
submittedEvals, submittedAllocs := 0, 0
for submittedEvals != len(evals) || submittedAllocs != len(allocs) {
Expand All @@ -384,7 +384,7 @@ func (c *CoreScheduler) partitionEvalReap(evals, allocs []string) []*structs.Eva
},
}
requests = append(requests, req)
available := structs.MaxUUIDsPerWriteRequest
available := batchSize

// Add the allocs first
if remaining := len(allocs) - submittedAllocs; remaining > 0 {
Expand Down Expand Up @@ -572,7 +572,7 @@ OUTER:
// deployments.
func (c *CoreScheduler) deploymentReap(deployments []string) error {
// Call to the leader to issue the reap
for _, req := range c.partitionDeploymentReap(deployments) {
for _, req := range c.partitionDeploymentReap(deployments, structs.MaxUUIDsPerWriteRequest) {
var resp structs.GenericResponse
if err := c.srv.RPC("Deployment.Reap", req, &resp); err != nil {
c.logger.Error("deployment reap failed", "error", err)
Expand All @@ -586,7 +586,7 @@ func (c *CoreScheduler) deploymentReap(deployments []string) error {
// partitionDeploymentReap returns a list of DeploymentDeleteRequest to make,
// ensuring a single request does not contain too many deployments. This is
// necessary to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionDeploymentReap(deployments []string) []*structs.DeploymentDeleteRequest {
func (c *CoreScheduler) partitionDeploymentReap(deployments []string, batchSize int) []*structs.DeploymentDeleteRequest {
var requests []*structs.DeploymentDeleteRequest
submittedDeployments := 0
for submittedDeployments != len(deployments) {
Expand All @@ -596,7 +596,7 @@ func (c *CoreScheduler) partitionDeploymentReap(deployments []string) []*structs
},
}
requests = append(requests, req)
available := structs.MaxUUIDsPerWriteRequest
available := batchSize

if remaining := len(deployments) - submittedDeployments; remaining > 0 {
if remaining <= available {
Expand Down
24 changes: 8 additions & 16 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1849,12 +1849,11 @@ func TestCoreScheduler_PartitionEvalReap(t *testing.T) {
}
core := NewCoreScheduler(s1, snap)

// Set the max ids per reap to something lower.
structs.MaxUUIDsPerWriteRequest = 2

evals := []string{"a", "b", "c"}
allocs := []string{"1", "2", "3"}
requests := core.(*CoreScheduler).partitionEvalReap(evals, allocs)

// Set the max ids per reap to something lower.
requests := core.(*CoreScheduler).partitionEvalReap(evals, allocs, 2)
if len(requests) != 3 {
t.Fatalf("Expected 3 requests got: %v", requests)
}
Expand Down Expand Up @@ -1892,11 +1891,9 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
}
core := NewCoreScheduler(s1, snap)

// Set the max ids per reap to something lower.
structs.MaxUUIDsPerWriteRequest = 2

deployments := []string{"a", "b", "c"}
requests := core.(*CoreScheduler).partitionDeploymentReap(deployments)
// Set the max ids per reap to something lower.
requests := core.(*CoreScheduler).partitionDeploymentReap(deployments, 2)
if len(requests) != 2 {
t.Fatalf("Expected 2 requests got: %v", requests)
}
Expand All @@ -1913,6 +1910,7 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
}

func TestCoreScheduler_PartitionJobReap(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
Expand All @@ -1924,16 +1922,10 @@ func TestCoreScheduler_PartitionJobReap(t *testing.T) {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}

// Set the max ids per reap to something lower.
originalMaxUUIDsPerWriteRequest := structs.MaxUUIDsPerWriteRequest
structs.MaxUUIDsPerWriteRequest = 2
defer func() {
structs.MaxUUIDsPerWriteRequest = originalMaxUUIDsPerWriteRequest
}()

jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}
requests := core.(*CoreScheduler).partitionJobReap(jobs, "")
requests := core.(*CoreScheduler).partitionJobReap(jobs, "", 2)
require.Len(t, requests, 2)

first := requests[0]
Expand Down

0 comments on commit fdc9906

Please sign in to comment.