From ec8517361cfc1b66410f94f8c335686224be0fe2 Mon Sep 17 00:00:00 2001 From: Drew Bailey Date: Tue, 12 Jan 2021 10:26:12 -0500 Subject: [PATCH] separate job update summary step --- nomad/state/state_store.go | 117 ++++++++++++++++++++------------ nomad/state/state_store_test.go | 28 +++----- 2 files changed, 80 insertions(+), 65 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 51271cc31ca..8416b0bb811 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4412,6 +4412,7 @@ func (s *StateStore) setJobStatuses(index uint64, txn *txn, if err := s.setJobStatus(index, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { return err } + } return nil @@ -4427,6 +4428,7 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, // Capture the current status so we can check if there is a change oldStatus := job.Status + firstPass := index == job.CreateIndex newStatus := forceStatus // If forceStatus is not set, compute the jobs status. @@ -4440,9 +4442,26 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, // Fast-path if nothing has changed. if oldStatus == newStatus { + updated := job.Copy() + updated.ModifyIndex = index + if err := txn.Insert("jobs", updated); err != nil { + return fmt.Errorf("job insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + if err := s.setJobSummary(txn, job, index, oldStatus, newStatus, firstPass); err != nil { + return err + } + // initialize job summary + // initialize / update job summary return nil } + // TODO (drew) + // not inserting the job again with modify index/status + // prevents job stability test pass + // Copy and update the existing job updated := job.Copy() updated.Status = newStatus @@ -4457,64 +4476,72 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, } // Update the children summary - if updated.ParentID != "" { - // Try to update the summary of the parent job summary - summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID) - if err != nil { - return fmt.Errorf("unable to retrieve summary for parent job: %v", err) - } + if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus, firstPass); err != nil { + return fmt.Errorf("job summary update failed %w", err) + } + return nil +} - // Only continue if the summary exists. It could not exist if the parent - // job was removed - if summaryRaw != nil { - existing := summaryRaw.(*structs.JobSummary) - pSummary := existing.Copy() - if pSummary.Children == nil { - pSummary.Children = new(structs.JobChildrenSummary) - } +func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, oldStatus, newStatus string, firstPass bool) error { + if updated.ParentID == "" { + return nil + } - // Determine the transition and update the correct fields - children := pSummary.Children + // Try to update the summary of the parent job summary + summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID) + if err != nil { + return fmt.Errorf("unable to retrieve summary for parent job: %v", err) + } - // Decrement old status - if oldStatus != "" { - switch oldStatus { - case structs.JobStatusPending: - children.Pending-- - case structs.JobStatusRunning: - children.Running-- - case structs.JobStatusDead: - children.Dead-- - default: - return fmt.Errorf("unknown old job status %q", oldStatus) - } - } + // Only continue if the summary exists. It could not exist if the parent + // job was removed + if summaryRaw != nil { + existing := summaryRaw.(*structs.JobSummary) + pSummary := existing.Copy() + if pSummary.Children == nil { + pSummary.Children = new(structs.JobChildrenSummary) + } + + // Determine the transition and update the correct fields + children := pSummary.Children - // Increment new status - switch newStatus { + // Decrement old status + if !firstPass { + switch oldStatus { case structs.JobStatusPending: - children.Pending++ + children.Pending-- case structs.JobStatusRunning: - children.Running++ + children.Running-- case structs.JobStatusDead: - children.Dead++ + children.Dead-- default: - return fmt.Errorf("unknown new job status %q", newStatus) + return fmt.Errorf("unknown old job status %q", oldStatus) } + } + + // Increment new status + switch newStatus { + case structs.JobStatusPending: + children.Pending++ + case structs.JobStatusRunning: + children.Running++ + case structs.JobStatusDead: + children.Dead++ + default: + return fmt.Errorf("unknown new job status %q", newStatus) + } - // Update the index - pSummary.ModifyIndex = index + // Update the index + pSummary.ModifyIndex = index - // Insert the summary - if err := txn.Insert("job_summary", pSummary); err != nil { - return fmt.Errorf("job summary insert failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) - } + // Insert the summary + if err := txn.Insert("job_summary", pSummary); err != nil { + return fmt.Errorf("job summary insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) } } - return nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 7bff0f1ca71..10f43ac6ad5 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -6855,16 +6855,10 @@ func TestStateStore_UpdateJobStability(t *testing.T) { // Check that the job was updated properly ws := memdb.NewWatchSet() - jout, _ := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) - if err != nil { - t.Fatalf("bad: %v", err) - } - if jout == nil { - t.Fatalf("bad: %#v", jout) - } - if !jout.Stable { - t.Fatalf("job not marked stable %#v", jout) - } + jout, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) + require.NoError(t, err) + require.NotNil(t, jout) + require.True(t, jout.Stable, "job not marked as stable") // Update the stability to false err = state.UpdateJobStability(3, job.Namespace, job.ID, 0, false) @@ -6873,16 +6867,10 @@ func TestStateStore_UpdateJobStability(t *testing.T) { } // Check that the job was updated properly - jout, _ = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) - if err != nil { - t.Fatalf("bad: %v", err) - } - if jout == nil { - t.Fatalf("bad: %#v", jout) - } - if jout.Stable { - t.Fatalf("job marked stable %#v", jout) - } + jout, err = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) + require.NoError(t, err) + require.NotNil(t, jout) + require.False(t, jout.Stable) } // Test that nonexistent deployment can't be promoted