From b25ceeeb03a7cd4cc3703d2148386113e6b27615 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 30 Mar 2016 15:17:13 -0700 Subject: [PATCH 1/2] Limit GC size --- nomad/core_sched.go | 69 +++++++++++++++++++++++++++++++++------- nomad/core_sched_test.go | 38 ++++++++++++++++++++++ 2 files changed, 96 insertions(+), 11 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 1bff28f67c5..e2d84a58b02 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -10,6 +10,13 @@ import ( "github.com/hashicorp/nomad/scheduler" ) +var ( + // maxIdsPerReap is the maximum number of evals and allocations to reap in a + // single Raft transaction. This is to ensure that the Raft message does not + // become too large. + maxIdsPerReap = (1024 * 512) / 36 // 0.5 MB of ids. +) + // CoreScheduler is a special "scheduler" that is registered // as "_core". It is used to run various administrative work // across the cluster. @@ -232,22 +239,62 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) // allocs. func (c *CoreScheduler) evalReap(evals, allocs []string) error { // Call to the leader to issue the reap - req := structs.EvalDeleteRequest{ - Evals: evals, - Allocs: allocs, - WriteRequest: structs.WriteRequest{ - Region: c.srv.config.Region, - }, - } - var resp structs.GenericResponse - if err := c.srv.RPC("Eval.Reap", &req, &resp); err != nil { - c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err) - return err + for _, req := range c.partitionReap(evals, allocs) { + var resp structs.GenericResponse + if err := c.srv.RPC("Eval.Reap", req, &resp); err != nil { + c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err) + return err + } } return nil } +// partitionReap returns a list of EvalDeleteRequest to make, ensuring a single +// request does not contain too many allocations and evaluations. This is +// necessary to ensure that the Raft transaction does not become too large. +func (c *CoreScheduler) partitionReap(evals, allocs []string) []*structs.EvalDeleteRequest { + var requests []*structs.EvalDeleteRequest + var submittedEvals, submittedAllocs int + for submittedEvals != len(evals) || submittedAllocs != len(allocs) { + req := &structs.EvalDeleteRequest{ + WriteRequest: structs.WriteRequest{ + Region: c.srv.config.Region, + }, + } + requests = append(requests, req) + available := maxIdsPerReap + + // Add the evals first + if remaining := len(evals) - submittedEvals; remaining > 0 { + if remaining <= available { + req.Evals = evals[submittedEvals:] + available -= remaining + submittedEvals += remaining + } else { + req.Evals = evals[submittedEvals : submittedEvals+available] + submittedEvals += available + + // Exhausted space so skip adding allocs + continue + } + } + + // Add the allocs + if remaining := len(allocs) - submittedAllocs; remaining > 0 { + if remaining <= available { + req.Allocs = allocs[submittedAllocs:] + submittedAllocs += remaining + } else { + req.Allocs = allocs[submittedAllocs : submittedAllocs+available] + submittedAllocs += available + } + } + } + + return requests +} + // nodeGC is used to garbage collect old nodes func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { // Iterate over the evaluations diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 792c306482f..b33bdac23b2 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -534,3 +534,41 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { } } } + +func TestCoreScheduler_PartitionReap(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create a core scheduler + snap, err := s1.fsm.State().Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Set the max ids per reap to something lower. + maxIdsPerReap = 2 + + evals := []string{"a", "b", "c"} + allocs := []string{"1", "2", "3"} + requests := core.(*CoreScheduler).partitionReap(evals, allocs) + if len(requests) != 3 { + t.Fatalf("Expected 3 requests got: %v", requests) + } + + first := requests[0] + if len(first.Evals) != 2 && len(first.Allocs) != 0 { + t.Fatalf("Unexpected first request: %v", first) + } + + second := requests[1] + if len(second.Evals) != 1 && len(second.Allocs) != 1 { + t.Fatalf("Unexpected second request: %v", second) + } + + third := requests[2] + if len(third.Evals) != 0 && len(third.Allocs) != 2 { + t.Fatalf("Unexpected third request: %v", third) + } +} From fdb619efa579df923084053483dfecdb64bc2b56 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 14 Apr 2016 11:41:04 -0700 Subject: [PATCH 2/2] Address comments --- nomad/core_sched.go | 30 +++++++++++++++--------------- nomad/core_sched_test.go | 6 +++--- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index e2d84a58b02..c6b0829c550 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -14,7 +14,7 @@ var ( // maxIdsPerReap is the maximum number of evals and allocations to reap in a // single Raft transaction. This is to ensure that the Raft message does not // become too large. - maxIdsPerReap = (1024 * 512) / 36 // 0.5 MB of ids. + maxIdsPerReap = (1024 * 256) / 36 // 0.25 MB of ids. ) // CoreScheduler is a special "scheduler" that is registered @@ -255,7 +255,7 @@ func (c *CoreScheduler) evalReap(evals, allocs []string) error { // necessary to ensure that the Raft transaction does not become too large. func (c *CoreScheduler) partitionReap(evals, allocs []string) []*structs.EvalDeleteRequest { var requests []*structs.EvalDeleteRequest - var submittedEvals, submittedAllocs int + submittedEvals, submittedAllocs := 0, 0 for submittedEvals != len(evals) || submittedAllocs != len(allocs) { req := &structs.EvalDeleteRequest{ WriteRequest: structs.WriteRequest{ @@ -265,29 +265,29 @@ func (c *CoreScheduler) partitionReap(evals, allocs []string) []*structs.EvalDel requests = append(requests, req) available := maxIdsPerReap - // Add the evals first - if remaining := len(evals) - submittedEvals; remaining > 0 { + // Add the allocs first + if remaining := len(allocs) - submittedAllocs; remaining > 0 { if remaining <= available { - req.Evals = evals[submittedEvals:] + req.Allocs = allocs[submittedAllocs:] available -= remaining - submittedEvals += remaining + submittedAllocs += remaining } else { - req.Evals = evals[submittedEvals : submittedEvals+available] - submittedEvals += available + req.Allocs = allocs[submittedAllocs : submittedAllocs+available] + submittedAllocs += available - // Exhausted space so skip adding allocs + // Exhausted space so skip adding evals continue } } - // Add the allocs - if remaining := len(allocs) - submittedAllocs; remaining > 0 { + // Add the evals + if remaining := len(evals) - submittedEvals; remaining > 0 { if remaining <= available { - req.Allocs = allocs[submittedAllocs:] - submittedAllocs += remaining + req.Evals = evals[submittedEvals:] + submittedEvals += remaining } else { - req.Allocs = allocs[submittedAllocs : submittedAllocs+available] - submittedAllocs += available + req.Evals = evals[submittedEvals : submittedEvals+available] + submittedEvals += available } } } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index b33bdac23b2..a60064e5290 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -558,17 +558,17 @@ func TestCoreScheduler_PartitionReap(t *testing.T) { } first := requests[0] - if len(first.Evals) != 2 && len(first.Allocs) != 0 { + if len(first.Allocs) != 2 && len(first.Evals) != 0 { t.Fatalf("Unexpected first request: %v", first) } second := requests[1] - if len(second.Evals) != 1 && len(second.Allocs) != 1 { + if len(second.Allocs) != 1 && len(second.Evals) != 1 { t.Fatalf("Unexpected second request: %v", second) } third := requests[2] - if len(third.Evals) != 0 && len(third.Allocs) != 2 { + if len(third.Allocs) != 0 && len(third.Evals) != 2 { t.Fatalf("Unexpected third request: %v", third) } }