Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Oct 4, 2019
1 parent 2b9f35b commit b0ec510
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 70 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ A goroutine pool for Go
<a title="Godoc for ants" target="_blank" href="https://godoc.org/github.com/panjf2000/ants"><img src="https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square"></a>
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/release/panjf2000/ants.svg?style=flat-square"></a>
<a title="License" target="_blank" href="https://opensource.org/licenses/mit-license.php"><img src="https://img.shields.io/aur/license/pac?style=flat-square"></a>
<a title="Awesome" target="_blank" href="https://github.com/avelino/awesome-go"><img src="https://awesome.re/badge-flat2.svg"></a>
<a title="Mentioned in Awesome Go" target="_blank" href="https://github.com/avelino/awesome-go"><img src="https://awesome.re/mentioned-badge-flat.svg"></a>
</p>

# [[中文](README_ZH.md)]
Expand Down
2 changes: 1 addition & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ A goroutine pool for Go
<a title="Godoc for ants" target="_blank" href="https://godoc.org/github.com/panjf2000/ants"><img src="https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square"></a>
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/release/panjf2000/ants.svg?style=flat-square"></a>
<a title="License" target="_blank" href="https://opensource.org/licenses/mit-license.php"><img src="https://img.shields.io/aur/license/pac?style=flat-square"></a>
<a title="Awesome" target="_blank" href="https://github.com/avelino/awesome-go"><img src="https://awesome.re/badge-flat2.svg"></a>
<a title="Mentioned in Awesome Go" target="_blank" href="https://github.com/avelino/awesome-go"><img src="https://awesome.re/mentioned-badge-flat.svg"></a>
</p>

# [[英文](README.md)]
Expand Down
18 changes: 13 additions & 5 deletions ants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (
)

const (
// DEFAULT_ANTS_POOL_SIZE is the default capacity for a default goroutine pool.
DEFAULT_ANTS_POOL_SIZE = math.MaxInt32
// DefaultAntsPoolSize is the default capacity for a default goroutine pool.
DefaultAntsPoolSize = math.MaxInt32

// DEFAULT_CLEAN_INTERVAL_TIME is the interval time to clean up goroutines.
DEFAULT_CLEAN_INTERVAL_TIME = 1
// DefaultCleanIntervalTime is the interval time to clean up goroutines.
DefaultCleanIntervalTime = time.Second

// CLOSED represents that the pool is closed.
CLOSED = 1
Expand Down Expand Up @@ -77,11 +77,13 @@ var (
}()

// Init a instance pool when importing ants.
defaultAntsPool, _ = NewPool(DEFAULT_ANTS_POOL_SIZE)
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
)

// Option represents the optional function.
type Option func(opts *Options)

// Options contains all options which will be applied when instantiating a ants pool.
type Options struct {
// ExpiryDuration set the expired time (second) of every worker.
ExpiryDuration time.Duration
Expand All @@ -103,36 +105,42 @@ type Options struct {
PanicHandler func(interface{})
}

// WithOptions accepts the whole options config.
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}

// WithExpiryDuration sets up the interval time of cleaning up goroutines.
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}

// WithPreAlloc indicates whether it should malloc for workers.
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}

// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}

// WithNonblocking indicates that pool will return nil when there is no available workers.
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}

// WithPanicHandler sets up panic handler.
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
Expand Down
22 changes: 11 additions & 11 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
)

const (
_ = 1 << (10 * iota)
//KiB // 1024
_ = 1 << (10 * iota)
KiB // 1024
MiB // 1048576
//GiB // 1073741824
//TiB // 1099511627776 (超过了int32的范围)
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestAntsPoolGetWorkerFromCache(t *testing.T) {
for i := 0; i < AntsSize; i++ {
_ = p.Submit(demoFunc)
}
time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
time.Sleep(2 * ants.DefaultCleanIntervalTime)
_ = p.Submit(demoFunc)
t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{}
Expand All @@ -161,7 +161,7 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur)
}
time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
time.Sleep(2 * ants.DefaultCleanIntervalTime)
_ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
Expand All @@ -178,7 +178,7 @@ func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur)
}
time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
time.Sleep(2 * ants.DefaultCleanIntervalTime)
_ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
Expand Down Expand Up @@ -369,7 +369,7 @@ func TestPurge(t *testing.T) {
}
defer p.Release()
_ = p.Submit(demoFunc)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
Expand All @@ -379,7 +379,7 @@ func TestPurge(t *testing.T) {
}
defer p1.Release()
_ = p1.Invoke(1)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
Expand All @@ -392,7 +392,7 @@ func TestPurgePreMalloc(t *testing.T) {
}
defer p.Release()
_ = p.Submit(demoFunc)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
Expand All @@ -402,7 +402,7 @@ func TestPurgePreMalloc(t *testing.T) {
}
defer p1.Release()
_ = p1.Invoke(1)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestRestCodeCoverage(t *testing.T) {
for i := 0; i < n; i++ {
_ = p.Invoke(Param)
}
time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
time.Sleep(ants.DefaultCleanIntervalTime)
t.Logf("pool with func, capacity:%d", p.Cap())
t.Logf("pool with func, running workers number:%d", p.Running())
t.Logf("pool with func, free workers number:%d", p.Free())
Expand All @@ -624,7 +624,7 @@ func TestRestCodeCoverage(t *testing.T) {
for i := 0; i < n; i++ {
_ = ppremWithFunc.Invoke(Param)
}
time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
time.Sleep(ants.DefaultCleanIntervalTime)
t.Logf("pre-malloc pool with func, capacity:%d", ppremWithFunc.Cap())
t.Logf("pre-malloc pool with func, running workers number:%d", ppremWithFunc.Running())
t.Logf("pre-malloc pool with func, free workers number:%d", ppremWithFunc.Free())
Expand Down
5 changes: 3 additions & 2 deletions spinlock.go → internal/spinlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.

package ants
package internal

import (
"runtime"
Expand All @@ -22,6 +22,7 @@ func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}

func SpinLock() sync.Locker {
// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
return new(spinLock)
}
40 changes: 16 additions & 24 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/panjf2000/ants/v2/internal"
)

// Pool accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
Expand Down Expand Up @@ -134,29 +136,19 @@ func NewPool(size int, options ...Option) (*Pool, error) {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second
opts.ExpiryDuration = DefaultCleanIntervalTime
}

var p *Pool
p := &Pool{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: internal.NewSpinLock(),
}
if opts.PreAlloc {
p = &Pool{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
workers: make([]*goWorker, 0, size),
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: SpinLock(),
}
} else {
p = &Pool{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: SpinLock(),
}
p.workers = make([]*goWorker, 0, size)
}
p.cond = sync.NewCond(p.lock)

Expand All @@ -173,11 +165,11 @@ func (p *Pool) Submit(task func()) error {
if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed
}
if w := p.retrieveWorker(); w == nil {
var w *goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
} else {
w.task <- task
}
w.task <- task
return nil
}

Expand All @@ -188,7 +180,7 @@ func (p *Pool) Running() int {

// Free returns the available goroutines to work.
func (p *Pool) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
return p.Cap() - p.Running()
}

// Cap returns the capacity of this pool.
Expand Down
43 changes: 17 additions & 26 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/panjf2000/ants/v2/internal"
)

// PoolWithFunc accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
Expand Down Expand Up @@ -141,31 +143,20 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second
opts.ExpiryDuration = DefaultCleanIntervalTime
}

var p *PoolWithFunc
p := &PoolWithFunc{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
poolFunc: pf,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: internal.NewSpinLock(),
}
if opts.PreAlloc {
p = &PoolWithFunc{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
poolFunc: pf,
workers: make([]*goWorkerWithFunc, 0, size),
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: SpinLock(),
}
} else {
p = &PoolWithFunc{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
poolFunc: pf,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: SpinLock(),
}
p.workers = make([]*goWorkerWithFunc, 0, size)
}
p.cond = sync.NewCond(p.lock)

Expand All @@ -182,11 +173,11 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed
}
if w := p.retrieveWorker(); w == nil {
var w *goWorkerWithFunc
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
} else {
w.args <- args
}
w.args <- args
return nil
}

Expand All @@ -197,7 +188,7 @@ func (p *PoolWithFunc) Running() int {

// Free returns a available goroutines to work.
func (p *PoolWithFunc) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
return p.Cap() - p.Running()
}

// Cap returns the capacity of this pool.
Expand Down

0 comments on commit b0ec510

Please sign in to comment.