-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track multiple job versions and introduce a stop state for jobs #2566
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
nomad/state/state_store.go
Outdated
} | ||
|
||
if _, err = txn.DeleteAll("job_versions", "id", job.ID, job.Version); err != nil { | ||
return fmt.Errorf("deleing job versions failed: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleing -> deleting
scheduler/util.go
Outdated
@@ -21,7 +21,7 @@ type allocTuple struct { | |||
// a job requires. This is used to do the count expansion. | |||
func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { | |||
out := make(map[string]*structs.TaskGroup) | |||
if job == nil { | |||
if job == nil || job.Stop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do this enough a Job.Stopped method might be appropriate:
func (j *Job) Stopped() bool { return j == nil || j.Stop }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
continue | ||
} | ||
|
||
if _, err = txn.DeleteAll("job_versions", "id", job.ID, job.Version); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just call DeleteAll
with the id_prefix
and eliminate the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we are doing a prefix match to get all the versions for the ID you may actually delete another job that starts with the same prefix.
return fmt.Errorf("failed to insert job into job_versions table: %v", err) | ||
} | ||
|
||
if err := txn.Insert("index", &IndexEntry{"job_versions", index}); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use IndexEntry
anymore? Or are we still waiting to move to having MemDB manage blocking queries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we still need this. It is used to return the index to wait on when the blocking query is against a nonexistent object. Consul does the same but with slightly different semantics (getter methods on state store return the index rather than the endpoints looking it up.)
for i, j := range all { | ||
if j.Stable { | ||
stableIdx = i | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't the break cause the lowest versioned stable job to be retained?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in this case. It is highest version first.
nomad/state/state_store.go
Outdated
} | ||
|
||
// Sort with highest versions first | ||
sort.Slice(all, func(i, j int) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are walking the index, isn't it already sorted? If it's low-to-high, a simple reverse might be sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update.
scheduler/system_sched.go
Outdated
@@ -83,6 +83,12 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { | |||
s.queuedAllocs) | |||
} | |||
|
|||
// isStoppedJob returns if the scheduling is for a stopped job and the scheduler | |||
// should stop all its allocations. | |||
func (s *SystemScheduler) isStoppedJob() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could define this on the Job
to avoid double defining in both schedulers (e.g. just check for nil receiver pointer)
@@ -322,6 +324,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { | |||
job.CreateIndex = index | |||
job.ModifyIndex = index | |||
job.JobModifyIndex = index | |||
job.Version = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also set stable to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Probably shouldn't have added the stable field in this PR as I didn't implement it.
@@ -311,6 +312,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { | |||
job.CreateIndex = existing.(*structs.Job).CreateIndex | |||
job.ModifyIndex = index | |||
job.JobModifyIndex = index | |||
job.Version = existing.(*structs.Job).Version + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we update a job to mark it stable without causing the version to change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to be separate fsm/state_store method
nomad/state/state_store.go
Outdated
} | ||
} | ||
|
||
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
stopped := current.Copy() | ||
stopped.Stop = true | ||
|
||
if err := n.state.UpsertJob(index, stopped); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not increment the version, may need a new mode for UpsertJob
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should it increment? I guess you want it for this case, but not for the case of marking stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah will cover the stable case in a follow up PR.
|
||
// upsertJobVersion inserts a job into its historic version table and limits the | ||
// number of job versions that are tracked. | ||
func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we guard this on the type? For example, batch probably should only retain 1 version for performance, since they are one-shot anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I say we loop back on this a bit latter. Not sure it makes sense to actually enforce the one-shot nature. You may want to change the logging config/service config etc while it is running which is in-place. In that case having versioning is nice as well�.
I think the one-shot behavior we need to enforce is not re-running a dead batch job and not allowing destructive updates.
I'm going to lock this pull request because it has been closed for 120 days ⏳. This helps our maintainers find and focus on the active contributions. |
This PR introduces tracking of multiple versions of a job and introduces a stop state to the job.
This allows workflows such as:
Previously there would be no way to check that status before.