From 3deb98ac5b530a7dbba21d6f15b106907f059ac5 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 13 Nov 2024 18:42:01 +0100 Subject: [PATCH 1/6] scheduler: fix a bug where force GC wasn't respected --- nomad/core_sched.go | 85 ++++++++++++++++++++++++++--------- nomad/system_endpoint_test.go | 8 ++-- 2 files changed, 68 insertions(+), 25 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index e71e2859356..1b5067b41bc 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -12,6 +12,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-multierror" version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -86,36 +87,79 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error { // forceGC is used to garbage collect all eligible objects. func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { - if err := c.jobGC(eval); err != nil { - return err + var mErr *multierror.Error + + c.customJobGCThreshold = time.Millisecond + c.customEvalGCThreshold = time.Millisecond + c.customBatchEvalGCThreshold = time.Millisecond + c.customDeploymentGCThreshold = time.Millisecond + c.customCSIPluginGCThreshold = time.Millisecond + c.customCSIVolumeClaimGCThreshold = time.Millisecond + c.customACLTokenExpirationGCThreshold = time.Millisecond + c.customNodeGCThreshold = time.Millisecond + + err := c.jobGC(eval) + if err != nil { + mErr = multierror.Append(mErr, err) } - if err := c.evalGC(); err != nil { - return err + + err = c.evalGC() + if err != nil { + mErr = multierror.Append(mErr, err) } - if err := c.deploymentGC(); err != nil { - return err + + err = c.deploymentGC() + if err != nil { + mErr = multierror.Append(mErr, err) } - if err := c.csiPluginGC(eval); err != nil { - return err + + err = c.csiPluginGC(eval) + if err != nil { + mErr = multierror.Append(mErr, err) } - if err := c.csiVolumeClaimGC(eval); err != nil { - return err + + err = c.csiVolumeClaimGC(eval) + if err != nil { + mErr = multierror.Append(mErr, err) } - if err := c.expiredOneTimeTokenGC(eval); err != nil { - return err + + err = c.expiredOneTimeTokenGC(eval) + if err != nil { + mErr = multierror.Append(mErr, err) } - if err := c.expiredACLTokenGC(eval, false); err != nil { - return err + + err = c.expiredACLTokenGC(eval, false) + if err != nil { + mErr = multierror.Append(mErr, err) } - if err := c.expiredACLTokenGC(eval, true); err != nil { - return err + + err = c.expiredACLTokenGC(eval, true) + if err != nil { + mErr = multierror.Append(mErr, err) } - if err := c.rootKeyGC(eval, time.Now()); err != nil { - return err + + err = c.rootKeyGC(eval, time.Now()) + if err != nil { + mErr = multierror.Append(mErr, err) } + // Node GC must occur after the others to ensure the allocations are // cleared. - return c.nodeGC(eval) + err = c.nodeGC(eval) + if err != nil { + mErr = multierror.Append(mErr, err) + } + + c.customJobGCThreshold = 0 + c.customEvalGCThreshold = 0 + c.customBatchEvalGCThreshold = 0 + c.customDeploymentGCThreshold = 0 + c.customCSIPluginGCThreshold = 0 + c.customCSIVolumeClaimGCThreshold = 0 + c.customACLTokenExpirationGCThreshold = 0 + c.customNodeGCThreshold = 0 + + return mErr.ErrorOrNil() } // jobGC is used to garbage collect eligible jobs. @@ -376,8 +420,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, cutoffTime time.Time, a var gcAllocIDs []string for _, alloc := range allocs { if !allocGCEligible(alloc, job, time.Now().UTC(), cutoffTime) { - // Can't GC the evaluation since not all of the allocations are - // terminal + // Can't GC the evaluation since not all the allocations are terminal gcEval = false } else { // The allocation is eligible to be GC'd diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go index 5d879b83481..209e875492b 100644 --- a/nomad/system_endpoint_test.go +++ b/nomad/system_endpoint_test.go @@ -34,15 +34,15 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { job := mock.Job() job.Type = structs.JobTypeBatch job.Stop = true - // submit time must be older than default job GC - job.SubmitTime = time.Now().Add(-6 * time.Hour).UnixNano() + // set submit time older than now but still newer than default GC threshold + job.SubmitTime = time.Now().Add(-10 * time.Millisecond).UnixNano() must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)) eval := mock.Eval() eval.Status = structs.EvalStatusComplete eval.JobID = job.ID - // modify time must be older than default eval GC - eval.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano() + // set modify time older than now but still newer than default GC threshold + eval.ModifyTime = time.Now().Add(-10 * time.Millisecond).UnixNano() must.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})) // Make the GC request From 99be6d6395c7dff2df1b0999bae3216745d99ed5 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 13 Nov 2024 18:52:59 +0100 Subject: [PATCH 2/6] cl --- .changelog/24456.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/24456.txt diff --git a/.changelog/24456.txt b/.changelog/24456.txt new file mode 100644 index 00000000000..914db39ef7d --- /dev/null +++ b/.changelog/24456.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fix bug where forced garbage collection does not ignore GC thresholds +``` From 25d46f821c663c9061a39e005ccd6c6f9087e77b Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 13 Nov 2024 19:29:07 +0100 Subject: [PATCH 3/6] simpler? --- nomad/core_sched.go | 120 +++++++++++++++++++++------------------ nomad/core_sched_test.go | 4 +- 2 files changed, 67 insertions(+), 57 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 1b5067b41bc..9fdf5bd5477 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -29,29 +29,42 @@ type CoreScheduler struct { snap *state.StateSnapshot logger log.Logger - // custom GC Threshold values can be used by unit tests to simulate time - // manipulation - customJobGCThreshold time.Duration - customEvalGCThreshold time.Duration - customBatchEvalGCThreshold time.Duration - customNodeGCThreshold time.Duration - customDeploymentGCThreshold time.Duration - customCSIVolumeClaimGCThreshold time.Duration - customCSIPluginGCThreshold time.Duration - customACLTokenExpirationGCThreshold time.Duration - customRootKeyGCThreshold time.Duration + // custom GC Threshold is a map of object names to threshold values which can be + // used by unit tests to simulate time manipulation, and is used by forceGC to + // set minimal threshold so that virtually all eligible objects will get GCd + customGCThreshold map[string]time.Duration } // NewCoreScheduler is used to return a new system scheduler instance func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Scheduler { s := &CoreScheduler{ - srv: srv, - snap: snap, - logger: srv.logger.ResetNamed("core.sched"), + srv: srv, + snap: snap, + logger: srv.logger.ResetNamed("core.sched"), + customGCThreshold: make(map[string]time.Duration), } return s } +func (c *CoreScheduler) setCustomThresholdForObject(objectName string, threshold time.Duration) { + c.customGCThreshold[objectName] = threshold +} + +func (c *CoreScheduler) setCustomThresholdForAllObjects(threshold time.Duration) { + for _, objectName := range []string{ + "job", + "eval", + "batchEval", + "deployment", + "csiPlugin", + "csiVolume", + "token", + "node", + } { + c.setCustomThresholdForObject(objectName, threshold) + } +} + // Process is used to implement the scheduler.Scheduler interface func (c *CoreScheduler) Process(eval *structs.Evaluation) error { job := strings.Split(eval.JobID, ":") // extra data can be smuggled in w/ JobID @@ -89,14 +102,10 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error { func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { var mErr *multierror.Error - c.customJobGCThreshold = time.Millisecond - c.customEvalGCThreshold = time.Millisecond - c.customBatchEvalGCThreshold = time.Millisecond - c.customDeploymentGCThreshold = time.Millisecond - c.customCSIPluginGCThreshold = time.Millisecond - c.customCSIVolumeClaimGCThreshold = time.Millisecond - c.customACLTokenExpirationGCThreshold = time.Millisecond - c.customNodeGCThreshold = time.Millisecond + // set a minimal threshold for all objects to make force GC possible, and + // remember to reset it when we're done + c.setCustomThresholdForAllObjects(time.Millisecond) + defer c.setCustomThresholdForAllObjects(0) err := c.jobGC(eval) if err != nil { @@ -150,15 +159,6 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { mErr = multierror.Append(mErr, err) } - c.customJobGCThreshold = 0 - c.customEvalGCThreshold = 0 - c.customBatchEvalGCThreshold = 0 - c.customDeploymentGCThreshold = 0 - c.customCSIPluginGCThreshold = 0 - c.customCSIVolumeClaimGCThreshold = 0 - c.customACLTokenExpirationGCThreshold = 0 - c.customNodeGCThreshold = 0 - return mErr.ErrorOrNil() } @@ -175,8 +175,10 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { threshold = c.srv.config.JobGCThreshold // custom threshold override - if c.customJobGCThreshold != 0 { - threshold = c.customJobGCThreshold + if val, ok := c.customGCThreshold["job"]; ok { + if val != 0 { + threshold = val + } } cutoffTime := c.getCutoffTime(threshold) @@ -320,11 +322,15 @@ func (c *CoreScheduler) evalGC() error { batchThreshold = c.srv.config.BatchEvalGCThreshold // custom threshold override - if c.customEvalGCThreshold != 0 { - threshold = c.customEvalGCThreshold + if val, ok := c.customGCThreshold["eval"]; ok { + if val != 0 { + threshold = val + } } - if c.customBatchEvalGCThreshold != 0 { - batchThreshold = c.customBatchEvalGCThreshold + if val, ok := c.customGCThreshold["batchEval"]; ok { + if val != 0 { + batchThreshold = val + } } cutoffTime := c.getCutoffTime(threshold) @@ -517,8 +523,10 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { threshold = c.srv.config.NodeGCThreshold // custom threshold override - if c.customNodeGCThreshold != 0 { - threshold = c.customNodeGCThreshold + if val, ok := c.customGCThreshold["node"]; ok { + if val != 0 { + threshold = val + } } cutoffTime := c.getCutoffTime(threshold) @@ -621,8 +629,10 @@ func (c *CoreScheduler) deploymentGC() error { threshold = c.srv.config.DeploymentGCThreshold // custom threshold override - if c.customDeploymentGCThreshold != 0 { - threshold = c.customDeploymentGCThreshold + if val, ok := c.customGCThreshold["deployment"]; ok { + if val != 0 { + threshold = val + } } cutoffTime := c.getCutoffTime(threshold) @@ -821,8 +831,10 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { threshold = c.srv.config.CSIVolumeClaimGCThreshold // custom threshold override - if c.customCSIVolumeClaimGCThreshold != 0 { - threshold = c.customCSIVolumeClaimGCThreshold + if val, ok := c.customGCThreshold["csiVolume"]; ok { + if val != 0 { + threshold = val + } } cutoffTime := c.getCutoffTime(threshold) @@ -868,8 +880,10 @@ func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error { threshold = c.srv.config.CSIPluginGCThreshold // custom threshold override - if c.customCSIPluginGCThreshold != 0 { - threshold = c.customCSIPluginGCThreshold + if val, ok := c.customGCThreshold["csiPlugin"]; ok { + if val != 0 { + threshold = val + } } cutoffTime := c.getCutoffTime(threshold) @@ -936,8 +950,10 @@ func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool) threshold = c.srv.config.ACLTokenExpirationGCThreshold // custom threshold override - if c.customACLTokenExpirationGCThreshold != 0 { - threshold = c.customACLTokenExpirationGCThreshold + if val, ok := c.customGCThreshold["token"]; ok { + if val != 0 { + threshold = val + } } cutoffTime := c.getCutoffTime(threshold) @@ -1046,13 +1062,9 @@ func (c *CoreScheduler) rootKeyGC(eval *structs.Evaluation, now time.Time) error return err } - var threshold time.Duration - threshold = c.srv.config.RootKeyGCThreshold - - // custom threshold override - if c.customRootKeyGCThreshold != 0 { - threshold = c.customRootKeyGCThreshold - } + // we don't do custom overrides for root keys because they are never subject to + // force GC + threshold := c.srv.config.RootKeyGCThreshold // the threshold is longer than we can support with the time table, and we // never want to force-GC keys because that will orphan signed Workload diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index d29aa2b9d8e..21af18cb59b 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -527,9 +527,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { // set a shorter GC threshold this time gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2) - core.(*CoreScheduler).customBatchEvalGCThreshold = time.Minute - //core.(*CoreScheduler).customEvalGCThreshold = time.Minute - //core.(*CoreScheduler).customJobGCThreshold = time.Minute + core.(*CoreScheduler).setCustomThresholdForObject("batchEval", time.Minute) must.NoError(t, core.Process(gc)) // We expect the following: From 750d8f65a27879ae922da56c30472acfa1e06d1d Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 13 Nov 2024 19:43:23 +0100 Subject: [PATCH 4/6] can't multierror on forceGC --- nomad/core_sched.go | 55 +++++++++++++++++---------------------------- 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 9fdf5bd5477..0e01e99117f 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -12,7 +12,6 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-multierror" version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -100,66 +99,54 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error { // forceGC is used to garbage collect all eligible objects. func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { - var mErr *multierror.Error - // set a minimal threshold for all objects to make force GC possible, and // remember to reset it when we're done c.setCustomThresholdForAllObjects(time.Millisecond) defer c.setCustomThresholdForAllObjects(0) - err := c.jobGC(eval) - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.jobGC(eval); err != nil { + return err } - err = c.evalGC() - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.evalGC(); err != nil { + return err } - err = c.deploymentGC() - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.deploymentGC(); err != nil { + return err } - err = c.csiPluginGC(eval) - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.csiPluginGC(eval); err != nil { + return err } - err = c.csiVolumeClaimGC(eval) - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.csiVolumeClaimGC(eval); err != nil { + return err } - err = c.expiredOneTimeTokenGC(eval) - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.expiredOneTimeTokenGC(eval); err != nil { + return err } - err = c.expiredACLTokenGC(eval, false) - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.expiredACLTokenGC(eval, false); err != nil { + return err } - err = c.expiredACLTokenGC(eval, true) - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.expiredACLTokenGC(eval, true); err != nil { + return err } - err = c.rootKeyGC(eval, time.Now()) - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.rootKeyGC(eval, time.Now()); err != nil { + return err } // Node GC must occur after the others to ensure the allocations are // cleared. - err = c.nodeGC(eval) - if err != nil { - mErr = multierror.Append(mErr, err) + if err := c.nodeGC(eval); err != nil { + return err } - return mErr.ErrorOrNil() + return nil } // jobGC is used to garbage collect eligible jobs. From e98c95d19a9614726b24f5a37a5c8c7e788162ae Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 13 Nov 2024 19:44:31 +0100 Subject: [PATCH 5/6] reduce whitespace diff --- nomad/core_sched.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 0e01e99117f..86827b6b389 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -107,46 +107,34 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { if err := c.jobGC(eval); err != nil { return err } - if err := c.evalGC(); err != nil { return err } - if err := c.deploymentGC(); err != nil { return err } - if err := c.csiPluginGC(eval); err != nil { return err } - if err := c.csiVolumeClaimGC(eval); err != nil { return err } - if err := c.expiredOneTimeTokenGC(eval); err != nil { return err } - if err := c.expiredACLTokenGC(eval, false); err != nil { return err } - if err := c.expiredACLTokenGC(eval, true); err != nil { return err } - if err := c.rootKeyGC(eval, time.Now()); err != nil { return err } // Node GC must occur after the others to ensure the allocations are // cleared. - if err := c.nodeGC(eval); err != nil { - return err - } - - return nil + return c.nodeGC(eval) } // jobGC is used to garbage collect eligible jobs. From 1d149a007216f6a615bd6e859ce7dc2539c1b1df Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:26:14 +0100 Subject: [PATCH 6/6] simplified --- nomad/core_sched.go | 141 +++++++++++++++------------------------ nomad/core_sched_test.go | 6 +- 2 files changed, 57 insertions(+), 90 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 86827b6b389..ec3d4f0207a 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -13,6 +13,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" version "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -28,64 +29,50 @@ type CoreScheduler struct { snap *state.StateSnapshot logger log.Logger - // custom GC Threshold is a map of object names to threshold values which can be - // used by unit tests to simulate time manipulation, and is used by forceGC to - // set minimal threshold so that virtually all eligible objects will get GCd - customGCThreshold map[string]time.Duration + // customThresholdForObject is used by unit tests that want to manipulate GC + // threshold settings. Users can pass the string that matches the object to GC + // (e.g., structs.CoreJobEvalGC) and time.Duration that will be used as GC + // threshold value. + customThresholdForObject map[string]*time.Duration } // NewCoreScheduler is used to return a new system scheduler instance func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Scheduler { s := &CoreScheduler{ - srv: srv, - snap: snap, - logger: srv.logger.ResetNamed("core.sched"), - customGCThreshold: make(map[string]time.Duration), + srv: srv, + snap: snap, + logger: srv.logger.ResetNamed("core.sched"), + customThresholdForObject: make(map[string]*time.Duration), } return s } -func (c *CoreScheduler) setCustomThresholdForObject(objectName string, threshold time.Duration) { - c.customGCThreshold[objectName] = threshold -} - -func (c *CoreScheduler) setCustomThresholdForAllObjects(threshold time.Duration) { - for _, objectName := range []string{ - "job", - "eval", - "batchEval", - "deployment", - "csiPlugin", - "csiVolume", - "token", - "node", - } { - c.setCustomThresholdForObject(objectName, threshold) - } -} - // Process is used to implement the scheduler.Scheduler interface func (c *CoreScheduler) Process(eval *structs.Evaluation) error { job := strings.Split(eval.JobID, ":") // extra data can be smuggled in w/ JobID + + // check if there are any custom threshold values set + customThreshold := c.customThresholdForObject[job[0]] + switch job[0] { case structs.CoreJobEvalGC: - return c.evalGC() + return c.evalGC(customThreshold) case structs.CoreJobNodeGC: - return c.nodeGC(eval) + return c.nodeGC(eval, customThreshold) case structs.CoreJobJobGC: - return c.jobGC(eval) + return c.jobGC(eval, customThreshold) case structs.CoreJobDeploymentGC: - return c.deploymentGC() + return c.deploymentGC(customThreshold) case structs.CoreJobCSIVolumeClaimGC: - return c.csiVolumeClaimGC(eval) + return c.csiVolumeClaimGC(eval, customThreshold) case structs.CoreJobCSIPluginGC: - return c.csiPluginGC(eval) + return c.csiPluginGC(eval, customThreshold) case structs.CoreJobOneTimeTokenGC: return c.expiredOneTimeTokenGC(eval) case structs.CoreJobLocalTokenExpiredGC: - return c.expiredACLTokenGC(eval, false) + return c.expiredACLTokenGC(eval, false, customThreshold) case structs.CoreJobGlobalTokenExpiredGC: - return c.expiredACLTokenGC(eval, true) + return c.expiredACLTokenGC(eval, true, customThreshold) case structs.CoreJobRootKeyRotateOrGC: return c.rootKeyRotateOrGC(eval) case structs.CoreJobVariablesRekey: @@ -99,33 +86,31 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error { // forceGC is used to garbage collect all eligible objects. func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { - // set a minimal threshold for all objects to make force GC possible, and - // remember to reset it when we're done - c.setCustomThresholdForAllObjects(time.Millisecond) - defer c.setCustomThresholdForAllObjects(0) + // set a minimal threshold for all objects to make force GC possible + force := pointer.Of(time.Millisecond) - if err := c.jobGC(eval); err != nil { + if err := c.jobGC(eval, force); err != nil { return err } - if err := c.evalGC(); err != nil { + if err := c.evalGC(force); err != nil { return err } - if err := c.deploymentGC(); err != nil { + if err := c.deploymentGC(force); err != nil { return err } - if err := c.csiPluginGC(eval); err != nil { + if err := c.csiPluginGC(eval, force); err != nil { return err } - if err := c.csiVolumeClaimGC(eval); err != nil { + if err := c.csiVolumeClaimGC(eval, force); err != nil { return err } if err := c.expiredOneTimeTokenGC(eval); err != nil { return err } - if err := c.expiredACLTokenGC(eval, false); err != nil { + if err := c.expiredACLTokenGC(eval, false, force); err != nil { return err } - if err := c.expiredACLTokenGC(eval, true); err != nil { + if err := c.expiredACLTokenGC(eval, true, force); err != nil { return err } if err := c.rootKeyGC(eval, time.Now()); err != nil { @@ -134,11 +119,11 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { // Node GC must occur after the others to ensure the allocations are // cleared. - return c.nodeGC(eval) + return c.nodeGC(eval, force) } // jobGC is used to garbage collect eligible jobs. -func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { +func (c *CoreScheduler) jobGC(eval *structs.Evaluation, customThreshold *time.Duration) error { // Get all the jobs eligible for garbage collection. ws := memdb.NewWatchSet() iter, err := c.snap.JobsByGC(ws, true) @@ -150,10 +135,8 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { threshold = c.srv.config.JobGCThreshold // custom threshold override - if val, ok := c.customGCThreshold["job"]; ok { - if val != 0 { - threshold = val - } + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -284,7 +267,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string, } // evalGC is used to garbage collect old evaluations -func (c *CoreScheduler) evalGC() error { +func (c *CoreScheduler) evalGC(customThreshold *time.Duration) error { // Iterate over the evaluations ws := memdb.NewWatchSet() iter, err := c.snap.Evals(ws, false) @@ -297,15 +280,9 @@ func (c *CoreScheduler) evalGC() error { batchThreshold = c.srv.config.BatchEvalGCThreshold // custom threshold override - if val, ok := c.customGCThreshold["eval"]; ok { - if val != 0 { - threshold = val - } - } - if val, ok := c.customGCThreshold["batchEval"]; ok { - if val != 0 { - batchThreshold = val - } + if customThreshold != nil { + threshold = *customThreshold + batchThreshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -486,7 +463,7 @@ func (c *CoreScheduler) partitionEvalReap(evals, allocs []string, batchSize int) } // nodeGC is used to garbage collect old nodes -func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { +func (c *CoreScheduler) nodeGC(eval *structs.Evaluation, customThreshold *time.Duration) error { // Iterate over the evaluations ws := memdb.NewWatchSet() iter, err := c.snap.Nodes(ws) @@ -498,10 +475,8 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { threshold = c.srv.config.NodeGCThreshold // custom threshold override - if val, ok := c.customGCThreshold["node"]; ok { - if val != 0 { - threshold = val - } + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -592,7 +567,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err } // deploymentGC is used to garbage collect old deployments -func (c *CoreScheduler) deploymentGC() error { +func (c *CoreScheduler) deploymentGC(customThreshold *time.Duration) error { // Iterate over the deployments ws := memdb.NewWatchSet() iter, err := c.snap.Deployments(ws, state.SortDefault) @@ -604,10 +579,8 @@ func (c *CoreScheduler) deploymentGC() error { threshold = c.srv.config.DeploymentGCThreshold // custom threshold override - if val, ok := c.customGCThreshold["deployment"]; ok { - if val != 0 { - threshold = val - } + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -767,7 +740,7 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime, cutoffTime } // csiVolumeClaimGC is used to garbage collect CSI volume claims -func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { +func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation, customThreshold *time.Duration) error { gcClaims := func(ns, volID string) error { req := &structs.CSIVolumeClaimRequest{ @@ -806,10 +779,8 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { threshold = c.srv.config.CSIVolumeClaimGCThreshold // custom threshold override - if val, ok := c.customGCThreshold["csiVolume"]; ok { - if val != 0 { - threshold = val - } + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -842,7 +813,7 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { } // csiPluginGC is used to garbage collect unused plugins -func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error { +func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation, customThreshold *time.Duration) error { ws := memdb.NewWatchSet() @@ -855,10 +826,8 @@ func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error { threshold = c.srv.config.CSIPluginGCThreshold // custom threshold override - if val, ok := c.customGCThreshold["csiPlugin"]; ok { - if val != 0 { - threshold = val - } + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -902,7 +871,7 @@ func (c *CoreScheduler) expiredOneTimeTokenGC(eval *structs.Evaluation) error { // tokens. It can be used for both local and global tokens and includes // behaviour to account for periodic and user actioned garbage collection // invocations. -func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool) error { +func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool, customThreshold *time.Duration) error { // If ACLs are not enabled, we do not need to continue and should exit // early. This is not an error condition as callers can blindly call this @@ -925,10 +894,8 @@ func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool) threshold = c.srv.config.ACLTokenExpirationGCThreshold // custom threshold override - if val, ok := c.customGCThreshold["token"]; ok { - if val != 0 { - threshold = val - } + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 21af18cb59b..fe700090f35 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -527,7 +527,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { // set a shorter GC threshold this time gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2) - core.(*CoreScheduler).setCustomThresholdForObject("batchEval", time.Minute) + core.(*CoreScheduler).customThresholdForObject[structs.CoreJobEvalGC] = pointer.Of(time.Minute) must.NoError(t, core.Process(gc)) // We expect the following: @@ -2511,7 +2511,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { index++ gc := srv.coreJobEval(structs.CoreJobForceGC, index) c := core.(*CoreScheduler) - require.NoError(t, c.csiVolumeClaimGC(gc)) + require.NoError(t, c.csiVolumeClaimGC(gc, nil)) // the only remaining claim is for a deleted alloc with no path to // the non-existent node, so volumewatcher will release the @@ -2549,7 +2549,7 @@ func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) { index++ gc := srv.coreJobEval(structs.CoreJobForceGC, index) c := core.(*CoreScheduler) - must.NoError(t, c.csiVolumeClaimGC(gc)) + must.NoError(t, c.csiVolumeClaimGC(gc, nil)) vol, err := srv.State().CSIVolumeByID(nil, structs.DefaultNamespace, "csi-volume-nfs0") must.NoError(t, err)