Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimenting with optimizations #284

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion bob/bobfile/playbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,33 @@ import (

func (b *Bobfile) Playbook(taskName string, opts ...playbook.Option) (*playbook.Playbook, error) {

var idCounter int
pb := playbook.New(
taskName,
idCounter,
opts...,
)
idCounter++

err := b.BTasks.Walk(taskName, "", func(tn string, task bobtask.Task, err error) error {
if err != nil {
return err
}
if taskName == tn {
// The root task already has an id
statusTask := playbook.NewStatus(&task)
pb.Tasks[tn] = statusTask
pb.TasksOptimized = append(pb.TasksOptimized, statusTask)
return nil
}

task.TaskID = idCounter
statusTask := playbook.NewStatus(&task)

pb.Tasks[tn] = statusTask
pb.TasksOptimized = append(pb.TasksOptimized, statusTask)

pb.Tasks[tn] = playbook.NewStatus(&task)
idCounter++

return nil
})
Expand Down
4 changes: 2 additions & 2 deletions bob/bobfile/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func (b *Bobfile) VerifyAfter() error {
func (b *Bobfile) verifyBefore() (err error) {
defer errz.Recover(&err)

err = b.BTasks.VerifyDuplicateTargets()
errz.Fatal(err)
// err = b.BTasks.VerifyDuplicateTargets()
// errz.Fatal(err)

for _, task := range b.BTasks {
err = task.VerifyBefore()
Expand Down
20 changes: 16 additions & 4 deletions bob/nix_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,34 @@ func (n *NixBuilder) BuildNixDependencies(ag *bobfile.Bobfile, buildTasksInPipel
return usererror.Wrap(fmt.Errorf("nix is not installed on your system. Get it from %s", nix.DownloadURl()))
}

// maps nix dependecies to nixShellEnv
schellEnvCache := make(map[string][]string)

// Resolve nix storePaths from dependencies
// and rewrite the affected tasks.
for _, name := range buildTasksInPipeline {
t := ag.BTasks[name]

t.SetNixpkgs(ag.Nixpkgs)

// construct used dependencies for this task
var deps []nix.Dependency
deps = append(deps, t.Dependencies()...)
deps = nix.UniqueDeps(deps)

t.SetNixpkgs(ag.Nixpkgs)

nixShellEnv, err := n.BuildEnvironment(deps, ag.Nixpkgs)
hash, err := nix.HashDependencies(deps)
errz.Fatal(err)
t.SetEnv(envutil.Merge(nixShellEnv, t.Env()))

env, ok := schellEnvCache[hash]
if !ok {
// build the environment using nix-shell
nixShellEnv, err := n.BuildEnvironment(deps, ag.Nixpkgs)
errz.Fatal(err)
env = envutil.Merge(nixShellEnv, t.Env())
t.SetEnv(env)
}

schellEnvCache[hash] = env
ag.BTasks[name] = t
}

Expand Down
132 changes: 102 additions & 30 deletions bob/playbook/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,69 +24,141 @@ func (p *Playbook) Build(ctx context.Context) (err error) {

// Setup worker pool and queue.
workers := p.maxParallel
queue := make(chan *bobtask.Task)
workerQueues := []chan *bobtask.Task{}
workerAvailabilityQueue := make(chan int, 1000)

boblog.Log.Info(fmt.Sprintf("Using %d workers", workers))

processing := sync.WaitGroup{}
runningWorkers := sync.WaitGroup{}

var once sync.Once
shutdownWorkers := func() {
once.Do(func() {
// Intitate gracefull shutdown of all workers
// by closing their queues.
for _, wq := range workerQueues {
close(wq)
}
})
}

var shutdown bool
var shutdownM sync.Mutex
shutdownAvailabilityQueue := func() {
once.Do(func() {
shutdownM.Lock()
defer shutdownM.Unlock()
shutdown = true
close(workerAvailabilityQueue)
})
}

// Start the workers which listen on task queue
for i := 0; i < workers; i++ {

// create workload queue for this worker
queue := make(chan *bobtask.Task)
workerQueues = append(workerQueues, queue)

runningWorkers.Add(1)
go func(workerID int) {
// initially signal availability to receive workload
workerAvailabilityQueue <- workerID

for t := range queue {
processing.Add(1)

boblog.Log.V(5).Info(fmt.Sprintf("RUNNING task %s on worker %d ", t.Name(), workerID))

err := p.build(ctx, t)
if err != nil {
processingErrorsMutex.Lock()
processingErrors = append(processingErrors, fmt.Errorf("(worker) [task: %s], %w", t.Name(), err))
processingErrorsMutex.Unlock()

shutdownAvailabilityQueue()

// Any error occurred during a build puts the
// playbook in a done state. This prevents
// further tasks be queued for execution.
p.Done()
// p.Done()

// TODO: shutdown gracefully.
// close(workerAvailabilityQueue)
// for _, wq := range workerQeues {
// close(wq)
// }

}

processedTasks = append(processedTasks, t)
processing.Done()

// done with processing. signal availability.
workerAvailabilityQueue <- workerID
}
fmt.Printf("worker %d is shutting down\n", workerID)
runningWorkers.Done()
}(i + 1)
}

// Listen for tasks from the playbook and forward them to the worker pool
// listen for available workers
go func() {
c := p.TaskChannel()
for t := range c {
boblog.Log.V(5).Info(fmt.Sprintf("Sending task %s", t.Name()))

// blocks till a worker is available
queue <- t
// A buffer for workers which have
// no workload assigned.
workerBuffer := []int{}

for workerID := range workerAvailabilityQueue {
shutdownM.Lock()
if shutdown {
shutdownM.Unlock()
break
}
shutdownM.Unlock()

// initiate another playbook run,
// as there might be workers without
// assigned tasks left.
err := p.Play()
task, err := p.Next()
if err != nil {
if !errors.Is(err, ErrDone) {
processingErrorsMutex.Lock()
processingErrors = append(processingErrors, fmt.Errorf("(scheduler) [task: %s], %w", t.Name(), err))
processingErrorsMutex.Unlock()
if errors.Is(err, ErrDone) {
// exit
break
}

processingErrorsMutex.Lock()
processingErrors = append(
processingErrors,
fmt.Errorf("worker-availability-queue: unexpected error comming from Next(): %w", err),
)
processingErrorsMutex.Unlock()
break
}
}
}()

err = p.Play()
if err != nil {
return err
}
// Push workload to the worker or store the worker for later.
if task != nil {
// Send workload to worker
select {
case workerQueues[workerID-1] <- task:
default:
}

<-p.DoneChan()
processing.Wait()
// There might be more workload left.
// Reqeuing a worker from the buffer.
if len(workerBuffer) > 0 {
wID := workerBuffer[len(workerBuffer)-1]
workerBuffer = workerBuffer[:len(workerBuffer)-1]

// requeue a buffered worker
workerAvailabilityQueue <- wID
}
} else {
// No task yet ready to be worked on butt the playbook is not done yet.
// Therfore the worker is stored in a buffer and is requeued on
// the next change to the playbook.
workerBuffer = append(workerBuffer, workerID)
}
}
shutdownWorkers()
}()

close(queue)
runningWorkers.Wait()
close(workerAvailabilityQueue)

// iterate through tasks and logs
// skipped input files.
Expand All @@ -99,7 +171,7 @@ func (p *Playbook) Build(ctx context.Context) (err error) {
)
}

p.summary(processedTasks)
//p.summary(processedTasks)

if len(processingErrors) > 0 {
// Pass only the very first processing error.
Expand Down
Loading