Skip to content

Commit

Permalink
backport of commit 6ccfcc3
Browse files Browse the repository at this point in the history
  • Loading branch information
pkazmierczak authored Nov 21, 2024
1 parent 824ce0c commit 449cff6
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 72 deletions.
3 changes: 3 additions & 0 deletions .changelog/24456.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fix bug where forced garbage collection does not ignore GC thresholds
```
123 changes: 60 additions & 63 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -28,51 +29,50 @@ type CoreScheduler struct {
snap *state.StateSnapshot
logger log.Logger

// custom GC Threshold values can be used by unit tests to simulate time
// manipulation
customJobGCThreshold time.Duration
customEvalGCThreshold time.Duration
customBatchEvalGCThreshold time.Duration
customNodeGCThreshold time.Duration
customDeploymentGCThreshold time.Duration
customCSIVolumeClaimGCThreshold time.Duration
customCSIPluginGCThreshold time.Duration
customACLTokenExpirationGCThreshold time.Duration
customRootKeyGCThreshold time.Duration
// customThresholdForObject is used by unit tests that want to manipulate GC
// threshold settings. Users can pass the string that matches the object to GC
// (e.g., structs.CoreJobEvalGC) and time.Duration that will be used as GC
// threshold value.
customThresholdForObject map[string]*time.Duration
}

// NewCoreScheduler is used to return a new system scheduler instance
func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Scheduler {
s := &CoreScheduler{
srv: srv,
snap: snap,
logger: srv.logger.ResetNamed("core.sched"),
srv: srv,
snap: snap,
logger: srv.logger.ResetNamed("core.sched"),
customThresholdForObject: make(map[string]*time.Duration),
}
return s
}

// Process is used to implement the scheduler.Scheduler interface
func (c *CoreScheduler) Process(eval *structs.Evaluation) error {
job := strings.Split(eval.JobID, ":") // extra data can be smuggled in w/ JobID

// check if there are any custom threshold values set
customThreshold := c.customThresholdForObject[job[0]]

switch job[0] {
case structs.CoreJobEvalGC:
return c.evalGC()
return c.evalGC(customThreshold)
case structs.CoreJobNodeGC:
return c.nodeGC(eval)
return c.nodeGC(eval, customThreshold)
case structs.CoreJobJobGC:
return c.jobGC(eval)
return c.jobGC(eval, customThreshold)
case structs.CoreJobDeploymentGC:
return c.deploymentGC()
return c.deploymentGC(customThreshold)
case structs.CoreJobCSIVolumeClaimGC:
return c.csiVolumeClaimGC(eval)
return c.csiVolumeClaimGC(eval, customThreshold)
case structs.CoreJobCSIPluginGC:
return c.csiPluginGC(eval)
return c.csiPluginGC(eval, customThreshold)
case structs.CoreJobOneTimeTokenGC:
return c.expiredOneTimeTokenGC(eval)
case structs.CoreJobLocalTokenExpiredGC:
return c.expiredACLTokenGC(eval, false)
return c.expiredACLTokenGC(eval, false, customThreshold)
case structs.CoreJobGlobalTokenExpiredGC:
return c.expiredACLTokenGC(eval, true)
return c.expiredACLTokenGC(eval, true, customThreshold)
case structs.CoreJobRootKeyRotateOrGC:
return c.rootKeyRotateOrGC(eval)
case structs.CoreJobVariablesRekey:
Expand All @@ -86,40 +86,44 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error {

// forceGC is used to garbage collect all eligible objects.
func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error {
if err := c.jobGC(eval); err != nil {
// set a minimal threshold for all objects to make force GC possible
force := pointer.Of(time.Millisecond)

if err := c.jobGC(eval, force); err != nil {
return err
}
if err := c.evalGC(); err != nil {
if err := c.evalGC(force); err != nil {
return err
}
if err := c.deploymentGC(); err != nil {
if err := c.deploymentGC(force); err != nil {
return err
}
if err := c.csiPluginGC(eval); err != nil {
if err := c.csiPluginGC(eval, force); err != nil {
return err
}
if err := c.csiVolumeClaimGC(eval); err != nil {
if err := c.csiVolumeClaimGC(eval, force); err != nil {
return err
}
if err := c.expiredOneTimeTokenGC(eval); err != nil {
return err
}
if err := c.expiredACLTokenGC(eval, false); err != nil {
if err := c.expiredACLTokenGC(eval, false, force); err != nil {
return err
}
if err := c.expiredACLTokenGC(eval, true); err != nil {
if err := c.expiredACLTokenGC(eval, true, force); err != nil {
return err
}
if err := c.rootKeyGC(eval, time.Now()); err != nil {
return err
}

// Node GC must occur after the others to ensure the allocations are
// cleared.
return c.nodeGC(eval)
return c.nodeGC(eval, force)
}

// jobGC is used to garbage collect eligible jobs.
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
func (c *CoreScheduler) jobGC(eval *structs.Evaluation, customThreshold *time.Duration) error {
// Get all the jobs eligible for garbage collection.
ws := memdb.NewWatchSet()
iter, err := c.snap.JobsByGC(ws, true)
Expand All @@ -131,8 +135,8 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
threshold = c.srv.config.JobGCThreshold

// custom threshold override
if c.customJobGCThreshold != 0 {
threshold = c.customJobGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}

cutoffTime := c.getCutoffTime(threshold)
Expand Down Expand Up @@ -263,7 +267,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string,
}

// evalGC is used to garbage collect old evaluations
func (c *CoreScheduler) evalGC() error {
func (c *CoreScheduler) evalGC(customThreshold *time.Duration) error {
// Iterate over the evaluations
ws := memdb.NewWatchSet()
iter, err := c.snap.Evals(ws, false)
Expand All @@ -276,11 +280,9 @@ func (c *CoreScheduler) evalGC() error {
batchThreshold = c.srv.config.BatchEvalGCThreshold

// custom threshold override
if c.customEvalGCThreshold != 0 {
threshold = c.customEvalGCThreshold
}
if c.customBatchEvalGCThreshold != 0 {
batchThreshold = c.customBatchEvalGCThreshold
if customThreshold != nil {
threshold = *customThreshold
batchThreshold = *customThreshold
}

cutoffTime := c.getCutoffTime(threshold)
Expand Down Expand Up @@ -376,8 +378,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, cutoffTime time.Time, a
var gcAllocIDs []string
for _, alloc := range allocs {
if !allocGCEligible(alloc, job, time.Now().UTC(), cutoffTime) {
// Can't GC the evaluation since not all of the allocations are
// terminal
// Can't GC the evaluation since not all the allocations are terminal
gcEval = false
} else {
// The allocation is eligible to be GC'd
Expand Down Expand Up @@ -462,7 +463,7 @@ func (c *CoreScheduler) partitionEvalReap(evals, allocs []string, batchSize int)
}

// nodeGC is used to garbage collect old nodes
func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
func (c *CoreScheduler) nodeGC(eval *structs.Evaluation, customThreshold *time.Duration) error {
// Iterate over the evaluations
ws := memdb.NewWatchSet()
iter, err := c.snap.Nodes(ws)
Expand All @@ -474,8 +475,8 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
threshold = c.srv.config.NodeGCThreshold

// custom threshold override
if c.customNodeGCThreshold != 0 {
threshold = c.customNodeGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -566,7 +567,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
}

// deploymentGC is used to garbage collect old deployments
func (c *CoreScheduler) deploymentGC() error {
func (c *CoreScheduler) deploymentGC(customThreshold *time.Duration) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws, state.SortDefault)
Expand All @@ -578,8 +579,8 @@ func (c *CoreScheduler) deploymentGC() error {
threshold = c.srv.config.DeploymentGCThreshold

// custom threshold override
if c.customDeploymentGCThreshold != 0 {
threshold = c.customDeploymentGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -739,7 +740,7 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime, cutoffTime
}

// csiVolumeClaimGC is used to garbage collect CSI volume claims
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation, customThreshold *time.Duration) error {

gcClaims := func(ns, volID string) error {
req := &structs.CSIVolumeClaimRequest{
Expand Down Expand Up @@ -778,8 +779,8 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
threshold = c.srv.config.CSIVolumeClaimGCThreshold

// custom threshold override
if c.customCSIVolumeClaimGCThreshold != 0 {
threshold = c.customCSIVolumeClaimGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -812,7 +813,7 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
}

// csiPluginGC is used to garbage collect unused plugins
func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error {
func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation, customThreshold *time.Duration) error {

ws := memdb.NewWatchSet()

Expand All @@ -825,8 +826,8 @@ func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error {
threshold = c.srv.config.CSIPluginGCThreshold

// custom threshold override
if c.customCSIPluginGCThreshold != 0 {
threshold = c.customCSIPluginGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -870,7 +871,7 @@ func (c *CoreScheduler) expiredOneTimeTokenGC(eval *structs.Evaluation) error {
// tokens. It can be used for both local and global tokens and includes
// behaviour to account for periodic and user actioned garbage collection
// invocations.
func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool) error {
func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool, customThreshold *time.Duration) error {

// If ACLs are not enabled, we do not need to continue and should exit
// early. This is not an error condition as callers can blindly call this
Expand All @@ -893,8 +894,8 @@ func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool)
threshold = c.srv.config.ACLTokenExpirationGCThreshold

// custom threshold override
if c.customACLTokenExpirationGCThreshold != 0 {
threshold = c.customACLTokenExpirationGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -1003,13 +1004,9 @@ func (c *CoreScheduler) rootKeyGC(eval *structs.Evaluation, now time.Time) error
return err
}

var threshold time.Duration
threshold = c.srv.config.RootKeyGCThreshold

// custom threshold override
if c.customRootKeyGCThreshold != 0 {
threshold = c.customRootKeyGCThreshold
}
// we don't do custom overrides for root keys because they are never subject to
// force GC
threshold := c.srv.config.RootKeyGCThreshold

// the threshold is longer than we can support with the time table, and we
// never want to force-GC keys because that will orphan signed Workload
Expand Down
8 changes: 3 additions & 5 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {

// set a shorter GC threshold this time
gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
core.(*CoreScheduler).customBatchEvalGCThreshold = time.Minute
//core.(*CoreScheduler).customEvalGCThreshold = time.Minute
//core.(*CoreScheduler).customJobGCThreshold = time.Minute
core.(*CoreScheduler).customThresholdForObject[structs.CoreJobEvalGC] = pointer.Of(time.Minute)
must.NoError(t, core.Process(gc))

// We expect the following:
Expand Down Expand Up @@ -2513,7 +2511,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(t, c.csiVolumeClaimGC(gc))
require.NoError(t, c.csiVolumeClaimGC(gc, nil))

// the only remaining claim is for a deleted alloc with no path to
// the non-existent node, so volumewatcher will release the
Expand Down Expand Up @@ -2551,7 +2549,7 @@ func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) {
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
must.NoError(t, c.csiVolumeClaimGC(gc))
must.NoError(t, c.csiVolumeClaimGC(gc, nil))

vol, err := srv.State().CSIVolumeByID(nil, structs.DefaultNamespace, "csi-volume-nfs0")
must.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions nomad/system_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Stop = true
// submit time must be older than default job GC
job.SubmitTime = time.Now().Add(-6 * time.Hour).UnixNano()
// set submit time older than now but still newer than default GC threshold
job.SubmitTime = time.Now().Add(-10 * time.Millisecond).UnixNano()
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job))

eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.JobID = job.ID
// modify time must be older than default eval GC
eval.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano()
// set modify time older than now but still newer than default GC threshold
eval.ModifyTime = time.Now().Add(-10 * time.Millisecond).UnixNano()
must.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval}))

// Make the GC request
Expand Down

0 comments on commit 449cff6

Please sign in to comment.