Skip to content

Commit

Permalink
Merge pull request #14354 from vrothberg/fix-14351
Browse files Browse the repository at this point in the history
work queue: simplify and use a wait group
  • Loading branch information
openshift-merge-robot authored May 25, 2022
2 parents b13184d + 4a447a2 commit da26439
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 36 deletions.
14 changes: 6 additions & 8 deletions libpod/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -87,8 +88,8 @@ type Runtime struct {
lockManager lock.Manager

// Worker
workerShutdown chan bool
workerChannel chan func()
workerChannel chan func()
workerGroup sync.WaitGroup

// syslog describes whenever logrus should log to the syslog as well.
// Note that the syslog hook will be enabled early in cmd/podman/syslog_linux.go
Expand Down Expand Up @@ -823,12 +824,9 @@ func (r *Runtime) Shutdown(force bool) error {
return define.ErrRuntimeStopped
}

if r.workerShutdown != nil {
// Signal the worker routine to shutdown. The routine will
// process all pending work items and then read from the
// channel; we're blocked until all work items have been
// processed.
r.workerShutdown <- true
if r.workerChannel != nil {
r.workerGroup.Wait()
close(r.workerChannel)
}

r.valid = false
Expand Down
33 changes: 5 additions & 28 deletions libpod/runtime_worker.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,17 @@
package libpod

import (
"time"
)

func (r *Runtime) startWorker() {
if r.workerChannel == nil {
r.workerChannel = make(chan func(), 1)
r.workerShutdown = make(chan bool)
}
r.workerChannel = make(chan func(), 10)
go func() {
for {
// Make sure to read all workers before
// checking if we're about to shutdown.
for len(r.workerChannel) > 0 {
w := <-r.workerChannel
w()
}

select {
// We'll read from the shutdown channel only when all
// items above have been processed.
//
// (*Runtime).Shutdown() will block until until the
// item is read.
case <-r.workerShutdown:
return

default:
time.Sleep(100 * time.Millisecond)
}
for w := range r.workerChannel {
w()
r.workerGroup.Done()
}
}()
}

func (r *Runtime) queueWork(f func()) {
r.workerGroup.Add(1)
go func() {
r.workerChannel <- f
}()
Expand Down

0 comments on commit da26439

Please sign in to comment.