Skip to content

Commit

Permalink
nonblocking worker queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Equanox committed Dec 9, 2022
1 parent 2e62046 commit 3f32176
Show file tree
Hide file tree
Showing 9 changed files with 424 additions and 238 deletions.
140 changes: 54 additions & 86 deletions bob/playbook/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,106 +4,98 @@ import (
"context"
"errors"
"fmt"
"os"
"sync"

"github.com/benchkram/bob/bobtask"
"github.com/benchkram/bob/bobtask/hash"
"github.com/benchkram/bob/pkg/boblog"
"github.com/benchkram/bob/pkg/usererror"
)

// Build the playbook starting at root.
func (p *Playbook) Build(ctx context.Context) (err error) {
processingErrorsMutex := sync.Mutex{}
var processingErrors []error

var processedTasks []*bobtask.Task

p.pickTaskColors()

// Setup worker pool and queue.
workers := p.maxParallel
queue := make(chan *bobtask.Task)

boblog.Log.Info(fmt.Sprintf("Using %d workers", workers))

processing := sync.WaitGroup{}

// Start the workers which listen on task queue
for i := 0; i < workers; i++ {
go func(workerID int) {
for t := range queue {
processing.Add(1)
boblog.Log.V(5).Info(fmt.Sprintf("RUNNING task %s on worker %d ", t.Name(), workerID))
err := p.build(ctx, t)
if err != nil {
processingErrorsMutex.Lock()
processingErrors = append(processingErrors, fmt.Errorf("(worker) [task: %s], %w", t.Name(), err))
processingErrorsMutex.Unlock()

// Any error occurred during a build puts the
// playbook in a done state. This prevents
// further tasks be queued for execution.
p.Done()
}
p.pickTaskColors()

processedTasks = append(processedTasks, t)
processing.Done()
}
}(i + 1)
}
wm := p.startWorkers(ctx, workers)

// Listen for tasks from the playbook and forward them to the worker pool
// listen for idle workers
go func() {
c := p.TaskChannel()
for t := range c {
boblog.Log.V(5).Info(fmt.Sprintf("Sending task %s", t.Name()))
// A buffer for workers which have
// no workload assigned.
workerBuffer := []int{}

// blocks till a worker is available
queue <- t
for workerID := range wm.idleChan {

// initiate another playbook run,
// as there might be workers without
// assigned tasks left.
err := p.Play()
// boblog.Log.V(1).Info("Calling Next")
task, err := p.Next()
if err != nil {
if !errors.Is(err, ErrDone) {
processingErrorsMutex.Lock()
processingErrors = append(processingErrors, fmt.Errorf("(scheduler) [task: %s], %w", t.Name(), err))
processingErrorsMutex.Unlock()

if errors.Is(err, ErrDone) {
//boblog.Log.V(1).Info("Done")
wm.stopWorkers()
// exit
break
}

wm.addError(fmt.Errorf("worker-availability-queue: unexpected error comming from Next(): %w", err))
wm.stopWorkers()
break
}
}
}()

err = p.Play()
if err != nil {
return err
}
// Push workload to the worker or store the worker for later.
if task != nil {
//boblog.Log.V(1).Info(fmt.Sprintf("Sending task %s to worker", task.Name()))
// Send workload to worker
wm.workloadQueues[workerID] <- task

// There might be more workload left.
// Reqeuing a worker from the buffer.
if len(workerBuffer) > 0 {
wID := workerBuffer[len(workerBuffer)-1]
workerBuffer = workerBuffer[:len(workerBuffer)-1]

// boblog.Log.V(1).Info("Requeue Worker")
// requeue a buffered worker
wm.idleChan <- wID
}
} else {

// No task yet ready to be worked on but the playbook is not done yet.
// Therfore the worker is stored in a buffer and is requeued on
// the next change to the playbook.
workerBuffer = append(workerBuffer, workerID)
//boblog.Log.V(1).Info(fmt.Sprintf("Buffering Worker [worker_id:%d ] [%s:%d]", workerID, "buffer_size", len(workerBuffer)))

<-p.DoneChan()
processing.Wait()
}

//wm.printWorkerState()
}

// to assure even idling workers will be shutdown.
wm.closeWorkloadQueues()
}()

close(queue)
wm.workerWG.Wait()

// iterate through tasks and logs
// skipped input files.
var skippedInputs int
for _, task := range processedTasks {
for _, task := range wm.processed {
skippedInputs = logSkippedInputs(
skippedInputs,
task.ColoredName(),
task.LogSkippedInput(),
)
}

p.summary(processedTasks)
//p.summary(processedTasks)

if len(processingErrors) > 0 {
if len(wm.errors) > 0 {
// Pass only the very first processing error.
return processingErrors[0]
return wm.errors[0]
}

// sync any newly generated artifacts with the remote store
Expand All @@ -119,30 +111,6 @@ func (p *Playbook) Build(ctx context.Context) (err error) {
return nil
}

const maxSkippedInputs = 5

// logSkippedInputs until max is reached
func logSkippedInputs(count int, taskname string, skippedInputs []string) int {
if len(skippedInputs) == 0 {
return count
}
if count >= maxSkippedInputs {
return maxSkippedInputs
}

for _, f := range skippedInputs {
count = count + 1
boblog.Log.V(1).Info(fmt.Sprintf("skipped %s '%s' %s", taskname, f, os.ErrPermission))

if count >= maxSkippedInputs {
boblog.Log.V(1).Info(fmt.Sprintf("skipped %s %s", taskname, "& more..."))
break
}
}

return count
}

// inputHashes returns and array of input hashes of the playbook,
// optionally filters tasks without targets.
func (p *Playbook) inputHashes(filterTarget bool) map[string]hash.In {
Expand Down
8 changes: 4 additions & 4 deletions bob/playbook/build_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (p *Playbook) build(ctx context.Context, task *bobtask.Task) (err error) {
var taskErr error
defer func() {
if !taskSuccessFul {
errr := p.TaskFailed(task.Name(), taskErr)
errr := p.TaskFailed(task.TaskID, taskErr)
if errr != nil {
boblog.Log.Error(errr, "Setting the task state to failed, failed.")
}
Expand All @@ -38,7 +38,7 @@ func (p *Playbook) build(ctx context.Context, task *bobtask.Task) (err error) {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
boblog.Log.V(1).Info(fmt.Sprintf("%-*s\t%s", p.namePad, coloredName, StateCanceled))
_ = p.TaskCanceled(task.Name())
_ = p.TaskCanceled(task.TaskID)
}
}
}()
Expand Down Expand Up @@ -102,7 +102,7 @@ func (p *Playbook) build(ctx context.Context, task *bobtask.Task) (err error) {
status := StateNoRebuildRequired
boblog.Log.V(2).Info(fmt.Sprintf("%-*s\t%s", p.namePad, coloredName, status.Short()))
taskSuccessFul = true
return p.TaskNoRebuildRequired(task.Name())
return p.TaskNoRebuildRequired(task.TaskID)
}

err = task.Clean()
Expand All @@ -123,7 +123,7 @@ func (p *Playbook) build(ctx context.Context, task *bobtask.Task) (err error) {
// flagged as failed in a defered function call.
taskSuccessFul = true

err = p.TaskCompleted(task.Name())
err = p.TaskCompleted(task.TaskID)
if err != nil {
if errors.Is(err, ErrFailed) {
return err
Expand Down
143 changes: 143 additions & 0 deletions bob/playbook/next.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package playbook

import (
"fmt"
"time"
)

func (p *Playbook) Next() (_ *Status, err error) {
if p.done {
return nil, ErrDone
}
p.oncePrepareOptimizedAccess.Do(func() {
_ = p.Tasks.walk(p.root, func(taskname string, task *Status, _ error) error {
for _, dependentTaskName := range task.DependsOn {
t := p.Tasks[dependentTaskName]
task.DependsOnIDs = append(task.DependsOnIDs, t.TaskID)
}
return nil
})
})

// Required? Yes!
p.playMutex.Lock()
//defer p.playMutex.Unlock()

if p.start.IsZero() {
p.start = time.Now()
}

// Walk the task chain and determine the next build task. Send it to the task channel.
// Returns `taskQueued` when a task has been send to the taskChannel.
// Returns `taskFailed` when a task has failed.
// Once it returns `nil` the playbook is done with it's work.
var taskQueued = fmt.Errorf("task queued")
var taskFailed = fmt.Errorf("task failed")
//var noTaskReadyToRun = fmt.Errorf("no task ready to run")

type result struct {
t *Status
state string // queued, playbook-done, failed
}
c := make(chan result, 1)

// Starting the walk function in a goroutine to be able
// to return a ready to be processed task immeadiately
// from Next().
go func(output chan result) {
didAllTaskComplete := true
_ = p.TasksOptimized.walkBottomFirst(p.rootID, func(taskID int, task *Status, err error) error {
if err != nil {
return err
}

// if taskID == p.rootID || task.Name() == "apps/build" {
// boblog.Log.V(1).Info(fmt.Sprintf("%-*s\t walking [state: %s]", p.namePad, task.Name(), task.State()))
// }

switch task.State() {
case StatePending:
didAllTaskComplete = false
// Check if all dependent tasks are completed
for _, dependentTaskID := range task.Task.DependsOnIDs {
t := p.TasksOptimized[dependentTaskID]

state := t.State()
// if taskID == p.rootID || task.Name() == "apps/build" {
// if state != StateNoRebuildRequired {
// boblog.Log.V(1).Info(fmt.Sprintf("%-*s\t walking [state: %s]", p.namePad, t.Name(), state))
// }
// }
if state != StateCompleted && state != StateNoRebuildRequired {
// A dependent task is not completed.
// So this task is not yet ready to run.
return nil
}
}
case StateFailed:
output <- result{t: task, state: "failed"}
return taskFailed
case StateCanceled:
output <- result{t: task, state: "canceled"}
return nil
case StateNoRebuildRequired:
return nil
case StateCompleted:
return nil
case StateRunning:
didAllTaskComplete = false
return nil
case StateQueued:
didAllTaskComplete = false
return nil
default:
}

//fmt.Printf("sending task %s to channel\n", task.Task.Name())
// setting the task start time before passing it to channel

// TODO: for async assure to handle send to a closed channel.

// if task.State() != StatePending {
// boblog.Log.Info(fmt.Sprintf("Queuing a task [%s] with state %s is invalid", task.Name(), task.State()))
// os.Exit(1)
// }
_ = p.setTaskState(task.TaskID, StateQueued, nil)
output <- result{t: task, state: "queued"}
return taskQueued
})

if didAllTaskComplete {
output <- result{t: nil, state: "playbook-done"}
}
close(output)

// the goroutine can outlive the Next() run.
// avoiding concurrrent runs by only unlocking at the
// end of the walk.
p.playMutex.Unlock()

}(c)

for r := range c {
switch r.state {
case "queued":
//fmt.Printf("received task %s and returning\n", r.t.Name())
return r.t, nil
case "failed":
//fmt.Println("failed")
fallthrough
case "canceled":
//fmt.Println("failed")
fallthrough
case "playbook-done":
//fmt.Println("playbook-done")
p.done = true
return nil, ErrDone
}
}

//fmt.Printf("returning without a task\n")
return nil, nil

}
Loading

0 comments on commit 3f32176

Please sign in to comment.