Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[15090] Ensure no leakage of evaluations for batch jobs. #15097

Merged
merged 12 commits into from
Jan 31, 2023
3 changes: 3 additions & 0 deletions .changelog/15090.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:breaking-change
core: ensure no leakage of evaluations for batch jobs.
```
7 changes: 7 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,13 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
}
conf.EvalGCThreshold = dur
}
if gcThreshold := agentConfig.Server.BatchEvalGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.BatchEvalGCThreshold = dur
}
if gcThreshold := agentConfig.Server.DeploymentGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,14 @@ type ServerConfig struct {

// EvalGCThreshold controls how "old" an eval must be to be collected by GC.
// Age is not the only requirement for a eval to be GCed but the threshold
// can be used to filter by age.
// can be used to filter by age. Please note that batch job evaluations are
// controlled by 'BatchEvalGCThreshold' instead.
EvalGCThreshold string `hcl:"eval_gc_threshold"`

// BatchEvalGCThreshold controls how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold string `hcl:"batch_eval_gc_threshold"`
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved

// DeploymentGCThreshold controls how "old" a deployment must be to be
// collected by GC. Age is not the only requirement for a deployment to be
// GCed but the threshold can be used to filter by age.
Expand Down Expand Up @@ -1827,6 +1832,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.EvalGCThreshold != "" {
result.EvalGCThreshold = b.EvalGCThreshold
}
if b.BatchEvalGCThreshold != "" {
result.BatchEvalGCThreshold = b.BatchEvalGCThreshold
}
if b.DeploymentGCThreshold != "" {
result.DeploymentGCThreshold = b.DeploymentGCThreshold
}
Expand Down
2 changes: 2 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func TestConfig_Merge(t *testing.T) {
RaftMultiplier: pointer.Of(5),
NumSchedulers: pointer.Of(1),
NodeGCThreshold: "1h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 30 * time.Second,
MinHeartbeatTTL: 30 * time.Second,
MaxHeartbeatsPerSecond: 30.0,
Expand Down Expand Up @@ -339,6 +340,7 @@ func TestConfig_Merge(t *testing.T) {
NumSchedulers: pointer.Of(2),
EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 2 * time.Minute,
MinHeartbeatTTL: 2 * time.Minute,
MaxHeartbeatsPerSecond: 200.0,
Expand Down
8 changes: 8 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ type Config struct {

// EvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC. This gives users some time to debug a failed evaluation.
//
// Please note that the rules for GC of evaluations which belong to a batch
// job are separate and controlled by `BatchEvalGCThreshold`
EvalGCThreshold time.Duration

// BatchEvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold time.Duration

// JobGCInterval is how often we dispatch a job to GC jobs that are
// available for garbage collection.
JobGCInterval time.Duration
Expand Down Expand Up @@ -444,6 +451,7 @@ func DefaultConfig() *Config {
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
BatchEvalGCThreshold: 24 * time.Hour,
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
Expand Down
58 changes: 28 additions & 30 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,20 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {

oldThreshold := c.getThreshold(eval, "eval",
"eval_gc_threshold", c.srv.config.EvalGCThreshold)
batchOldThreshold := c.getThreshold(eval, "eval",
"batch_eval_gc_threshold", c.srv.config.BatchEvalGCThreshold)

// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string
for raw := iter.Next(); raw != nil; raw = iter.Next() {
eval := raw.(*structs.Evaluation)

// The Evaluation GC should not handle batch jobs since those need to be
// garbage collected in one shot
gc, allocs, err := c.gcEval(eval, oldThreshold, false)
gcThreshold := oldThreshold
if eval.Type == structs.JobTypeBatch {
gcThreshold = batchOldThreshold
}

gc, allocs, err := c.gcEval(eval, gcThreshold, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -297,33 +302,26 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
}

// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
// allocations.
// collect its most current allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the allocations. However,
// we do want to GC old Evals and Allocs if there are newer ones due to update.
//
// The age of the evaluation must also reach the threshold configured to be GCed so that
// one may debug old evaluations and referenced allocations.
if eval.Type == structs.JobTypeBatch {
// Check if the job is running

// Can collect if:
// Job doesn't exist
// Job is Stopped and dead
// allowBatch and the job is dead
collect := false
if job == nil {
collect = true
} else if job.Status != structs.JobStatusDead {
collect = false
} else if job.Stop {
collect = true
} else if allowBatch {
collect = true
}

// We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch
// Can collect if either holds:
// - Job doesn't exist
// - Job is Stopped and dead
// - allowBatch and the job is dead
//
// If we cannot collect outright, check if a partial GC may occur
collect := job == nil || job.Status == structs.JobStatusDead && (job.Stop || allowBatch)
if !collect {
// Find allocs associated with older (based on createindex) and GC them if terminal
oldAllocs := olderVersionTerminalAllocs(allocs, job)
return false, oldAllocs, nil
oldAllocs := olderVersionTerminalAllocs(allocs, job, thresholdIndex)
gcEval := (len(oldAllocs) == len(allocs))
return gcEval, oldAllocs, nil
}
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -344,12 +342,12 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return gcEval, gcAllocIDs, nil
}

// olderVersionTerminalAllocs returns terminal allocations whose job create index
// is older than the job's create index
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string {
// olderVersionTerminalAllocs returns a list of terminal allocations that belong to the evaluation and may be
// GCed.
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, thresholdIndex uint64) []string {
var ret []string
for _, alloc := range allocs {
if alloc.Job != nil && alloc.Job.CreateIndex < job.CreateIndex && alloc.TerminalStatus() {
if alloc.CreateIndex < job.JobModifyIndex && alloc.ModifyIndex < thresholdIndex && alloc.TerminalStatus() {
stswidwinski marked this conversation as resolved.
Show resolved Hide resolved
ret = append(ret, alloc.ID)
}
}
Expand Down
Loading