Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocked evaluation fixes #4867

Merged
merged 6 commits into from
Nov 13, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 91 additions & 26 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

@@ -29,6 +31,9 @@ const (
// allocations. It is unblocked when the capacity of a node that could run the
// failed allocation becomes available.
type BlockedEvals struct {
// logger is the logger to use by the blocked eval tracker.
logger log.Logger

evalBroker *EvalBroker
enabled bool
stats *BlockedStats
@@ -47,7 +52,7 @@ type BlockedEvals struct {

// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[string]string
jobs map[structs.NamespacedID]string

// unblockIndexes maps computed node classes or quota name to the index in
// which they were unblocked. This is used to check if an evaluation could
@@ -102,12 +107,13 @@ type BlockedStats struct {

// NewBlockedEvals creates a new blocked eval tracker that will enqueue
// unblocked evals into the passed broker.
func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
return &BlockedEvals{
logger: logger.Named("blocked_evals"),
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
jobs: make(map[string]string),
jobs: make(map[structs.NamespacedID]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
@@ -176,19 +182,11 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
return
}

// Check if the job already has a blocked evaluation. If it does add it to
// the list of duplicates. We only ever want one blocked evaluation per job,
// otherwise we would create unnecessary work for the scheduler as multiple
// evals for the same job would be run, all producing the same outcome.
if _, existing := b.jobs[eval.JobID]; existing {
b.duplicates = append(b.duplicates, eval)

// Unblock any waiter.
select {
case b.duplicateCh <- struct{}{}:
default:
}

// Handle the new evaluation being for a job we are already tracking.
if b.processBlockJobDuplicate(eval) {
// If process block job duplicate returns true, the new evaluation has
// been marked as a duplicate and we have nothing to do, so return
// early.
return
}

@@ -205,7 +203,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
}

// Mark the job as tracked.
b.jobs[eval.JobID] = eval.ID
b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = eval.ID
b.stats.TotalBlocked++

// Track that the evaluation is being added due to reaching the quota limit
@@ -234,6 +232,71 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
b.captured[eval.ID] = wrapped
}

// processBlockJobDuplicate handles the case where the new eval is for a job
// that we are already tracking. If the eval is a duplicate, we add the older
// evaluation by Raft index to the list of duplicates such that it can be
// cancelled. We only ever want one blocked evaluation per job, otherwise we
// would create unnecessary work for the scheduler as multiple evals for the
// same job would be run, all producing the same outcome. It is critical to
// prefer the newer evaluation, since it will contain the most up to date set of
// class eligibility. The return value is set to true, if the passed evaluation
// is cancelled. This should be called with the lock held.
func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) (newCancelled bool) {
existingID, hasExisting := b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)]
if !hasExisting {
return
}

var dup *structs.Evaluation
existingW, ok := b.captured[existingID]
if ok {
if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.captured, existingID)
b.stats.TotalBlocked--
dup = existingW.eval
} else {
dup = eval
newCancelled = true
}
} else {
existingW, ok = b.escaped[existingID]
if !ok {
// This is a programming error
b.logger.Error("existing blocked evaluation is neither tracked as captured or escaped", "existing_id", existingID)
delete(b.jobs, structs.NewNamespacedID(eval.JobID, eval.Namespace))
return
}

if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.escaped, existingID)
b.stats.TotalEscaped--
dup = existingW.eval
} else {
dup = eval
newCancelled = true
}
}

b.duplicates = append(b.duplicates, dup)

// Unblock any waiter.
select {
case b.duplicateCh <- struct{}{}:
default:
}

return
}

// latestEvalIndex returns the max of the evaluations create and snapshot index
func latestEvalIndex(eval *structs.Evaluation) uint64 {
if eval == nil {
return 0
}

return helper.Uint64Max(eval.CreateIndex, eval.SnapshotIndex)
}

// missedUnblock returns whether an evaluation missed an unblock while it was in
// the scheduler. Since the scheduler can operate at an index in the past, the
// evaluation may have been processed missing data that would allow it to
@@ -291,7 +354,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
// Untrack causes any blocked evaluation for the passed job to be no longer
// tracked. Untrack is called when there is a successful evaluation for the job
// and a blocked evaluation is no longer needed.
func (b *BlockedEvals) Untrack(jobID string) {
func (b *BlockedEvals) Untrack(jobID, namespace string) {
b.l.Lock()
defer b.l.Unlock()

@@ -300,16 +363,18 @@ func (b *BlockedEvals) Untrack(jobID string) {
return
}

nsID := structs.NewNamespacedID(jobID, namespace)

// Get the evaluation ID to cancel
evalID, ok := b.jobs[jobID]
evalID, ok := b.jobs[nsID]
if !ok {
// No blocked evaluation so exit
return
}

// Attempt to delete the evaluation
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.jobs, nsID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
if w.eval.QuotaLimitReached != "" {
@@ -318,7 +383,7 @@ func (b *BlockedEvals) Untrack(jobID string) {
}

if w, ok := b.escaped[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.jobs, nsID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
@@ -440,7 +505,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
for id, wrapped := range b.escaped {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))

if wrapped.eval.QuotaLimitReached != "" {
numQuotaLimit++
@@ -467,7 +532,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
// is eligible based on the computed node class, or never seen the
// computed node class.
unblocked[wrapped.eval] = wrapped.token
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
delete(b.captured, id)
if wrapped.eval.QuotaLimitReached != "" {
numQuotaLimit++
@@ -502,7 +567,7 @@ func (b *BlockedEvals) UnblockFailed() {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.captured, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
if wrapped.eval.QuotaLimitReached != "" {
quotaLimit++
}
@@ -513,7 +578,7 @@ func (b *BlockedEvals) UnblockFailed() {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
b.stats.TotalEscaped -= 1
if wrapped.eval.QuotaLimitReached != "" {
quotaLimit++
@@ -571,7 +636,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalQuotaLimit = 0
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[string]string)
b.jobs = make(map[structs.NamespacedID]string)
b.unblockIndexes = make(map[string]uint64)
b.timetable = nil
b.duplicates = nil
40 changes: 33 additions & 7 deletions nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@@ -14,7 +15,7 @@ import (
func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) {
broker := testBroker(t, 0)
broker.SetEnabled(true)
blocked := NewBlockedEvals(broker)
blocked := NewBlockedEvals(broker, testlog.HCLogger(t))
blocked.SetEnabled(true)
return blocked, broker
}
@@ -99,23 +100,29 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {

// Create duplicate blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.CreateIndex = 100
e2 := mock.Eval()
e2.JobID = e.JobID
e2.CreateIndex = 101
e3 := mock.Eval()
e3.JobID = e.JobID
e3.CreateIndex = 102
e4 := mock.Eval()
e4.JobID = e.JobID
e4.CreateIndex = 100
blocked.Block(e)
blocked.Block(e2)

// Verify block did track both
// Verify stats such that we are only tracking one
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}

// Get the duplicates.
out := blocked.GetDuplicates(0)
if len(out) != 1 || !reflect.DeepEqual(out[0], e2) {
t.Fatalf("bad: %#v %#v", out, e2)
if len(out) != 1 || !reflect.DeepEqual(out[0], e) {
t.Fatalf("bad: %#v %#v", out, e)
}

// Call block again after a small sleep.
@@ -126,9 +133,28 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {

// Get the duplicates.
out = blocked.GetDuplicates(1 * time.Second)
if len(out) != 1 || !reflect.DeepEqual(out[0], e3) {
if len(out) != 1 || !reflect.DeepEqual(out[0], e2) {
t.Fatalf("bad: %#v %#v", out, e2)
}

// Verify stats such that we are only tracking one
bStats = blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}

// Add an older evaluation and assert it gets cancelled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test case for the other way around (add an eval thats older than the current one so that its marked as the duplicate)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e1-e3 are that, e4 tests that we can cancel the incoming

blocked.Block(e4)
out = blocked.GetDuplicates(0)
if len(out) != 1 || !reflect.DeepEqual(out[0], e4) {
t.Fatalf("bad: %#v %#v", out, e4)
}

// Verify stats such that we are only tracking one
bStats = blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}

func TestBlockedEvals_UnblockEscaped(t *testing.T) {
@@ -647,7 +673,7 @@ func TestBlockedEvals_Untrack(t *testing.T) {
}

// Untrack and verify
blocked.Untrack(e.JobID)
blocked.Untrack(e.JobID, e.Namespace)
bStats = blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
@@ -672,7 +698,7 @@ func TestBlockedEvals_Untrack_Quota(t *testing.T) {
}

// Untrack and verify
blocked.Untrack(e.JobID)
blocked.Untrack(e.JobID, e.Namespace)
bs = blocked.Stats()
if bs.TotalBlocked != 0 || bs.TotalEscaped != 0 || bs.TotalQuotaLimit != 0 {
t.Fatalf("bad: %#v", bs)
5 changes: 2 additions & 3 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
@@ -7,10 +7,9 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"

"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@@ -609,7 +608,7 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) {
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
n.blockedEvals.Untrack(eval.JobID, eval.Namespace)
}
}

5 changes: 3 additions & 2 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
@@ -48,11 +48,12 @@ func testStateStore(t *testing.T) *state.StateStore {
func testFSM(t *testing.T) *nomadFSM {
broker := testBroker(t, 0)
dispatcher, _ := testPeriodicDispatcher(t)
logger := testlog.HCLogger(t)
fsmConfig := &FSMConfig{
EvalBroker: broker,
Periodic: dispatcher,
Blocked: NewBlockedEvals(broker),
Logger: testlog.HCLogger(t),
Blocked: NewBlockedEvals(broker, logger),
Logger: logger,
Region: "global",
}
fsm, err := NewFSM(fsmConfig)
4 changes: 3 additions & 1 deletion nomad/leader_test.go
Original file line number Diff line number Diff line change
@@ -627,16 +627,18 @@ func TestLeader_ReapDuplicateEval(t *testing.T) {

// Create a duplicate blocked eval
eval := mock.Eval()
eval.CreateIndex = 100
eval2 := mock.Eval()
eval2.JobID = eval.JobID
eval2.CreateIndex = 102
s1.blockedEvals.Block(eval)
s1.blockedEvals.Block(eval2)

// Wait for the evaluation to marked as cancelled
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
out, err := state.EvalByID(ws, eval2.ID)
out, err := state.EvalByID(ws, eval.ID)
if err != nil {
return false, err
}
Loading