-
Notifications
You must be signed in to change notification settings - Fork 25
/
gopool_test.go
142 lines (119 loc) · 3.26 KB
/
gopool_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package gopool_test
import (
"errors"
"sync"
"sync/atomic"
"time"
"github.com/daniel-hutao/spinlock"
"github.com/devchat-ai/gopool"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Gopool", func() {
Describe("With Mutex", func() {
It("should work correctly", func() {
pool := gopool.NewGoPool(100, gopool.WithLock(new(sync.Mutex)))
defer pool.Release()
for i := 0; i < 1000; i++ {
pool.AddTask(func() (interface{}, error) {
time.Sleep(10 * time.Millisecond)
return nil, nil
})
}
pool.Wait()
})
})
Describe("With SpinLock", func() {
It("should work correctly", func() {
pool := gopool.NewGoPool(100, gopool.WithLock(new(spinlock.SpinLock)))
defer pool.Release()
for i := 0; i < 1000; i++ {
pool.AddTask(func() (interface{}, error) {
time.Sleep(10 * time.Millisecond)
return nil, nil
})
}
pool.Wait()
})
})
Describe("With Error", func() {
It("should work correctly", func() {
var errTaskError = errors.New("task error")
pool := gopool.NewGoPool(100, gopool.WithErrorCallback(func(err error) {
Expect(err).To(Equal(errTaskError))
}))
defer pool.Release()
for i := 0; i < 1000; i++ {
pool.AddTask(func() (interface{}, error) {
return nil, errTaskError
})
}
pool.Wait()
})
})
Describe("With Result", func() {
It("should work correctly", func() {
var expectedResult = "task result"
pool := gopool.NewGoPool(100, gopool.WithResultCallback(func(result interface{}) {
Expect(result).To(Equal(expectedResult))
}))
defer pool.Release()
for i := 0; i < 1000; i++ {
pool.AddTask(func() (interface{}, error) {
return expectedResult, nil
})
}
pool.Wait()
})
})
Describe("With Retry", func() {
It("should work correctly", func() {
var retryCount = int32(3)
var taskError = errors.New("task error")
var taskRunCount int32 = 0
pool := gopool.NewGoPool(100, gopool.WithRetryCount(int(retryCount)))
defer pool.Release()
pool.AddTask(func() (interface{}, error) {
atomic.AddInt32(&taskRunCount, 1)
if taskRunCount <= retryCount {
return nil, taskError
}
return nil, nil
})
pool.Wait()
Expect(atomic.LoadInt32(&taskRunCount)).To(Equal(retryCount + 1))
})
})
Describe("With Timeout", func() {
It("should work correctly", func() {
var taskRun int32
pool := gopool.NewGoPool(100, gopool.WithTimeout(100*time.Millisecond), gopool.WithErrorCallback(func(err error) {
Expect(err.Error()).To(Equal("task timed out"))
atomic.StoreInt32(&taskRun, 1)
}))
defer pool.Release()
pool.AddTask(func() (interface{}, error) {
time.Sleep(200 * time.Millisecond)
return nil, nil
})
pool.Wait()
Expect(atomic.LoadInt32(&taskRun)).To(Equal(int32(1)))
})
})
Describe("With MinWorkers", func() {
It("should work correctly", func() {
var minWorkers = 50
pool := gopool.NewGoPool(100, gopool.WithMinWorkers(minWorkers))
defer pool.Release()
Expect(pool.GetWorkerCount()).To(Equal(minWorkers))
})
})
Describe("With TaskQueueSize", func() {
It("should work correctly", func() {
size := 5000
pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(size))
defer pool.Release()
Expect(pool.GetTaskQueueSize()).To(Equal(size))
})
})
})