Skip to content

Commit

Permalink
Only attempt to flush queue if the underlying worker pool is not fini…
Browse files Browse the repository at this point in the history
…shed (#18593)

* Only attempt to flush queue if the underlying worker pool is not finished

There is a possible race whereby a worker pool could be cancelled but yet the
underlying queue is not empty. This will lead to flush-all cycling because it
cannot empty the pool.

Signed-off-by: Andrew Thornton <[email protected]>

* Apply suggestions from code review

Co-authored-by: Gusted <[email protected]>

Co-authored-by: Gusted <[email protected]>
  • Loading branch information
zeripath and Gusted authored Feb 5, 2022
1 parent a51d211 commit 7ba1b71
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
11 changes: 11 additions & 0 deletions modules/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type ManagedPool interface {
BoostWorkers() int
// SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
// Done returns a channel that will be closed when the Pool's baseCtx is closed
Done() <-chan struct{}
}

// ManagedQueueList implements the sort.Interface
Expand Down Expand Up @@ -211,6 +213,15 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
continue
}
}
if pool, ok := mq.Managed.(ManagedPool); ok {
// No point into flushing pools when their base's ctx is already done.
select {
case <-pool.Done():
wg.Done()
continue
default:
}
}

allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {
Expand Down
5 changes: 5 additions & 0 deletions modules/queue/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
return pool
}

// Done returns when this worker pool's base context has been cancelled
func (p *WorkerPool) Done() <-chan struct{} {
return p.baseCtx.Done()
}

// Push pushes the data to the internal channel
func (p *WorkerPool) Push(data Data) {
atomic.AddInt64(&p.numInQueue, 1)
Expand Down

0 comments on commit 7ba1b71

Please sign in to comment.