Skip to content

Commit

Permalink
EnqueueAll inserts all evaluations before unblocking dequeue calls
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed May 18, 2016
1 parent 8698970 commit ab21a76
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 11 deletions.
29 changes: 18 additions & 11 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,21 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
}

// EnqueueAll is used to enqueue many evaluations.
// TODO: Update enqueueLocked to take a list and use heap.Fix instead of
// heap.Push in order to make the running time O(log(n+m)) instead of
// O(m*log(n)) where m is the size of the evals and n is the size of the
// existing heap.
func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) {
for _, e := range evals {
b.Enqueue(e)
}
}

// Enqueue is used to enqueue an evaluation
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
// The lock needs to be held until all evaluations are enqueued. This is so
// that when Dequeue operations are unblocked they will pick the highest
// priority evaluations.
b.l.Lock()
defer b.l.Unlock()
for _, eval := range evals {
b.processEnqueue(eval)
}
}

// processEnqueue deduplicates evals and either enqueue immediately
// or enforce the evals wait time. processEnqueue must be called with the lock
// held.
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation) {
// Check if already enqueued
if _, ok := b.evals[eval.ID]; ok {
return
Expand All @@ -164,6 +164,13 @@ func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
b.enqueueLocked(eval, eval.Type)
}

// Enqueue is used to enqueue an evaluation
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
b.l.Lock()
defer b.l.Unlock()
b.processEnqueue(eval)
}

// enqueueWaiting is used to enqueue a waiting evaluation
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
b.l.Lock()
Expand Down
45 changes: 45 additions & 0 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,48 @@ func TestEvalBroker_Wait(t *testing.T) {
t.Fatalf("bad : %#v", out)
}
}

// Ensure that priority is taken into account when enqueueing many evaluations.
func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)

// Start with a blocked dequeue
outCh := make(chan *structs.Evaluation, 1)
go func() {
start := time.Now()
out, _, err := b.Dequeue(defaultSched, time.Second)
end := time.Now()
outCh <- out
if err != nil {
t.Fatalf("err: %v", err)
}
if d := end.Sub(start); d < 5*time.Millisecond {
t.Fatalf("bad: %v", d)
}
}()

// Wait for a bit
time.Sleep(5 * time.Millisecond)

// Enqueue
evals := make([]*structs.Evaluation, 0, 8)
expectedPriority := 90
for i := 10; i <= expectedPriority; i += 10 {
eval := mock.Eval()
eval.Priority = i
evals = append(evals, eval)

}
b.EnqueueAll(evals)

// Ensure dequeue
select {
case out := <-outCh:
if out.Priority != expectedPriority {
t.Fatalf("bad: %v", out)
}
case <-time.After(time.Second):
t.Fatalf("timeout")
}
}

0 comments on commit ab21a76

Please sign in to comment.