Skip to content

Commit

Permalink
Support unlimited pool
Browse files Browse the repository at this point in the history
Fixes #90
  • Loading branch information
panjf2000 committed May 8, 2020
1 parent 88b5a85 commit 1c53485
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
14 changes: 14 additions & 0 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,20 @@ func TestRebootNewPool(t *testing.T) {
wg.Wait()
}

func TestInfinitePool(t *testing.T) {
c := make(chan struct{})
p, _ := NewPool(-1)
_ = p.Submit(func() {
_ = p.Submit(func() {
<-c
})
})
c <- struct{}{}
if n := p.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}
}

func TestRestCodeCoverage(t *testing.T) {
_, err := NewPool(-1, WithExpiryDuration(-1))
t.Log(err)
Expand Down
28 changes: 17 additions & 11 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type Pool struct {
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int

// infinite indicates whether the pool capacity is limitless, an infinite pool is used to avoid
// potential the endless blocking caused by nested usage of a pool: submitting a task to pool
// which submits a new task to the same pool.
infinite bool

options *Options
}

Expand Down Expand Up @@ -92,10 +97,6 @@ func (p *Pool) periodicallyPurge() {

// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
if size <= 0 {
return nil, ErrInvalidPoolSize
}

opts := loadOptions(options...)

if expiry := opts.ExpiryDuration; expiry < 0 {
Expand All @@ -119,6 +120,9 @@ func NewPool(size int, options ...Option) (*Pool, error) {
task: make(chan func(), workerChanCap),
}
}
if size <= 0 {
p.infinite = true
}
if p.options.PreAlloc {
p.workers = newWorkerArray(loopQueueType, size)
} else {
Expand Down Expand Up @@ -199,8 +203,7 @@ func (p *Pool) decRunning() {
}

// retrieveWorker returns a available worker to run the tasks.
func (p *Pool) retrieveWorker() *goWorker {
var w *goWorker
func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
Expand All @@ -211,26 +214,29 @@ func (p *Pool) retrieveWorker() *goWorker {
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
} else if p.infinite {
p.lock.Unlock()
spawnWorker()
} else if p.Running() < p.Cap() {
p.lock.Unlock()
spawnWorker()
} else {
if p.options.Nonblocking {
p.lock.Unlock()
return nil
return
}
Reentry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return nil
return
}
p.blockingNum++
p.cond.Wait()
p.blockingNum--
if p.Running() == 0 {
p.lock.Unlock()
spawnWorker()
return w
return
}

w = p.workers.detach()
Expand All @@ -240,12 +246,12 @@ func (p *Pool) retrieveWorker() *goWorker {

p.lock.Unlock()
}
return w
return
}

// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *goWorker) bool {
if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() {
if atomic.LoadInt32(&p.state) == CLOSED || (!p.infinite && p.Running() > p.Cap()) {
return false
}
worker.recycleTime = time.Now()
Expand Down
11 changes: 5 additions & 6 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ func (p *PoolWithFunc) decRunning() {
}

// retrieveWorker returns a available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
var w *goWorkerWithFunc
func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorkerWithFunc)
w.run()
Expand All @@ -244,20 +243,20 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
} else {
if p.options.Nonblocking {
p.lock.Unlock()
return nil
return
}
Reentry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return nil
return
}
p.blockingNum++
p.cond.Wait()
p.blockingNum--
if p.Running() == 0 {
p.lock.Unlock()
spawnWorker()
return w
return
}
l := len(p.workers) - 1
if l < 0 {
Expand All @@ -268,7 +267,7 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
p.workers = p.workers[:l]
p.lock.Unlock()
}
return w
return
}

// revertWorker puts a worker back into free pool, recycling the goroutines.
Expand Down

0 comments on commit 1c53485

Please sign in to comment.