Skip to content

Commit

Permalink
Merge pull request #1012 from hashicorp/f-partition-gc
Browse files Browse the repository at this point in the history
core: Limit GC size
  • Loading branch information
dadgar committed Apr 14, 2016
2 parents 6b94f6b + fdb619e commit fb6f794
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 11 deletions.
69 changes: 58 additions & 11 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import (
"github.com/hashicorp/nomad/scheduler"
)

var (
// maxIdsPerReap is the maximum number of evals and allocations to reap in a
// single Raft transaction. This is to ensure that the Raft message does not
// become too large.
maxIdsPerReap = (1024 * 256) / 36 // 0.25 MB of ids.
)

// CoreScheduler is a special "scheduler" that is registered
// as "_core". It is used to run various administrative work
// across the cluster.
Expand Down Expand Up @@ -253,22 +260,62 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64)
// allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error {
// Call to the leader to issue the reap
req := structs.EvalDeleteRequest{
Evals: evals,
Allocs: allocs,
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
}
var resp structs.GenericResponse
if err := c.srv.RPC("Eval.Reap", &req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
return err
for _, req := range c.partitionReap(evals, allocs) {
var resp structs.GenericResponse
if err := c.srv.RPC("Eval.Reap", req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err)
return err
}
}

return nil
}

// partitionReap returns a list of EvalDeleteRequest to make, ensuring a single
// request does not contain too many allocations and evaluations. This is
// necessary to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionReap(evals, allocs []string) []*structs.EvalDeleteRequest {
var requests []*structs.EvalDeleteRequest
submittedEvals, submittedAllocs := 0, 0
for submittedEvals != len(evals) || submittedAllocs != len(allocs) {
req := &structs.EvalDeleteRequest{
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
},
}
requests = append(requests, req)
available := maxIdsPerReap

// Add the allocs first
if remaining := len(allocs) - submittedAllocs; remaining > 0 {
if remaining <= available {
req.Allocs = allocs[submittedAllocs:]
available -= remaining
submittedAllocs += remaining
} else {
req.Allocs = allocs[submittedAllocs : submittedAllocs+available]
submittedAllocs += available

// Exhausted space so skip adding evals
continue
}
}

// Add the evals
if remaining := len(evals) - submittedEvals; remaining > 0 {
if remaining <= available {
req.Evals = evals[submittedEvals:]
submittedEvals += remaining
} else {
req.Evals = evals[submittedEvals : submittedEvals+available]
submittedEvals += available
}
}
}

return requests
}

// nodeGC is used to garbage collect old nodes
func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
// Iterate over the evaluations
Expand Down
38 changes: 38 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,3 +574,41 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
}
}
}

func TestCoreScheduler_PartitionReap(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Create a core scheduler
snap, err := s1.fsm.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Set the max ids per reap to something lower.
maxIdsPerReap = 2

evals := []string{"a", "b", "c"}
allocs := []string{"1", "2", "3"}
requests := core.(*CoreScheduler).partitionReap(evals, allocs)
if len(requests) != 3 {
t.Fatalf("Expected 3 requests got: %v", requests)
}

first := requests[0]
if len(first.Allocs) != 2 && len(first.Evals) != 0 {
t.Fatalf("Unexpected first request: %v", first)
}

second := requests[1]
if len(second.Allocs) != 1 && len(second.Evals) != 1 {
t.Fatalf("Unexpected second request: %v", second)
}

third := requests[2]
if len(third.Allocs) != 0 && len(third.Evals) != 2 {
t.Fatalf("Unexpected third request: %v", third)
}
}

0 comments on commit fb6f794

Please sign in to comment.