Skip to content

Commit

Permalink
fix #1 Scheduler has deadlock on worker stop, if worker has not yet s…
Browse files Browse the repository at this point in the history
…tarted execute
  • Loading branch information
yougg committed Sep 28, 2017
1 parent b9a4328 commit e07bb70
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 63 deletions.
67 changes: 29 additions & 38 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pool

import (
"errors"
"time"
)

// 调度器接口
Expand All @@ -12,15 +11,18 @@ type Scheduler interface {
Add(job Job) bool

// 添加任务到线程池队列中
// 如果队列已满,则返回失败
// 当队列容量已满时:
// 有界队列, 超过队列容量时新工作入队直接失败
// 无界队列, 新工作入队时一直等待直到入队成功
Join(t Task, args ...interface{}) bool
schedule()
}

// 调度器接口实现, 线程池
type pool struct {
queue Queue
workers []*worker
queue Queue
inputForWorkers Queue
workers chan worker
}

// 添加工作到线程池队列中
Expand All @@ -41,30 +43,11 @@ func (p *pool) Join(t Task, args ...interface{}) bool {
// 开启并发线程调度
// 线程池工作者worker的数量会动态增减
func (p *pool) schedule() {
duration := time.Second
workingNum := 0
for ; ; time.Sleep(duration) {
switch {
case p.queue.length() > 0 && workingNum < len(p.workers):
for _, w := range p.workers {
if w.idle {
w.start()
duration = 100 * time.Millisecond
workingNum++
break
}
}
case p.queue.length() == 0 && workingNum > 0:
for _, w := range p.workers {
if w.idle {
w.stop()
duration = 10 * time.Second
workingNum--
break
}
}
default:
duration = time.Second
for {
p.inputForWorkers.add(<-p.queue.poll())
select {
case idleWorker := <-p.workers:
idleWorker.start()
}
}
}
Expand Down Expand Up @@ -95,24 +78,32 @@ func New(qType QueueType, queueCap, workerNum int) (s Scheduler, err error) {
return nil, errors.New("The max queue capcity or max worker number are error.")
}

var q Queue
bq := make(basequeue, queueCap)
var dataQueue Queue
var dispatcherQueue Queue
//dispatcher can hold more than number of workers
//i.e when all workers busy, act like a buffer
dataq := make(basequeue, queueCap)
dispatchq := make(basequeue, queueCap*10)
switch qType {
case SynchronousQueue, PriorityBlockingQueue:
// TODO 功能暂时未实现
// q = synchronousqueue{bq}
// q = priorityqueue{bq}
dataQueue = dataq
dispatcherQueue = dispatchq
case ArrayBlockingQueue:
q = arrayqueue{bq}
dataQueue = arrayqueue{dataq}
dispatcherQueue = arrayqueue{dispatchq}
case LinkedBlockingQueue:
q = linkedqueue{bq}
dataQueue = linkedqueue{dataq}
dispatcherQueue = linkedqueue{dispatchq}
default:
q = bq
dataQueue = dataq
dispatcherQueue = dispatchq
}

workers := newWorkers(workerNum, dataQueue)
s = &pool{
queue: q,
workers: newWorkers(workerNum, q),
queue: dispatcherQueue,
workers: workers,
inputForWorkers: dataQueue,
}

go s.schedule()
Expand Down
24 changes: 21 additions & 3 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ func MyTask(args ...interface{}) {
time.Sleep(100 * time.Millisecond)
}

func TestDefault(t *testing.T) {
t.Log("Test default array blocking queue scheduled pool")

p := Default()

p, err := New(SynchronousQueue, 10, 10)
if nil != err || nil == p {
t.Error("Create default pool error.")
}
}

func TestNewPool(t *testing.T) {
t.Log("Test array blocking queue scheduled pool")

Expand All @@ -20,6 +31,8 @@ func TestNewPool(t *testing.T) {
return
}

p, err = New(QueueType(-1), 10, 10)

// 初始化一个队列容量10,并发数量5的线程池队列调度器
p, err = New(ArrayBlockingQueue, 10, 5)
if nil != err {
Expand All @@ -32,13 +45,18 @@ func TestNewPool(t *testing.T) {
result[i] = p.Join(MyTask, i)
time.Sleep(10 * time.Millisecond)
}
t.Log("Array Blocking Result:", result)
t.Log("Array Blocking Result:", len(result))
}

func TestBlockingPool(t *testing.T) {
t.Log("Test linked blocking queue scheduled pool")
p, err := NewBlocking(0)
if nil == err {
t.Error("Create new linked blocking queue failed.", err)
return
}
// 初始化一个并发数量10的线程池队列调度器
p, err := NewBlocking(10)
p, err = NewBlocking(10)
if nil != err {
t.Error("Create new linked blocking queue failed.", err)
return
Expand All @@ -48,7 +66,7 @@ func TestBlockingPool(t *testing.T) {
for i := 101; i < 150; i++ {
result[i] = p.Join(MyTask, i)
}
t.Log("Linked Blocking Result:", result)
t.Log("Linked Blocking Result:", len(result))
}

func TestWaitAll(t *testing.T) {
Expand Down
39 changes: 17 additions & 22 deletions worker.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,45 @@
package pool

import "time"

// 任务执行者
type worker struct {
idle bool // worker是否空闲
end chan bool // worker是否停止
q Queue // worker接入的队列
jobConsumeQueue Queue // worker接入的队列
notifyChan chan worker
}

// 开工
func (w *worker) start() {
go w.execute()
}

// 停工
func (w *worker) stop() {
w.end <- true
}

// 循环执行任务
func (w *worker) execute() {
c := time.NewTicker(time.Second).C
killNextCycle := make(chan bool, 1)
for {
select {
case job := <-w.q.poll():
w.idle = false
case job := <-w.jobConsumeQueue.poll():
if nil != job.Run {
job.Run(job.Args...)
}
case <-c:
w.idle = true
case <-w.end:
case <-killNextCycle:
w.notifyChan <- *w
return
}
//situation when u are spawned but some other worker read your data
//hence do not wait forever for data, if you dont have it now,
//try to die in next cycle, cos select is random when all ready
killNextCycle <- true
}
}

// 初始化指定数量的worker, 并接入队列
func newWorkers(num int, q Queue) (ws []*worker) {
func newWorkers(num int, q Queue) (workers chan worker) {
workers = make(chan worker, num)
for i := 0; i < num; i++ {
ws = append(ws, &worker{
idle: true,
end: make(chan bool),
q: q,
})
worker := worker{
jobConsumeQueue: q,
notifyChan: workers,
}
workers <- worker
}
return
}

0 comments on commit e07bb70

Please sign in to comment.