-
Notifications
You must be signed in to change notification settings - Fork 2
/
pool_test.go
executable file
·143 lines (128 loc) · 3.19 KB
/
pool_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package pool
import (
"sync"
"testing"
"time"
)
func MyTask(args ...interface{}) {
// fmt.Print(args, ` `)
time.Sleep(100 * time.Millisecond)
}
func TestDefault(t *testing.T) {
t.Log("Test default array blocking queue scheduled pool")
p := Default()
p, err := New(SynchronousQueue, 10, 10)
if nil != err || nil == p {
t.Error("Create default pool error.")
}
}
func TestNewPool(t *testing.T) {
t.Log("Test array blocking queue scheduled pool")
p, err := New(ArrayBlockingQueue, -1, 5)
if nil == err {
t.Error("Create new array blocking queue failed.", err)
return
}
p, err = New(QueueType(-1), 10, 10)
// 初始化一个队列容量10,并发数量5的线程池队列调度器
p, err = New(ArrayBlockingQueue, 10, 5)
if nil != err {
t.Error("Create new array blocking queue failed.", err)
return
}
result := make(map[int]bool)
// 一旦队列容量满, 后续加入的任务会直接失败返回
for i := 1; i < 50; i++ {
result[i] = p.Join(MyTask, i)
time.Sleep(10 * time.Millisecond)
}
t.Log("Array Blocking Result:", len(result))
}
func TestBlockingPool(t *testing.T) {
t.Log("Test linked blocking queue scheduled pool")
p, err := NewBlocking(0)
if nil == err {
t.Error("Create new linked blocking queue failed.", err)
return
}
// 初始化一个并发数量10的线程池队列调度器
p, err = NewBlocking(10)
if nil != err {
t.Error("Create new linked blocking queue failed.", err)
return
}
result := make(map[int]bool)
// 所有的任务都会在添加成功后再返回, 注意: 添加任务成功不等于任务执行成功
for i := 101; i < 150; i++ {
result[i] = p.Join(MyTask, i)
}
t.Log("Linked Blocking Result:", len(result))
}
func TestWaitAll(t *testing.T) {
p, err := New(ArrayBlockingQueue, 10, 5)
if nil != err {
t.Error("Create new array blocking queue failed.", err)
return
}
// 创建10个并行任务, 等待所有任务执行完成后再退出
wg := &sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
p.Add(Job{
Run: func(args ...interface{}) {
if x, ok := args[0].(int); ok {
time.Sleep(time.Duration(x*100) * time.Millisecond)
}
if wg, ok := args[1].(*sync.WaitGroup); ok {
wg.Done()
}
},
Args: []interface{}{i, wg},
})
}
wg.Wait()
t.Log("Wait 10 jobs finish.")
}
func TestGetAllResult(t *testing.T) {
t.Log("Sum 1 to 100 use 10 tasks")
p, err := NewBlocking(10)
if nil != err {
t.Error("Create new linked blocking queue failed.", err)
return
}
// 使用10个并行任务计算1到100的和,最后统计总和
ch := make(chan int)
for i := 1; i < 100; i += 10 {
p.Join(func(i int) Task {
return func(args ...interface{}) {
s := 0
for j := i; j < (i + 10); j++ {
s += j
}
ch <- s
}
}(i))
}
sum, count := 0, 0
for s := range ch {
sum += s
if count++; count >= 10 {
break
}
}
close(ch)
t.Log("Sum 1 to 100 result:", sum)
}
func BenchmarkNewPool(b *testing.B) {
// 初始化一个队列容量10,并发数量5的线程池队列调度器
p, _ := New(ArrayBlockingQueue, 10, 5)
p.Join(func(n int) Task {
return func(args ...interface{}) {
sum := 0
for j := 0; j <= n; j++ {
sum += j
}
b.Logf("sum 0 to %d :\t%d\n", n, sum)
}
}(b.N))
}