Skip to content

Commit

Permalink
Merge pull request #1383 from hashicorp/f-job-summary
Browse files Browse the repository at this point in the history
Job Summary - Part 1
  • Loading branch information
diptanu authored Jul 13, 2016
2 parents 7494c94 + 04b24cf commit 4b214f3
Show file tree
Hide file tree
Showing 12 changed files with 575 additions and 112 deletions.
29 changes: 0 additions & 29 deletions command/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,6 @@ func (m *monitor) update(update *evalState) {
for allocID, alloc := range update.allocs {
if existing, ok := existing.allocs[allocID]; !ok {
switch {
case alloc.desired == structs.AllocDesiredStatusFailed:
// New allocs with desired state failed indicate
// scheduling failure.
m.ui.Output(fmt.Sprintf("Scheduling error for group %q (%s)",
alloc.group, alloc.desiredDesc))

// Log the client status, if any provided
if alloc.clientDesc != "" {
m.ui.Output("Client reported status: " + alloc.clientDesc)
}

// Generate a more descriptive error for why the allocation
// failed and dump it to the screen
if alloc.full != nil {
dumpAllocStatus(m.ui, alloc.full, m.length)
}

case alloc.index < update.index:
// New alloc with create index lower than the eval
// create index indicates modification
Expand Down Expand Up @@ -275,18 +258,6 @@ func (m *monitor) monitor(evalID string, allowPrefix bool) int {
clientDesc: alloc.ClientDescription,
index: alloc.CreateIndex,
}

// If we have a scheduling error, query the full allocation
// to get the details.
if alloc.DesiredStatus == structs.AllocDesiredStatusFailed {
schedFailure = true
failed, _, err := m.client.Allocations().Info(alloc.ID, nil)
if err != nil {
m.ui.Error(fmt.Sprintf("Error querying allocation: %s", err))
return 1
}
state.allocs[alloc.ID].full = failed
}
}

// Update the state
Expand Down
62 changes: 0 additions & 62 deletions command/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,68 +133,6 @@ func TestMonitor_Update_Allocs(t *testing.T) {
}
}

func TestMonitor_Update_SchedulingFailure(t *testing.T) {
ui := new(cli.MockUi)
mon := newMonitor(ui, nil, shortId)

// New allocs with desired status failed warns
state := &evalState{
allocs: map[string]*allocState{
"alloc2": &allocState{
id: "87654321-dcba-efab-cdef-123456789abc",
group: "group2",
desired: structs.AllocDesiredStatusFailed,
desiredDesc: "something failed",
client: structs.AllocClientStatusFailed,
clientDesc: "client failed",
index: 1,

// Attach the full failed allocation
full: &api.Allocation{
ID: "87654321-dcba-efab-cdef-123456789abc",
TaskGroup: "group2",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusFailed,
Metrics: &api.AllocationMetric{
NodesEvaluated: 3,
NodesFiltered: 3,
ConstraintFiltered: map[string]int{
"$attr.kernel.name = linux": 3,
},
},
},
},
},
}
mon.update(state)

// Scheduling failure was logged
out := ui.OutputWriter.String()
if !strings.Contains(out, "group2") {
t.Fatalf("missing group\n\n%s", out)
}
if !strings.Contains(out, "Scheduling error") {
t.Fatalf("missing failure\n\n%s", out)
}
if !strings.Contains(out, "something failed") {
t.Fatalf("missing desired desc\n\n%s", out)
}
if !strings.Contains(out, "client failed") {
t.Fatalf("missing client desc\n\n%s", out)
}

// Check that the allocation details were dumped
if !strings.Contains(out, "3/3") {
t.Fatalf("missing filter stats\n\n%s", out)
}
if !strings.Contains(out, structs.AllocDesiredStatusFailed) {
t.Fatalf("missing alloc status\n\n%s", out)
}
if !strings.Contains(out, "$attr.kernel.name = linux") {
t.Fatalf("missing constraint\n\n%s", out)
}
}

func TestMonitor_Update_AllocModification(t *testing.T) {
ui := new(cli.MockUi)
mon := newMonitor(ui, nil, fullId)
Expand Down
18 changes: 9 additions & 9 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredStatus = structs.AllocDesiredStatusStop
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -467,19 +467,19 @@ func TestCoreScheduler_JobGC(t *testing.T) {
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: false,
},
{
test: "Has Alloc",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusRun,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: true,
},
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: true,
},
}
Expand Down Expand Up @@ -678,7 +678,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: false,
},
{
Expand All @@ -690,7 +690,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
allocStatus: structs.AllocDesiredStatusStop,
shouldExist: true,
},
}
Expand Down
38 changes: 38 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
AllocSnapshot
TimeTableSnapshot
PeriodicLaunchSnapshot
JobSummarySnapshot
)

// nomadFSM implements a finite state machine that is used
Expand Down Expand Up @@ -539,6 +540,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}

case JobSummarySnapshot:
summary := new(structs.JobSummary)
if err := dec.Decode(summary); err != nil {
return err
}
if err := restore.JobSummaryRestore(summary); err != nil {
return err
}

default:
return fmt.Errorf("Unrecognized snapshot type: %v", msgType)
}
Expand Down Expand Up @@ -593,6 +603,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistJobSummaries(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}

Expand Down Expand Up @@ -758,6 +772,30 @@ func (s *nomadSnapshot) persistPeriodicLaunches(sink raft.SnapshotSink,
return nil
}

func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink,
encoder *codec.Encoder) error {

summaries, err := s.snap.JobSummaries()
if err != nil {
return err
}

for {
raw := summaries.Next()
if raw == nil {
break
}

jobSummary := raw.(*structs.JobSummary)

sink.Write([]byte{byte(JobSummarySnapshot)})
if err := encoder.Encode(jobSummary); err != nil {
return err
}
}
return nil
}

// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
Expand Down
26 changes: 26 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,3 +938,29 @@ func TestFSM_SnapshotRestore_PeriodicLaunches(t *testing.T) {
t.Fatalf("bad: \n%#v\n%#v", out2, job2)
}
}

func TestFSM_SnapshotRestore_JobSummary(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()

job1 := mock.Job()
state.UpsertJob(1000, job1)
js1, _ := state.JobSummaryByID(job1.ID)

job2 := mock.Job()
state.UpsertJob(1001, job2)
js2, _ := state.JobSummaryByID(job2.ID)

// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.JobSummaryByID(job1.ID)
out2, _ := state2.JobSummaryByID(job2.ID)
if !reflect.DeepEqual(js1, out1) {
t.Fatalf("bad: \n%#v\n%#v", js1, job1)
}
if !reflect.DeepEqual(js2, out2) {
t.Fatalf("bad: \n%#v\n%#v", js2, job2)
}
}
19 changes: 19 additions & 0 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func stateStoreSchema() *memdb.DBSchema {
indexTableSchema,
nodeTableSchema,
jobTableSchema,
jobSummarySchema,
periodicLaunchTableSchema,
evalTableSchema,
allocTableSchema,
Expand Down Expand Up @@ -121,6 +122,24 @@ func jobTableSchema() *memdb.TableSchema {
}
}

// jobSummarySchema returns the memdb schema for the job summary table
func jobSummarySchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "job_summary",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
},
},
},
}
}

// jobIsGCable satisfies the ConditionalIndexFunc interface and creates an index
// on whether a job is eligible for garbage collection.
func jobIsGCable(obj interface{}) (bool, error) {
Expand Down
Loading

0 comments on commit 4b214f3

Please sign in to comment.