Skip to content

Commit

Permalink
separate job update summary step
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Jan 12, 2021
1 parent 29563c1 commit 578c02a
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 65 deletions.
111 changes: 66 additions & 45 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/davecgh/go-spew/spew"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -1488,6 +1489,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b

// Setup the indexes correctly
if existing != nil {
spew.Dump("EXISTING!")
job.CreateIndex = existing.(*structs.Job).CreateIndex
job.ModifyIndex = index

Expand Down Expand Up @@ -3942,6 +3944,7 @@ func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID strin

copy := job.Copy()
copy.Stable = stable
spew.Dump("COPY STABLEEE", copy.Stable)
return s.upsertJobImpl(index, copy, true, txn)
}

Expand Down Expand Up @@ -4427,6 +4430,10 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,

// Capture the current status so we can check if there is a change
oldStatus := job.Status
// if index == job.CreateIndex {
// oldStatus = ""
// }
firstPass := index == job.CreateIndex
newStatus := forceStatus

// If forceStatus is not set, compute the jobs status.
Expand All @@ -4440,6 +4447,12 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,

// Fast-path if nothing has changed.
if oldStatus == newStatus {
spew.Dump("DOING THIS THING")
if err := s.setJobSummary(txn, job, index, oldStatus, newStatus, firstPass); err != nil {
return err
}
// initialize job summary
// initialize / update job summary
return nil
}

Expand All @@ -4457,64 +4470,72 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn,
}

// Update the children summary
if updated.ParentID != "" {
// Try to update the summary of the parent job summary
summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for parent job: %v", err)
}
if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus, firstPass); err != nil {
return fmt.Errorf("job summary update failed %w", err)
}
return nil
}

// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children == nil {
pSummary.Children = new(structs.JobChildrenSummary)
}
func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, oldStatus, newStatus string, firstPass bool) error {
if updated.ParentID == "" {
return nil
}

// Determine the transition and update the correct fields
children := pSummary.Children
// Try to update the summary of the parent job summary
summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for parent job: %v", err)
}

// Decrement old status
if oldStatus != "" {
switch oldStatus {
case structs.JobStatusPending:
children.Pending--
case structs.JobStatusRunning:
children.Running--
case structs.JobStatusDead:
children.Dead--
default:
return fmt.Errorf("unknown old job status %q", oldStatus)
}
}
// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children == nil {
pSummary.Children = new(structs.JobChildrenSummary)
}

// Determine the transition and update the correct fields
children := pSummary.Children

// Increment new status
switch newStatus {
// Decrement old status
if !firstPass {
switch oldStatus {
case structs.JobStatusPending:
children.Pending++
children.Pending--
case structs.JobStatusRunning:
children.Running++
children.Running--
case structs.JobStatusDead:
children.Dead++
children.Dead--
default:
return fmt.Errorf("unknown new job status %q", newStatus)
return fmt.Errorf("unknown old job status %q", oldStatus)
}
}

// Increment new status
switch newStatus {
case structs.JobStatusPending:
children.Pending++
case structs.JobStatusRunning:
children.Running++
case structs.JobStatusDead:
children.Dead++
default:
return fmt.Errorf("unknown new job status %q", newStatus)
}

// Update the index
pSummary.ModifyIndex = index
// Update the index
pSummary.ModifyIndex = index

// Insert the summary
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
// Insert the summary
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}

return nil
}

Expand Down
28 changes: 8 additions & 20 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6855,16 +6855,10 @@ func TestStateStore_UpdateJobStability(t *testing.T) {

// Check that the job was updated properly
ws := memdb.NewWatchSet()
jout, _ := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
if !jout.Stable {
t.Fatalf("job not marked stable %#v", jout)
}
jout, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
require.NoError(t, err)
require.NotNil(t, jout)
require.True(t, jout.Stable, "job not marked as stable")

// Update the stability to false
err = state.UpdateJobStability(3, job.Namespace, job.ID, 0, false)
Expand All @@ -6873,16 +6867,10 @@ func TestStateStore_UpdateJobStability(t *testing.T) {
}

// Check that the job was updated properly
jout, _ = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
if jout.Stable {
t.Fatalf("job marked stable %#v", jout)
}
jout, err = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0)
require.NoError(t, err)
require.NotNil(t, jout)
require.False(t, jout.Stable)
}

// Test that nonexistent deployment can't be promoted
Expand Down

0 comments on commit 578c02a

Please sign in to comment.