Skip to content

Commit

Permalink
Set SubmitTime on stopped and scaled jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgemarey committed Nov 9, 2023
1 parent e45b814 commit a3f053f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
10 changes: 6 additions & 4 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func (n *nomadFSM) applyDeregisterJob(msgType structs.MessageType, buf []byte, i
}

err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error {
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.NoShutdownDelay, tx)
err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.SubmitTime, req.NoShutdownDelay, tx)

if err != nil {
n.logger.Error("deregistering job failed",
Expand Down Expand Up @@ -766,7 +766,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by
// evals for jobs whose deregistering didn't get committed yet.
err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error {
for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, false, tx); err != nil {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, req.SubmitTime, false, tx); err != nil {
n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err)
return err
}
Expand All @@ -791,7 +791,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by

// handleJobDeregister is used to deregister a job. Leaves error logging up to
// caller.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, noShutdownDelay bool, tx state.Txn) error {
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, submitTime int64, noShutdownDelay bool, tx state.Txn) error {
// If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
return fmt.Errorf("periodicDispatcher.Remove failed: %w", err)
Expand Down Expand Up @@ -838,8 +838,10 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu
}

stopped := current.Copy()
stopped.SetSubmitTime()
stopped.Stop = true
if submitTime != 0 {
stopped.SubmitTime = submitTime
}

if err := n.state.UpsertJobTxn(index, nil, stopped, tx); err != nil {
return fmt.Errorf("UpsertJob failed: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
reply.EvalID = eval.ID
}

args.SubmitTime = now
args.Eval = eval

// Commit the job update via Raft
Expand Down Expand Up @@ -999,6 +1000,8 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
args.Evals = append(args.Evals, eval)
}

args.SubmitTime = time.Now().UnixNano()

// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobBatchDeregisterRequestType, args)
if err != nil {
Expand Down Expand Up @@ -1122,6 +1125,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes

// Update group count
group.Count = int(*args.Count)
job.SubmitTime = now

// Block scaling event if there's an active deployment
deployment, err := snap.LatestDeploymentByJobID(ws, namespace, args.JobID)
Expand Down
10 changes: 8 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,9 @@ type JobDeregisterRequest struct {
// Eval is the evaluation to create that's associated with job deregister
Eval *Evaluation

// SubmitTime is the time at which the job was requested to be stopped
SubmitTime int64

WriteRequest
}

Expand All @@ -795,6 +798,9 @@ type JobBatchDeregisterRequest struct {
// Evals is the set of evaluations to create.
Evals []*Evaluation

// SubmitTime is the time at which the job was requested to be stopped
SubmitTime int64

WriteRequest
}

Expand Down Expand Up @@ -4452,8 +4458,8 @@ type Job struct {
// on each job register.
Version uint64

// SubmitTime is the time at which the job was submitted as a UnixNano in
// UTC
// SubmitTime is the time at which the job version was submitted as
// UnixNano in UTC
SubmitTime int64

// Raft Indexes
Expand Down

0 comments on commit a3f053f

Please sign in to comment.