-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathdispatcher.go
73 lines (64 loc) · 1.51 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package gochan
import (
"errors"
"math/rand"
"sync/atomic"
)
const (
dispatcherStatusOpen int32 = 0
dispatcherStatusClosed int32 = 1
)
// Dispatcher pool of gochan
type Dispatcher struct {
status int32
gcNum int
gcs []*gochan
}
// NewDispatcher new pool
// @gochanNum the number of gochan
// @bufferNum the buffer number of chan in each gochan
func NewDispatcher(gochanNum, bufferNum int) *Dispatcher {
logger.Infof("%d gochans starting with %d bufferNum chan buffer for each", gochanNum, bufferNum)
d := &Dispatcher{
gcNum: gochanNum,
gcs: make([]*gochan, gochanNum),
status: dispatcherStatusOpen,
}
for index := range d.gcs {
gc := newGochan(bufferNum)
gc.setUUID(index)
d.gcs[index] = gc
gc.run()
}
return d
}
// Dispatch dispatch task referenced by objID
func (d *Dispatcher) Dispatch(objID int, task TaskFunc) (err error) {
defer func() {
if e := recover(); e != nil {
err = errors.New("dispatcher closed")
}
}()
// dispatch at random if objID is less than 0
if objID < 0 {
objID = rand.Intn(d.gcNum)
}
// dispatching to closed channel is limited
if atomic.LoadInt32(&d.status) == dispatcherStatusClosed {
return errors.New("dispatcher closed")
}
index := objID % d.gcNum
d.gcs[index].tasksChan <- task
return
}
// Close close diapatcher
func (d *Dispatcher) Close() {
if atomic.LoadInt32(&d.status) == dispatcherStatusClosed {
return
}
atomic.StoreInt32(&d.status, dispatcherStatusClosed)
// close gochan
for _, gc := range d.gcs {
gc.dieChan <- struct{}{}
}
}