Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent deadlock when purging idle worker during task submission #34

Merged
merged 2 commits into from
Aug 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
name: Test
strategy:
matrix:
go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x]
go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x, 1.19.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -21,7 +21,7 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2
- name: Test
run: go test -race -v ./
run: make test
codecov:
name: Upload coverage report to Codecov
runs-on: ubuntu-latest
Expand All @@ -33,9 +33,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2
- name: Test
run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./
run: make coverage
- uses: codecov/codecov-action@v2
with:
files: ./coverage.txt
files: coverage.out
fail_ci_if_error: true
verbose: true
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
test:
go test -race -v ./

coverage:
go test -race -v -coverprofile=coverage.out -covermode=atomic ./
2 changes: 1 addition & 1 deletion examples/dynamic_size/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/dynamic_size

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion examples/fixed_size/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/fixed_size

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion examples/group_context/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/group_context

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion examples/pool_context/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/pool_context

go 1.18
go 1.19

require github.com/alitto/pond v1.7.1

Expand Down
2 changes: 1 addition & 1 deletion examples/prometheus/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/fixed_size

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion examples/task_group/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/task_group

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/alitto/pond

go 1.18
go 1.19
134 changes: 66 additions & 68 deletions pond.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {
p.tasksWaitGroup.Wait()
}

// Reset worker count
p.resetWorkerCount()

// Terminate all workers & purger goroutine
p.contextCancel()

Expand All @@ -380,42 +383,49 @@ func (p *WorkerPool) purge() {
select {
// Timed out waiting for any activity to happen, attempt to stop an idle worker
case <-idleTicker.C:
p.stopIdleWorker()
p.maybeStopIdleWorker()
// Pool context was cancelled, exit
case <-p.context.Done():
return
}
}
}

// stopIdleWorker attempts to stop an idle worker by sending it a nil task
func (p *WorkerPool) stopIdleWorker() {
if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers && !p.Stopped() {
p.tasks <- nil
// maybeStopIdleWorker attempts to stop an idle worker by sending it a nil task
func (p *WorkerPool) maybeStopIdleWorker() bool {

if decremented := p.decrementWorkerCount(); !decremented {
return false
}

// Send a nil task to stop an idle worker
p.tasks <- nil

return true
}

// maybeStartWorker attempts to create a new worker goroutine to run the given task.
// If the worker pool has reached the maximum number of workers or there are idle workers,
// it will not create a new worker.
// it will not create a new one.
func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {

if ok := p.incrementWorkerCount(); !ok {
if incremented := p.incrementWorkerCount(); !incremented {
return false
}

if firstTask == nil {
// Worker starts idle
atomic.AddInt32(&p.idleWorkerCount, 1)
}

// Launch worker goroutine
go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)
go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask)

return true
}

// executeTask executes the given task and updates task-related counters
func (p *WorkerPool) executeTask(task func()) {
func (p *WorkerPool) executeTask(task func(), isFirstTask bool) {

defer func() {
if panic := recover(); panic != nil {
Expand All @@ -424,60 +434,90 @@ func (p *WorkerPool) executeTask(task func()) {

// Invoke panic handler
p.panicHandler(panic)

// Increment idle count
atomic.AddInt32(&p.idleWorkerCount, 1)
}
p.tasksWaitGroup.Done()
}()

// Decrement idle count
if !isFirstTask {
atomic.AddInt32(&p.idleWorkerCount, -1)
}

// Decrement waiting task count
atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))

// Execute task
task()

// Increment successful task count
atomic.AddUint64(&p.successfulTaskCount, 1)

// Increment idle count
atomic.AddInt32(&p.idleWorkerCount, 1)
}

func (p *WorkerPool) incrementWorkerCount() (incremented bool) {
func (p *WorkerPool) incrementWorkerCount() bool {

p.mutex.Lock()
defer p.mutex.Unlock()

runningWorkerCount := p.RunningWorkers()

// Reached max workers, do not create a new one
if runningWorkerCount >= p.maxWorkers {
return
return false
}

// Idle workers available, do not create a new one
if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 {
return
return false
}

// Execute the resizing strategy to determine if we should create more workers
if resize := p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers); !resize {
return false
}

// Increment worker count
atomic.AddInt32(&p.workerCount, 1)

// Increment wait group
p.workersWaitGroup.Add(1)

return true
}

func (p *WorkerPool) decrementWorkerCount() bool {

p.mutex.Lock()
defer p.mutex.Unlock()

// Execute the resizing strategy to determine if we can create more workers
incremented = p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers)
if p.IdleWorkers() <= 0 || p.RunningWorkers() <= p.minWorkers || p.Stopped() {
return false
}

if incremented {
// Increment worker count
atomic.AddInt32(&p.workerCount, 1)
// Decrement worker count
atomic.AddInt32(&p.workerCount, -1)

// Increment wait group
p.workersWaitGroup.Add(1)
}
// Decrement idle count
atomic.AddInt32(&p.idleWorkerCount, -1)

return
return true
}

func (p *WorkerPool) decrementWorkerCount() {
func (p *WorkerPool) resetWorkerCount() {

p.mutex.Lock()
defer p.mutex.Unlock()

// Decrement worker count
atomic.AddInt32(&p.workerCount, -1)
// Reset worker count
atomic.StoreInt32(&p.workerCount, 0)

// Decrement wait group
p.workersWaitGroup.Done()
// Reset idle count
atomic.StoreInt32(&p.idleWorkerCount, 0)
}

// Group creates a new task group
Expand Down Expand Up @@ -506,45 +546,3 @@ func (p *WorkerPool) GroupContext(ctx context.Context) (*TaskGroupWithContext, c
cancel: cancel,
}, ctx
}

// worker launches a worker goroutine
func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {

defer func() {
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)

// Handle normal exit
exitHandler()
}()

// We have received a task, execute it
if firstTask != nil {
taskExecutor(firstTask)

// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}

for {
select {
case <-context.Done():
// Pool context was cancelled, exit
return
case task, ok := <-tasks:
if task == nil || !ok {
// We have received a signal to quit
return
}

// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)

// We have received a task, execute it
taskExecutor(task)

// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}
}
}
29 changes: 28 additions & 1 deletion pond_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,32 @@ func TestPurgeAfterPoolStopped(t *testing.T) {

// Simulate purger goroutine attempting to stop a worker after tasks channel is closed
atomic.StoreInt32(&pool.stopped, 1)
pool.stopIdleWorker()
pool.maybeStopIdleWorker()
}

// See: https://github.com/alitto/pond/issues/33
func TestPurgeDuringSubmit(t *testing.T) {

pool := New(1, 1)

var doneCount int32

// Submit a task to ensure at least 1 worker is started
pool.SubmitAndWait(func() {
atomic.AddInt32(&doneCount, 1)
})

assertEqual(t, 1, pool.IdleWorkers())

// Stop an idle worker right before submitting another task
pool.maybeStopIdleWorker()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR and the prompt reaction!

How does this test validate issue #33, if maybeStopIdleWorker is synchronously called by the same go routine?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was entering a deadlock quite consistently (although not 100% of the time). maybeStopIdleWorker executes synchronously up to this point, where it sends a nil task to kill a worker goroutine asynchronously (buffered channel). In the previous version of the lib, the idle worker counter was being updated when the killed goroutine exited, so there was a small window of time during which this call would incorrectly return false, indicating there was an idle worker available.

pool.Submit(func() {
atomic.AddInt32(&doneCount, 1)
})

pool.StopAndWait()

assertEqual(t, int32(2), atomic.LoadInt32(&doneCount))
assertEqual(t, 0, pool.RunningWorkers())

}
35 changes: 35 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pond

import (
"context"
"sync"
)

// worker represents a worker goroutine
func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool)) {

// If provided, execute the first task immediately, before listening to the tasks channel
if firstTask != nil {
taskExecutor(firstTask, true)
}

defer func() {
waitGroup.Done()
}()

for {
select {
case <-context.Done():
// Pool context was cancelled, exit
return
case task, ok := <-tasks:
if task == nil || !ok {
// We have received a signal to exit
return
}

// We have received a task, execute it
taskExecutor(task, false)
}
}
}