From 4b244219f9384b883fa17f6846347f13ee8abd92 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 12 Apr 2017 15:44:30 -0700 Subject: [PATCH 01/12] Job History schema --- nomad/state/schema.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index f61b1d109e6..535024c7c46 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -20,6 +20,7 @@ func stateStoreSchema() *memdb.DBSchema { nodeTableSchema, jobTableSchema, jobSummarySchema, + jobHistorySchema, periodicLaunchTableSchema, evalTableSchema, allocTableSchema, @@ -141,6 +142,37 @@ func jobSummarySchema() *memdb.TableSchema { } } +// jobHistorySchema returns the memdb schema for the job history table which +// keeps a historical view of jobs. +func jobHistorySchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "job_histories", + Indexes: map[string]*memdb.IndexSchema{ + "id": &memdb.IndexSchema{ + Name: "id", + AllowMissing: false, + Unique: true, + + // Use a compound index so the tuple of (JobID, Version) is + // uniquely identifying + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "JobID", + Lowercase: true, + }, + + // Will need to create a new indexer + &memdb.UintFieldIndex{ + Field: "Version", + }, + }, + }, + }, + }, + } +} + // jobIsGCable satisfies the ConditionalIndexFunc interface and creates an index // on whether a job is eligible for garbage collection. func jobIsGCable(obj interface{}) (bool, error) { From 0c94d4822bce70dfdb03e6870764af41717cdbac Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 13 Apr 2017 13:54:57 -0700 Subject: [PATCH 02/12] Upsert Job Histories --- api/jobs.go | 8 +++ command/agent/job_endpoint.go | 27 +++----- nomad/mock/mock.go | 1 + nomad/state/schema.go | 2 +- nomad/state/state_store.go | 100 +++++++++++++++++++++++++++ nomad/state/state_store_test.go | 119 ++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 13 ++++ 7 files changed, 253 insertions(+), 17 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 90cb116142f..91829aaca48 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -297,6 +297,8 @@ type Job struct { VaultToken *string `mapstructure:"vault_token"` Status *string StatusDescription *string + Stable *bool + Version *uint64 CreateIndex *uint64 ModifyIndex *uint64 JobModifyIndex *uint64 @@ -343,6 +345,12 @@ func (j *Job) Canonicalize() { if j.StatusDescription == nil { j.StatusDescription = helper.StringToPtr("") } + if j.Stable == nil { + j.Stable = helper.BoolToPtr(false) + } + if j.Version == nil { + j.Version = helper.Uint64ToPtr(0) + } if j.CreateIndex == nil { j.CreateIndex = helper.Uint64ToPtr(0) } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 5ad4bde838a..9e4086acf66 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -372,22 +372,17 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { job.Canonicalize() j := &structs.Job{ - Region: *job.Region, - ID: *job.ID, - ParentID: *job.ParentID, - Name: *job.Name, - Type: *job.Type, - Priority: *job.Priority, - AllAtOnce: *job.AllAtOnce, - Datacenters: job.Datacenters, - Payload: job.Payload, - Meta: job.Meta, - VaultToken: *job.VaultToken, - Status: *job.Status, - StatusDescription: *job.StatusDescription, - CreateIndex: *job.CreateIndex, - ModifyIndex: *job.ModifyIndex, - JobModifyIndex: *job.JobModifyIndex, + Region: *job.Region, + ID: *job.ID, + ParentID: *job.ParentID, + Name: *job.Name, + Type: *job.Type, + Priority: *job.Priority, + AllAtOnce: *job.AllAtOnce, + Datacenters: job.Datacenters, + Payload: job.Payload, + Meta: job.Meta, + VaultToken: *job.VaultToken, } j.Constraints = make([]*structs.Constraint, len(job.Constraints)) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 8214c8ca9bc..8191b00f6e4 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -146,6 +146,7 @@ func Job() *structs.Job { "owner": "armon", }, Status: structs.JobStatusPending, + Version: 0, CreateIndex: 42, ModifyIndex: 99, JobModifyIndex: 99, diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 535024c7c46..b0ef44b27e6 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -158,7 +158,7 @@ func jobHistorySchema() *memdb.TableSchema { Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{ - Field: "JobID", + Field: "ID", Lowercase: true, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index a5235d3456e..a0dc8c38dda 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "sort" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" @@ -311,6 +312,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index job.JobModifyIndex = index + job.Version = existing.(*structs.Job).Version + 1 // Compute the job status var err error @@ -322,6 +324,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { job.CreateIndex = index job.ModifyIndex = index job.JobModifyIndex = index + job.Version = 0 if err := s.setJobStatus(index, txn, job, false, ""); err != nil { return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) @@ -341,6 +344,10 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("unable to create job summary: %v", err) } + if err := s.upsertJobHistory(index, job, txn); err != nil { + return fmt.Errorf("unable to upsert job into job_histories table: %v", err) + } + // Create the EphemeralDisk if it's nil by adding up DiskMB from task resources. // COMPAT 0.4.1 -> 0.5 s.addEphemeralDiskToTaskGroups(job) @@ -437,6 +444,55 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return nil } +// upsertJobHistory inserts a job into its historic table and limits the number +// of historic jobs that are tracked. +func (s *StateStore) upsertJobHistory(index uint64, job *structs.Job, txn *memdb.Txn) error { + // Insert the job + if err := txn.Insert("job_histories", job); err != nil { + return fmt.Errorf("failed to insert job into job_histories table: %v", err) + } + + if err := txn.Insert("index", &IndexEntry{"job_histories", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + // Get all the historic jobs for this ID + all, err := s.jobHistoryByID(txn, nil, job.ID) + if err != nil { + return fmt.Errorf("failed to look up job history for %q: %v", job.ID, err) + } + + // If we are below the limit there is no GCing to be done + if len(all) <= structs.JobDefaultHistoricCount { + return nil + } + + // We have to delete a historic job to make room. + // Find index of the highest versioned stable job + stableIdx := -1 + for i, j := range all { + if j.Stable { + stableIdx = i + break + } + } + + // If the stable job is the oldest version, do a swap to bring it into the + // keep set. + max := structs.JobDefaultHistoricCount + if stableIdx == max { + all[max-1], all[max] = all[max], all[max-1] + } + + // Delete the job outside of the set that are being kept. + d := all[max] + if err := txn.Delete("job_histories", d); err != nil { + return fmt.Errorf("failed to delete job %v (%d) from job_histories", d.ID, d.Version) + } + + return nil +} + // JobByID is used to lookup a job by its ID func (s *StateStore) JobByID(ws memdb.WatchSet, id string) (*structs.Job, error) { txn := s.db.Txn(false) @@ -467,6 +523,50 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, id string) (memdb.ResultI return iter, nil } +// JobHistoryByID returns all the tracked versions of a job. +func (s *StateStore) JobHistoryByID(ws memdb.WatchSet, id string) ([]*structs.Job, error) { + txn := s.db.Txn(false) + return s.jobHistoryByID(txn, &ws, id) +} + +// jobHistoryByID is the underlying implementation for retrieving all tracked +// versions of a job and is called under an existing transaction. A watch set +// can optionally be passed in to add the job histories to the watch set. +func (s *StateStore) jobHistoryByID(txn *memdb.Txn, ws *memdb.WatchSet, id string) ([]*structs.Job, error) { + // Get all the historic jobs for this ID + iter, err := txn.Get("job_histories", "id_prefix", id) + if err != nil { + return nil, err + } + + if ws != nil { + ws.Add(iter.WatchCh()) + } + + var all []*structs.Job + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the ID is an exact match + j := raw.(*structs.Job) + if j.ID != id { + continue + } + + all = append(all, j) + } + + // Sort with highest versions first + sort.Slice(all, func(i, j int) bool { + return all[i].Version >= all[j].Version + }) + + return all, nil +} + // Jobs returns an iterator over all the jobs func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f3d96c18b2e..687263346c6 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -420,6 +420,19 @@ func TestStateStore_UpsertJob_Job(t *testing.T) { if watchFired(ws) { t.Fatalf("bad") } + + // Check the job history + allVersions, err := state.JobHistoryByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(allVersions) != 1 { + t.Fatalf("got %d; want 1", len(allVersions)) + } + + if a := allVersions[0]; a.ID != job.ID || a.Version != 0 { + t.Fatalf("bad: %v", a) + } } func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { @@ -439,6 +452,7 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { job2 := mock.Job() job2.ID = job.ID + job2.AllAtOnce = true err = state.UpsertJob(1001, job2) if err != nil { t.Fatalf("err: %v", err) @@ -490,6 +504,22 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { t.Fatalf("nil summary for task group") } + // Check the job history + allVersions, err := state.JobHistoryByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(allVersions) != 2 { + t.Fatalf("got %d; want 1", len(allVersions)) + } + + if a := allVersions[0]; a.ID != job.ID || a.Version != 1 || !a.AllAtOnce { + t.Fatalf("bad: %+v", a) + } + if a := allVersions[1]; a.ID != job.ID || a.Version != 0 || a.AllAtOnce { + t.Fatalf("bad: %+v", a) + } + if watchFired(ws) { t.Fatalf("bad") } @@ -628,6 +658,95 @@ func TestStateStore_UpsertJob_ChildJob(t *testing.T) { } } +func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) { + state := testStateStore(t) + + // Create a job and mark it as stable + job := mock.Job() + job.Stable = true + job.Priority = 0 + + // Create a watchset so we can test that upsert fires the watch + ws := memdb.NewWatchSet() + _, err := state.JobHistoryByID(ws, job.ID) + if err != nil { + t.Fatalf("bad: %v", err) + } + + if err := state.UpsertJob(1000, job); err != nil { + t.Fatalf("err: %v", err) + } + + if !watchFired(ws) { + t.Fatalf("bad") + } + + var finalJob *structs.Job + for i := 1; i < 20; i++ { + finalJob = mock.Job() + finalJob.ID = job.ID + finalJob.Priority = i + err = state.UpsertJob(uint64(1000+i), finalJob) + if err != nil { + t.Fatalf("err: %v", err) + } + } + + ws = memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(finalJob, out) { + t.Fatalf("bad: %#v %#v", finalJob, out) + } + + if out.CreateIndex != 1000 { + t.Fatalf("bad: %#v", out) + } + if out.ModifyIndex != 1019 { + t.Fatalf("bad: %#v", out) + } + if out.Version != 19 { + t.Fatalf("bad: %#v", out) + } + + index, err := state.Index("job_histories") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1019 { + t.Fatalf("bad: %d", index) + } + + // Check the job history + allVersions, err := state.JobHistoryByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(allVersions) != structs.JobDefaultHistoricCount { + t.Fatalf("got %d; want 1", len(allVersions)) + } + + if a := allVersions[0]; a.ID != job.ID || a.Version != 19 || a.Priority != 19 { + t.Fatalf("bad: %+v", a) + } + if a := allVersions[1]; a.ID != job.ID || a.Version != 18 || a.Priority != 18 { + t.Fatalf("bad: %+v", a) + } + + // Ensure we didn't delete the stable job + if a := allVersions[structs.JobDefaultHistoricCount-1]; a.ID != job.ID || + a.Version != 0 || a.Priority != 0 || !a.Stable { + t.Fatalf("bad: %+v", a) + } + + if watchFired(ws) { + t.Fatalf("bad") + } +} + func TestStateStore_DeleteJob_Job(t *testing.T) { state := testStateStore(t) job := mock.Job() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 57376279ca1..43387329acb 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1097,6 +1097,10 @@ const ( // specified job so that it gets priority. This is important // for the system to remain healthy. CoreJobPriority = JobMaxPriority * 2 + + // JobDefaultHistoricCount is the number of historic job versions that are + // kept. + JobDefaultHistoricCount = 6 ) // Job is the scope of a scheduling request to Nomad. It is the largest @@ -1172,6 +1176,15 @@ type Job struct { // StatusDescription is meant to provide more human useful information StatusDescription string + // Stable marks a job as stable. Stability is only defined on "service" and + // "system" jobs. The stability of a job will be set automatically as part + // of a deployment and can be manually set via APIs. + Stable bool + + // Version is a monitonically increasing version number that is incremened + // on each job register. + Version uint64 + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 From 5c618ab56ebda28276bc7d53c0f31d3e56bf0df9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 13 Apr 2017 14:54:22 -0700 Subject: [PATCH 03/12] Histories -> Versions --- nomad/state/schema.go | 10 ++++----- nomad/state/state_store.go | 40 ++++++++++++++++----------------- nomad/state/state_store_test.go | 22 +++++++++--------- nomad/structs/structs.go | 4 ++-- 4 files changed, 38 insertions(+), 38 deletions(-) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index b0ef44b27e6..d31af34b24e 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -20,7 +20,7 @@ func stateStoreSchema() *memdb.DBSchema { nodeTableSchema, jobTableSchema, jobSummarySchema, - jobHistorySchema, + jobVersionsSchema, periodicLaunchTableSchema, evalTableSchema, allocTableSchema, @@ -142,11 +142,11 @@ func jobSummarySchema() *memdb.TableSchema { } } -// jobHistorySchema returns the memdb schema for the job history table which -// keeps a historical view of jobs. -func jobHistorySchema() *memdb.TableSchema { +// jobVersionsSchema returns the memdb schema for the job version table which +// keeps a historical view of job versions. +func jobVersionsSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: "job_histories", + Name: "job_versions", Indexes: map[string]*memdb.IndexSchema{ "id": &memdb.IndexSchema{ Name: "id", diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index a0dc8c38dda..ac0e1f426cc 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -344,8 +344,8 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("unable to create job summary: %v", err) } - if err := s.upsertJobHistory(index, job, txn); err != nil { - return fmt.Errorf("unable to upsert job into job_histories table: %v", err) + if err := s.upsertJobVersion(index, job, txn); err != nil { + return fmt.Errorf("unable to upsert job into job_versions table: %v", err) } // Create the EphemeralDisk if it's nil by adding up DiskMB from task resources. @@ -444,26 +444,26 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return nil } -// upsertJobHistory inserts a job into its historic table and limits the number -// of historic jobs that are tracked. -func (s *StateStore) upsertJobHistory(index uint64, job *structs.Job, txn *memdb.Txn) error { +// upsertJobVersion inserts a job into its historic version table and limits the +// number of job versions that are tracked. +func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error { // Insert the job - if err := txn.Insert("job_histories", job); err != nil { - return fmt.Errorf("failed to insert job into job_histories table: %v", err) + if err := txn.Insert("job_versions", job); err != nil { + return fmt.Errorf("failed to insert job into job_versions table: %v", err) } - if err := txn.Insert("index", &IndexEntry{"job_histories", index}); err != nil { + if err := txn.Insert("index", &IndexEntry{"job_versions", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } // Get all the historic jobs for this ID - all, err := s.jobHistoryByID(txn, nil, job.ID) + all, err := s.jobVersionByID(txn, nil, job.ID) if err != nil { - return fmt.Errorf("failed to look up job history for %q: %v", job.ID, err) + return fmt.Errorf("failed to look up job versions for %q: %v", job.ID, err) } // If we are below the limit there is no GCing to be done - if len(all) <= structs.JobDefaultHistoricCount { + if len(all) <= structs.JobTrackedVersions { return nil } @@ -479,15 +479,15 @@ func (s *StateStore) upsertJobHistory(index uint64, job *structs.Job, txn *memdb // If the stable job is the oldest version, do a swap to bring it into the // keep set. - max := structs.JobDefaultHistoricCount + max := structs.JobTrackedVersions if stableIdx == max { all[max-1], all[max] = all[max], all[max-1] } // Delete the job outside of the set that are being kept. d := all[max] - if err := txn.Delete("job_histories", d); err != nil { - return fmt.Errorf("failed to delete job %v (%d) from job_histories", d.ID, d.Version) + if err := txn.Delete("job_versions", d); err != nil { + return fmt.Errorf("failed to delete job %v (%d) from job_versions", d.ID, d.Version) } return nil @@ -523,18 +523,18 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, id string) (memdb.ResultI return iter, nil } -// JobHistoryByID returns all the tracked versions of a job. -func (s *StateStore) JobHistoryByID(ws memdb.WatchSet, id string) ([]*structs.Job, error) { +// JobVersionsByID returns all the tracked versions of a job. +func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, id string) ([]*structs.Job, error) { txn := s.db.Txn(false) - return s.jobHistoryByID(txn, &ws, id) + return s.jobVersionByID(txn, &ws, id) } -// jobHistoryByID is the underlying implementation for retrieving all tracked +// jobVersionByID is the underlying implementation for retrieving all tracked // versions of a job and is called under an existing transaction. A watch set // can optionally be passed in to add the job histories to the watch set. -func (s *StateStore) jobHistoryByID(txn *memdb.Txn, ws *memdb.WatchSet, id string) ([]*structs.Job, error) { +func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, id string) ([]*structs.Job, error) { // Get all the historic jobs for this ID - iter, err := txn.Get("job_histories", "id_prefix", id) + iter, err := txn.Get("job_versions", "id_prefix", id) if err != nil { return nil, err } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 687263346c6..385957ed16a 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -421,8 +421,8 @@ func TestStateStore_UpsertJob_Job(t *testing.T) { t.Fatalf("bad") } - // Check the job history - allVersions, err := state.JobHistoryByID(ws, job.ID) + // Check the job versions + allVersions, err := state.JobVersionsByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -504,8 +504,8 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) { t.Fatalf("nil summary for task group") } - // Check the job history - allVersions, err := state.JobHistoryByID(ws, job.ID) + // Check the job versions + allVersions, err := state.JobVersionsByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } @@ -658,7 +658,7 @@ func TestStateStore_UpsertJob_ChildJob(t *testing.T) { } } -func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) { +func TestStateStore_UpdateUpsertJob_JobVersion(t *testing.T) { state := testStateStore(t) // Create a job and mark it as stable @@ -668,7 +668,7 @@ func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) { // Create a watchset so we can test that upsert fires the watch ws := memdb.NewWatchSet() - _, err := state.JobHistoryByID(ws, job.ID) + _, err := state.JobVersionsByID(ws, job.ID) if err != nil { t.Fatalf("bad: %v", err) } @@ -712,7 +712,7 @@ func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) { t.Fatalf("bad: %#v", out) } - index, err := state.Index("job_histories") + index, err := state.Index("job_versions") if err != nil { t.Fatalf("err: %v", err) } @@ -720,12 +720,12 @@ func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) { t.Fatalf("bad: %d", index) } - // Check the job history - allVersions, err := state.JobHistoryByID(ws, job.ID) + // Check the job versions + allVersions, err := state.JobVersionsByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } - if len(allVersions) != structs.JobDefaultHistoricCount { + if len(allVersions) != structs.JobTrackedVersions { t.Fatalf("got %d; want 1", len(allVersions)) } @@ -737,7 +737,7 @@ func TestStateStore_UpdateUpsertJob_JobHistory(t *testing.T) { } // Ensure we didn't delete the stable job - if a := allVersions[structs.JobDefaultHistoricCount-1]; a.ID != job.ID || + if a := allVersions[structs.JobTrackedVersions-1]; a.ID != job.ID || a.Version != 0 || a.Priority != 0 || !a.Stable { t.Fatalf("bad: %+v", a) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 43387329acb..578c336c54d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1098,9 +1098,9 @@ const ( // for the system to remain healthy. CoreJobPriority = JobMaxPriority * 2 - // JobDefaultHistoricCount is the number of historic job versions that are + // JobTrackedVersions is the number of historic job versions that are // kept. - JobDefaultHistoricCount = 6 + JobTrackedVersions = 6 ) // Job is the scope of a scheduling request to Nomad. It is the largest From b9a813748509dd654e28c6231799f8f4b5435580 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 13 Apr 2017 15:47:59 -0700 Subject: [PATCH 04/12] GetJobVersions server endpoint --- nomad/job_endpoint.go | 39 +++++ nomad/job_endpoint_test.go | 295 +++++++++++++++++++++++++++---------- nomad/structs/structs.go | 8 +- 3 files changed, 267 insertions(+), 75 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 866c7fd0071..1d51cb5d810 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -475,6 +475,45 @@ func (j *Job) GetJob(args *structs.JobSpecificRequest, return j.srv.blockingRPC(&opts) } +// GetJobVersions is used to retrieve all tracked versions of a job. +func (j *Job) GetJobVersions(args *structs.JobSpecificRequest, + reply *structs.JobVersionsResponse) error { + if done, err := j.srv.forward("Job.GetJobVersions", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "get_job_versions"}, time.Now()) + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + // Look for the job + out, err := state.JobVersionsByID(ws, args.JobID) + if err != nil { + return err + } + + // Setup the output + reply.Versions = out + if len(out) != 0 { + reply.Index = out[0].ModifyIndex + } else { + // Use the last index that affected the nodes table + index, err := state.Index("job_versions") + if err != nil { + return err + } + reply.Index = index + } + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return j.srv.blockingRPC(&opts) +} + // List is used to list the jobs registered in the system func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index fb92b0efa0f..f7795bdd613 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -1134,6 +1134,227 @@ func TestJobEndpoint_GetJob(t *testing.T) { } } +func TestJobEndpoint_GetJob_Blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the jobs + job1 := mock.Job() + job2 := mock.Job() + + // Upsert a job we are not interested in first. + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpsertJob(100, job1); err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Upsert another job later which should trigger the watch. + time.AfterFunc(200*time.Millisecond, func() { + if err := state.UpsertJob(200, job2); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req := &structs.JobSpecificRequest{ + JobID: job2.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + start := time.Now() + var resp structs.SingleJobResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp.Index != 200 { + t.Fatalf("Bad index: %d %d", resp.Index, 200) + } + if resp.Job == nil || resp.Job.ID != job2.ID { + t.Fatalf("bad: %#v", resp.Job) + } + + // Job delete fires watches + time.AfterFunc(100*time.Millisecond, func() { + if err := state.DeleteJob(300, job2.ID); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.QueryOptions.MinQueryIndex = 250 + start = time.Now() + + var resp2 structs.SingleJobResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", req, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } + if resp2.Index != 300 { + t.Fatalf("Bad index: %d %d", resp2.Index, 300) + } + if resp2.Job != nil { + t.Fatalf("bad: %#v", resp2.Job) + } +} + +func TestJobEndpoint_GetJobVersions(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + job.Priority = 88 + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Register the job again to create another version + job.Priority = 100 + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup the job + get := &structs.JobSpecificRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var versionsResp structs.JobVersionsResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.GetJobVersions", get, &versionsResp); err != nil { + t.Fatalf("err: %v", err) + } + if versionsResp.Index != resp.JobModifyIndex { + t.Fatalf("Bad index: %d %d", versionsResp.Index, resp.Index) + } + + // Make sure there are two job versions + versions := versionsResp.Versions + if l := len(versions); l != 2 { + t.Fatalf("Got %d versions; want 2", l) + } + + if v := versions[0]; v.Priority != 100 || v.ID != job.ID || v.Version != 1 { + t.Fatalf("bad: %+v", v) + } + if v := versions[1]; v.Priority != 88 || v.ID != job.ID || v.Version != 0 { + t.Fatalf("bad: %+v", v) + } + + // Lookup non-existing job + get.JobID = "foobarbaz" + if err := msgpackrpc.CallWithCodec(codec, "Job.GetJobVersions", get, &versionsResp); err != nil { + t.Fatalf("err: %v", err) + } + if versionsResp.Index != resp.JobModifyIndex { + t.Fatalf("Bad index: %d %d", versionsResp.Index, resp.Index) + } + if l := len(versionsResp.Versions); l != 0 { + t.Fatalf("unexpected versions: %d", l) + } +} + +func TestJobEndpoint_GetJobVersions_Blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the jobs + job1 := mock.Job() + job2 := mock.Job() + job3 := mock.Job() + job3.ID = job2.ID + job3.Priority = 1 + + // Upsert a job we are not interested in first. + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpsertJob(100, job1); err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Upsert another job later which should trigger the watch. + time.AfterFunc(200*time.Millisecond, func() { + if err := state.UpsertJob(200, job2); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req := &structs.JobSpecificRequest{ + JobID: job2.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + start := time.Now() + var resp structs.JobVersionsResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.GetJobVersions", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp.Index != 200 { + t.Fatalf("Bad index: %d %d", resp.Index, 200) + } + if len(resp.Versions) != 1 || resp.Versions[0].ID != job2.ID { + t.Fatalf("bad: %#v", resp.Versions) + } + + // Upsert the job again which should trigger the watch. + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpsertJob(300, job3); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req2 := &structs.JobSpecificRequest{ + JobID: job3.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 250, + }, + } + var resp2 structs.JobVersionsResponse + start = time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Job.GetJobVersions", req2, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp2.Index != 300 { + t.Fatalf("Bad index: %d %d", resp.Index, 300) + } + if len(resp2.Versions) != 2 { + t.Fatalf("bad: %#v", resp2.Versions) + } +} + func TestJobEndpoint_GetJobSummary(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -1278,80 +1499,6 @@ func TestJobEndpoint_GetJobSummary_Blocking(t *testing.T) { } } -func TestJobEndpoint_GetJob_Blocking(t *testing.T) { - s1 := testServer(t, nil) - defer s1.Shutdown() - state := s1.fsm.State() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) - - // Create the jobs - job1 := mock.Job() - job2 := mock.Job() - - // Upsert a job we are not interested in first. - time.AfterFunc(100*time.Millisecond, func() { - if err := state.UpsertJob(100, job1); err != nil { - t.Fatalf("err: %v", err) - } - }) - - // Upsert another job later which should trigger the watch. - time.AfterFunc(200*time.Millisecond, func() { - if err := state.UpsertJob(200, job2); err != nil { - t.Fatalf("err: %v", err) - } - }) - - req := &structs.JobSpecificRequest{ - JobID: job2.ID, - QueryOptions: structs.QueryOptions{ - Region: "global", - MinQueryIndex: 150, - }, - } - start := time.Now() - var resp structs.SingleJobResponse - if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", req, &resp); err != nil { - t.Fatalf("err: %v", err) - } - - if elapsed := time.Since(start); elapsed < 200*time.Millisecond { - t.Fatalf("should block (returned in %s) %#v", elapsed, resp) - } - if resp.Index != 200 { - t.Fatalf("Bad index: %d %d", resp.Index, 200) - } - if resp.Job == nil || resp.Job.ID != job2.ID { - t.Fatalf("bad: %#v", resp.Job) - } - - // Job delete fires watches - time.AfterFunc(100*time.Millisecond, func() { - if err := state.DeleteJob(300, job2.ID); err != nil { - t.Fatalf("err: %v", err) - } - }) - - req.QueryOptions.MinQueryIndex = 250 - start = time.Now() - - var resp2 structs.SingleJobResponse - if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", req, &resp2); err != nil { - t.Fatalf("err: %v", err) - } - - if elapsed := time.Since(start); elapsed < 100*time.Millisecond { - t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) - } - if resp2.Index != 300 { - t.Fatalf("Bad index: %d %d", resp2.Index, 300) - } - if resp2.Job != nil { - t.Fatalf("bad: %#v", resp2.Job) - } -} - func TestJobEndpoint_ListJobs(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 578c336c54d..5ac6d76326f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -536,7 +536,7 @@ type SingleNodeResponse struct { QueryMeta } -// JobListResponse is used for a list request +// NodeListResponse is used for a list request type NodeListResponse struct { Nodes []*NodeListStub QueryMeta @@ -568,6 +568,12 @@ type JobListResponse struct { QueryMeta } +// JobVersionsResponse is used for a job get versions request +type JobVersionsResponse struct { + Versions []*Job + QueryMeta +} + // JobPlanResponse is used to respond to a job plan request type JobPlanResponse struct { // Annotations stores annotations explaining decisions the scheduler made. From 330800f68382c92720e6d909ee1f28a2d1feda32 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 13 Apr 2017 16:55:21 -0700 Subject: [PATCH 05/12] Agent API + api package --- api/jobs.go | 11 +++++ api/jobs_test.go | 32 +++++++++++++++ command/agent/job_endpoint.go | 25 +++++++++++ command/agent/job_endpoint_test.go | 66 ++++++++++++++++++++++++++++++ 4 files changed, 134 insertions(+) diff --git a/api/jobs.go b/api/jobs.go index 91829aaca48..a30fdd3f809 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -106,6 +106,17 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) { return &resp, qm, nil } +// Versions is used to retrieve all versions of a particular +// job given its unique ID. +func (j *Jobs) Versions(jobID string, q *QueryOptions) ([]*Job, *QueryMeta, error) { + var resp []*Job + qm, err := j.client.query("/v1/job/"+jobID+"/versions", &resp, q) + if err != nil { + return nil, nil, err + } + return resp, qm, nil +} + // Allocations is used to return the allocs for a given job ID. func (j *Jobs) Allocations(jobID string, allAllocs bool, q *QueryOptions) ([]*AllocationListStub, *QueryMeta, error) { var resp []*AllocationListStub diff --git a/api/jobs_test.go b/api/jobs_test.go index e1658d01250..5d81de261eb 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -506,6 +506,38 @@ func TestJobs_Info(t *testing.T) { } } +func TestJobs_Versions(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Trying to retrieve a job by ID before it exists returns an error + _, _, err := jobs.Versions("job1", nil) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Fatalf("expected not found error, got: %#v", err) + } + + // Register the job + job := testJob() + _, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Query the job again and ensure it exists + result, qm, err := jobs.Versions("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + + // Check that the result is what we expect + if len(result) == 0 || *result[0].ID != *job.ID { + t.Fatalf("expect: %#v, got: %#v", job, result) + } +} + func TestJobs_PrefixList(t *testing.T) { c, s := makeClient(t, nil, nil) defer s.Stop() diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 9e4086acf66..ece4f25596b 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -63,6 +63,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ case strings.HasSuffix(path, "/dispatch"): jobName := strings.TrimSuffix(path, "/dispatch") return s.jobDispatchRequest(resp, req, jobName) + case strings.HasSuffix(path, "/versions"): + jobName := strings.TrimSuffix(path, "/versions") + return s.jobVersions(resp, req, jobName) default: return s.jobCRUD(resp, req, path) } @@ -322,6 +325,28 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, return out, nil } +func (s *HTTPServer) jobVersions(resp http.ResponseWriter, req *http.Request, + jobName string) (interface{}, error) { + args := structs.JobSpecificRequest{ + JobID: jobName, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.JobVersionsResponse + if err := s.agent.RPC("Job.GetJobVersions", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if len(out.Versions) == 0 { + return nil, CodedError(404, "job versions not found") + } + + return out.Versions, nil +} + func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) { args := structs.JobSummaryRequest{ JobID: name, diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index b2bcf225412..11715a39848 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -560,6 +560,72 @@ func TestHTTP_JobAllocations(t *testing.T) { }) } +func TestHTTP_JobVersions(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Create the job + job := mock.Job() + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.JobRegisterResponse + if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + job2 := mock.Job() + job2.ID = job.ID + job2.Priority = 100 + + args2 := structs.JobRegisterRequest{ + Job: job2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp2 structs.JobRegisterResponse + if err := s.Agent.RPC("Job.Register", &args2, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/job/"+job.ID+"/versions", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.JobSpecificRequest(respW, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check the response + versions := obj.([]*structs.Job) + if len(versions) != 2 { + t.Fatalf("got %d versions; want 2", len(versions)) + } + + if v := versions[0]; v.Version != 1 || v.Priority != 100 { + t.Fatalf("bad %v", v) + } + + if v := versions[1]; v.Version != 0 { + t.Fatalf("bad %v", v) + } + + // Check for the index + if respW.HeaderMap.Get("X-Nomad-Index") == "" { + t.Fatalf("missing index") + } + if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" { + t.Fatalf("missing known leader") + } + if respW.HeaderMap.Get("X-Nomad-LastContact") == "" { + t.Fatalf("missing last contact") + } + }) +} + func TestHTTP_PeriodicForce(t *testing.T) { httpTest(t, nil, func(s *TestServer) { // Create and register a periodic job. From 950171f094d7bb45c8313f45c50099b91f36d36f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 14 Apr 2017 20:54:30 -0700 Subject: [PATCH 06/12] non-purge deregisters --- api/jobs.go | 13 +++- api/jobs_test.go | 27 ++++++- command/agent/job_endpoint.go | 14 ++++ command/agent/job_endpoint_test.go | 60 ++++++++++++--- command/status.go | 11 ++- command/stop.go | 9 ++- nomad/fsm.go | 41 ++++++++--- nomad/fsm_test.go | 62 +++++++++++++++- nomad/state/state_store.go | 36 +++++++++ nomad/state/state_store_test.go | 24 ++++++ nomad/structs/structs.go | 14 ++++ scheduler/generic_sched.go | 16 ++-- scheduler/generic_sched_test.go | 68 ++++++++++++++++- scheduler/system_sched.go | 16 ++-- scheduler/system_sched_test.go | 73 ++++++++++++++++++- scheduler/util.go | 2 +- website/source/docs/commands/stop.html.md.erb | 5 ++ 17 files changed, 446 insertions(+), 45 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index a30fdd3f809..e2e30139600 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -149,10 +149,12 @@ func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *Query return resp, qm, nil } -// Deregister is used to remove an existing job. -func (j *Jobs) Deregister(jobID string, q *WriteOptions) (string, *WriteMeta, error) { +// Deregister is used to remove an existing job. If purge is set to true, the job +// is deregistered and purged from the system versus still being queryable and +// eventually GC'ed from the system. Most callers should not specify purge. +func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) { var resp deregisterJobResponse - wm, err := j.client.delete("/v1/job/"+jobID, &resp, q) + wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", jobID, purge), &resp, q) if err != nil { return "", nil, err } @@ -290,6 +292,7 @@ type ParameterizedJobConfig struct { // Job is used to serialize a job. type Job struct { + Stop *bool Region *string ID *string ParentID *string @@ -338,6 +341,9 @@ func (j *Job) Canonicalize() { if j.Priority == nil { j.Priority = helper.IntToPtr(50) } + if j.Stop == nil { + j.Stop = helper.BoolToPtr(false) + } if j.Region == nil { j.Region = helper.StringToPtr("global") } @@ -425,6 +431,7 @@ type JobListStub struct { Name string Type string Priority int + Stop bool Status string StatusDescription string JobSummary *JobSummary diff --git a/api/jobs_test.go b/api/jobs_test.go index 5d81de261eb..6a4d63ea66c 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -690,13 +690,12 @@ func TestJobs_Deregister(t *testing.T) { assertWriteMeta(t, wm) // Attempting delete on non-existing job returns an error - if _, _, err = jobs.Deregister("nope", nil); err != nil { + if _, _, err = jobs.Deregister("nope", false, nil); err != nil { t.Fatalf("unexpected error deregistering job: %v", err) - } - // Deleting an existing job works - evalID, wm3, err := jobs.Deregister("job1", nil) + // Do a soft deregister of an existing job + evalID, wm3, err := jobs.Deregister("job1", false, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -705,6 +704,26 @@ func TestJobs_Deregister(t *testing.T) { t.Fatalf("missing eval ID") } + // Check that the job is still queryable + out, qm1, err := jobs.Info("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm1) + if out == nil { + t.Fatalf("missing job") + } + + // Do a purge deregister of an existing job + evalID, wm4, err := jobs.Deregister("job1", true, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm4) + if evalID == "" { + t.Fatalf("missing eval ID") + } + // Check that the job is really gone result, qm, err := jobs.List(nil) if err != nil { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index ece4f25596b..f58a88b38f2 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1,6 +1,7 @@ package agent import ( + "fmt" "net/http" "strconv" "strings" @@ -312,8 +313,20 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, jobName string) (interface{}, error) { + + purgeStr := req.URL.Query().Get("purge") + var purgeBool bool + if purgeStr != "" { + var err error + purgeBool, err = strconv.ParseBool(purgeStr) + if err != nil { + return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "purge", purgeStr, err) + } + } + args := structs.JobDeregisterRequest{ JobID: jobName, + Purge: purgeBool, } s.parseRegion(req, &args.Region) @@ -397,6 +410,7 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { job.Canonicalize() j := &structs.Job{ + Stop: *job.Stop, Region: *job.Region, ID: *job.ID, ParentID: *job.ParentID, diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 11715a39848..49abdf8de40 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -383,7 +383,7 @@ func TestHTTP_JobDelete(t *testing.T) { t.Fatalf("err: %v", err) } - // Make the HTTP request + // Make the HTTP request to do a soft delete req, err := http.NewRequest("DELETE", "/v1/job/"+job.ID, nil) if err != nil { t.Fatalf("err: %v", err) @@ -407,16 +407,56 @@ func TestHTTP_JobDelete(t *testing.T) { t.Fatalf("missing index") } + // Check the job is still queryable + getReq1 := structs.JobSpecificRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var getResp1 structs.SingleJobResponse + if err := s.Agent.RPC("Job.GetJob", &getReq1, &getResp1); err != nil { + t.Fatalf("err: %v", err) + } + if getResp1.Job == nil { + t.Fatalf("job doesn't exists") + } + if !getResp1.Job.Stop { + t.Fatalf("job should be marked as stop") + } + + // Make the HTTP request to do a purge delete + req2, err := http.NewRequest("DELETE", "/v1/job/"+job.ID+"?purge=true", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW.Flush() + + // Make the request + obj, err = s.Server.JobSpecificRequest(respW, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check the response + dereg = obj.(structs.JobDeregisterResponse) + if dereg.EvalID == "" { + t.Fatalf("bad: %v", dereg) + } + + // Check for the index + if respW.HeaderMap.Get("X-Nomad-Index") == "" { + t.Fatalf("missing index") + } + // Check the job is gone - getReq := structs.JobSpecificRequest{ + getReq2 := structs.JobSpecificRequest{ JobID: job.ID, QueryOptions: structs.QueryOptions{Region: "global"}, } - var getResp structs.SingleJobResponse - if err := s.Agent.RPC("Job.GetJob", &getReq, &getResp); err != nil { + var getResp2 structs.SingleJobResponse + if err := s.Agent.RPC("Job.GetJob", &getReq2, &getResp2); err != nil { t.Fatalf("err: %v", err) } - if getResp.Job != nil { + if getResp2.Job != nil { t.Fatalf("job still exists") } }) @@ -751,6 +791,7 @@ func TestHTTP_JobDispatch(t *testing.T) { func TestJobs_ApiJobToStructsJob(t *testing.T) { apiJob := &api.Job{ + Stop: helper.BoolToPtr(true), Region: helper.StringToPtr("global"), ID: helper.StringToPtr("foo"), ParentID: helper.StringToPtr("lol"), @@ -922,12 +963,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { VaultToken: helper.StringToPtr("token"), Status: helper.StringToPtr("status"), StatusDescription: helper.StringToPtr("status_desc"), + Version: helper.Uint64ToPtr(10), CreateIndex: helper.Uint64ToPtr(1), ModifyIndex: helper.Uint64ToPtr(3), JobModifyIndex: helper.Uint64ToPtr(5), } expected := &structs.Job{ + Stop: true, Region: "global", ID: "foo", ParentID: "lol", @@ -1094,12 +1137,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { }, }, - VaultToken: "token", - Status: "status", - StatusDescription: "status_desc", - CreateIndex: 1, - ModifyIndex: 3, - JobModifyIndex: 5, + VaultToken: "token", } structsJob := ApiJobToStructJob(apiJob) diff --git a/command/status.go b/command/status.go index bf56808138e..7b3e4297604 100644 --- a/command/status.go +++ b/command/status.go @@ -141,7 +141,7 @@ func (c *StatusCommand) Run(args []string) int { fmt.Sprintf("Type|%s", *job.Type), fmt.Sprintf("Priority|%d", *job.Priority), fmt.Sprintf("Datacenters|%s", strings.Join(job.Datacenters, ",")), - fmt.Sprintf("Status|%s", *job.Status), + fmt.Sprintf("Status|%s", getStatusString(*job.Status, *job.Stop)), fmt.Sprintf("Periodic|%v", periodic), fmt.Sprintf("Parameterized|%v", parameterized), } @@ -448,7 +448,14 @@ func createStatusListOutput(jobs []*api.JobListStub) string { job.ID, job.Type, job.Priority, - job.Status) + getStatusString(job.Status, job.Stop)) } return formatList(out) } + +func getStatusString(status string, stop bool) string { + if stop { + return fmt.Sprintf("%s (stopped)", status) + } + return status +} diff --git a/command/stop.go b/command/stop.go index c60957d8d17..7fa25c99a8c 100644 --- a/command/stop.go +++ b/command/stop.go @@ -31,6 +31,10 @@ Stop Options: screen, which can be used to examine the evaluation using the eval-status command. + -purge + Purge is used to stop the job and purge it from the system. If not set, the + job will still be queryable and will be purged by the garbage collector. + -yes Automatic yes to prompts. @@ -45,13 +49,14 @@ func (c *StopCommand) Synopsis() string { } func (c *StopCommand) Run(args []string) int { - var detach, verbose, autoYes bool + var detach, purge, verbose, autoYes bool flags := c.Meta.FlagSet("stop", FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&autoYes, "yes", false, "") + flags.BoolVar(&purge, "purge", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -132,7 +137,7 @@ func (c *StopCommand) Run(args []string) int { } // Invoke the stop - evalID, _, err := client.Jobs().Deregister(*job.ID, nil) + evalID, _, err := client.Jobs().Deregister(*job.ID, purge, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err)) return 1 diff --git a/nomad/fsm.go b/nomad/fsm.go index cb8e1f701d7..0ece2bb99b7 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -330,20 +330,43 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.DeleteJob(index, req.JobID); err != nil { - n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err) - return err - } - + // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(req.JobID); err != nil { n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err) return err } - // We always delete from the periodic launch table because it is possible that - // the job was updated to be non-perioidic, thus checking if it is periodic - // doesn't ensure we clean it up properly. - n.state.DeletePeriodicLaunch(index, req.JobID) + if req.Purge { + if err := n.state.DeleteJob(index, req.JobID); err != nil { + n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err) + return err + } + + // We always delete from the periodic launch table because it is possible that + // the job was updated to be non-perioidic, thus checking if it is periodic + // doesn't ensure we clean it up properly. + n.state.DeletePeriodicLaunch(index, req.JobID) + } else { + // Get the current job and mark it as stopped and re-insert it. + ws := memdb.NewWatchSet() + current, err := n.state.JobByID(ws, req.JobID) + if err != nil { + n.logger.Printf("[ERR] nomad.fsm: JobByID lookup failed: %v", err) + return err + } + + if current == nil { + return fmt.Errorf("job %q doesn't exist to be deregistered", req.JobID) + } + + stopped := current.Copy() + stopped.Stop = true + + if err := n.state.UpsertJob(index, stopped); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpsertJob failed: %v", err) + return err + } + } return nil } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 80e48faee0e..99e8ecb430f 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -314,7 +314,7 @@ func TestFSM_RegisterJob(t *testing.T) { } } -func TestFSM_DeregisterJob(t *testing.T) { +func TestFSM_DeregisterJob_Purge(t *testing.T) { fsm := testFSM(t) job := mock.PeriodicJob() @@ -333,6 +333,7 @@ func TestFSM_DeregisterJob(t *testing.T) { req2 := structs.JobDeregisterRequest{ JobID: job.ID, + Purge: true, } buf, err = structs.Encode(structs.JobDeregisterRequestType, req2) if err != nil { @@ -369,6 +370,65 @@ func TestFSM_DeregisterJob(t *testing.T) { } } +func TestFSM_DeregisterJob_NoPurge(t *testing.T) { + fsm := testFSM(t) + + job := mock.PeriodicJob() + req := structs.JobRegisterRequest{ + Job: job, + } + buf, err := structs.Encode(structs.JobRegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + req2 := structs.JobDeregisterRequest{ + JobID: job.ID, + Purge: false, + } + buf, err = structs.Encode(structs.JobDeregisterRequestType, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are NOT registered + ws := memdb.NewWatchSet() + jobOut, err := fsm.State().JobByID(ws, req.Job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if jobOut == nil { + t.Fatalf("job not found!") + } + if !jobOut.Stop { + t.Fatalf("job not stopped found!") + } + + // Verify it was removed from the periodic runner. + if _, ok := fsm.periodicDispatcher.tracked[job.ID]; ok { + t.Fatal("job not removed from periodic runner") + } + + // Verify it was removed from the periodic launch table. + launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if launchOut == nil { + t.Fatalf("launch not found!") + } +} + func TestFSM_UpdateEval(t *testing.T) { fsm := testFSM(t) fsm.evalBroker.SetEnabled(true) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index ac0e1f426cc..b6770d15415 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -432,6 +432,11 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return fmt.Errorf("index update failed: %v", err) } + // Delete the job versions + if err := s.deleteJobVersions(index, job, txn); err != nil { + return err + } + // Delete the job summary if _, err = txn.DeleteAll("job_summary", "id", jobID); err != nil { return fmt.Errorf("deleing job summary failed: %v", err) @@ -444,6 +449,37 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return nil } +// deleteJobVersions deletes all versions of the given job. +func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error { + iter, err := txn.Get("job_versions", "id_prefix", job.ID) + if err != nil { + return err + } + + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the ID is an exact match + j := raw.(*structs.Job) + if j.ID != job.ID { + continue + } + + if _, err = txn.DeleteAll("job_versions", "id", job.ID, job.Version); err != nil { + return fmt.Errorf("deleing job versions failed: %v", err) + } + } + + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + return nil +} + // upsertJobVersion inserts a job into its historic version table and limits the // number of job versions that are tracked. func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 385957ed16a..f92f336e399 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -797,6 +797,30 @@ func TestStateStore_DeleteJob_Job(t *testing.T) { t.Fatalf("expected summary to be nil, but got: %v", summary) } + index, err = state.Index("job_summary") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1001 { + t.Fatalf("bad: %d", index) + } + + versions, err := state.JobVersionsByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(versions) != 0 { + t.Fatalf("expected no job versions") + } + + index, err = state.Index("job_summary") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1001 { + t.Fatalf("bad: %d", index) + } + if watchFired(ws) { t.Fatalf("bad") } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5ac6d76326f..98c559a47b8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -239,6 +239,12 @@ type JobRegisterRequest struct { // to deregister a job as being a schedulable entity. type JobDeregisterRequest struct { JobID string + + // Purge controls whether the deregister purges the job from the system or + // whether the job is just marked as stopped and will be removed by the + // garbage collector + Purge bool + WriteRequest } @@ -1114,6 +1120,12 @@ const ( // is further composed of tasks. A task group (TG) is the unit of scheduling // however. type Job struct { + // Stop marks whether the user has stopped the job. A stopped job will + // have all created allocations stopped and acts as a way to stop a job + // without purging it from the system. This allows existing allocs to be + // queried and the job to be inspected as it is being killed. + Stop bool + // Region is the Nomad region that handles scheduling this job Region string @@ -1384,6 +1396,7 @@ func (j *Job) Stub(summary *JobSummary) *JobListStub { Name: j.Name, Type: j.Type, Priority: j.Priority, + Stop: j.Stop, Status: j.Status, StatusDescription: j.StatusDescription, CreateIndex: j.CreateIndex, @@ -1483,6 +1496,7 @@ type JobListStub struct { Name string Type string Priority int + Stop bool Status string StatusDescription string JobSummary *JobSummary diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 5653d953710..d2df0344e1a 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -179,6 +179,12 @@ func (s *GenericScheduler) createBlockedEval(planFailure bool) error { return s.planner.CreateEval(s.blocked) } +// isStoppedJob returns if the scheduling is for a stopped job and the scheduler +// should stop all its allocations. +func (s *GenericScheduler) isStoppedJob() bool { + return s.job == nil || s.job.Stop +} + // process is wrapped in retryMax to iteratively run the handler until we have no // further work or we've made the maximum number of attempts. func (s *GenericScheduler) process() (bool, error) { @@ -191,7 +197,7 @@ func (s *GenericScheduler) process() (bool, error) { s.eval.JobID, err) } numTaskGroups := 0 - if s.job != nil { + if !s.isStoppedJob() { numTaskGroups = len(s.job.TaskGroups) } s.queuedAllocs = make(map[string]int, numTaskGroups) @@ -207,7 +213,7 @@ func (s *GenericScheduler) process() (bool, error) { // Construct the placement stack s.stack = NewGenericStack(s.batch, s.ctx) - if s.job != nil { + if !s.isStoppedJob() { s.stack.SetJob(s.job) } @@ -351,7 +357,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([ func (s *GenericScheduler) computeJobAllocs() error { // Materialize all the task groups, job could be missing if deregistered var groups map[string]*structs.TaskGroup - if s.job != nil { + if !s.isStoppedJob() { groups = materializeTaskGroups(s.job) } @@ -398,7 +404,7 @@ func (s *GenericScheduler) computeJobAllocs() error { // Check if a rolling upgrade strategy is being used limit := len(diff.update) + len(diff.migrate) + len(diff.lost) - if s.job != nil && s.job.Update.Rolling() { + if !s.isStoppedJob() && s.job.Update.Rolling() { limit = s.job.Update.MaxParallel } @@ -414,7 +420,7 @@ func (s *GenericScheduler) computeJobAllocs() error { // Nothing remaining to do if placement is not required if len(diff.place) == 0 { - if s.job != nil { + if !s.isStoppedJob() { for _, tg := range s.job.TaskGroups { s.queuedAllocs[tg.Name] = 0 } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 832e17627ac..afb0e48d4cb 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1671,7 +1671,7 @@ func TestServiceSched_JobModify_DistinctProperty(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestServiceSched_JobDeregister(t *testing.T) { +func TestServiceSched_JobDeregister_Purged(t *testing.T) { h := NewHarness(t) // Generate a fake job with allocations @@ -1735,6 +1735,72 @@ func TestServiceSched_JobDeregister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobDeregister_Stopped(t *testing.T) { + h := NewHarness(t) + + // Generate a fake job with allocations + job := mock.Job() + job.Stop = true + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + allocs = append(allocs, alloc) + } + for _, alloc := range allocs { + h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID)) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted all nodes + if len(plan.NodeUpdate["12345678-abcd-efab-cdef-123456789abc"]) != len(allocs) { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure that the job field on the allocation is still populated + for _, alloc := range out { + if alloc.Job == nil { + t.Fatalf("bad: %#v", alloc) + } + } + + // Ensure no remaining allocations + out, _ = structs.FilterTerminalAllocs(out) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_NodeDown(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 755153d9c92..b95e5b5fec3 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -83,6 +83,12 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { s.queuedAllocs) } +// isStoppedJob returns if the scheduling is for a stopped job and the scheduler +// should stop all its allocations. +func (s *SystemScheduler) isStoppedJob() bool { + return s.job == nil || s.job.Stop +} + // process is wrapped in retryMax to iteratively run the handler until we have no // further work or we've made the maximum number of attempts. func (s *SystemScheduler) process() (bool, error) { @@ -95,13 +101,13 @@ func (s *SystemScheduler) process() (bool, error) { s.eval.JobID, err) } numTaskGroups := 0 - if s.job != nil { + if !s.isStoppedJob() { numTaskGroups = len(s.job.TaskGroups) } s.queuedAllocs = make(map[string]int, numTaskGroups) // Get the ready nodes in the required datacenters - if s.job != nil { + if !s.isStoppedJob() { s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) @@ -119,7 +125,7 @@ func (s *SystemScheduler) process() (bool, error) { // Construct the placement stack s.stack = NewSystemStack(s.ctx) - if s.job != nil { + if !s.isStoppedJob() { s.stack.SetJob(s.job) } @@ -228,7 +234,7 @@ func (s *SystemScheduler) computeJobAllocs() error { // Check if a rolling upgrade strategy is being used limit := len(diff.update) - if s.job != nil && s.job.Update.Rolling() { + if !s.isStoppedJob() && s.job.Update.Rolling() { limit = s.job.Update.MaxParallel } @@ -237,7 +243,7 @@ func (s *SystemScheduler) computeJobAllocs() error { // Nothing remaining to do if placement is not required if len(diff.place) == 0 { - if s.job != nil { + if !s.isStoppedJob() { for _, tg := range s.job.TaskGroups { s.queuedAllocs[tg.Name] = 0 } diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 313d573d393..36713a2f660 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -773,7 +773,7 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { } } -func TestSystemSched_JobDeregister(t *testing.T) { +func TestSystemSched_JobDeregister_Purged(t *testing.T) { h := NewHarness(t) // Create some nodes @@ -842,6 +842,77 @@ func TestSystemSched_JobDeregister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestSystemSched_JobDeregister_Stopped(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + job.Stop = true + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + for _, alloc := range allocs { + noErr(t, h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID))) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted the job from all nodes. + for _, node := range nodes { + if len(plan.NodeUpdate[node.ID]) != 1 { + t.Fatalf("bad: %#v", plan) + } + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.ID, false) + noErr(t, err) + + // Ensure no remaining allocations + out, _ = structs.FilterTerminalAllocs(out) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestSystemSched_NodeDown(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/util.go b/scheduler/util.go index 08db94f1514..01f10c8dcd7 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -21,7 +21,7 @@ type allocTuple struct { // a job requires. This is used to do the count expansion. func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { out := make(map[string]*structs.TaskGroup) - if job == nil { + if job == nil || job.Stop { return out } diff --git a/website/source/docs/commands/stop.html.md.erb b/website/source/docs/commands/stop.html.md.erb index f43261012ee..9040ae1db3d 100644 --- a/website/source/docs/commands/stop.html.md.erb +++ b/website/source/docs/commands/stop.html.md.erb @@ -41,6 +41,11 @@ the request. It is safe to exit the monitor early using ctrl+c. * `-yes`: Automatic yes to prompts. +* `-purge`: Purge is used to stop the job and purge it from the system. If not +set, the job will still be queryable and will be purged by the garbage +collector. + + ## Examples Stop the job with ID "job1": From 5e1e7afc62e5be3a9de6117d76de362f1bcd22a0 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 15 Apr 2017 16:47:19 -0700 Subject: [PATCH 07/12] GC and some fixes --- command/status.go | 18 +-- nomad/core_sched.go | 18 ++- nomad/core_sched_test.go | 204 ++++++++++++++++++++++++++++++-- nomad/job_endpoint.go | 4 + nomad/job_endpoint_test.go | 80 ++++++++++++- nomad/state/schema.go | 28 ++++- nomad/state/state_store.go | 5 + nomad/state/state_store_test.go | 53 +++++++++ nomad/system_endpoint_test.go | 12 +- 9 files changed, 396 insertions(+), 26 deletions(-) diff --git a/command/status.go b/command/status.go index 7b3e4297604..e4a5292308a 100644 --- a/command/status.go +++ b/command/status.go @@ -147,13 +147,17 @@ func (c *StatusCommand) Run(args []string) int { } if periodic && !parameterized { - location, err := job.Periodic.GetLocation() - if err == nil { - now := time.Now().In(location) - next := job.Periodic.Next(now) - basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s", - fmt.Sprintf("%s (%s from now)", - formatTime(next), formatTimeDifference(now, next, time.Second)))) + if *job.Stop { + basic = append(basic, fmt.Sprintf("Next Periodic Launch|none (job stopped)")) + } else { + location, err := job.Periodic.GetLocation() + if err == nil { + now := time.Now().In(location) + next := job.Periodic.Next(now) + basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s", + fmt.Sprintf("%s (%s from now)", + formatTime(next), formatTimeDifference(now, next, time.Second)))) + } } } diff --git a/nomad/core_sched.go b/nomad/core_sched.go index a191effddbf..83f8bd10e1e 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -149,6 +149,7 @@ OUTER: for _, job := range gcJob { req := structs.JobDeregisterRequest{ JobID: job, + Purge: true, WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, }, @@ -243,9 +244,24 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, return false, nil, err } + // Can collect if: + // Job doesn't exist + // Job is Stopped and dead + // allowBatch and the job is dead + collect := false + if job == nil { + collect = true + } else if job.Status != structs.JobStatusDead { + collect = false + } else if job.Stop { + collect = true + } else if allowBatch { + collect = true + } + // We don't want to gc anything related to a job which is not dead // If the batch job doesn't exist we can GC it regardless of allowBatch - if job != nil && (!allowBatch || job.Status != structs.JobStatusDead) { + if !collect { return false, nil, nil } } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index ce1e39cd352..a10a4c16d3a 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -1006,6 +1006,103 @@ func TestCoreScheduler_JobGC_OneShot(t *testing.T) { } } +// This test ensures that stopped jobs are GCd +func TestCoreScheduler_JobGC_Stopped(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 + s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) + + // Insert job. + state := s1.fsm.State() + job := mock.Job() + //job.Status = structs.JobStatusDead + job.Stop = true + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert two complete evals + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusComplete + + eval2 := mock.Eval() + eval2.JobID = job.ID + eval2.Status = structs.EvalStatusComplete + + err = state.UpsertEvals(1001, []*structs.Evaluation{eval, eval2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert one complete alloc + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.EvalID = eval.ID + alloc.DesiredStatus = structs.AllocDesiredStatusStop + + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobJobGC, 2000) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Shouldn't still exist + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %v", out) + } + + outE, err := state.EvalByID(ws, eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outE != nil { + t.Fatalf("bad: %v", outE) + } + + outE2, err := state.EvalByID(ws, eval2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outE2 != nil { + t.Fatalf("bad: %v", outE2) + } + + outA, err := state.AllocByID(ws, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA != nil { + t.Fatalf("bad: %v", outA) + } +} + func TestCoreScheduler_JobGC_Force(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() @@ -1066,8 +1163,8 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) { } } -// This test ensures parameterized and periodic jobs don't get GCd -func TestCoreScheduler_JobGC_NonGCable(t *testing.T) { +// This test ensures parameterized jobs only get gc'd when stopped +func TestCoreScheduler_JobGC_Parameterized(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) @@ -1088,9 +1185,77 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) { t.Fatalf("err: %v", err) } - // Insert a periodic job. - job2 := mock.PeriodicJob() - if err := state.UpsertJob(1001, job2); err != nil { + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobForceGC, 1002) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should still exist + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("bad: %v", out) + } + + // Mark the job as stopped and try again + job2 := job.Copy() + job2.Stop = true + err = state.UpsertJob(2000, job2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a core scheduler + snap, err = state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core = NewCoreScheduler(s1, snap) + + // Attempt the GC + gc = s1.coreJobEval(structs.CoreJobForceGC, 2002) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should not exist + out, err = state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %+v", out) + } +} + +// This test ensures periodic jobs don't get GCd til they are stopped +func TestCoreScheduler_JobGC_Periodic(t *testing.T) { + + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 + s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) + + // Insert a parameterized job. + state := s1.fsm.State() + job := mock.PeriodicJob() + err := state.UpsertJob(1000, job) + if err != nil { t.Fatalf("err: %v", err) } @@ -1118,12 +1283,35 @@ func TestCoreScheduler_JobGC_NonGCable(t *testing.T) { t.Fatalf("bad: %v", out) } - outE, err := state.JobByID(ws, job2.ID) + // Mark the job as stopped and try again + job2 := job.Copy() + job2.Stop = true + err = state.UpsertJob(2000, job2) if err != nil { t.Fatalf("err: %v", err) } - if outE == nil { - t.Fatalf("bad: %v", outE) + + // Create a core scheduler + snap, err = state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core = NewCoreScheduler(s1, snap) + + // Attempt the GC + gc = s1.coreJobEval(structs.CoreJobForceGC, 2002) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should not exist + out, err = state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %+v", out) } } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 1d51cb5d810..af99e1f4b74 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -846,6 +846,10 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa return fmt.Errorf("Specified job %q is not a parameterized job", args.JobID) } + if parameterizedJob.Stop { + return fmt.Errorf("Specified job %q is stopped", args.JobID) + } + // Validate the arguments if err := validateDispatchRequest(args, parameterizedJob); err != nil { return err diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index f7795bdd613..ebc2d5fe410 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -854,9 +854,10 @@ func TestJobEndpoint_Deregister(t *testing.T) { t.Fatalf("err: %v", err) } - // Deregister + // Deregister but don't purge dereg := &structs.JobDeregisterRequest{ JobID: job.ID, + Purge: false, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.JobDeregisterResponse @@ -867,15 +868,18 @@ func TestJobEndpoint_Deregister(t *testing.T) { t.Fatalf("bad index: %d", resp2.Index) } - // Check for the node in the FSM + // Check for the job in the FSM ws := memdb.NewWatchSet() state := s1.fsm.State() out, err := state.JobByID(ws, job.ID) if err != nil { t.Fatalf("err: %v", err) } - if out != nil { - t.Fatalf("unexpected job") + if out == nil { + t.Fatalf("job purged") + } + if !out.Stop { + t.Fatalf("job not stopped") } // Lookup the evaluation @@ -908,6 +912,60 @@ func TestJobEndpoint_Deregister(t *testing.T) { if eval.Status != structs.EvalStatusPending { t.Fatalf("bad: %#v", eval) } + + // Deregister and purge + dereg2 := &structs.JobDeregisterRequest{ + JobID: job.ID, + Purge: true, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp3 structs.JobDeregisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp3); err != nil { + t.Fatalf("err: %v", err) + } + if resp3.Index == 0 { + t.Fatalf("bad index: %d", resp3.Index) + } + + // Check for the job in the FSM + out, err = state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("unexpected job") + } + + // Lookup the evaluation + eval, err = state.EvalByID(ws, resp3.EvalID) + if err != nil { + t.Fatalf("err: %v", err) + } + if eval == nil { + t.Fatalf("expected eval") + } + if eval.CreateIndex != resp3.EvalCreateIndex { + t.Fatalf("index mis-match") + } + + if eval.Priority != structs.JobDefaultPriority { + t.Fatalf("bad: %#v", eval) + } + if eval.Type != structs.JobTypeService { + t.Fatalf("bad: %#v", eval) + } + if eval.TriggeredBy != structs.EvalTriggerJobDeregister { + t.Fatalf("bad: %#v", eval) + } + if eval.JobID != job.ID { + t.Fatalf("bad: %#v", eval) + } + if eval.JobModifyIndex != resp3.JobModifyIndex { + t.Fatalf("bad: %#v", eval) + } + if eval.Status != structs.EvalStatusPending { + t.Fatalf("bad: %#v", eval) + } } func TestJobEndpoint_Deregister_NonExistent(t *testing.T) { @@ -990,6 +1048,7 @@ func TestJobEndpoint_Deregister_Periodic(t *testing.T) { // Deregister dereg := &structs.JobDeregisterRequest{ JobID: job.ID, + Purge: true, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.JobDeregisterResponse @@ -1042,6 +1101,7 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) { // Deregister dereg := &structs.JobDeregisterRequest{ JobID: job.ID, + Purge: true, WriteRequest: structs.WriteRequest{Region: "global"}, } var resp2 structs.JobDeregisterResponse @@ -2089,6 +2149,11 @@ func TestJobEndpoint_Dispatch(t *testing.T) { d6 := mock.PeriodicJob() d6.ParameterizedJob = &structs.ParameterizedJobConfig{} + d7 := mock.Job() + d7.Type = structs.JobTypeBatch + d7.ParameterizedJob = &structs.ParameterizedJobConfig{} + d7.Stop = true + reqNoInputNoMeta := &structs.JobDispatchRequest{} reqInputDataNoMeta := &structs.JobDispatchRequest{ Payload: []byte("hello world"), @@ -2210,6 +2275,13 @@ func TestJobEndpoint_Dispatch(t *testing.T) { dispatchReq: reqNoInputNoMeta, noEval: true, }, + { + name: "periodic job stopped, ensure error", + parameterizedJob: d7, + dispatchReq: reqNoInputNoMeta, + err: true, + errStr: "stopped", + }, } for _, tc := range cases { diff --git a/nomad/state/schema.go b/nomad/state/schema.go index d31af34b24e..2ecd1e88d80 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -181,11 +181,31 @@ func jobIsGCable(obj interface{}) (bool, error) { return false, fmt.Errorf("Unexpected type: %v", obj) } - // The job is GCable if it is batch, it is not periodic and is not a - // parameterized job. + // If the job is periodic or parameterized it is only garbage collectable if + // it is stopped. periodic := j.Periodic != nil && j.Periodic.Enabled - gcable := j.Type == structs.JobTypeBatch && !periodic && !j.IsParameterized() - return gcable, nil + parameterized := j.IsParameterized() + if periodic || parameterized { + return j.Stop, nil + } + + // If the job isn't dead it isn't eligible + if j.Status != structs.JobStatusDead { + return false, nil + } + + // Any job that is stopped is eligible for garbage collection + if j.Stop { + return true, nil + } + + // Otherwise, only batch jobs are eligible because they complete on their + // own without a user stopping them. + if j.Type != structs.JobTypeBatch { + return false, nil + } + + return true, nil } // jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b6770d15415..314613f3672 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1787,6 +1787,11 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b // job is periodic or is a parameterized job, we mark it as running as // it will never have an allocation/evaluation against it. if job.IsPeriodic() || job.IsParameterized() { + // If the job is stopped mark it as dead + if job.Stop { + return structs.JobStatusDead, nil + } + return structs.JobStatusRunning, nil } return structs.JobStatusPending, nil diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f92f336e399..9e0a5b51e1c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3760,6 +3760,59 @@ func TestStateStore_GetJobStatus_RunningAlloc(t *testing.T) { } } +func TestStateStore_GetJobStatus_PeriodicJob(t *testing.T) { + state := testStateStore(t) + job := mock.PeriodicJob() + + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) + } + + // Mark it as stopped + job.Stop = true + status, err = state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) + } +} + +func TestStateStore_GetJobStatus_ParameterizedJob(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + job.ParameterizedJob = &structs.ParameterizedJobConfig{} + + txn := state.db.Txn(false) + status, err := state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusRunning { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusRunning) + } + + // Mark it as stopped + job.Stop = true + status, err = state.getJobStatus(txn, job, false) + if err != nil { + t.Fatalf("getJobStatus() failed: %v", err) + } + + if status != structs.JobStatusDead { + t.Fatalf("getJobStatus() returned %v; expected %v", status, structs.JobStatusDead) + } +} + func TestStateStore_SetJobStatus_PendingEval(t *testing.T) { state := testStateStore(t) job := mock.Job() diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go index 14dada6808e..ffad325eb64 100644 --- a/nomad/system_endpoint_test.go +++ b/nomad/system_endpoint_test.go @@ -22,8 +22,16 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { state := s1.fsm.State() job := mock.Job() job.Type = structs.JobTypeBatch + job.Stop = true if err := state.UpsertJob(1000, job); err != nil { - t.Fatalf("UpsertAllocs() failed: %v", err) + t.Fatalf("UpsertJob() failed: %v", err) + } + + eval := mock.Eval() + eval.Status = structs.EvalStatusComplete + eval.JobID = job.ID + if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil { + t.Fatalf("UpsertEvals() failed: %v", err) } // Make the GC request @@ -45,7 +53,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { return false, err } if exist != nil { - return false, fmt.Errorf("job %q wasn't garbage collected", job.ID) + return false, fmt.Errorf("job %+v wasn't garbage collected", job) } return true, nil }, func(err error) { From 376e177836611dac20807e56176c7f4a8f604317 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 15 Apr 2017 17:05:52 -0700 Subject: [PATCH 08/12] Status shows type of job --- api/jobs.go | 2 ++ command/status.go | 16 +++++++++++++++- nomad/structs/structs.go | 4 ++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/api/jobs.go b/api/jobs.go index e2e30139600..64a6377624c 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -431,6 +431,8 @@ type JobListStub struct { Name string Type string Priority int + Periodic bool + ParameterizedJob bool Stop bool Status string StatusDescription string diff --git a/command/status.go b/command/status.go index e4a5292308a..9a09cff0d16 100644 --- a/command/status.go +++ b/command/status.go @@ -450,13 +450,27 @@ func createStatusListOutput(jobs []*api.JobListStub) string { for i, job := range jobs { out[i+1] = fmt.Sprintf("%s|%s|%d|%s", job.ID, - job.Type, + getTypeString(job), job.Priority, getStatusString(job.Status, job.Stop)) } return formatList(out) } +func getTypeString(job *api.JobListStub) string { + t := job.Type + + if job.Periodic { + t += "/periodic" + } + + if job.ParameterizedJob { + t += "/parameterized" + } + + return t +} + func getStatusString(status string, stop bool) string { if stop { return fmt.Sprintf("%s (stopped)", status) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 98c559a47b8..4a86f85dbe4 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1396,6 +1396,8 @@ func (j *Job) Stub(summary *JobSummary) *JobListStub { Name: j.Name, Type: j.Type, Priority: j.Priority, + Periodic: j.IsPeriodic(), + ParameterizedJob: j.IsParameterized(), Stop: j.Stop, Status: j.Status, StatusDescription: j.StatusDescription, @@ -1496,6 +1498,8 @@ type JobListStub struct { Name string Type string Priority int + Periodic bool + ParameterizedJob bool Stop bool Status string StatusDescription string From ea15f4b05e94c8bb150ba715bf42f94f1d68e84c Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 16 Apr 2017 16:54:02 -0700 Subject: [PATCH 09/12] Diff code fixes --- command/agent/job_endpoint.go | 211 ++++++++++++++++++++-------------- nomad/eval_endpoint_test.go | 15 ++- nomad/structs/diff.go | 36 +++++- nomad/structs/diff_test.go | 12 ++ 4 files changed, 181 insertions(+), 93 deletions(-) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index f58a88b38f2..e807ee96347 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -424,18 +424,22 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { VaultToken: *job.VaultToken, } - j.Constraints = make([]*structs.Constraint, len(job.Constraints)) - for i, c := range job.Constraints { - con := &structs.Constraint{} - ApiConstraintToStructs(c, con) - j.Constraints[i] = con + if l := len(job.Constraints); l != 0 { + j.Constraints = make([]*structs.Constraint, l) + for i, c := range job.Constraints { + con := &structs.Constraint{} + ApiConstraintToStructs(c, con) + j.Constraints[i] = con + } } + if job.Update != nil { j.Update = structs.UpdateStrategy{ Stagger: job.Update.Stagger, MaxParallel: job.Update.MaxParallel, } } + if job.Periodic != nil { j.Periodic = &structs.PeriodicConfig{ Enabled: *job.Periodic.Enabled, @@ -443,10 +447,12 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { ProhibitOverlap: *job.Periodic.ProhibitOverlap, TimeZone: *job.Periodic.TimeZone, } + if job.Periodic.Spec != nil { j.Periodic.Spec = *job.Periodic.Spec } } + if job.ParameterizedJob != nil { j.ParameterizedJob = &structs.ParameterizedJobConfig{ Payload: job.ParameterizedJob.Payload, @@ -455,11 +461,13 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { } } - j.TaskGroups = make([]*structs.TaskGroup, len(job.TaskGroups)) - for i, taskGroup := range job.TaskGroups { - tg := &structs.TaskGroup{} - ApiTgToStructsTG(taskGroup, tg) - j.TaskGroups[i] = tg + if l := len(job.TaskGroups); l != 0 { + j.TaskGroups = make([]*structs.TaskGroup, l) + for i, taskGroup := range job.TaskGroups { + tg := &structs.TaskGroup{} + ApiTgToStructsTG(taskGroup, tg) + j.TaskGroups[i] = tg + } } return j @@ -469,29 +477,36 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { tg.Name = *taskGroup.Name tg.Count = *taskGroup.Count tg.Meta = taskGroup.Meta - tg.Constraints = make([]*structs.Constraint, len(taskGroup.Constraints)) - for k, constraint := range taskGroup.Constraints { - c := &structs.Constraint{} - ApiConstraintToStructs(constraint, c) - tg.Constraints[k] = c + + if l := len(taskGroup.Constraints); l != 0 { + tg.Constraints = make([]*structs.Constraint, l) + for k, constraint := range taskGroup.Constraints { + c := &structs.Constraint{} + ApiConstraintToStructs(constraint, c) + tg.Constraints[k] = c + } } + tg.RestartPolicy = &structs.RestartPolicy{ Attempts: *taskGroup.RestartPolicy.Attempts, Interval: *taskGroup.RestartPolicy.Interval, Delay: *taskGroup.RestartPolicy.Delay, Mode: *taskGroup.RestartPolicy.Mode, } + tg.EphemeralDisk = &structs.EphemeralDisk{ Sticky: *taskGroup.EphemeralDisk.Sticky, SizeMB: *taskGroup.EphemeralDisk.SizeMB, Migrate: *taskGroup.EphemeralDisk.Migrate, } - tg.Meta = taskGroup.Meta - tg.Tasks = make([]*structs.Task, len(taskGroup.Tasks)) - for l, task := range taskGroup.Tasks { - t := &structs.Task{} - ApiTaskToStructsTask(task, t) - tg.Tasks[l] = t + + if l := len(taskGroup.Tasks); l != 0 { + tg.Tasks = make([]*structs.Task, l) + for l, task := range taskGroup.Tasks { + t := &structs.Task{} + ApiTaskToStructsTask(task, t) + tg.Tasks[l] = t + } } } @@ -501,77 +516,101 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { structsTask.User = apiTask.User structsTask.Leader = apiTask.Leader structsTask.Config = apiTask.Config - structsTask.Constraints = make([]*structs.Constraint, len(apiTask.Constraints)) - for i, constraint := range apiTask.Constraints { - c := &structs.Constraint{} - ApiConstraintToStructs(constraint, c) - structsTask.Constraints[i] = c - } structsTask.Env = apiTask.Env - structsTask.Services = make([]*structs.Service, len(apiTask.Services)) - for i, service := range apiTask.Services { - structsTask.Services[i] = &structs.Service{ - Name: service.Name, - PortLabel: service.PortLabel, - Tags: service.Tags, + structsTask.Meta = apiTask.Meta + structsTask.KillTimeout = *apiTask.KillTimeout + + if l := len(apiTask.Constraints); l != 0 { + structsTask.Constraints = make([]*structs.Constraint, l) + for i, constraint := range apiTask.Constraints { + c := &structs.Constraint{} + ApiConstraintToStructs(constraint, c) + structsTask.Constraints[i] = c } - structsTask.Services[i].Checks = make([]*structs.ServiceCheck, len(service.Checks)) - for j, check := range service.Checks { - structsTask.Services[i].Checks[j] = &structs.ServiceCheck{ - Name: check.Name, - Type: check.Type, - Command: check.Command, - Args: check.Args, - Path: check.Path, - Protocol: check.Protocol, - PortLabel: check.PortLabel, - Interval: check.Interval, - Timeout: check.Timeout, - InitialStatus: check.InitialStatus, + } + + if l := len(apiTask.Services); l != 0 { + structsTask.Services = make([]*structs.Service, l) + for i, service := range apiTask.Services { + structsTask.Services[i] = &structs.Service{ + Name: service.Name, + PortLabel: service.PortLabel, + Tags: service.Tags, + } + + if l := len(service.Checks); l != 0 { + structsTask.Services[i].Checks = make([]*structs.ServiceCheck, l) + for j, check := range service.Checks { + structsTask.Services[i].Checks[j] = &structs.ServiceCheck{ + Name: check.Name, + Type: check.Type, + Command: check.Command, + Args: check.Args, + Path: check.Path, + Protocol: check.Protocol, + PortLabel: check.PortLabel, + Interval: check.Interval, + Timeout: check.Timeout, + InitialStatus: check.InitialStatus, + } + } } } } + structsTask.Resources = &structs.Resources{ CPU: *apiTask.Resources.CPU, MemoryMB: *apiTask.Resources.MemoryMB, IOPS: *apiTask.Resources.IOPS, } - structsTask.Resources.Networks = make([]*structs.NetworkResource, len(apiTask.Resources.Networks)) - for i, nw := range apiTask.Resources.Networks { - structsTask.Resources.Networks[i] = &structs.NetworkResource{ - CIDR: nw.CIDR, - IP: nw.IP, - MBits: *nw.MBits, - } - structsTask.Resources.Networks[i].DynamicPorts = make([]structs.Port, len(nw.DynamicPorts)) - structsTask.Resources.Networks[i].ReservedPorts = make([]structs.Port, len(nw.ReservedPorts)) - for j, dp := range nw.DynamicPorts { - structsTask.Resources.Networks[i].DynamicPorts[j] = structs.Port{ - Label: dp.Label, - Value: dp.Value, + + if l := len(apiTask.Resources.Networks); l != 0 { + structsTask.Resources.Networks = make([]*structs.NetworkResource, l) + for i, nw := range apiTask.Resources.Networks { + structsTask.Resources.Networks[i] = &structs.NetworkResource{ + CIDR: nw.CIDR, + IP: nw.IP, + MBits: *nw.MBits, } - } - for j, rp := range nw.ReservedPorts { - structsTask.Resources.Networks[i].ReservedPorts[j] = structs.Port{ - Label: rp.Label, - Value: rp.Value, + + if l := len(nw.DynamicPorts); l != 0 { + structsTask.Resources.Networks[i].DynamicPorts = make([]structs.Port, l) + for j, dp := range nw.DynamicPorts { + structsTask.Resources.Networks[i].DynamicPorts[j] = structs.Port{ + Label: dp.Label, + Value: dp.Value, + } + } + } + + if l := len(nw.ReservedPorts); l != 0 { + structsTask.Resources.Networks[i].ReservedPorts = make([]structs.Port, l) + for j, rp := range nw.ReservedPorts { + structsTask.Resources.Networks[i].ReservedPorts[j] = structs.Port{ + Label: rp.Label, + Value: rp.Value, + } + } } } } - structsTask.Meta = apiTask.Meta - structsTask.KillTimeout = *apiTask.KillTimeout + structsTask.LogConfig = &structs.LogConfig{ MaxFiles: *apiTask.LogConfig.MaxFiles, MaxFileSizeMB: *apiTask.LogConfig.MaxFileSizeMB, } - structsTask.Artifacts = make([]*structs.TaskArtifact, len(apiTask.Artifacts)) - for k, ta := range apiTask.Artifacts { - structsTask.Artifacts[k] = &structs.TaskArtifact{ - GetterSource: *ta.GetterSource, - GetterOptions: ta.GetterOptions, - RelativeDest: *ta.RelativeDest, + + if l := len(apiTask.Artifacts); l != 0 { + structsTask.Artifacts = make([]*structs.TaskArtifact, l) + for k, ta := range apiTask.Artifacts { + structsTask.Artifacts[k] = &structs.TaskArtifact{ + GetterSource: *ta.GetterSource, + GetterOptions: ta.GetterOptions, + RelativeDest: *ta.RelativeDest, + } } } + if apiTask.Vault != nil { structsTask.Vault = &structs.Vault{ Policies: apiTask.Vault.Policies, @@ -580,20 +619,24 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { ChangeSignal: *apiTask.Vault.ChangeSignal, } } - structsTask.Templates = make([]*structs.Template, len(apiTask.Templates)) - for i, template := range apiTask.Templates { - structsTask.Templates[i] = &structs.Template{ - SourcePath: *template.SourcePath, - DestPath: *template.DestPath, - EmbeddedTmpl: *template.EmbeddedTmpl, - ChangeMode: *template.ChangeMode, - ChangeSignal: *template.ChangeSignal, - Splay: *template.Splay, - Perms: *template.Perms, - LeftDelim: *template.LeftDelim, - RightDelim: *template.RightDelim, + + if l := len(apiTask.Templates); l != 0 { + structsTask.Templates = make([]*structs.Template, l) + for i, template := range apiTask.Templates { + structsTask.Templates[i] = &structs.Template{ + SourcePath: *template.SourcePath, + DestPath: *template.DestPath, + EmbeddedTmpl: *template.EmbeddedTmpl, + ChangeMode: *template.ChangeMode, + ChangeSignal: *template.ChangeSignal, + Splay: *template.Splay, + Perms: *template.Perms, + LeftDelim: *template.LeftDelim, + RightDelim: *template.RightDelim, + } } } + if apiTask.DispatchPayload != nil { structsTask.DispatchPayload = &structs.DispatchPayloadConfig{ File: apiTask.DispatchPayload.File, diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 37a6168f849..f29d3c6f181 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "fmt" "reflect" "strings" "testing" @@ -272,10 +273,16 @@ func TestEvalEndpoint_Nack(t *testing.T) { } // Should get it back - out2, _, _ := s1.evalBroker.Dequeue(defaultSched, time.Second) - if out2 != out { - t.Fatalf("nack failed") - } + testutil.WaitForResult(func() (bool, error) { + out2, _, _ := s1.evalBroker.Dequeue(defaultSched, time.Second) + if out2 != out { + return false, fmt.Errorf("nack failed") + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) } func TestEvalEndpoint_Update(t *testing.T) { diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index 1d057b824ea..fba16ee667c 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -59,7 +59,8 @@ type JobDiff struct { func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) { diff := &JobDiff{Type: DiffTypeNone} var oldPrimitiveFlat, newPrimitiveFlat map[string]string - filter := []string{"ID", "Status", "StatusDescription", "CreateIndex", "ModifyIndex", "JobModifyIndex"} + filter := []string{"ID", "Status", "StatusDescription", "Version", "Stable", "CreateIndex", + "ModifyIndex", "JobModifyIndex"} // Have to treat this special since it is a struct literal, not a pointer var jUpdate, otherUpdate *UpdateStrategy @@ -83,10 +84,6 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) { return nil, fmt.Errorf("can not diff jobs with different IDs: %q and %q", j.ID, other.ID) } - if !reflect.DeepEqual(j, other) { - diff.Type = DiffTypeEdited - } - jUpdate = &j.Update otherUpdate = &other.Update oldPrimitiveFlat = flatmap.Flatten(j, filter, true) @@ -135,6 +132,35 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) { diff.Objects = append(diff.Objects, cDiff) } + // Check to see if there is a diff. We don't use reflect because we are + // filtering quite a few fields that will change on each diff. + if diff.Type == DiffTypeNone { + for _, fd := range diff.Fields { + if fd.Type != DiffTypeNone { + diff.Type = DiffTypeEdited + break + } + } + } + + if diff.Type == DiffTypeNone { + for _, od := range diff.Objects { + if od.Type != DiffTypeNone { + diff.Type = DiffTypeEdited + break + } + } + } + + if diff.Type == DiffTypeNone { + for _, tg := range diff.TaskGroups { + if tg.Type != DiffTypeNone { + diff.Type = DiffTypeEdited + break + } + } + } + return diff, nil } diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 1b8f7a94699..729f0e1a151 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -174,6 +174,12 @@ func TestJobDiff(t *testing.T) { Old: "foo", New: "", }, + { + Type: DiffTypeDeleted, + Name: "Stop", + Old: "false", + New: "", + }, { Type: DiffTypeDeleted, Name: "Type", @@ -251,6 +257,12 @@ func TestJobDiff(t *testing.T) { Old: "", New: "foo", }, + { + Type: DiffTypeAdded, + Name: "Stop", + Old: "", + New: "false", + }, { Type: DiffTypeAdded, Name: "Type", From 000f42a427e73b6448426e837f9d35447be70231 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 16 Apr 2017 16:58:48 -0700 Subject: [PATCH 10/12] API Docs --- website/source/docs/http/job.html.md | 51 ++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/website/source/docs/http/job.html.md b/website/source/docs/http/job.html.md index d7b8d0f66c6..25316d6786d 100644 --- a/website/source/docs/http/job.html.md +++ b/website/source/docs/http/job.html.md @@ -133,6 +133,7 @@ region is used; another region can be specified using the `?region=` query param }, "Status": "", "StatusDescription": "", + "Version": 3, "CreateIndex": 14, "ModifyIndex": 14 } @@ -140,6 +141,56 @@ region is used; another region can be specified using the `?region=` query param +
+
Description
+
+ Query all versions of a single job. +
+ +
Method
+
GET
+ +
URL
+
`/v1/job//versions`
+ +
Parameters
+
+ None +
+ +
Blocking Queries
+
+ [Supported](/docs/http/index.html#blocking-queries) +
+ +
Returns
+
+ + ```javascript + [ + { + "Region": "global", + "ID": "binstore-storagelocker", + "Version": 2, + ... + }, + { + "Region": "global", + "ID": "binstore-storagelocker", + "Version": 1, + ... + }, + { + "Region": "global", + "ID": "binstore-storagelocker", + "Version": 0, + ... + } + ] + ``` +
+
+
Description
From f8828a5a50f8070d7fc582025e1caac6eb0e8e08 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 17 Apr 2017 19:39:20 -0700 Subject: [PATCH 11/12] Fix some tests --- api/jobs_test.go | 14 ++++++++++++++ nomad/state/state_store_test.go | 32 +++++++++++++++++++------------- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/api/jobs_test.go b/api/jobs_test.go index 6a4d63ea66c..f9b1d151251 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/testutil" + "github.com/kr/pretty" ) func TestJobs_Register(t *testing.T) { @@ -107,6 +108,9 @@ func TestJobs_Canonicalize(t *testing.T) { VaultToken: helper.StringToPtr(""), Status: helper.StringToPtr(""), StatusDescription: helper.StringToPtr(""), + Stop: helper.BoolToPtr(false), + Stable: helper.BoolToPtr(false), + Version: helper.Uint64ToPtr(0), CreateIndex: helper.Uint64ToPtr(0), ModifyIndex: helper.Uint64ToPtr(0), JobModifyIndex: helper.Uint64ToPtr(0), @@ -162,6 +166,9 @@ func TestJobs_Canonicalize(t *testing.T) { Priority: helper.IntToPtr(50), AllAtOnce: helper.BoolToPtr(false), VaultToken: helper.StringToPtr(""), + Stop: helper.BoolToPtr(false), + Stable: helper.BoolToPtr(false), + Version: helper.Uint64ToPtr(0), Status: helper.StringToPtr(""), StatusDescription: helper.StringToPtr(""), CreateIndex: helper.Uint64ToPtr(0), @@ -277,6 +284,9 @@ func TestJobs_Canonicalize(t *testing.T) { Type: helper.StringToPtr("service"), AllAtOnce: helper.BoolToPtr(false), VaultToken: helper.StringToPtr(""), + Stop: helper.BoolToPtr(false), + Stable: helper.BoolToPtr(false), + Version: helper.Uint64ToPtr(0), Status: helper.StringToPtr(""), StatusDescription: helper.StringToPtr(""), CreateIndex: helper.Uint64ToPtr(0), @@ -379,6 +389,9 @@ func TestJobs_Canonicalize(t *testing.T) { Priority: helper.IntToPtr(50), AllAtOnce: helper.BoolToPtr(false), VaultToken: helper.StringToPtr(""), + Stop: helper.BoolToPtr(false), + Stable: helper.BoolToPtr(false), + Version: helper.Uint64ToPtr(0), Status: helper.StringToPtr(""), StatusDescription: helper.StringToPtr(""), CreateIndex: helper.Uint64ToPtr(0), @@ -399,6 +412,7 @@ func TestJobs_Canonicalize(t *testing.T) { t.Run(tc.name, func(t *testing.T) { tc.input.Canonicalize() if !reflect.DeepEqual(tc.input, tc.expected) { + t.Logf("Name: %v, Diffs:\n%v", tc.name, pretty.Diff(tc.expected, tc.input)) t.Fatalf("Name: %v, expected:\n%#v\nactual:\n%#v", tc.name, tc.expected, tc.input) } }) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 9e0a5b51e1c..0624417f811 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1142,7 +1142,7 @@ func TestStateStore_JobsByScheduler(t *testing.T) { func TestStateStore_JobsByGC(t *testing.T) { state := testStateStore(t) - var gc, nonGc []*structs.Job + gc, nonGc := make(map[string]struct{}), make(map[string]struct{}) for i := 0; i < 20; i++ { var job *structs.Job @@ -1151,21 +1151,30 @@ func TestStateStore_JobsByGC(t *testing.T) { } else { job = mock.PeriodicJob() } - nonGc = append(nonGc, job) + nonGc[job.ID] = struct{}{} if err := state.UpsertJob(1000+uint64(i), job); err != nil { t.Fatalf("err: %v", err) } } - for i := 0; i < 10; i++ { + for i := 0; i < 20; i += 2 { job := mock.Job() job.Type = structs.JobTypeBatch - gc = append(gc, job) + gc[job.ID] = struct{}{} if err := state.UpsertJob(2000+uint64(i), job); err != nil { t.Fatalf("err: %v", err) } + + // Create an eval for it + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = structs.EvalStatusComplete + if err := state.UpsertEvals(2000+uint64(i+1), []*structs.Evaluation{eval}); err != nil { + t.Fatalf("err: %v", err) + } + } ws := memdb.NewWatchSet() @@ -1174,9 +1183,10 @@ func TestStateStore_JobsByGC(t *testing.T) { t.Fatalf("err: %v", err) } - var outGc []*structs.Job + outGc := make(map[string]struct{}) for i := iter.Next(); i != nil; i = iter.Next() { - outGc = append(outGc, i.(*structs.Job)) + j := i.(*structs.Job) + outGc[j.ID] = struct{}{} } iter, err = state.JobsByGC(ws, false) @@ -1184,16 +1194,12 @@ func TestStateStore_JobsByGC(t *testing.T) { t.Fatalf("err: %v", err) } - var outNonGc []*structs.Job + outNonGc := make(map[string]struct{}) for i := iter.Next(); i != nil; i = iter.Next() { - outNonGc = append(outNonGc, i.(*structs.Job)) + j := i.(*structs.Job) + outNonGc[j.ID] = struct{}{} } - sort.Sort(JobIDSort(gc)) - sort.Sort(JobIDSort(nonGc)) - sort.Sort(JobIDSort(outGc)) - sort.Sort(JobIDSort(outNonGc)) - if !reflect.DeepEqual(gc, outGc) { t.Fatalf("bad: %#v %#v", gc, outGc) } From 019bb3ae586863f0659630fecb1b2b8df6069414 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 19 Apr 2017 10:54:03 -0700 Subject: [PATCH 12/12] Respond to review comments --- nomad/state/state_store.go | 11 +++++------ nomad/structs/structs.go | 5 +++++ scheduler/generic_sched.go | 16 +++++----------- scheduler/system_sched.go | 16 +++++----------- scheduler/util.go | 2 +- 5 files changed, 21 insertions(+), 29 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 314613f3672..983c20a3371 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "log" - "sort" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" @@ -469,7 +468,7 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd } if _, err = txn.DeleteAll("job_versions", "id", job.ID, job.Version); err != nil { - return fmt.Errorf("deleing job versions failed: %v", err) + return fmt.Errorf("deleting job versions failed: %v", err) } } @@ -595,10 +594,10 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, id strin all = append(all, j) } - // Sort with highest versions first - sort.Slice(all, func(i, j int) bool { - return all[i].Version >= all[j].Version - }) + // Reverse so that highest versions first + for i, j := 0, len(all)-1; i < j; i, j = i+1, j-1 { + all[i], all[j] = all[j], all[i] + } return all, nil } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4a86f85dbe4..f04a525f071 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1388,6 +1388,11 @@ func (j *Job) CombinedTaskMeta(groupName, taskName string) map[string]string { return meta } +// Stopped returns if a job is stopped. +func (j *Job) Stopped() bool { + return j == nil || j.Stop +} + // Stub is used to return a summary of the job func (j *Job) Stub(summary *JobSummary) *JobListStub { return &JobListStub{ diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index d2df0344e1a..43f38f15eeb 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -179,12 +179,6 @@ func (s *GenericScheduler) createBlockedEval(planFailure bool) error { return s.planner.CreateEval(s.blocked) } -// isStoppedJob returns if the scheduling is for a stopped job and the scheduler -// should stop all its allocations. -func (s *GenericScheduler) isStoppedJob() bool { - return s.job == nil || s.job.Stop -} - // process is wrapped in retryMax to iteratively run the handler until we have no // further work or we've made the maximum number of attempts. func (s *GenericScheduler) process() (bool, error) { @@ -197,7 +191,7 @@ func (s *GenericScheduler) process() (bool, error) { s.eval.JobID, err) } numTaskGroups := 0 - if !s.isStoppedJob() { + if !s.job.Stopped() { numTaskGroups = len(s.job.TaskGroups) } s.queuedAllocs = make(map[string]int, numTaskGroups) @@ -213,7 +207,7 @@ func (s *GenericScheduler) process() (bool, error) { // Construct the placement stack s.stack = NewGenericStack(s.batch, s.ctx) - if !s.isStoppedJob() { + if !s.job.Stopped() { s.stack.SetJob(s.job) } @@ -357,7 +351,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([ func (s *GenericScheduler) computeJobAllocs() error { // Materialize all the task groups, job could be missing if deregistered var groups map[string]*structs.TaskGroup - if !s.isStoppedJob() { + if !s.job.Stopped() { groups = materializeTaskGroups(s.job) } @@ -404,7 +398,7 @@ func (s *GenericScheduler) computeJobAllocs() error { // Check if a rolling upgrade strategy is being used limit := len(diff.update) + len(diff.migrate) + len(diff.lost) - if !s.isStoppedJob() && s.job.Update.Rolling() { + if !s.job.Stopped() && s.job.Update.Rolling() { limit = s.job.Update.MaxParallel } @@ -420,7 +414,7 @@ func (s *GenericScheduler) computeJobAllocs() error { // Nothing remaining to do if placement is not required if len(diff.place) == 0 { - if !s.isStoppedJob() { + if !s.job.Stopped() { for _, tg := range s.job.TaskGroups { s.queuedAllocs[tg.Name] = 0 } diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index b95e5b5fec3..a90d27dd2eb 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -83,12 +83,6 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { s.queuedAllocs) } -// isStoppedJob returns if the scheduling is for a stopped job and the scheduler -// should stop all its allocations. -func (s *SystemScheduler) isStoppedJob() bool { - return s.job == nil || s.job.Stop -} - // process is wrapped in retryMax to iteratively run the handler until we have no // further work or we've made the maximum number of attempts. func (s *SystemScheduler) process() (bool, error) { @@ -101,13 +95,13 @@ func (s *SystemScheduler) process() (bool, error) { s.eval.JobID, err) } numTaskGroups := 0 - if !s.isStoppedJob() { + if !s.job.Stopped() { numTaskGroups = len(s.job.TaskGroups) } s.queuedAllocs = make(map[string]int, numTaskGroups) // Get the ready nodes in the required datacenters - if !s.isStoppedJob() { + if !s.job.Stopped() { s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) @@ -125,7 +119,7 @@ func (s *SystemScheduler) process() (bool, error) { // Construct the placement stack s.stack = NewSystemStack(s.ctx) - if !s.isStoppedJob() { + if !s.job.Stopped() { s.stack.SetJob(s.job) } @@ -234,7 +228,7 @@ func (s *SystemScheduler) computeJobAllocs() error { // Check if a rolling upgrade strategy is being used limit := len(diff.update) - if !s.isStoppedJob() && s.job.Update.Rolling() { + if !s.job.Stopped() && s.job.Update.Rolling() { limit = s.job.Update.MaxParallel } @@ -243,7 +237,7 @@ func (s *SystemScheduler) computeJobAllocs() error { // Nothing remaining to do if placement is not required if len(diff.place) == 0 { - if !s.isStoppedJob() { + if !s.job.Stopped() { for _, tg := range s.job.TaskGroups { s.queuedAllocs[tg.Name] = 0 } diff --git a/scheduler/util.go b/scheduler/util.go index 01f10c8dcd7..c84283b1c9f 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -21,7 +21,7 @@ type allocTuple struct { // a job requires. This is used to do the count expansion. func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { out := make(map[string]*structs.TaskGroup) - if job == nil || job.Stop { + if job.Stopped() { return out }