From 2285432424c8f122d1a1f97481dab28246fc2ec0 Mon Sep 17 00:00:00 2001 From: stswidwinski Date: Tue, 31 Jan 2023 13:32:14 -0500 Subject: [PATCH] GC: ensure no leakage of evaluations for batch jobs. (#15097) Prior to 2409f72 the code compared the modification index of a job to itself. Afterwards, the code compared the creation index of the job to itself. In either case there should never be a case of re-parenting of allocs causing the evaluation to trivially always result in false, which leads to unreclaimable memory. Prior to this change allocations and evaluations for batch jobs were never garbage collected until the batch job was explicitly stopped. The new `batch_eval_gc_threshold` server configuration controls how often they are collected. The default threshold is `24h`. --- .changelog/15097.txt | 3 + command/agent/agent.go | 7 + command/agent/config.go | 10 +- command/agent/config_test.go | 2 + nomad/config.go | 8 + nomad/core_sched.go | 58 +- nomad/core_sched_test.go | 581 +++++++++--------- website/content/docs/configuration/server.mdx | 10 +- 8 files changed, 355 insertions(+), 324 deletions(-) create mode 100644 .changelog/15097.txt diff --git a/.changelog/15097.txt b/.changelog/15097.txt new file mode 100644 index 00000000000..15a495d9aa5 --- /dev/null +++ b/.changelog/15097.txt @@ -0,0 +1,3 @@ +```release-note:breaking-change +core: Ensure no leakage of evaluations for batch jobs. Prior to this change allocations and evaluations for batch jobs were never garbage collected until the batch job was explicitly stopped. The new `batch_eval_gc_threshold` server configuration controls how often they are collected. The default threshold is `24h`. +``` diff --git a/command/agent/agent.go b/command/agent/agent.go index 4141c28e2ad..00082e9639c 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -390,6 +390,13 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { } conf.EvalGCThreshold = dur } + if gcThreshold := agentConfig.Server.BatchEvalGCThreshold; gcThreshold != "" { + dur, err := time.ParseDuration(gcThreshold) + if err != nil { + return nil, err + } + conf.BatchEvalGCThreshold = dur + } if gcThreshold := agentConfig.Server.DeploymentGCThreshold; gcThreshold != "" { dur, err := time.ParseDuration(gcThreshold) if err != nil { diff --git a/command/agent/config.go b/command/agent/config.go index 7e06a28792c..2a2669f1d77 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -473,9 +473,14 @@ type ServerConfig struct { // EvalGCThreshold controls how "old" an eval must be to be collected by GC. // Age is not the only requirement for a eval to be GCed but the threshold - // can be used to filter by age. + // can be used to filter by age. Please note that batch job evaluations are + // controlled by 'BatchEvalGCThreshold' instead. EvalGCThreshold string `hcl:"eval_gc_threshold"` + // BatchEvalGCThreshold controls how "old" an evaluation must be to be eligible + // for GC if the eval belongs to a batch job. + BatchEvalGCThreshold string `hcl:"batch_eval_gc_threshold"` + // DeploymentGCThreshold controls how "old" a deployment must be to be // collected by GC. Age is not the only requirement for a deployment to be // GCed but the threshold can be used to filter by age. @@ -1861,6 +1866,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig { if b.EvalGCThreshold != "" { result.EvalGCThreshold = b.EvalGCThreshold } + if b.BatchEvalGCThreshold != "" { + result.BatchEvalGCThreshold = b.BatchEvalGCThreshold + } if b.DeploymentGCThreshold != "" { result.DeploymentGCThreshold = b.DeploymentGCThreshold } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index bba3e5ea3a4..dc5d6f6468d 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -140,6 +140,7 @@ func TestConfig_Merge(t *testing.T) { RaftMultiplier: pointer.Of(5), NumSchedulers: pointer.Of(1), NodeGCThreshold: "1h", + BatchEvalGCThreshold: "4h", HeartbeatGrace: 30 * time.Second, MinHeartbeatTTL: 30 * time.Second, MaxHeartbeatsPerSecond: 30.0, @@ -339,6 +340,7 @@ func TestConfig_Merge(t *testing.T) { NumSchedulers: pointer.Of(2), EnabledSchedulers: []string{structs.JobTypeBatch}, NodeGCThreshold: "12h", + BatchEvalGCThreshold: "4h", HeartbeatGrace: 2 * time.Minute, MinHeartbeatTTL: 2 * time.Minute, MaxHeartbeatsPerSecond: 200.0, diff --git a/nomad/config.go b/nomad/config.go index 7df5335e04b..5442e90dbeb 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -153,8 +153,15 @@ type Config struct { // EvalGCThreshold is how "old" an evaluation must be to be eligible // for GC. This gives users some time to debug a failed evaluation. + // + // Please note that the rules for GC of evaluations which belong to a batch + // job are separate and controlled by `BatchEvalGCThreshold` EvalGCThreshold time.Duration + // BatchEvalGCThreshold is how "old" an evaluation must be to be eligible + // for GC if the eval belongs to a batch job. + BatchEvalGCThreshold time.Duration + // JobGCInterval is how often we dispatch a job to GC jobs that are // available for garbage collection. JobGCInterval time.Duration @@ -460,6 +467,7 @@ func DefaultConfig() *Config { ReconcileInterval: 60 * time.Second, EvalGCInterval: 5 * time.Minute, EvalGCThreshold: 1 * time.Hour, + BatchEvalGCThreshold: 24 * time.Hour, JobGCInterval: 5 * time.Minute, JobGCThreshold: 4 * time.Hour, NodeGCInterval: 5 * time.Minute, diff --git a/nomad/core_sched.go b/nomad/core_sched.go index f4e2aa48b3d..023103280f2 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -240,15 +240,20 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { oldThreshold := c.getThreshold(eval, "eval", "eval_gc_threshold", c.srv.config.EvalGCThreshold) + batchOldThreshold := c.getThreshold(eval, "eval", + "batch_eval_gc_threshold", c.srv.config.BatchEvalGCThreshold) // Collect the allocations and evaluations to GC var gcAlloc, gcEval []string for raw := iter.Next(); raw != nil; raw = iter.Next() { eval := raw.(*structs.Evaluation) - // The Evaluation GC should not handle batch jobs since those need to be - // garbage collected in one shot - gc, allocs, err := c.gcEval(eval, oldThreshold, false) + gcThreshold := oldThreshold + if eval.Type == structs.JobTypeBatch { + gcThreshold = batchOldThreshold + } + + gc, allocs, err := c.gcEval(eval, gcThreshold, false) if err != nil { return err } @@ -299,33 +304,26 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, } // If the eval is from a running "batch" job we don't want to garbage - // collect its allocations. If there is a long running batch job and its - // terminal allocations get GC'd the scheduler would re-run the - // allocations. + // collect its most current allocations. If there is a long running batch job and its + // terminal allocations get GC'd the scheduler would re-run the allocations. However, + // we do want to GC old Evals and Allocs if there are newer ones due to update. + // + // The age of the evaluation must also reach the threshold configured to be GCed so that + // one may debug old evaluations and referenced allocations. if eval.Type == structs.JobTypeBatch { // Check if the job is running - // Can collect if: - // Job doesn't exist - // Job is Stopped and dead - // allowBatch and the job is dead - collect := false - if job == nil { - collect = true - } else if job.Status != structs.JobStatusDead { - collect = false - } else if job.Stop { - collect = true - } else if allowBatch { - collect = true - } - - // We don't want to gc anything related to a job which is not dead - // If the batch job doesn't exist we can GC it regardless of allowBatch + // Can collect if either holds: + // - Job doesn't exist + // - Job is Stopped and dead + // - allowBatch and the job is dead + // + // If we cannot collect outright, check if a partial GC may occur + collect := job == nil || job.Status == structs.JobStatusDead && (job.Stop || allowBatch) if !collect { - // Find allocs associated with older (based on createindex) and GC them if terminal - oldAllocs := olderVersionTerminalAllocs(allocs, job) - return false, oldAllocs, nil + oldAllocs := olderVersionTerminalAllocs(allocs, job, thresholdIndex) + gcEval := (len(oldAllocs) == len(allocs)) + return gcEval, oldAllocs, nil } } @@ -346,12 +344,12 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, return gcEval, gcAllocIDs, nil } -// olderVersionTerminalAllocs returns terminal allocations whose job create index -// is older than the job's create index -func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string { +// olderVersionTerminalAllocs returns a list of terminal allocations that belong to the evaluation and may be +// GCed. +func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, thresholdIndex uint64) []string { var ret []string for _, alloc := range allocs { - if alloc.Job != nil && alloc.Job.CreateIndex < job.CreateIndex && alloc.TerminalStatus() { + if alloc.CreateIndex < job.JobModifyIndex && alloc.ModifyIndex < thresholdIndex && alloc.TerminalStatus() { ret = append(ret, alloc.ID) } } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index db374fdda19..59c2538e36c 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -293,324 +294,320 @@ func TestCoreScheduler_EvalGC_StoppedJob_Reschedulable(t *testing.T) { func TestCoreScheduler_EvalGC_Batch(t *testing.T) { ci.Parallel(t) - s1, cleanupS1 := TestServer(t, nil) + s1, cleanupS1 := TestServer(t, func(c *Config) { + // Set EvalGCThreshold past BatchEvalThreshold to make sure that only + // BatchEvalThreshold affects the results. + c.BatchEvalGCThreshold = time.Hour + c.EvalGCThreshold = 2 * time.Hour + }) defer cleanupS1() testutil.WaitForLeader(t, s1.RPC) // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 - s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) - - // Insert a "dead" job - store := s1.fsm.State() - job := mock.Job() - job.Type = structs.JobTypeBatch - job.Status = structs.JobStatusDead - err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Insert "complete" eval - eval := mock.Eval() - eval.Status = structs.EvalStatusComplete - eval.Type = structs.JobTypeBatch - eval.JobID = job.ID - err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval}) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Insert "failed" alloc - alloc := mock.Alloc() - alloc.Job = job - alloc.JobID = job.ID - alloc.EvalID = eval.ID - alloc.DesiredStatus = structs.AllocDesiredStatusStop - - // Insert "lost" alloc - alloc2 := mock.Alloc() - alloc2.Job = job - alloc2.JobID = job.ID - alloc2.EvalID = eval.ID - alloc2.DesiredStatus = structs.AllocDesiredStatusRun - alloc2.ClientStatus = structs.AllocClientStatusLost - - err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2}) - if err != nil { - t.Fatalf("err: %v", err) - } + s1.fsm.timetable.table = make([]TimeTableEntry, 2, 10) - // Update the time tables to make this work - tt := s1.fsm.TimeTable() - tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) - - // Create a core scheduler - snap, err := store.Snapshot() - if err != nil { - t.Fatalf("err: %v", err) - } - core := NewCoreScheduler(s1, snap) + var jobModifyIdx uint64 = 1000 - // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) - err = core.Process(gc) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Nothing should be gone - ws := memdb.NewWatchSet() - out, err := store.EvalByID(ws, eval.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out == nil { - t.Fatalf("bad: %v", out) - } - - outA, err := store.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA == nil { - t.Fatalf("bad: %v", outA) - } - - outA2, err := store.AllocByID(ws, alloc2.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA2 == nil { - t.Fatalf("bad: %v", outA2) - } - - outB, err := store.JobByID(ws, job.Namespace, job.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outB == nil { - t.Fatalf("bad: %v", outB) - } -} - -// An EvalGC should reap allocations from jobs with an older modify index -func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) { - ci.Parallel(t) - - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - testutil.WaitForLeader(t, s1.RPC) - - // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 - s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) - - // Insert a "dead" job + // A "stopped" job containing one "complete" eval with one terminal allocation. store := s1.fsm.State() - job := mock.Job() - job.Type = structs.JobTypeBatch - job.Status = structs.JobStatusDead - err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Insert "complete" eval - eval := mock.Eval() - eval.Status = structs.EvalStatusComplete - eval.Type = structs.JobTypeBatch - eval.JobID = job.ID - err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval}) - if err != nil { - t.Fatalf("err: %v", err) + stoppedJob := mock.Job() + stoppedJob.Type = structs.JobTypeBatch + stoppedJob.Status = structs.JobStatusDead + stoppedJob.Stop = true + stoppedJob.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: 0, + Interval: 0 * time.Second, } + err := store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx+1, stoppedJob) + must.NoError(t, err) + + stoppedJobEval := mock.Eval() + stoppedJobEval.Status = structs.EvalStatusComplete + stoppedJobEval.Type = structs.JobTypeBatch + stoppedJobEval.JobID = stoppedJob.ID + err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+2, []*structs.Evaluation{stoppedJobEval}) + must.NoError(t, err) + + stoppedJobStoppedAlloc := mock.Alloc() + stoppedJobStoppedAlloc.Job = stoppedJob + stoppedJobStoppedAlloc.JobID = stoppedJob.ID + stoppedJobStoppedAlloc.EvalID = stoppedJobEval.ID + stoppedJobStoppedAlloc.DesiredStatus = structs.AllocDesiredStatusStop + stoppedJobStoppedAlloc.ClientStatus = structs.AllocClientStatusFailed + + stoppedJobLostAlloc := mock.Alloc() + stoppedJobLostAlloc.Job = stoppedJob + stoppedJobLostAlloc.JobID = stoppedJob.ID + stoppedJobLostAlloc.EvalID = stoppedJobEval.ID + stoppedJobLostAlloc.DesiredStatus = structs.AllocDesiredStatusRun + stoppedJobLostAlloc.ClientStatus = structs.AllocClientStatusLost + + err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+3, []*structs.Allocation{stoppedJobStoppedAlloc, stoppedJobLostAlloc}) + must.NoError(t, err) + + // A "dead" job containing one "complete" eval with: + // 1. A "stopped" alloc + // 2. A "lost" alloc + // Both allocs upserted at 1002. + deadJob := mock.Job() + deadJob.Type = structs.JobTypeBatch + deadJob.Status = structs.JobStatusDead + err = store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, deadJob) + must.NoError(t, err) + + deadJobEval := mock.Eval() + deadJobEval.Status = structs.EvalStatusComplete + deadJobEval.Type = structs.JobTypeBatch + deadJobEval.JobID = deadJob.ID + err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{deadJobEval}) + must.NoError(t, err) + + stoppedAlloc := mock.Alloc() + stoppedAlloc.Job = deadJob + stoppedAlloc.JobID = deadJob.ID + stoppedAlloc.EvalID = deadJobEval.ID + stoppedAlloc.DesiredStatus = structs.AllocDesiredStatusStop + stoppedAlloc.ClientStatus = structs.AllocClientStatusFailed + + lostAlloc := mock.Alloc() + lostAlloc.Job = deadJob + lostAlloc.JobID = deadJob.ID + lostAlloc.EvalID = deadJobEval.ID + lostAlloc.DesiredStatus = structs.AllocDesiredStatusRun + lostAlloc.ClientStatus = structs.AllocClientStatusLost + + err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+2, []*structs.Allocation{stoppedAlloc, lostAlloc}) + must.NoError(t, err) + + // An "alive" job #2 containing two complete evals. The first with: + // 1. A "lost" alloc + // 2. A "running" alloc + // Both allocs upserted at 999 + // + // The second with just terminal allocs: + // 1. A "completed" alloc + // All allocs upserted at 999. The eval upserted at 999 as well. + activeJob := mock.Job() + activeJob.Type = structs.JobTypeBatch + activeJob.Status = structs.JobStatusDead + err = store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, activeJob) + must.NoError(t, err) + + activeJobEval := mock.Eval() + activeJobEval.Status = structs.EvalStatusComplete + activeJobEval.Type = structs.JobTypeBatch + activeJobEval.JobID = activeJob.ID + err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{activeJobEval}) + must.NoError(t, err) + + activeJobRunningAlloc := mock.Alloc() + activeJobRunningAlloc.Job = activeJob + activeJobRunningAlloc.JobID = activeJob.ID + activeJobRunningAlloc.EvalID = activeJobEval.ID + activeJobRunningAlloc.DesiredStatus = structs.AllocDesiredStatusRun + activeJobRunningAlloc.ClientStatus = structs.AllocClientStatusRunning + + activeJobLostAlloc := mock.Alloc() + activeJobLostAlloc.Job = activeJob + activeJobLostAlloc.JobID = activeJob.ID + activeJobLostAlloc.EvalID = activeJobEval.ID + activeJobLostAlloc.DesiredStatus = structs.AllocDesiredStatusRun + activeJobLostAlloc.ClientStatus = structs.AllocClientStatusLost + + err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{activeJobRunningAlloc, activeJobLostAlloc}) + must.NoError(t, err) + + activeJobCompleteEval := mock.Eval() + activeJobCompleteEval.Status = structs.EvalStatusComplete + activeJobCompleteEval.Type = structs.JobTypeBatch + activeJobCompleteEval.JobID = activeJob.ID + err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Evaluation{activeJobCompleteEval}) + must.NoError(t, err) + + activeJobCompletedEvalCompletedAlloc := mock.Alloc() + activeJobCompletedEvalCompletedAlloc.Job = activeJob + activeJobCompletedEvalCompletedAlloc.JobID = activeJob.ID + activeJobCompletedEvalCompletedAlloc.EvalID = activeJobCompleteEval.ID + activeJobCompletedEvalCompletedAlloc.DesiredStatus = structs.AllocDesiredStatusStop + activeJobCompletedEvalCompletedAlloc.ClientStatus = structs.AllocClientStatusComplete + + err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{activeJobCompletedEvalCompletedAlloc}) + must.NoError(t, err) + + // A job that ran once and was then purged. + purgedJob := mock.Job() + purgedJob.Type = structs.JobTypeBatch + purgedJob.Status = structs.JobStatusDead + err = store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, purgedJob) + must.NoError(t, err) + + purgedJobEval := mock.Eval() + purgedJobEval.Status = structs.EvalStatusComplete + purgedJobEval.Type = structs.JobTypeBatch + purgedJobEval.JobID = purgedJob.ID + err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{purgedJobEval}) + must.NoError(t, err) + + purgedJobCompleteAlloc := mock.Alloc() + purgedJobCompleteAlloc.Job = purgedJob + purgedJobCompleteAlloc.JobID = purgedJob.ID + purgedJobCompleteAlloc.EvalID = purgedJobEval.ID + purgedJobCompleteAlloc.DesiredStatus = structs.AllocDesiredStatusRun + purgedJobCompleteAlloc.ClientStatus = structs.AllocClientStatusLost + + err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{purgedJobCompleteAlloc}) + must.NoError(t, err) + + purgedJobCompleteEval := mock.Eval() + purgedJobCompleteEval.Status = structs.EvalStatusComplete + purgedJobCompleteEval.Type = structs.JobTypeBatch + purgedJobCompleteEval.JobID = purgedJob.ID + err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Evaluation{purgedJobCompleteEval}) + must.NoError(t, err) + + // Purge job. + err = store.DeleteJob(jobModifyIdx, purgedJob.Namespace, purgedJob.ID) + must.NoError(t, err) + + // A little helper for assertions + assertCorrectJobEvalAlloc := func( + ws memdb.WatchSet, + jobsShouldExist []*structs.Job, + jobsShouldNotExist []*structs.Job, + evalsShouldExist []*structs.Evaluation, + evalsShouldNotExist []*structs.Evaluation, + allocsShouldExist []*structs.Allocation, + allocsShouldNotExist []*structs.Allocation, + ) { + t.Helper() + for _, job := range jobsShouldExist { + out, err := store.JobByID(ws, job.Namespace, job.ID) + must.NoError(t, err) + must.NotNil(t, out) + } - // Insert "failed" alloc - alloc := mock.Alloc() - alloc.Job = job - alloc.JobID = job.ID - alloc.EvalID = eval.ID - alloc.DesiredStatus = structs.AllocDesiredStatusStop + for _, job := range jobsShouldNotExist { + out, err := store.JobByID(ws, job.Namespace, job.ID) + must.NoError(t, err) + must.Nil(t, out) + } - // Insert "lost" alloc - alloc2 := mock.Alloc() - alloc2.Job = job - alloc2.JobID = job.ID - alloc2.EvalID = eval.ID - alloc2.DesiredStatus = structs.AllocDesiredStatusRun - alloc2.ClientStatus = structs.AllocClientStatusLost + for _, eval := range evalsShouldExist { + out, err := store.EvalByID(ws, eval.ID) + must.NoError(t, err) + must.NotNil(t, out) + } - // Insert alloc with older job modifyindex - alloc3 := mock.Alloc() - job2 := job.Copy() + for _, eval := range evalsShouldNotExist { + out, err := store.EvalByID(ws, eval.ID) + must.NoError(t, err) + must.Nil(t, out) + } - alloc3.Job = job2 - alloc3.JobID = job2.ID - alloc3.EvalID = eval.ID - job2.CreateIndex = 500 - alloc3.DesiredStatus = structs.AllocDesiredStatusRun - alloc3.ClientStatus = structs.AllocClientStatusLost + for _, alloc := range allocsShouldExist { + outA, err := store.AllocByID(ws, alloc.ID) + must.NoError(t, err) + must.NotNil(t, outA) + } - err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2, alloc3}) - if err != nil { - t.Fatalf("err: %v", err) + for _, alloc := range allocsShouldNotExist { + outA, err := store.AllocByID(ws, alloc.ID) + must.NoError(t, err) + must.Nil(t, outA) + } } - // Update the time tables to make this work - tt := s1.fsm.TimeTable() - tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) - // Create a core scheduler snap, err := store.Snapshot() - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) core := NewCoreScheduler(s1, snap) - // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) + // Attempt the GC without moving the time at all + gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx) err = core.Process(gc) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Alloc1 and 2 should be there, and alloc3 should be gone - ws := memdb.NewWatchSet() - out, err := store.EvalByID(ws, eval.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out == nil { - t.Fatalf("bad: %v", out) - } - - outA, err := store.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA == nil { - t.Fatalf("bad: %v", outA) - } - - outA2, err := store.AllocByID(ws, alloc2.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA2 == nil { - t.Fatalf("bad: %v", outA2) - } - - outA3, err := store.AllocByID(ws, alloc3.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA3 != nil { - t.Fatalf("expected alloc to be nil:%v", outA2) - } - - outB, err := store.JobByID(ws, job.Namespace, job.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outB == nil { - t.Fatalf("bad: %v", outB) - } -} - -// An EvalGC should reap a batch job that has been stopped -func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) { - ci.Parallel(t) - - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - testutil.WaitForLeader(t, s1.RPC) - - // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 - s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) - - // Create a "dead" job - store := s1.fsm.State() - job := mock.Job() - job.Type = structs.JobTypeBatch - job.Status = structs.JobStatusDead - job.Stop = true - job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ - Attempts: 0, - Interval: 0 * time.Second, - } - err := store.UpsertJob(structs.MsgTypeTestSetup, 1001, job) - require.Nil(t, err) - - // Insert "complete" eval - eval := mock.Eval() - eval.Status = structs.EvalStatusComplete - eval.Type = structs.JobTypeBatch - eval.JobID = job.ID - err = store.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval}) - require.Nil(t, err) - - // Insert "failed" alloc - alloc := mock.Alloc() - alloc.JobID = job.ID - alloc.EvalID = eval.ID - alloc.TaskGroup = job.TaskGroups[0].Name - alloc.DesiredStatus = structs.AllocDesiredStatusStop - - // Insert "lost" alloc - alloc2 := mock.Alloc() - alloc2.JobID = job.ID - alloc2.EvalID = eval.ID - alloc2.DesiredStatus = structs.AllocDesiredStatusRun - alloc2.ClientStatus = structs.AllocClientStatusLost - alloc2.TaskGroup = job.TaskGroups[0].Name - - err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc, alloc2}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) + + // Nothing is gone + assertCorrectJobEvalAlloc( + memdb.NewWatchSet(), + []*structs.Job{deadJob, activeJob, stoppedJob}, + []*structs.Job{}, + []*structs.Evaluation{ + deadJobEval, + activeJobEval, activeJobCompleteEval, + stoppedJobEval, + purgedJobEval, + }, + []*structs.Evaluation{}, + []*structs.Allocation{ + stoppedAlloc, lostAlloc, + activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc, + stoppedJobStoppedAlloc, stoppedJobLostAlloc, + purgedJobCompleteAlloc, + }, + []*structs.Allocation{}, + ) - // Update the time tables to make this work + // Update the time tables by half of the BatchEvalGCThreshold which is too + // small to GC anything. tt := s1.fsm.TimeTable() - tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) + tt.Witness(2*jobModifyIdx, time.Now().UTC().Add((-1)*s1.config.BatchEvalGCThreshold/2)) - // Create a core scheduler - snap, err := store.Snapshot() - if err != nil { - t.Fatalf("err: %v", err) - } - core := NewCoreScheduler(s1, snap) - - // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) + gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2) err = core.Process(gc) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Everything should be gone - ws := memdb.NewWatchSet() - out, err := store.EvalByID(ws, eval.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != nil { - t.Fatalf("bad: %v", out) - } + must.NoError(t, err) + + // Nothing is gone. + assertCorrectJobEvalAlloc( + memdb.NewWatchSet(), + []*structs.Job{deadJob, activeJob, stoppedJob}, + []*structs.Job{}, + []*structs.Evaluation{ + deadJobEval, + activeJobEval, activeJobCompleteEval, + stoppedJobEval, + purgedJobEval, + }, + []*structs.Evaluation{}, + []*structs.Allocation{ + stoppedAlloc, lostAlloc, + activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc, + stoppedJobStoppedAlloc, stoppedJobLostAlloc, + purgedJobCompleteAlloc, + }, + []*structs.Allocation{}, + ) - outA, err := store.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA != nil { - t.Fatalf("bad: %v", outA) - } + // Update the time tables so that BatchEvalGCThreshold has elapsed. + s1.fsm.timetable.table = make([]TimeTableEntry, 2, 10) + tt = s1.fsm.TimeTable() + tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.BatchEvalGCThreshold)) - outA2, err := store.AllocByID(ws, alloc2.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA2 != nil { - t.Fatalf("bad: %v", outA2) - } + gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2) + err = core.Process(gc) + must.NoError(t, err) + + // We expect the following: + // + // 1. The stopped job remains, but its evaluation and allocations are both removed. + // 2. The dead job remains with its evaluation and allocations intact. This is because + // for them the BatchEvalGCThreshold has not yet elapsed (their modification idx are larger + // than that of the job). + // 3. The active job remains since it is active, even though the allocations are otherwise + // eligible for GC. However, the inactive allocation is GCed for it. + // 4. The eval and allocation for the purged job are GCed. + assertCorrectJobEvalAlloc( + memdb.NewWatchSet(), + []*structs.Job{deadJob, activeJob, stoppedJob}, + []*structs.Job{}, + []*structs.Evaluation{deadJobEval, activeJobEval}, + []*structs.Evaluation{activeJobCompleteEval, stoppedJobEval, purgedJobEval}, + []*structs.Allocation{stoppedAlloc, lostAlloc, activeJobRunningAlloc}, + []*structs.Allocation{ + activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc, + stoppedJobLostAlloc, stoppedJobLostAlloc, + purgedJobCompleteAlloc, + }) } func TestCoreScheduler_EvalGC_Partial(t *testing.T) { diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 4d9e36e4505..5b87695d23f 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -93,7 +93,15 @@ server { - `eval_gc_threshold` `(string: "1h")` - Specifies the minimum time an evaluation must be in the terminal state before it is eligible for garbage - collection. This is specified using a label suffix like "30s" or "1h". + collection. This is specified using a label suffix like "30s" or "1h". Note + that batch job evaluations are controlled via `batch_eval_gc_threshold`. + +- `batch_eval_gc_threshold` `(string: "24h")` - Specifies the minimum time an + evaluation stemming from a batch job must be in the terminal state before it is + eligible for garbage collection. This is specified using a label suffix like + "30s" or "1h". Note that the threshold is a necessary but insufficient condition + for collection, and the most recent evaluation won't be garbage collected even if + it breaches the threshold. - `deployment_gc_threshold` `(string: "1h")` - Specifies the minimum time a deployment must be in the terminal state before it is eligible for garbage