Skip to content

Commit

Permalink
Handle new eval being the duplicate properly
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Nov 13, 2018
1 parent 1e3a20f commit 8460726
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
16 changes: 13 additions & 3 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,12 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
}

// Handle the new evaluation being for a job we are already tracking.
b.processBlockJobDuplicate(eval)
if b.processBlockJobDuplicate(eval) {
// If process block job duplicate returns true, the new evaluation has
// been marked as a duplicate and we have nothing to do, so return
// early.
return
}

// Check if the eval missed an unblock while it was in the scheduler at an
// older index. The scheduler could have been invoked with a snapshot of
Expand Down Expand Up @@ -234,8 +239,9 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
// would create unnecessary work for the scheduler as multiple evals for the
// same job would be run, all producing the same outcome. It is critical to
// prefer the newer evaluation, since it will contain the most up to date set of
// class eligibility. This should be called with the lock held.
func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) {
// class eligibility. The return value is set to true, if the passed evaluation
// is cancelled. This should be called with the lock held.
func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) (newCancelled bool) {
existingID, hasExisting := b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)]
if !hasExisting {
return
Expand All @@ -250,6 +256,7 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) {
dup = existingW.eval
} else {
dup = eval
newCancelled = true
}
} else {
existingW, ok = b.escaped[existingID]
Expand All @@ -266,6 +273,7 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) {
dup = existingW.eval
} else {
dup = eval
newCancelled = true
}
}

Expand All @@ -276,6 +284,8 @@ func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) {
case b.duplicateCh <- struct{}{}:
default:
}

return
}

// latestEvalIndex returns the max of the evaluations create and snapshot index
Expand Down
14 changes: 13 additions & 1 deletion nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {
blocked.Block(e)
blocked.Block(e2)

// Verify block did track both
// Verify stats such that we are only tracking one
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
Expand All @@ -137,12 +137,24 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {
t.Fatalf("bad: %#v %#v", out, e2)
}

// Verify stats such that we are only tracking one
bStats = blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}

// Add an older evaluation and assert it gets cancelled
blocked.Block(e4)
out = blocked.GetDuplicates(0)
if len(out) != 1 || !reflect.DeepEqual(out[0], e4) {
t.Fatalf("bad: %#v %#v", out, e4)
}

// Verify stats such that we are only tracking one
bStats = blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}

func TestBlockedEvals_UnblockEscaped(t *testing.T) {
Expand Down

0 comments on commit 8460726

Please sign in to comment.