Skip to content

Commit

Permalink
Remove running, system scheduler, and fix tg overriding eligibility
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Feb 1, 2016
1 parent 3d8e7d0 commit 65b8b52
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 110 deletions.
19 changes: 8 additions & 11 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const (
type BlockedEvals struct {
evalBroker *EvalBroker
enabled bool
running bool
stats *BlockedStats
l sync.RWMutex

Expand Down Expand Up @@ -92,11 +91,15 @@ func (b *BlockedEvals) Enabled() bool {
// should only be enabled on the active leader.
func (b *BlockedEvals) SetEnabled(enabled bool) {
b.l.Lock()
b.enabled = enabled
if !b.running {
b.running = true
if b.enabled == enabled {
// No-op
return
} else if enabled {
go b.watchCapacity()
} else {
close(b.stopCh)
}
b.enabled = enabled
b.l.Unlock()
if !enabled {
b.Flush()
Expand Down Expand Up @@ -181,7 +184,7 @@ func (b *BlockedEvals) unblock(computedClass string) {
defer b.l.Unlock()

// Protect against the case of a flush.
if !b.running {
if !b.enabled {
return
}

Expand Down Expand Up @@ -265,12 +268,6 @@ func (b *BlockedEvals) Flush() {
b.l.Lock()
defer b.l.Unlock()

// Kill any running goroutines
if b.running {
close(b.stopCh)
b.running = false
}

// Reset the blocked eval tracker.
b.stats.TotalEscaped = 0
b.stats.TotalBlocked = 0
Expand Down
7 changes: 6 additions & 1 deletion scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,12 @@ func (e *EvalEligibility) GetClasses() map[string]bool {
case EvalComputedClassEligible:
elig[class] = true
case EvalComputedClassIneligible:
elig[class] = false
// Only mark as ineligible if it hasn't been marked before. This
// prevents one task group marking a class as ineligible when it
// is eligible on another task group.
if _, ok := elig[class]; !ok {
elig[class] = false
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions scheduler/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func TestEvalEligibility_GetClasses(t *testing.T) {
e.SetTaskGroupEligibility(false, "bar", "v1:4")
e.SetTaskGroupEligibility(true, "bar", "v1:5")

// Mark an existing eligible class as ineligible in the TG.
e.SetTaskGroupEligibility(false, "fizz", "v1:1")
e.SetTaskGroupEligibility(false, "fizz", "v1:3")

expClasses := map[string]bool{
"v1:1": true,
"v1:2": false,
Expand Down
15 changes: 0 additions & 15 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ type SystemScheduler struct {

limitReached bool
nextEval *structs.Evaluation

blocked *structs.Evaluation
}

// NewSystemScheduler is a factory function to instantiate a new system
Expand Down Expand Up @@ -129,19 +127,6 @@ func (s *SystemScheduler) process() (bool, error) {
s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID)
}

// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available.
if len(s.plan.FailedAllocs) != 0 && s.blocked == nil {
e := s.ctx.Eligibility()
classes := e.GetClasses()
s.blocked = s.eval.BlockedEval(classes, e.HasEscaped())
if err := s.planner.CreateEval(s.blocked); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
return false, err
}
s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID)
}

// Submit the plan
result, newState, err := s.planner.SubmitPlan(s.plan)
if err != nil {
Expand Down
83 changes: 0 additions & 83 deletions scheduler/system_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,89 +184,6 @@ func TestSystemSched_JobRegister_AllocFail(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestSystemSched_JobRegister_BlockedEval(t *testing.T) {
h := NewHarness(t)

// Create a full node
node := mock.Node()
node.Reserved = node.Resources
node.ComputeClass()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))

// Create an ineligible node
node2 := mock.Node()
node2.Attributes["kernel.name"] = "windows"
node2.ComputeClass()
noErr(t, h.State.UpsertNode(h.NextIndex(), node2))

// Create a jobs
job := mock.SystemJob()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))

// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}

// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}

// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]

// Ensure the plan has created a follow up eval.
if len(h.CreateEvals) != 1 {
t.Fatalf("bad: %#v", h.CreateEvals)
}

created := h.CreateEvals[0]
if created.Status != structs.EvalStatusBlocked {
t.Fatalf("bad: %#v", created)
}

classes := created.ClassEligibility
if len(classes) != 2 || !classes[node.ComputedClass] || classes[node2.ComputedClass] {
t.Fatalf("bad: %#v", classes)
}

if created.EscapedComputedClass {
t.Fatalf("bad: %#v", created)
}

// Ensure the plan failed to alloc
if len(plan.FailedAllocs) != 1 {
t.Fatalf("bad: %#v", plan)
}

// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)

// Ensure all allocations placed
if len(out) != 1 {
for _, a := range out {
t.Logf("%#v", a)
}
t.Fatalf("bad: %#v", out)
}

// Check the available nodes
if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 {
t.Fatalf("bad: %#v", out[0].Metrics)
}

h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestSystemSched_JobModify(t *testing.T) {
h := NewHarness(t)

Expand Down

0 comments on commit 65b8b52

Please sign in to comment.