From 17bc13bfe5fa9a0ca7eadd94ac7bc692b98d5cc5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 14 Dec 2015 19:20:57 -0800 Subject: [PATCH 1/8] Add garbage collection to jobs --- api/jobs.go | 7 + jobspec/parse.go | 44 ++++++ jobspec/parse_test.go | 16 +++ jobspec/test-fixtures/gc.hcl | 5 + nomad/config.go | 5 + nomad/core_sched.go | 159 +++++++++++++++++----- nomad/core_sched_test.go | 108 +++++++++++++++ nomad/job_endpoint.go | 5 +- nomad/leader.go | 4 + nomad/state/schema.go | 24 ++++ nomad/state/state_store.go | 12 ++ nomad/state/state_store_test.go | 60 ++++++++ nomad/structs/funcs_test.go | 38 +++++- nomad/structs/structs.go | 73 +++++++++- scheduler/context_test.go | 6 + scheduler/rank_test.go | 14 +- website/source/docs/jobspec/index.html.md | 23 ++++ 17 files changed, 561 insertions(+), 42 deletions(-) create mode 100644 jobspec/test-fixtures/gc.hcl diff --git a/api/jobs.go b/api/jobs.go index 17e75daff5b..f50826ad39d 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -107,6 +107,12 @@ type UpdateStrategy struct { MaxParallel int } +// JobGCConfig configures the garbage collection policy of a job. +type JobGCConfig struct { + Enabled bool + Threshold time.Duration +} + // Job is used to serialize a job. type Job struct { Region string @@ -119,6 +125,7 @@ type Job struct { Constraints []*Constraint TaskGroups []*TaskGroup Update *UpdateStrategy + GC *JobGCConfig Meta map[string]string Status string StatusDescription string diff --git a/jobspec/parse.go b/jobspec/parse.go index 765f58b3ada..723fac6a121 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -91,6 +91,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { delete(m, "meta") delete(m, "update") delete(m, "periodic") + delete(m, "gc") // Set the ID and name to the object key result.ID = obj.Keys[0].Token.Value().(string) @@ -135,6 +136,13 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { } } + // If we have a gc config, then parse that + if o := listVal.Filter("gc"); len(o.Items) > 0 { + if err := parseGC(&result.GC, o); err != nil { + return err + } + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { @@ -714,3 +722,39 @@ func parsePeriodic(result **structs.PeriodicConfig, list *ast.ObjectList) error *result = &p return nil } + +func parseGC(result **structs.JobGCConfig, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'gc' block allowed per job") + } + + // Get our resource object + o := list.Items[0] + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return err + } + + // Enabled by default if the gc block exists. + if value, ok := m["enabled"]; !ok { + m["Enabled"] = true + } else { + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("gc.enabled should be set to true or false; %v", err) + } + m["Enabled"] = enabled + } + + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: result, + }) + if err != nil { + return err + } + return dec.Decode(m) +} diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 4497348eba5..768fa9988da 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -246,6 +246,22 @@ func TestParse(t *testing.T) { false, }, + { + "gc.hcl", + &structs.Job{ + ID: "foo", + Name: "foo", + Priority: 50, + Region: "global", + Type: "service", + GC: &structs.JobGCConfig{ + Enabled: true, + Threshold: 2 * time.Hour, + }, + }, + false, + }, + { "specify-job.hcl", &structs.Job{ diff --git a/jobspec/test-fixtures/gc.hcl b/jobspec/test-fixtures/gc.hcl new file mode 100644 index 00000000000..cef4bcabd24 --- /dev/null +++ b/jobspec/test-fixtures/gc.hcl @@ -0,0 +1,5 @@ +job "foo" { + gc { + threshold = "2h" + } +} diff --git a/nomad/config.go b/nomad/config.go index 91986644fcf..850aa58c29c 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -131,6 +131,10 @@ type Config struct { // for GC. This gives users some time to debug a failed evaluation. EvalGCThreshold time.Duration + // JobGCInterval is how often we dispatch a job to GC jobs that are + // available for garbage collection. + JobGCInterval time.Duration + // NodeGCInterval is how often we dispatch a job to GC failed nodes. NodeGCInterval time.Duration @@ -202,6 +206,7 @@ func DefaultConfig() *Config { ReconcileInterval: 60 * time.Second, EvalGCInterval: 5 * time.Minute, EvalGCThreshold: 1 * time.Hour, + JobGCInterval: 5 * time.Minute, NodeGCInterval: 5 * time.Minute, NodeGCThreshold: 24 * time.Hour, EvalNackTimeout: 60 * time.Second, diff --git a/nomad/core_sched.go b/nomad/core_sched.go index b5ed092f917..27ca5cfcf7f 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -33,11 +33,88 @@ func (s *CoreScheduler) Process(eval *structs.Evaluation) error { return s.evalGC(eval) case structs.CoreJobNodeGC: return s.nodeGC(eval) + case structs.CoreJobJobGC: + return s.jobGC(eval) default: return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID) } } +// jobGC is used to garbage collect eligible jobs. +func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { + // Get all the jobs eligible for garbage collection. + jIter, err := c.snap.JobsByGC(true) + if err != nil { + return err + } + + // Get the time table to calculate GC cutoffs. + tt := c.srv.fsm.TimeTable() + + // Collect the allocations and evaluations to GC + var gcAlloc, gcEval, gcJob []string + +OUTER: + for i := jIter.Next(); i != nil; i = jIter.Next() { + job := i.(*structs.Job) + cutoff := time.Now().UTC().Add(-1 * job.GC.Threshold) + oldThreshold := tt.NearestIndex(cutoff) + + // Ignore new jobs. + if job.CreateIndex > oldThreshold { + continue OUTER + } + + evals, err := c.snap.EvalsByJob(job.ID) + if err != nil { + c.srv.logger.Printf("[ERR] sched.core: failed to get evals for job %s: %v", job.ID, err) + continue + } + + for _, eval := range evals { + gc, allocs, err := c.gcEval(eval, oldThreshold) + if err != nil || !gc { + continue OUTER + } + + gcEval = append(gcEval, eval.ID) + gcAlloc = append(gcAlloc, allocs...) + } + + // Job is eligible for garbage collection + gcJob = append(gcJob, job.ID) + } + + // Fast-path the nothing case + if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 { + return nil + } + c.srv.logger.Printf("[DEBUG] sched.core: job GC: %d jobs, %d evaluations, %d allocs eligible", + len(gcJob), len(gcEval), len(gcAlloc)) + + // Reap the evals and allocs + if err := c.evalReap(gcEval, gcAlloc); err != nil { + return err + } + + // Call to the leader to deregister the jobs. + for _, job := range gcJob { + req := structs.JobDeregisterRequest{ + JobID: job, + WriteRequest: structs.WriteRequest{ + Region: c.srv.config.Region, + }, + } + var resp structs.JobDeregisterResponse + if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil { + c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err) + return err + } + } + + return nil +} + // evalGC is used to garbage collect old evaluations func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { // Iterate over the evaluations @@ -57,39 +134,16 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { // Collect the allocations and evaluations to GC var gcAlloc, gcEval []string - -OUTER: - for { - raw := iter.Next() - if raw == nil { - break - } + for raw := iter.Next(); raw != nil; raw = iter.Next() { eval := raw.(*structs.Evaluation) - - // Ignore non-terminal and new evaluations - if !eval.TerminalStatus() || eval.ModifyIndex > oldThreshold { - continue - } - - // Get the allocations by eval - allocs, err := c.snap.AllocsByEval(eval.ID) + gc, allocs, err := c.gcEval(eval, oldThreshold) if err != nil { - c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v", - eval.ID, err) - continue - } - - // Scan the allocations to ensure they are terminal and old - for _, alloc := range allocs { - if !alloc.TerminalStatus() || alloc.ModifyIndex > oldThreshold { - continue OUTER - } + return err } - // Evaluation is eligible for garbage collection - gcEval = append(gcEval, eval.ID) - for _, alloc := range allocs { - gcAlloc = append(gcAlloc, alloc.ID) + if gc { + gcEval = append(gcEval, eval.ID) + gcAlloc = append(gcAlloc, allocs...) } } @@ -100,10 +154,52 @@ OUTER: c.srv.logger.Printf("[DEBUG] sched.core: eval GC: %d evaluations, %d allocs eligible", len(gcEval), len(gcAlloc)) + return c.evalReap(gcEval, gcAlloc) +} + +// gcEval returns whether the eval should be garbage collected given a raft +// threshold index. The eval disqualifies for garbage collection if it or its +// allocs are not older than the threshold. If the eval should be garbage +// collected, the associated alloc ids that should also be removed are also +// returned +func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) ( + bool, []string, error) { + // Ignore non-terminal and new evaluations + if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex { + return false, nil, nil + } + + // Get the allocations by eval + allocs, err := c.snap.AllocsByEval(eval.ID) + if err != nil { + c.srv.logger.Printf("[ERR] sched.core: failed to get allocs for eval %s: %v", + eval.ID, err) + return false, nil, err + } + + // Scan the allocations to ensure they are terminal and old + for _, alloc := range allocs { + if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex { + return false, nil, nil + } + } + + allocIds := make([]string, len(allocs)) + for i, alloc := range allocs { + allocIds[i] = alloc.ID + } + + // Evaluation is eligible for garbage collection + return true, allocIds, nil +} + +// evalReap contacts the leader and issues a reap on the passed evals and +// allocs. +func (c *CoreScheduler) evalReap(evals, allocs []string) error { // Call to the leader to issue the reap req := structs.EvalDeleteRequest{ - Evals: gcEval, - Allocs: gcAlloc, + Evals: evals, + Allocs: allocs, WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, }, @@ -113,6 +209,7 @@ OUTER: c.srv.logger.Printf("[ERR] sched.core: eval reap failed: %v", err) return err } + return nil } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index b8dfae9613f..2d4057d50b7 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -111,3 +111,111 @@ func TestCoreScheduler_NodeGC(t *testing.T) { t.Fatalf("bad: %v", out) } } + +func TestCoreScheduler_JobGC(t *testing.T) { + tests := []struct { + test, evalStatus, allocStatus string + shouldExist bool + }{ + { + test: "Terminal", + evalStatus: structs.EvalStatusFailed, + allocStatus: structs.AllocDesiredStatusFailed, + shouldExist: false, + }, + { + test: "Has Alloc", + evalStatus: structs.EvalStatusFailed, + allocStatus: structs.AllocDesiredStatusRun, + shouldExist: true, + }, + { + test: "Has Eval", + evalStatus: structs.EvalStatusPending, + allocStatus: structs.AllocDesiredStatusFailed, + shouldExist: true, + }, + } + + for _, test := range tests { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert job. + state := s1.fsm.State() + job := mock.Job() + threshold := 1 * time.Hour + job.GC = &structs.JobGCConfig{ + Enabled: true, + Threshold: threshold, + } + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Insert eval + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = test.evalStatus + err = state.UpsertEvals(1001, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Insert alloc + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.EvalID = eval.ID + alloc.DesiredStatus = test.allocStatus + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*threshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobJobGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Should still exist + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("test(%s) err: %v", err) + } + if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) { + t.Fatalf("test(%s) bad: %v", test.test, out) + } + + outE, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("test(%s) err: %v", err) + } + if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) { + t.Fatalf("test(%s) bad: %v", test.test, out) + } + + outA, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("test(%s) err: %v", err) + } + if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) { + t.Fatalf("test(%s) bad: %v", test.test, outA) + } + } +} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 4d8cc128df5..f693af9c40d 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -29,8 +29,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } - // Initialize all the fields of services - args.Job.InitAllServiceFields() + // Initialize the job fields (sets defaults and any other necessary init + // work). + args.Job.InitFields() if args.Job.Type == structs.JobTypeCore { return fmt.Errorf("job type cannot be core") diff --git a/nomad/leader.go b/nomad/leader.go index 8e0d6be7db7..0e267a7dc83 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -173,6 +173,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { defer evalGC.Stop() nodeGC := time.NewTicker(s.config.NodeGCInterval) defer nodeGC.Stop() + jobGC := time.NewTicker(s.config.JobGCInterval) + defer jobGC.Stop() for { select { @@ -180,6 +182,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC)) case <-nodeGC.C: s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC)) + case <-jobGC.C: + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC)) case <-stopCh: return } diff --git a/nomad/state/schema.go b/nomad/state/schema.go index dfd663aba07..f795d1ff01c 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/structs" ) // stateStoreSchema is used to return the schema for the state store @@ -100,10 +101,33 @@ func jobTableSchema() *memdb.TableSchema { Lowercase: false, }, }, + "gc": &memdb.IndexSchema{ + Name: "gc", + AllowMissing: false, + Unique: false, + Indexer: &memdb.ConditionalIndex{ + Conditional: jobIsGCable, + }, + }, }, } } +// jobIsGCable satisfies the ConditionalIndexFunc interface and creates an index +// on whether a job is eligible for garbage collection. +func jobIsGCable(obj interface{}) (bool, error) { + j, ok := obj.(*structs.Job) + if !ok { + return false, fmt.Errorf("Unexpected type: %v", obj) + } + + if j.GC == nil || !j.GC.Enabled { + return false, nil + } + + return true, nil +} + // evalTableSchema returns the MemDB schema for the eval table. // This table is used to store all the evaluations that are pending // or recently completed. diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 30ee8725982..c6236183a86 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -372,6 +372,18 @@ func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator return iter, nil } +// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage +// collection. +func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + + iter, err := txn.Get("jobs", "gc", gc) + if err != nil { + return nil, err + } + return iter, nil +} + // UpsertEvaluation is used to upsert an evaluation func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { txn := s.db.Txn(true) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 5e1021e557c..aa497cd4a00 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -471,6 +471,66 @@ func TestStateStore_JobsByScheduler(t *testing.T) { } } +func TestStateStore_JobsByGC(t *testing.T) { + state := testStateStore(t) + var gc, nonGc []*structs.Job + + for i := 0; i < 10; i++ { + job := mock.Job() + nonGc = append(nonGc, job) + + if err := state.UpsertJob(1000+uint64(i), job); err != nil { + t.Fatalf("err: %v", err) + } + } + + for i := 0; i < 10; i++ { + job := mock.Job() + job.GC = &structs.JobGCConfig{ + Enabled: true, + Threshold: structs.DefaultJobGCThreshold, + } + gc = append(gc, job) + + if err := state.UpsertJob(2000+uint64(i), job); err != nil { + t.Fatalf("err: %v", err) + } + } + + iter, err := state.JobsByGC(true) + if err != nil { + t.Fatalf("err: %v", err) + } + + var outGc []*structs.Job + for i := iter.Next(); i != nil; i = iter.Next() { + outGc = append(outGc, i.(*structs.Job)) + } + + iter, err = state.JobsByGC(false) + if err != nil { + t.Fatalf("err: %v", err) + } + + var outNonGc []*structs.Job + for i := iter.Next(); i != nil; i = iter.Next() { + outNonGc = append(outNonGc, i.(*structs.Job)) + } + + sort.Sort(JobIDSort(gc)) + sort.Sort(JobIDSort(nonGc)) + sort.Sort(JobIDSort(outGc)) + sort.Sort(JobIDSort(outNonGc)) + + if !reflect.DeepEqual(gc, outGc) { + t.Fatalf("bad: %#v %#v", gc, outGc) + } + + if !reflect.DeepEqual(nonGc, outNonGc) { + t.Fatalf("bad: %#v %#v", nonGc, outNonGc) + } +} + func TestStateStore_RestoreJob(t *testing.T) { state := testStateStore(t) job := mock.Job() diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index d156394dccd..a3bf238858b 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -22,19 +22,47 @@ func TestRemoveAllocs(t *testing.T) { } } -func TestFilterTerminalALlocs(t *testing.T) { +func TestFilterTerminalAllocs(t *testing.T) { l := []*Allocation{ - &Allocation{ID: "foo", DesiredStatus: AllocDesiredStatusRun}, &Allocation{ID: "bar", DesiredStatus: AllocDesiredStatusEvict}, &Allocation{ID: "baz", DesiredStatus: AllocDesiredStatusStop}, - &Allocation{ID: "zip", DesiredStatus: AllocDesiredStatusRun}, + &Allocation{ + ID: "zip", + DesiredStatus: AllocDesiredStatusRun, + TaskStates: map[string]*TaskState{ + "a": &TaskState{State: TaskStatePending}, + }, + }, + &Allocation{ + ID: "foo", + DesiredStatus: AllocDesiredStatusRun, + TaskStates: map[string]*TaskState{ + "a": &TaskState{State: TaskStatePending}, + }, + }, + &Allocation{ + ID: "bam", + DesiredStatus: AllocDesiredStatusRun, + TaskStates: map[string]*TaskState{ + "a": &TaskState{State: TaskStatePending}, + "b": &TaskState{State: TaskStateDead}, + }, + }, + &Allocation{ + ID: "fizz", + DesiredStatus: AllocDesiredStatusRun, + TaskStates: map[string]*TaskState{ + "a": &TaskState{State: TaskStateDead}, + "b": &TaskState{State: TaskStateDead}, + }, + }, } out := FilterTerminalAllocs(l) - if len(out) != 2 { + if len(out) != 3 { t.Fatalf("bad: %#v", out) } - if out[0].ID != "foo" && out[1].ID != "zip" { + if out[0].ID != "zip" && out[1].ID != "foo" && out[2].ID != "bam" { t.Fatalf("bad: %#v", out) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a86fae84334..6d384623e08 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -764,6 +764,10 @@ type Job struct { // Periodic is used to define the interval the job is run at. Periodic *PeriodicConfig + // GC is used to mark the job as available for garbage collection after it + // has no outstanding evaluations or allocations. + GC *JobGCConfig + // Meta is used to associate arbitrary metadata with this // job. This is opaque to Nomad. Meta map[string]string @@ -779,6 +783,18 @@ type Job struct { ModifyIndex uint64 } +// InitFields is used to initialize fields in the Job. This should be called +// when registering a Job. +func (j *Job) InitFields() { + // Initialize the service block. + j.InitAllServiceFields() + + // Initalize the GC policy + if j.GC != nil { + j.GC.Init() + } +} + // InitAllServiceFields traverses all Task Groups and makes them // interpolate Job, Task group and Task names in all Service names. // It also generates the check names if they are not set. This method also @@ -854,6 +870,13 @@ func (j *Job) Validate() error { fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch)) } + // Validate the GC config. + if j.GC != nil { + if err := j.GC.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } @@ -899,6 +922,37 @@ type JobListStub struct { ModifyIndex uint64 } +const ( + // DefaultJobGCThreshold is the default threshold for garbage collecting + // eligible jobs. + DefaultJobGCThreshold = 4 * time.Hour +) + +// JobGCConfig configures the garbage collection policy of a job. +type JobGCConfig struct { + // Enabled determines whether the job is eligible for garbage collection. + Enabled bool + + // Threshold is how old a job must be before it eligible for GC. This gives + // the user time to inspect the job. + Threshold time.Duration +} + +// Init sets the Threshold time to its default value if it is un-specified but +// garbage collection is enabled. +func (gc *JobGCConfig) Init() { + if gc.Enabled && gc.Threshold == 0 { + gc.Threshold = DefaultJobGCThreshold + } +} + +func (gc *JobGCConfig) Validate() error { + if gc.Threshold < 0 { + return fmt.Errorf("job GC threshold must be positive: %v", gc.Threshold) + } + return nil +} + // UpdateStrategy is used to modify how updates are done type UpdateStrategy struct { // Stagger is the amount of time between the updates @@ -1470,14 +1524,21 @@ type Allocation struct { ModifyIndex uint64 } -// TerminalStatus returns if the desired status is terminal and -// will no longer transition. This is not based on the current client status. +// TerminalStatus returns if the desired or actual status is terminal and +// will no longer transition. func (a *Allocation) TerminalStatus() bool { switch a.DesiredStatus { case AllocDesiredStatusStop, AllocDesiredStatusEvict, AllocDesiredStatusFailed: return true default: - return false + // If all tasks are dead, the alloc is terminal. + for _, state := range a.TaskStates { + if state.State != TaskStateDead { + return false + } + } + + return true } } @@ -1656,6 +1717,12 @@ const ( // We periodically scan nodes in a terminal state, and if they have no // corresponding allocations we delete these out of the system. CoreJobNodeGC = "node-gc" + + // CoreJobJobGC is used for the garbage collection of eligible jobs. We + // periodically scan garbage collectible jobs and check if both their + // evaluations and allocations are terminal. If so, we delete these out of + // the system. + CoreJobJobGC = "job-gc" ) // Evaluation is used anytime we need to apply business logic as a result diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 914b54b060b..1f573028620 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -61,6 +61,9 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -72,6 +75,9 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { MemoryMB: 1024, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 605902ed4e2..68716ea7609 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -203,6 +203,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -214,6 +217,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { MemoryMB: 1024, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) @@ -277,6 +283,9 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -288,6 +297,9 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { MemoryMB: 1024, }, DesiredStatus: structs.AllocDesiredStatusRun, + TaskStates: map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + }, } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) @@ -317,7 +329,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { t.Fatalf("Bad: %v", out[0]) } if out[1].Score != 18 { - t.Fatalf("Bad: %v", out[0]) + t.Fatalf("Bad: %v", out[1]) } } diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index 99aefe2b497..8b8d0ef1c9f 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -156,6 +156,29 @@ The `job` object supports the following keys: and "h" suffix can be used, such as "30s". Both values default to zero, which disables rolling updates. +* `gc` - Specifies the job's garbage collection configuration. This allows jobs + to be garbage collected when all their evaluations and allocations are + terminal. The `gc` block has the following format: + + ``` + "gc" { + // Enabled is set to true by default if the "gc" block is included. + enabled = true + + // Threshold is a duration that configures how old a job must be + // before it is garbage collected. + threshold = "4h" + } + ``` + + * `enabled`: Toggles the eligibility of a job for garbage collection. + + * `threshold`: `threshold` is a string that should be parseable as a + [time.Duration](https://golang.org/pkg/time/#ParseDuration). A job will + only be garbage collected after the job, its evaluations and allocations + are all older than the threshold. + + ### Task Group The `group` object supports the following keys: From 1375d255e9309c25e114911cce5d08f27298ef36 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 15 Dec 2015 13:07:25 -0800 Subject: [PATCH 2/8] Remove user-specifiable gc threshold --- api/jobs.go | 8 +--- jobspec/parse.go | 20 ++++------ jobspec/parse_test.go | 5 +-- jobspec/test-fixtures/gc.hcl | 2 +- nomad/config.go | 5 +++ nomad/core_sched.go | 6 ++- nomad/core_sched_test.go | 6 +-- nomad/state/schema.go | 6 +-- nomad/state/state_store_test.go | 5 +-- nomad/structs/structs.go | 45 +---------------------- website/source/docs/jobspec/index.html.md | 14 ++----- 11 files changed, 27 insertions(+), 95 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index f50826ad39d..912bcb97286 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -107,12 +107,6 @@ type UpdateStrategy struct { MaxParallel int } -// JobGCConfig configures the garbage collection policy of a job. -type JobGCConfig struct { - Enabled bool - Threshold time.Duration -} - // Job is used to serialize a job. type Job struct { Region string @@ -125,7 +119,7 @@ type Job struct { Constraints []*Constraint TaskGroups []*TaskGroup Update *UpdateStrategy - GC *JobGCConfig + GC bool Meta map[string]string Status string StatusDescription string diff --git a/jobspec/parse.go b/jobspec/parse.go index 723fac6a121..2573341bdc5 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -723,7 +723,7 @@ func parsePeriodic(result **structs.PeriodicConfig, list *ast.ObjectList) error return nil } -func parseGC(result **structs.JobGCConfig, list *ast.ObjectList) error { +func parseGC(result *bool, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) > 1 { return fmt.Errorf("only one 'gc' block allowed per job") @@ -738,23 +738,17 @@ func parseGC(result **structs.JobGCConfig, list *ast.ObjectList) error { } // Enabled by default if the gc block exists. + enabled := false if value, ok := m["enabled"]; !ok { - m["Enabled"] = true + enabled = true } else { - enabled, err := parseBool(value) + var err error + enabled, err = parseBool(value) if err != nil { return fmt.Errorf("gc.enabled should be set to true or false; %v", err) } - m["Enabled"] = enabled } - dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ - DecodeHook: mapstructure.StringToTimeDurationHookFunc(), - WeaklyTypedInput: true, - Result: result, - }) - if err != nil { - return err - } - return dec.Decode(m) + *result = enabled + return nil } diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 768fa9988da..7a89595f87b 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -254,10 +254,7 @@ func TestParse(t *testing.T) { Priority: 50, Region: "global", Type: "service", - GC: &structs.JobGCConfig{ - Enabled: true, - Threshold: 2 * time.Hour, - }, + GC: true, }, false, }, diff --git a/jobspec/test-fixtures/gc.hcl b/jobspec/test-fixtures/gc.hcl index cef4bcabd24..5af5de1e13d 100644 --- a/jobspec/test-fixtures/gc.hcl +++ b/jobspec/test-fixtures/gc.hcl @@ -1,5 +1,5 @@ job "foo" { gc { - threshold = "2h" + enabled = true } } diff --git a/nomad/config.go b/nomad/config.go index 850aa58c29c..1773921984f 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -135,6 +135,10 @@ type Config struct { // available for garbage collection. JobGCInterval time.Duration + // JobGCThreshold is how old a job must be before it eligible for GC. This gives + // the user time to inspect the job. + JobGCThreshold time.Duration + // NodeGCInterval is how often we dispatch a job to GC failed nodes. NodeGCInterval time.Duration @@ -207,6 +211,7 @@ func DefaultConfig() *Config { EvalGCInterval: 5 * time.Minute, EvalGCThreshold: 1 * time.Hour, JobGCInterval: 5 * time.Minute, + JobGCThreshold: 4 * time.Hour, NodeGCInterval: 5 * time.Minute, NodeGCThreshold: 24 * time.Hour, EvalNackTimeout: 60 * time.Second, diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 27ca5cfcf7f..ed096a53f3a 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -50,6 +50,10 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { // Get the time table to calculate GC cutoffs. tt := c.srv.fsm.TimeTable() + cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold) + oldThreshold := tt.NearestIndex(cutoff) + c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)", + oldThreshold, c.srv.config.JobGCThreshold) // Collect the allocations and evaluations to GC var gcAlloc, gcEval, gcJob []string @@ -57,8 +61,6 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { OUTER: for i := jIter.Next(); i != nil; i = jIter.Next() { job := i.(*structs.Job) - cutoff := time.Now().UTC().Add(-1 * job.GC.Threshold) - oldThreshold := tt.NearestIndex(cutoff) // Ignore new jobs. if job.CreateIndex > oldThreshold { diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 2d4057d50b7..28c0580f10e 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -145,11 +145,7 @@ func TestCoreScheduler_JobGC(t *testing.T) { // Insert job. state := s1.fsm.State() job := mock.Job() - threshold := 1 * time.Hour - job.GC = &structs.JobGCConfig{ - Enabled: true, - Threshold: threshold, - } + job.GC = true err := state.UpsertJob(1000, job) if err != nil { t.Fatalf("test(%s) err: %v", test.test, err) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index f795d1ff01c..961cb67a7e2 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -121,11 +121,7 @@ func jobIsGCable(obj interface{}) (bool, error) { return false, fmt.Errorf("Unexpected type: %v", obj) } - if j.GC == nil || !j.GC.Enabled { - return false, nil - } - - return true, nil + return j.GC, nil } // evalTableSchema returns the MemDB schema for the eval table. diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index aa497cd4a00..0609f30480a 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -486,10 +486,7 @@ func TestStateStore_JobsByGC(t *testing.T) { for i := 0; i < 10; i++ { job := mock.Job() - job.GC = &structs.JobGCConfig{ - Enabled: true, - Threshold: structs.DefaultJobGCThreshold, - } + job.GC = true gc = append(gc, job) if err := state.UpsertJob(2000+uint64(i), job); err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6d384623e08..1f45f8842da 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -766,7 +766,7 @@ type Job struct { // GC is used to mark the job as available for garbage collection after it // has no outstanding evaluations or allocations. - GC *JobGCConfig + GC bool // Meta is used to associate arbitrary metadata with this // job. This is opaque to Nomad. @@ -788,11 +788,6 @@ type Job struct { func (j *Job) InitFields() { // Initialize the service block. j.InitAllServiceFields() - - // Initalize the GC policy - if j.GC != nil { - j.GC.Init() - } } // InitAllServiceFields traverses all Task Groups and makes them @@ -870,13 +865,6 @@ func (j *Job) Validate() error { fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch)) } - // Validate the GC config. - if j.GC != nil { - if err := j.GC.Validate(); err != nil { - mErr.Errors = append(mErr.Errors, err) - } - } - return mErr.ErrorOrNil() } @@ -922,37 +910,6 @@ type JobListStub struct { ModifyIndex uint64 } -const ( - // DefaultJobGCThreshold is the default threshold for garbage collecting - // eligible jobs. - DefaultJobGCThreshold = 4 * time.Hour -) - -// JobGCConfig configures the garbage collection policy of a job. -type JobGCConfig struct { - // Enabled determines whether the job is eligible for garbage collection. - Enabled bool - - // Threshold is how old a job must be before it eligible for GC. This gives - // the user time to inspect the job. - Threshold time.Duration -} - -// Init sets the Threshold time to its default value if it is un-specified but -// garbage collection is enabled. -func (gc *JobGCConfig) Init() { - if gc.Enabled && gc.Threshold == 0 { - gc.Threshold = DefaultJobGCThreshold - } -} - -func (gc *JobGCConfig) Validate() error { - if gc.Threshold < 0 { - return fmt.Errorf("job GC threshold must be positive: %v", gc.Threshold) - } - return nil -} - // UpdateStrategy is used to modify how updates are done type UpdateStrategy struct { // Stagger is the amount of time between the updates diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index 8b8d0ef1c9f..d3f4ee65066 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -156,7 +156,7 @@ The `job` object supports the following keys: and "h" suffix can be used, such as "30s". Both values default to zero, which disables rolling updates. -* `gc` - Specifies the job's garbage collection configuration. This allows jobs +* `gc` - Toggles the job's eligibility for garbage collection. This allows jobs to be garbage collected when all their evaluations and allocations are terminal. The `gc` block has the following format: @@ -164,20 +164,14 @@ The `job` object supports the following keys: "gc" { // Enabled is set to true by default if the "gc" block is included. enabled = true - - // Threshold is a duration that configures how old a job must be - // before it is garbage collected. - threshold = "4h" } + + // Equivalent configuration. + "gc" {} ``` * `enabled`: Toggles the eligibility of a job for garbage collection. - * `threshold`: `threshold` is a string that should be parseable as a - [time.Duration](https://golang.org/pkg/time/#ParseDuration). A job will - only be garbage collected after the job, its evaluations and allocations - are all older than the threshold. - ### Task Group From 4cc880249a057ff238e3726af8268755016131eb Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 15 Dec 2015 13:32:31 -0800 Subject: [PATCH 3/8] Fix test --- nomad/core_sched_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 28c0580f10e..54c87605126 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -172,7 +172,7 @@ func TestCoreScheduler_JobGC(t *testing.T) { // Update the time tables to make this work tt := s1.fsm.TimeTable() - tt.Witness(2000, time.Now().UTC().Add(-1*threshold)) + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold)) // Create a core scheduler snap, err := state.Snapshot() From c496c761b82a2428f9d6ef20556621a94f84afb7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 15 Dec 2015 17:20:07 -0800 Subject: [PATCH 4/8] Remove from parser --- api/jobs.go | 1 - jobspec/parse.go | 38 ----------------------- jobspec/parse_test.go | 13 -------- jobspec/test-fixtures/gc.hcl | 5 --- website/source/docs/jobspec/index.html.md | 17 ---------- 5 files changed, 74 deletions(-) delete mode 100644 jobspec/test-fixtures/gc.hcl diff --git a/api/jobs.go b/api/jobs.go index 912bcb97286..17e75daff5b 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -119,7 +119,6 @@ type Job struct { Constraints []*Constraint TaskGroups []*TaskGroup Update *UpdateStrategy - GC bool Meta map[string]string Status string StatusDescription string diff --git a/jobspec/parse.go b/jobspec/parse.go index 2573341bdc5..765f58b3ada 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -91,7 +91,6 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { delete(m, "meta") delete(m, "update") delete(m, "periodic") - delete(m, "gc") // Set the ID and name to the object key result.ID = obj.Keys[0].Token.Value().(string) @@ -136,13 +135,6 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { } } - // If we have a gc config, then parse that - if o := listVal.Filter("gc"); len(o.Items) > 0 { - if err := parseGC(&result.GC, o); err != nil { - return err - } - } - // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { @@ -722,33 +714,3 @@ func parsePeriodic(result **structs.PeriodicConfig, list *ast.ObjectList) error *result = &p return nil } - -func parseGC(result *bool, list *ast.ObjectList) error { - list = list.Elem() - if len(list.Items) > 1 { - return fmt.Errorf("only one 'gc' block allowed per job") - } - - // Get our resource object - o := list.Items[0] - - var m map[string]interface{} - if err := hcl.DecodeObject(&m, o.Val); err != nil { - return err - } - - // Enabled by default if the gc block exists. - enabled := false - if value, ok := m["enabled"]; !ok { - enabled = true - } else { - var err error - enabled, err = parseBool(value) - if err != nil { - return fmt.Errorf("gc.enabled should be set to true or false; %v", err) - } - } - - *result = enabled - return nil -} diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 7a89595f87b..4497348eba5 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -246,19 +246,6 @@ func TestParse(t *testing.T) { false, }, - { - "gc.hcl", - &structs.Job{ - ID: "foo", - Name: "foo", - Priority: 50, - Region: "global", - Type: "service", - GC: true, - }, - false, - }, - { "specify-job.hcl", &structs.Job{ diff --git a/jobspec/test-fixtures/gc.hcl b/jobspec/test-fixtures/gc.hcl deleted file mode 100644 index 5af5de1e13d..00000000000 --- a/jobspec/test-fixtures/gc.hcl +++ /dev/null @@ -1,5 +0,0 @@ -job "foo" { - gc { - enabled = true - } -} diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index d3f4ee65066..99aefe2b497 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -156,23 +156,6 @@ The `job` object supports the following keys: and "h" suffix can be used, such as "30s". Both values default to zero, which disables rolling updates. -* `gc` - Toggles the job's eligibility for garbage collection. This allows jobs - to be garbage collected when all their evaluations and allocations are - terminal. The `gc` block has the following format: - - ``` - "gc" { - // Enabled is set to true by default if the "gc" block is included. - enabled = true - } - - // Equivalent configuration. - "gc" {} - ``` - - * `enabled`: Toggles the eligibility of a job for garbage collection. - - ### Task Group The `group` object supports the following keys: From b09440a018ab5014a2d2d1f32081fe7dcf32db61 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 15 Dec 2015 17:30:50 -0800 Subject: [PATCH 5/8] Don't allow users to set gc and make batch gc --- nomad/job_endpoint.go | 21 +++++++++++-- nomad/job_endpoint_test.go | 62 ++++++++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 5 +++ 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index f693af9c40d..18da75268ab 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1,6 +1,7 @@ package nomad import ( + "errors" "fmt" "time" @@ -25,14 +26,18 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis if args.Job == nil { return fmt.Errorf("missing job for registration") } - if err := args.Job.Validate(); err != nil { + + if err := j.checkBlacklist(args.Job); err != nil { return err } - // Initialize the job fields (sets defaults and any other necessary init - // work). + // Initialize the job fields (sets defaults and any necessary init work). args.Job.InitFields() + if err := args.Job.Validate(); err != nil { + return err + } + if args.Job.Type == structs.JobTypeCore { return fmt.Errorf("job type cannot be core") } @@ -76,6 +81,16 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return nil } +// checkBlacklist returns an error if the user has set any blacklisted field in +// the job. +func (j *Job) checkBlacklist(job *structs.Job) error { + if job.GC { + return errors.New("GC field of a job is used only internally and should not be set by user") + } + + return nil +} + // Evaluate is used to force a job for re-evaluation func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error { if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 986ebc102c0..5bc3bb95297 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -171,6 +171,68 @@ func TestJobEndpoint_Register_Existing(t *testing.T) { } } +func TestJobEndpoint_Register_Batch(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + job.Type = structs.JobTypeBatch + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Check for the node in the FSM + state := s1.fsm.State() + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if !out.GC { + t.Fatal("expect batch job to be made garbage collectible") + } +} + +func TestJobEndpoint_Register_GC_Set(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + job.GC = true + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err == nil { + t.Fatalf("expect err") + } +} + func TestJobEndpoint_Evaluate(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1f45f8842da..14912dd0ce8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -788,6 +788,11 @@ type Job struct { func (j *Job) InitFields() { // Initialize the service block. j.InitAllServiceFields() + + // If the job is batch then make it GC. + if j.Type == JobTypeBatch { + j.GC = true + } } // InitAllServiceFields traverses all Task Groups and makes them From 6cdd01366013cb2a7a8d5c064cc46e4bdd27f9d7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 16 Dec 2015 14:27:40 -0800 Subject: [PATCH 6/8] Small cleanup --- nomad/core_sched.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index ed096a53f3a..f557c9285c1 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -43,7 +43,7 @@ func (s *CoreScheduler) Process(eval *structs.Evaluation) error { // jobGC is used to garbage collect eligible jobs. func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { // Get all the jobs eligible for garbage collection. - jIter, err := c.snap.JobsByGC(true) + iter, err := c.snap.JobsByGC(true) if err != nil { return err } @@ -55,16 +55,16 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)", oldThreshold, c.srv.config.JobGCThreshold) - // Collect the allocations and evaluations to GC + // Collect the allocations, evaluations and jobs to GC var gcAlloc, gcEval, gcJob []string OUTER: - for i := jIter.Next(); i != nil; i = jIter.Next() { + for i := iter.Next(); i != nil; i = iter.Next() { job := i.(*structs.Job) // Ignore new jobs. if job.CreateIndex > oldThreshold { - continue OUTER + continue } evals, err := c.snap.EvalsByJob(job.ID) From 2752a951a254f06cf5e11eb448e6ff336d91d5a4 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 16 Dec 2015 14:34:17 -0800 Subject: [PATCH 7/8] Use client state --- nomad/structs/structs.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 14912dd0ce8..ec4f089869a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1489,18 +1489,19 @@ type Allocation struct { // TerminalStatus returns if the desired or actual status is terminal and // will no longer transition. func (a *Allocation) TerminalStatus() bool { + // First check the desired state and if that isn't terminal, check client + // state. switch a.DesiredStatus { case AllocDesiredStatusStop, AllocDesiredStatusEvict, AllocDesiredStatusFailed: return true default: - // If all tasks are dead, the alloc is terminal. - for _, state := range a.TaskStates { - if state.State != TaskStateDead { - return false - } - } + } + switch a.ClientStatus { + case AllocClientStatusDead, AllocClientStatusFailed: return true + default: + return false } } From bc13dcaf48bf66b24ecaea20209848cea1e622b5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 16 Dec 2015 15:01:15 -0800 Subject: [PATCH 8/8] merge --- nomad/mock/mock.go | 6 ------ nomad/structs/funcs_test.go | 28 ++++------------------------ scheduler/context_test.go | 8 ++------ scheduler/rank_test.go | 16 ++++------------ 4 files changed, 10 insertions(+), 48 deletions(-) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 012677f18b8..2e0057c8e2b 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -231,12 +231,6 @@ func Alloc() *structs.Allocation { }, }, }, - Services: map[string]string{"web-frontend": "nomad-registered-task-1234"}, - TaskStates: map[string]*structs.TaskState{ - "web": &structs.TaskState{ - State: structs.TaskStatePending, - }, - }, Job: Job(), DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index a3bf238858b..93ce5cb308b 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -26,43 +26,23 @@ func TestFilterTerminalAllocs(t *testing.T) { l := []*Allocation{ &Allocation{ID: "bar", DesiredStatus: AllocDesiredStatusEvict}, &Allocation{ID: "baz", DesiredStatus: AllocDesiredStatusStop}, - &Allocation{ - ID: "zip", - DesiredStatus: AllocDesiredStatusRun, - TaskStates: map[string]*TaskState{ - "a": &TaskState{State: TaskStatePending}, - }, - }, &Allocation{ ID: "foo", DesiredStatus: AllocDesiredStatusRun, - TaskStates: map[string]*TaskState{ - "a": &TaskState{State: TaskStatePending}, - }, + ClientStatus: AllocClientStatusPending, }, &Allocation{ ID: "bam", DesiredStatus: AllocDesiredStatusRun, - TaskStates: map[string]*TaskState{ - "a": &TaskState{State: TaskStatePending}, - "b": &TaskState{State: TaskStateDead}, - }, - }, - &Allocation{ - ID: "fizz", - DesiredStatus: AllocDesiredStatusRun, - TaskStates: map[string]*TaskState{ - "a": &TaskState{State: TaskStateDead}, - "b": &TaskState{State: TaskStateDead}, - }, + ClientStatus: AllocClientStatusDead, }, } out := FilterTerminalAllocs(l) - if len(out) != 3 { + if len(out) != 1 { t.Fatalf("bad: %#v", out) } - if out[0].ID != "zip" && out[1].ID != "foo" && out[2].ID != "bam" { + if out[0].ID != "foo" { t.Fatalf("bad: %#v", out) } } diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 1f573028620..006e1ae994c 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -61,9 +61,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, - TaskStates: map[string]*structs.TaskState{ - "foo": &structs.TaskState{State: structs.TaskStatePending}, - }, + ClientStatus: structs.AllocClientStatusPending, } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -75,9 +73,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) { MemoryMB: 1024, }, DesiredStatus: structs.AllocDesiredStatusRun, - TaskStates: map[string]*structs.TaskState{ - "foo": &structs.TaskState{State: structs.TaskStatePending}, - }, + ClientStatus: structs.AllocClientStatusPending, } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 68716ea7609..d53aa996f57 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -203,9 +203,7 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, - TaskStates: map[string]*structs.TaskState{ - "foo": &structs.TaskState{State: structs.TaskStatePending}, - }, + ClientStatus: structs.AllocClientStatusPending, } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -217,9 +215,7 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { MemoryMB: 1024, }, DesiredStatus: structs.AllocDesiredStatusRun, - TaskStates: map[string]*structs.TaskState{ - "foo": &structs.TaskState{State: structs.TaskStatePending}, - }, + ClientStatus: structs.AllocClientStatusPending, } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2})) @@ -283,9 +279,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { MemoryMB: 2048, }, DesiredStatus: structs.AllocDesiredStatusRun, - TaskStates: map[string]*structs.TaskState{ - "foo": &structs.TaskState{State: structs.TaskStatePending}, - }, + ClientStatus: structs.AllocClientStatusPending, } alloc2 := &structs.Allocation{ ID: structs.GenerateUUID(), @@ -297,9 +291,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { MemoryMB: 1024, }, DesiredStatus: structs.AllocDesiredStatusRun, - TaskStates: map[string]*structs.TaskState{ - "foo": &structs.TaskState{State: structs.TaskStatePending}, - }, + ClientStatus: structs.AllocClientStatusPending, } noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))