From 3d4c1859eaadb280b340f1a9fc189a1b8277ac5b Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 25 Jul 2016 17:26:10 -0700 Subject: [PATCH] Reconciling the queued allocations during restore --- nomad/fsm.go | 67 ++++++++++++++++++++++++++++++++- nomad/fsm_test.go | 26 +++++++++++++ nomad/state/state_store.go | 35 +++++++++++++---- nomad/state/state_store_test.go | 26 ++++++++++++- 4 files changed, 144 insertions(+), 10 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 71d02886882..e1d98ebfb7e 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -9,6 +9,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/raft" "github.com/ugorji/go/codec" ) @@ -579,11 +580,73 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // Create Job Summaries // COMPAT 0.4 -> 0.4.1 - if err := restore.CreateJobSummaries(); err != nil { + jobs, err := restore.JobsWithoutSummary() + if err != nil { + fmt.Errorf("error retreiving jobs during restore: %v", err) + } + if err := restore.CreateJobSummaries(jobs); err != nil { return fmt.Errorf("error creating job summaries: %v", err) } - // Commit the state restore + restore.Commit() + return n.reconcileSummaries(jobs) +} + +// reconcileSummaries re-calculates the queued allocations for every job that we +// created a Job Summary during the snap shot restore +func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error { + // Start the state restore + restore, err := n.state.Restore() + if err != nil { + return err + } + defer restore.Abort() + snap, err := n.state.Snapshot() + if err != nil { + return fmt.Errorf("unable to create snapshot: %v", err) + } + for _, job := range jobs { + planner := &scheduler.Harness{ + State: &snap.StateStore, + } + // Create an eval and mark it as requiring annotations and insert that as well + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + Type: job.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + JobModifyIndex: job.JobModifyIndex + 1, + Status: structs.EvalStatusPending, + AnnotatePlan: true, + } + + // Create the scheduler and run it + sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner) + if err != nil { + return err + } + + if err := sched.Process(eval); err != nil { + return err + } + summary, err := snap.JobSummaryByID(job.ID) + if err != nil { + return err + } + for tg, queued := range planner.Evals[0].QueuedAllocations { + tgSummary, ok := summary.Summary[tg] + if !ok { + return fmt.Errorf("task group %q not found while updating queued count", tg) + } + tgSummary.Queued = queued + summary.Summary[tg] = tgSummary + } + + if err := restore.JobSummaryRestore(summary); err != nil { + return err + } + } restore.Commit() return nil } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 6acebeb7ac3..e26f83a13a9 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -972,3 +972,29 @@ func TestFSM_SnapshotRestore_JobSummary(t *testing.T) { t.Fatalf("bad: \n%#v\n%#v", js2, out2) } } + +func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { + // Add some state + fsm := testFSM(t) + state := fsm.State() + + job1 := mock.Job() + state.UpsertJob(1000, job1) + state.DeleteJobSummary(1010, job1.ID) + + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out1, _ := state2.JobSummaryByID(job1.ID) + expected := structs.JobSummary{ + JobID: job1.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Queued: 10, + }, + }, + } + + if !reflect.DeepEqual(&expected, out1) { + t.Fatalf("expected: %#v, actual: %#v", expected, out1) + } +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 854f96060a8..81586675b11 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -98,6 +98,20 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma return nil } +// DeleteJobSummary deletes the job summary with the given ID. This is for +// testing purposes only. +func (s *StateStore) DeleteJobSummary(index uint64, id string) error { + txn := s.db.Txn(true) + defer txn.Abort() + + // Delete the job summary + if _, err := txn.DeleteAll("job_summary", "id", id); err != nil { + return fmt.Errorf("deleting job summary failed: %v", err) + } + txn.Commit() + return nil +} + // UpsertNode is used to register a node or update a node definition // This is assumed to be triggered by the client, so we retain the value // of drain which is set by the scheduler. @@ -1501,13 +1515,13 @@ func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error { return nil } -// CreateJobSummaries computes the job summaries for all the jobs -func (r *StateRestore) CreateJobSummaries() error { +// JobsWithoutSummary returns the list of jobs which don't have any summary +func (r *StateRestore) JobsWithoutSummary() ([]*structs.Job, error) { // Get all the jobs var jobs []*structs.Job iter, err := r.txn.Get("jobs", "id") if err != nil { - return fmt.Errorf("couldn't retrieve jobs: %v", err) + return nil, fmt.Errorf("couldn't retrieve jobs: %v", err) } for { raw := iter.Next() @@ -1517,9 +1531,9 @@ func (r *StateRestore) CreateJobSummaries() error { // Filter the jobs which have summaries job := raw.(*structs.Job) - jobSummary, err := r.txn.Get("job_summary", "id", job.ID) + jobSummary, err := r.txn.First("job_summary", "id", job.ID) if err != nil { - return fmt.Errorf("unable to get job summary: %v", err) + return nil, fmt.Errorf("unable to get job summary: %v", err) } if jobSummary != nil { continue @@ -1527,11 +1541,14 @@ func (r *StateRestore) CreateJobSummaries() error { jobs = append(jobs, job) } + return jobs, nil +} +// CreateJobSummaries computes the job summaries for all the jobs +func (r *StateRestore) CreateJobSummaries(jobs []*structs.Job) error { for _, job := range jobs { - // Get all the allocations for the job - iter, err = r.txn.Get("allocs", "job", job.ID) + iter, err := r.txn.Get("allocs", "job", job.ID) if err != nil { return fmt.Errorf("couldn't retrieve allocations for job %v: %v", job.ID, err) } @@ -1549,6 +1566,9 @@ func (r *StateRestore) CreateJobSummaries() error { JobID: job.ID, Summary: make(map[string]structs.TaskGroupSummary), } + for _, tg := range job.TaskGroups { + summary.Summary[tg.Name] = structs.TaskGroupSummary{} + } // Calculate the summary for the job for _, alloc := range allocs { if _, ok := summary.Summary[alloc.TaskGroup]; !ok { @@ -1570,6 +1590,7 @@ func (r *StateRestore) CreateJobSummaries() error { summary.Summary[alloc.TaskGroup] = tg } // Insert the job summary + if err := r.txn.Insert("job_summary", summary); err != nil { return fmt.Errorf("error inserting job summary: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 03639bcfcb5..24721547602 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1132,7 +1132,7 @@ func TestStateStore_CreateJobSummaries(t *testing.T) { } // Create the job summaries - if err := restore.CreateJobSummaries(); err != nil { + if err := restore.CreateJobSummaries([]*structs.Job{job}); err != nil { t.Fatalf("err: %v", err) } restore.Commit() @@ -1155,6 +1155,30 @@ func TestStateStore_CreateJobSummaries(t *testing.T) { } } +func TestStateRestore_JobsWithoutSummaries(t *testing.T) { + state := testStateStore(t) + restore, err := state.Restore() + if err != nil { + t.Fatalf("err: %v", err) + } + // Restore a job + job := mock.Job() + if err := restore.JobRestore(job); err != nil { + t.Fatalf("err: %v", err) + } + + jobs, err := restore.JobsWithoutSummary() + if err != nil { + t.Fatalf("err: %v", err) + } + if len(jobs) != 1 { + t.Fatalf("expected: %v, actual: %v", 1, len(jobs)) + } + if !reflect.DeepEqual(job, jobs[0]) { + t.Fatalf("Bad: %#v %#v", job, jobs[0]) + } +} + func TestStateStore_Indexes(t *testing.T) { state := testStateStore(t) node := mock.Node()