-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add loop queue * add loop queue * fix the bugs add loop queue move the worker queue to directory 按照新的接口实现 lifo 队列 添加新接口的环形队列实现 rename the slice queue 修复了 unlock 使用 queue 管理 goWorkerWithFunc 使用 dequeue 判断队列 add remainder 增加测试文件 循环队列需要一个空闲位 * remove interface{} * Refine the logic of sync.Pool * Add flowcharts of ants into READMEs * Add the installation about ants v2 * Renew the functional options in READMEs * Renew English and Chinese flowcharts * rename package name 移动 worker queue 位置 worker queue 都修改为私有接口 考虑到性能问题,把 interface{} 改回到 *goworker * 修改 releaseExpiry 和 releaseAll * remove files * fix some bug
- Loading branch information
1 parent
0efbda3
commit f0e2392
Showing
6 changed files
with
493 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package ants | ||
|
||
import "time" | ||
|
||
type loopQueue struct { | ||
items []*goWorker | ||
expiry []*goWorker | ||
head int | ||
tail int | ||
remainder int | ||
} | ||
|
||
func newLoopQueue(size int) *loopQueue { | ||
if size <= 0 { | ||
return nil | ||
} | ||
|
||
wq := loopQueue{ | ||
items: make([]*goWorker, size+1), | ||
expiry: make([]*goWorker, 0), | ||
head: 0, | ||
tail: 0, | ||
remainder: size + 1, | ||
} | ||
|
||
return &wq | ||
} | ||
|
||
func (wq *loopQueue) len() int { | ||
if wq.remainder == 0 { | ||
return 0 | ||
} | ||
|
||
return (wq.tail - wq.head + wq.remainder) % wq.remainder | ||
} | ||
|
||
func (wq *loopQueue) cap() int { | ||
if wq.remainder == 0 { | ||
return 0 | ||
} | ||
return wq.remainder - 1 | ||
} | ||
|
||
func (wq *loopQueue) isEmpty() bool { | ||
return wq.tail == wq.head | ||
} | ||
|
||
func (wq *loopQueue) enqueue(worker *goWorker) error { | ||
if wq.remainder == 0 { | ||
return ErrQueueLengthIsZero | ||
} | ||
if (wq.tail+1)%wq.remainder == wq.head { | ||
return ErrQueueIsFull | ||
} | ||
|
||
wq.items[wq.tail] = worker | ||
wq.tail = (wq.tail + 1) % wq.remainder | ||
|
||
return nil | ||
} | ||
|
||
func (wq *loopQueue) dequeue() *goWorker { | ||
if wq.len() == 0 { | ||
return nil | ||
} | ||
|
||
w := wq.items[wq.head] | ||
wq.head = (wq.head + 1) % wq.remainder | ||
|
||
return w | ||
} | ||
|
||
func (wq *loopQueue) releaseExpiry(duration time.Duration) chan *goWorker { | ||
stream := make(chan *goWorker) | ||
|
||
if wq.len() == 0 { | ||
close(stream) | ||
return stream | ||
} | ||
|
||
wq.expiry = wq.expiry[:0] | ||
expiryTime := time.Now().Add(-duration) | ||
|
||
for wq.head != wq.tail { | ||
if expiryTime.After(wq.items[wq.head].recycleTime) { | ||
wq.expiry = append(wq.expiry, wq.items[wq.head]) | ||
wq.head = (wq.head + 1) % wq.remainder | ||
continue | ||
} | ||
break | ||
} | ||
|
||
go func() { | ||
defer close(stream) | ||
|
||
for i := 0; i < len(wq.expiry); i++ { | ||
stream <- wq.expiry[i] | ||
} | ||
}() | ||
|
||
return stream | ||
} | ||
|
||
//func (wq *LoopQueue)search(compareTime time.Time, l, r int) int { | ||
// if l == r { | ||
// if wq.items[l].recycleTime.After(compareTime) { | ||
// return -1 | ||
// } else { | ||
// return l | ||
// } | ||
// } | ||
// | ||
// c := cap(wq.items) | ||
// mid := ((r-l+c)/2 + l) % c | ||
// if mid == l { | ||
// return wq.search(compareTime, l, l) | ||
// } else if wq.items[mid].recycleTime.After(compareTime) { | ||
// return wq.search(compareTime, l, mid-1) | ||
// } else { | ||
// return wq.search(compareTime, mid+1, r) | ||
// } | ||
//} | ||
|
||
func (wq *loopQueue) releaseAll() { | ||
if wq.len() == 0 { | ||
return | ||
} | ||
|
||
for wq.head != wq.tail { | ||
wq.items[wq.head].task <- nil | ||
wq.head = (wq.head + 1) % wq.remainder | ||
} | ||
wq.items = wq.items[:0] | ||
wq.remainder = 0 | ||
wq.head = 0 | ||
wq.tail = 0 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package ants | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestNewLoopQueue(t *testing.T) { | ||
size := 100 | ||
q := newLoopQueue(size) | ||
if q.len() != 0 { | ||
t.Fatalf("Len error") | ||
} | ||
|
||
if q.cap() != size { | ||
t.Fatalf("Cap error") | ||
} | ||
|
||
if !q.isEmpty() { | ||
t.Fatalf("IsEmpty error") | ||
} | ||
|
||
if q.dequeue() != nil { | ||
t.Fatalf("Dequeue error") | ||
} | ||
} | ||
|
||
func TestLoopQueue(t *testing.T) { | ||
size := 10 | ||
q := newLoopQueue(size) | ||
|
||
for i := 0; i < 5; i++ { | ||
err := q.enqueue(&goWorker{recycleTime: time.Now()}) | ||
if err != nil { | ||
break | ||
} | ||
} | ||
|
||
if q.len() != 5 { | ||
t.Fatalf("Len error") | ||
} | ||
|
||
v := q.dequeue() | ||
t.Log(v) | ||
|
||
if q.len() != 4 { | ||
t.Fatalf("Len error") | ||
} | ||
|
||
time.Sleep(time.Second) | ||
|
||
for i := 0; i < 6; i++ { | ||
err := q.enqueue(&goWorker{recycleTime: time.Now()}) | ||
if err != nil { | ||
break | ||
} | ||
} | ||
|
||
if q.len() != 10 { | ||
t.Fatalf("Len error") | ||
} | ||
|
||
err := q.enqueue(&goWorker{recycleTime: time.Now()}) | ||
if err == nil { | ||
t.Fatalf("Enqueue error") | ||
} | ||
|
||
q.releaseExpiry(time.Second) | ||
|
||
if q.len() != 6 { | ||
t.Fatalf("Len error: %d", q.len()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package ants | ||
|
||
import ( | ||
"errors" | ||
"time" | ||
) | ||
|
||
var ( | ||
ErrQueueIsFull = errors.New("the queue is full") | ||
ErrQueueLengthIsZero = errors.New("the queue length is zero") | ||
) | ||
|
||
type workerQueue interface { | ||
len() int | ||
cap() int | ||
isEmpty() bool | ||
enqueue(worker *goWorker) error | ||
dequeue() *goWorker | ||
releaseExpiry(duration time.Duration) chan *goWorker | ||
releaseAll() | ||
} | ||
|
||
type queueType int | ||
|
||
const ( | ||
stackType queueType = 1 << iota | ||
loopQueueType | ||
) | ||
|
||
func newQueue(qType queueType, size int) workerQueue { | ||
switch qType { | ||
case stackType: | ||
return newWorkerStack(size) | ||
case loopQueueType: | ||
return newLoopQueue(size) | ||
default: | ||
return newWorkerStack(size) | ||
} | ||
} |
Oops, something went wrong.