From 449cff62a6dfcc71c10f49569b459a77a3e720ac Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Thu, 21 Nov 2024 08:07:23 +0000 Subject: [PATCH] backport of commit 6ccfcc37a30b34f5f76fece890af1c42a510ea9c --- .changelog/24456.txt | 3 + nomad/core_sched.go | 123 +++++++++++++++++----------------- nomad/core_sched_test.go | 8 +-- nomad/system_endpoint_test.go | 8 +-- 4 files changed, 70 insertions(+), 72 deletions(-) create mode 100644 .changelog/24456.txt diff --git a/.changelog/24456.txt b/.changelog/24456.txt new file mode 100644 index 00000000000..914db39ef7d --- /dev/null +++ b/.changelog/24456.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fix bug where forced garbage collection does not ignore GC thresholds +``` diff --git a/nomad/core_sched.go b/nomad/core_sched.go index e71e2859356..ec3d4f0207a 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -13,6 +13,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" version "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -28,25 +29,20 @@ type CoreScheduler struct { snap *state.StateSnapshot logger log.Logger - // custom GC Threshold values can be used by unit tests to simulate time - // manipulation - customJobGCThreshold time.Duration - customEvalGCThreshold time.Duration - customBatchEvalGCThreshold time.Duration - customNodeGCThreshold time.Duration - customDeploymentGCThreshold time.Duration - customCSIVolumeClaimGCThreshold time.Duration - customCSIPluginGCThreshold time.Duration - customACLTokenExpirationGCThreshold time.Duration - customRootKeyGCThreshold time.Duration + // customThresholdForObject is used by unit tests that want to manipulate GC + // threshold settings. Users can pass the string that matches the object to GC + // (e.g., structs.CoreJobEvalGC) and time.Duration that will be used as GC + // threshold value. + customThresholdForObject map[string]*time.Duration } // NewCoreScheduler is used to return a new system scheduler instance func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Scheduler { s := &CoreScheduler{ - srv: srv, - snap: snap, - logger: srv.logger.ResetNamed("core.sched"), + srv: srv, + snap: snap, + logger: srv.logger.ResetNamed("core.sched"), + customThresholdForObject: make(map[string]*time.Duration), } return s } @@ -54,25 +50,29 @@ func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Schedule // Process is used to implement the scheduler.Scheduler interface func (c *CoreScheduler) Process(eval *structs.Evaluation) error { job := strings.Split(eval.JobID, ":") // extra data can be smuggled in w/ JobID + + // check if there are any custom threshold values set + customThreshold := c.customThresholdForObject[job[0]] + switch job[0] { case structs.CoreJobEvalGC: - return c.evalGC() + return c.evalGC(customThreshold) case structs.CoreJobNodeGC: - return c.nodeGC(eval) + return c.nodeGC(eval, customThreshold) case structs.CoreJobJobGC: - return c.jobGC(eval) + return c.jobGC(eval, customThreshold) case structs.CoreJobDeploymentGC: - return c.deploymentGC() + return c.deploymentGC(customThreshold) case structs.CoreJobCSIVolumeClaimGC: - return c.csiVolumeClaimGC(eval) + return c.csiVolumeClaimGC(eval, customThreshold) case structs.CoreJobCSIPluginGC: - return c.csiPluginGC(eval) + return c.csiPluginGC(eval, customThreshold) case structs.CoreJobOneTimeTokenGC: return c.expiredOneTimeTokenGC(eval) case structs.CoreJobLocalTokenExpiredGC: - return c.expiredACLTokenGC(eval, false) + return c.expiredACLTokenGC(eval, false, customThreshold) case structs.CoreJobGlobalTokenExpiredGC: - return c.expiredACLTokenGC(eval, true) + return c.expiredACLTokenGC(eval, true, customThreshold) case structs.CoreJobRootKeyRotateOrGC: return c.rootKeyRotateOrGC(eval) case structs.CoreJobVariablesRekey: @@ -86,40 +86,44 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error { // forceGC is used to garbage collect all eligible objects. func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { - if err := c.jobGC(eval); err != nil { + // set a minimal threshold for all objects to make force GC possible + force := pointer.Of(time.Millisecond) + + if err := c.jobGC(eval, force); err != nil { return err } - if err := c.evalGC(); err != nil { + if err := c.evalGC(force); err != nil { return err } - if err := c.deploymentGC(); err != nil { + if err := c.deploymentGC(force); err != nil { return err } - if err := c.csiPluginGC(eval); err != nil { + if err := c.csiPluginGC(eval, force); err != nil { return err } - if err := c.csiVolumeClaimGC(eval); err != nil { + if err := c.csiVolumeClaimGC(eval, force); err != nil { return err } if err := c.expiredOneTimeTokenGC(eval); err != nil { return err } - if err := c.expiredACLTokenGC(eval, false); err != nil { + if err := c.expiredACLTokenGC(eval, false, force); err != nil { return err } - if err := c.expiredACLTokenGC(eval, true); err != nil { + if err := c.expiredACLTokenGC(eval, true, force); err != nil { return err } if err := c.rootKeyGC(eval, time.Now()); err != nil { return err } + // Node GC must occur after the others to ensure the allocations are // cleared. - return c.nodeGC(eval) + return c.nodeGC(eval, force) } // jobGC is used to garbage collect eligible jobs. -func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { +func (c *CoreScheduler) jobGC(eval *structs.Evaluation, customThreshold *time.Duration) error { // Get all the jobs eligible for garbage collection. ws := memdb.NewWatchSet() iter, err := c.snap.JobsByGC(ws, true) @@ -131,8 +135,8 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { threshold = c.srv.config.JobGCThreshold // custom threshold override - if c.customJobGCThreshold != 0 { - threshold = c.customJobGCThreshold + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -263,7 +267,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string, } // evalGC is used to garbage collect old evaluations -func (c *CoreScheduler) evalGC() error { +func (c *CoreScheduler) evalGC(customThreshold *time.Duration) error { // Iterate over the evaluations ws := memdb.NewWatchSet() iter, err := c.snap.Evals(ws, false) @@ -276,11 +280,9 @@ func (c *CoreScheduler) evalGC() error { batchThreshold = c.srv.config.BatchEvalGCThreshold // custom threshold override - if c.customEvalGCThreshold != 0 { - threshold = c.customEvalGCThreshold - } - if c.customBatchEvalGCThreshold != 0 { - batchThreshold = c.customBatchEvalGCThreshold + if customThreshold != nil { + threshold = *customThreshold + batchThreshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -376,8 +378,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, cutoffTime time.Time, a var gcAllocIDs []string for _, alloc := range allocs { if !allocGCEligible(alloc, job, time.Now().UTC(), cutoffTime) { - // Can't GC the evaluation since not all of the allocations are - // terminal + // Can't GC the evaluation since not all the allocations are terminal gcEval = false } else { // The allocation is eligible to be GC'd @@ -462,7 +463,7 @@ func (c *CoreScheduler) partitionEvalReap(evals, allocs []string, batchSize int) } // nodeGC is used to garbage collect old nodes -func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { +func (c *CoreScheduler) nodeGC(eval *structs.Evaluation, customThreshold *time.Duration) error { // Iterate over the evaluations ws := memdb.NewWatchSet() iter, err := c.snap.Nodes(ws) @@ -474,8 +475,8 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error { threshold = c.srv.config.NodeGCThreshold // custom threshold override - if c.customNodeGCThreshold != 0 { - threshold = c.customNodeGCThreshold + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -566,7 +567,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err } // deploymentGC is used to garbage collect old deployments -func (c *CoreScheduler) deploymentGC() error { +func (c *CoreScheduler) deploymentGC(customThreshold *time.Duration) error { // Iterate over the deployments ws := memdb.NewWatchSet() iter, err := c.snap.Deployments(ws, state.SortDefault) @@ -578,8 +579,8 @@ func (c *CoreScheduler) deploymentGC() error { threshold = c.srv.config.DeploymentGCThreshold // custom threshold override - if c.customDeploymentGCThreshold != 0 { - threshold = c.customDeploymentGCThreshold + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -739,7 +740,7 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime, cutoffTime } // csiVolumeClaimGC is used to garbage collect CSI volume claims -func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { +func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation, customThreshold *time.Duration) error { gcClaims := func(ns, volID string) error { req := &structs.CSIVolumeClaimRequest{ @@ -778,8 +779,8 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { threshold = c.srv.config.CSIVolumeClaimGCThreshold // custom threshold override - if c.customCSIVolumeClaimGCThreshold != 0 { - threshold = c.customCSIVolumeClaimGCThreshold + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -812,7 +813,7 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { } // csiPluginGC is used to garbage collect unused plugins -func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error { +func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation, customThreshold *time.Duration) error { ws := memdb.NewWatchSet() @@ -825,8 +826,8 @@ func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error { threshold = c.srv.config.CSIPluginGCThreshold // custom threshold override - if c.customCSIPluginGCThreshold != 0 { - threshold = c.customCSIPluginGCThreshold + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -870,7 +871,7 @@ func (c *CoreScheduler) expiredOneTimeTokenGC(eval *structs.Evaluation) error { // tokens. It can be used for both local and global tokens and includes // behaviour to account for periodic and user actioned garbage collection // invocations. -func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool) error { +func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool, customThreshold *time.Duration) error { // If ACLs are not enabled, we do not need to continue and should exit // early. This is not an error condition as callers can blindly call this @@ -893,8 +894,8 @@ func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool) threshold = c.srv.config.ACLTokenExpirationGCThreshold // custom threshold override - if c.customACLTokenExpirationGCThreshold != 0 { - threshold = c.customACLTokenExpirationGCThreshold + if customThreshold != nil { + threshold = *customThreshold } cutoffTime := c.getCutoffTime(threshold) @@ -1003,13 +1004,9 @@ func (c *CoreScheduler) rootKeyGC(eval *structs.Evaluation, now time.Time) error return err } - var threshold time.Duration - threshold = c.srv.config.RootKeyGCThreshold - - // custom threshold override - if c.customRootKeyGCThreshold != 0 { - threshold = c.customRootKeyGCThreshold - } + // we don't do custom overrides for root keys because they are never subject to + // force GC + threshold := c.srv.config.RootKeyGCThreshold // the threshold is longer than we can support with the time table, and we // never want to force-GC keys because that will orphan signed Workload diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index d29aa2b9d8e..fe700090f35 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -527,9 +527,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { // set a shorter GC threshold this time gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2) - core.(*CoreScheduler).customBatchEvalGCThreshold = time.Minute - //core.(*CoreScheduler).customEvalGCThreshold = time.Minute - //core.(*CoreScheduler).customJobGCThreshold = time.Minute + core.(*CoreScheduler).customThresholdForObject[structs.CoreJobEvalGC] = pointer.Of(time.Minute) must.NoError(t, core.Process(gc)) // We expect the following: @@ -2513,7 +2511,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { index++ gc := srv.coreJobEval(structs.CoreJobForceGC, index) c := core.(*CoreScheduler) - require.NoError(t, c.csiVolumeClaimGC(gc)) + require.NoError(t, c.csiVolumeClaimGC(gc, nil)) // the only remaining claim is for a deleted alloc with no path to // the non-existent node, so volumewatcher will release the @@ -2551,7 +2549,7 @@ func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) { index++ gc := srv.coreJobEval(structs.CoreJobForceGC, index) c := core.(*CoreScheduler) - must.NoError(t, c.csiVolumeClaimGC(gc)) + must.NoError(t, c.csiVolumeClaimGC(gc, nil)) vol, err := srv.State().CSIVolumeByID(nil, structs.DefaultNamespace, "csi-volume-nfs0") must.NoError(t, err) diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go index 5d879b83481..209e875492b 100644 --- a/nomad/system_endpoint_test.go +++ b/nomad/system_endpoint_test.go @@ -34,15 +34,15 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { job := mock.Job() job.Type = structs.JobTypeBatch job.Stop = true - // submit time must be older than default job GC - job.SubmitTime = time.Now().Add(-6 * time.Hour).UnixNano() + // set submit time older than now but still newer than default GC threshold + job.SubmitTime = time.Now().Add(-10 * time.Millisecond).UnixNano() must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)) eval := mock.Eval() eval.Status = structs.EvalStatusComplete eval.JobID = job.ID - // modify time must be older than default eval GC - eval.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano() + // set modify time older than now but still newer than default GC threshold + eval.ModifyTime = time.Now().Add(-10 * time.Millisecond).UnixNano() must.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})) // Make the GC request