From a3f053f0a7d26da23962385205133b53ef0dd332 Mon Sep 17 00:00:00 2001 From: Jorge Marey Date: Thu, 9 Nov 2023 21:18:37 +0100 Subject: [PATCH] Set SubmitTime on stopped and scaled jobs --- nomad/fsm.go | 10 ++++++---- nomad/job_endpoint.go | 4 ++++ nomad/structs/structs.go | 10 ++++++++-- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 061086e3176..35ab1390507 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -726,7 +726,7 @@ func (n *nomadFSM) applyDeregisterJob(msgType structs.MessageType, buf []byte, i } err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { - err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.NoShutdownDelay, tx) + err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.SubmitTime, req.NoShutdownDelay, tx) if err != nil { n.logger.Error("deregistering job failed", @@ -766,7 +766,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // evals for jobs whose deregistering didn't get committed yet. err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { for jobNS, options := range req.Jobs { - if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, false, tx); err != nil { + if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, req.SubmitTime, false, tx); err != nil { n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err) return err } @@ -791,7 +791,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // handleJobDeregister is used to deregister a job. Leaves error logging up to // caller. -func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, noShutdownDelay bool, tx state.Txn) error { +func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, submitTime int64, noShutdownDelay bool, tx state.Txn) error { // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { return fmt.Errorf("periodicDispatcher.Remove failed: %w", err) @@ -838,8 +838,10 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu } stopped := current.Copy() - stopped.SetSubmitTime() stopped.Stop = true + if submitTime != 0 { + stopped.SubmitTime = submitTime + } if err := n.state.UpsertJobTxn(index, nil, stopped, tx); err != nil { return fmt.Errorf("UpsertJob failed: %w", err) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index edeaa04d4b6..59f0ad6569a 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -899,6 +899,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD reply.EvalID = eval.ID } + args.SubmitTime = now args.Eval = eval // Commit the job update via Raft @@ -999,6 +1000,8 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st args.Evals = append(args.Evals, eval) } + args.SubmitTime = time.Now().UnixNano() + // Commit this update via Raft _, index, err := j.srv.raftApply(structs.JobBatchDeregisterRequestType, args) if err != nil { @@ -1122,6 +1125,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes // Update group count group.Count = int(*args.Count) + job.SubmitTime = now // Block scaling event if there's an active deployment deployment, err := snap.LatestDeploymentByJobID(ws, namespace, args.JobID) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7b296ccb765..05b53f2ff3f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -783,6 +783,9 @@ type JobDeregisterRequest struct { // Eval is the evaluation to create that's associated with job deregister Eval *Evaluation + // SubmitTime is the time at which the job was requested to be stopped + SubmitTime int64 + WriteRequest } @@ -795,6 +798,9 @@ type JobBatchDeregisterRequest struct { // Evals is the set of evaluations to create. Evals []*Evaluation + // SubmitTime is the time at which the job was requested to be stopped + SubmitTime int64 + WriteRequest } @@ -4452,8 +4458,8 @@ type Job struct { // on each job register. Version uint64 - // SubmitTime is the time at which the job was submitted as a UnixNano in - // UTC + // SubmitTime is the time at which the job version was submitted as + // UnixNano in UTC SubmitTime int64 // Raft Indexes