Skip to content

Commit

Permalink
separate job update summary step
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Jan 19, 2021
1 parent 09d5cbb commit ec85173
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 65 deletions.
117 changes: 72 additions & 45 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4412,6 +4412,7 @@ func (s *StateStore) setJobStatuses(index uint64, txn *txn,
if err := s.setJobStatus(index, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil {
return err
}

}

return nil
Expand All @@ -4427,6 +4428,7 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,

// Capture the current status so we can check if there is a change
oldStatus := job.Status
firstPass := index == job.CreateIndex
newStatus := forceStatus

// If forceStatus is not set, compute the jobs status.
Expand All @@ -4440,9 +4442,26 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,

// Fast-path if nothing has changed.
if oldStatus == newStatus {
updated := job.Copy()
updated.ModifyIndex = index
if err := txn.Insert("jobs", updated); err != nil {
return fmt.Errorf("job insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
if err := s.setJobSummary(txn, job, index, oldStatus, newStatus, firstPass); err != nil {
return err
}
// initialize job summary
// initialize / update job summary
return nil
}

// TODO (drew)
// not inserting the job again with modify index/status
// prevents job stability test pass

// Copy and update the existing job
updated := job.Copy()
updated.Status = newStatus
Expand All @@ -4457,64 +4476,72 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,
}

// Update the children summary
if updated.ParentID != "" {
// Try to update the summary of the parent job summary
summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for parent job: %v", err)
}
if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus, firstPass); err != nil {
return fmt.Errorf("job summary update failed %w", err)
}
return nil
}

// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children == nil {
pSummary.Children = new(structs.JobChildrenSummary)
}
func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, oldStatus, newStatus string, firstPass bool) error {
if updated.ParentID == "" {
return nil
}

// Determine the transition and update the correct fields
children := pSummary.Children
// Try to update the summary of the parent job summary
summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for parent job: %v", err)
}

// Decrement old status
if oldStatus != "" {
switch oldStatus {
case structs.JobStatusPending:
children.Pending--
case structs.JobStatusRunning:
children.Running--
case structs.JobStatusDead:
children.Dead--
default:
return fmt.Errorf("unknown old job status %q", oldStatus)
}
}
// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children == nil {
pSummary.Children = new(structs.JobChildrenSummary)
}

// Determine the transition and update the correct fields
children := pSummary.Children

// Increment new status
switch newStatus {
// Decrement old status
if !firstPass {
switch oldStatus {
case structs.JobStatusPending:
children.Pending++
children.Pending--
case structs.JobStatusRunning:
children.Running++
children.Running--
case structs.JobStatusDead:
children.Dead++
children.Dead--
default:
return fmt.Errorf("unknown new job status %q", newStatus)
return fmt.Errorf("unknown old job status %q", oldStatus)
}
}

// Increment new status
switch newStatus {
case structs.JobStatusPending:
children.Pending++
case structs.JobStatusRunning:
children.Running++
case structs.JobStatusDead:
children.Dead++
default:
return fmt.Errorf("unknown new job status %q", newStatus)
}

// Update the index
pSummary.ModifyIndex = index
// Update the index
pSummary.ModifyIndex = index

// Insert the summary
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
// Insert the summary
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}

return nil
}

Expand Down
28 changes: 8 additions & 20 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6855,16 +6855,10 @@ func TestStateStore_UpdateJobStability(t *testing.T) {

// Check that the job was updated properly
ws := memdb.NewWatchSet()
jout, _ := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
if !jout.Stable {
t.Fatalf("job not marked stable %#v", jout)
}
jout, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
require.NoError(t, err)
require.NotNil(t, jout)
require.True(t, jout.Stable, "job not marked as stable")

// Update the stability to false
err = state.UpdateJobStability(3, job.Namespace, job.ID, 0, false)
Expand All @@ -6873,16 +6867,10 @@ func TestStateStore_UpdateJobStability(t *testing.T) {
}

// Check that the job was updated properly
jout, _ = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
if jout.Stable {
t.Fatalf("job marked stable %#v", jout)
}
jout, err = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
require.NoError(t, err)
require.NotNil(t, jout)
require.False(t, jout.Stable)
}

// Test that nonexistent deployment can't be promoted
Expand Down

0 comments on commit ec85173

Please sign in to comment.