-
Notifications
You must be signed in to change notification settings - Fork 1
/
runner.go
142 lines (114 loc) · 2.52 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package eventbus
import (
"sync"
"sync/atomic"
"time"
)
type PorcessFn struct {
currentState State
chFn chan FnCallback
currentNrFnWorkers int32
semaphore *Semaphore
limitTicker *time.Ticker
ticker *time.Ticker
group *sync.WaitGroup
mtx *sync.Mutex
maxTime time.Duration
isStoped bool
}
func NewRunnerFn(maxTime time.Duration, limit int) *PorcessFn {
return &PorcessFn{
ticker: time.NewTicker(time.Nanosecond * 1),
limitTicker: time.NewTicker(maxTime),
maxTime: maxTime,
mtx: &sync.Mutex{},
group: &sync.WaitGroup{},
chFn: make(chan FnCallback),
semaphore: NewSemaphore(limit),
}
}
func (pf *PorcessFn) Stop() WaitCallback {
pf.isStoped = true
return func() error {
pf.group.Wait()
return nil
}
}
func (pf *PorcessFn) acquire() {
pf.group.Add(1)
pf.semaphore.Acquire()
atomic.AddInt32(&pf.currentNrFnWorkers, 1)
}
func (pf *PorcessFn) release() {
atomic.AddInt32(&pf.currentNrFnWorkers, -1)
pf.semaphore.Release()
pf.group.Done()
}
func (pf *PorcessFn) Next(fn FnCallback) error {
if fn == nil {
return ErrInvalidFnCallback
}
pf.mtx.Lock()
defer func() {
pf.limitTicker.Stop()
pf.ticker.Stop()
pf.mtx.Unlock()
}()
//Don't accept any new func to PorcessFn if one is already running
if pf.currentState.IsStilRunning() {
return ErrStillRunning
}
//Reset tickers
pf.limitTicker.Reset(pf.maxTime)
pf.ticker.Reset(time.Microsecond * 10)
//Set new state
pf.currentState = RUNNING
isDone := false
if atomic.LoadInt32(&pf.currentNrFnWorkers) == 0 || pf.currentState.IsOverMaxTime() {
//Block any new FnCallack if limit is reached
pf.acquire()
go func() {
var (
fn FnCallback
ticker = time.NewTicker(time.Millisecond * 10)
)
defer func() {
ticker.Stop()
pf.release()
}()
for {
select {
case fn = <-pf.chFn:
isDone = false
fn()
isDone = true
if atomic.LoadInt32(&pf.currentNrFnWorkers) == 1 {
//Set current state done when is only one worker to be able to accept new FnCallback
pf.currentState = DONE
} else if atomic.LoadInt32(&pf.currentNrFnWorkers) > 1 {
return
}
break
case <-ticker.C:
if pf.isStoped || atomic.LoadInt32(&pf.currentNrFnWorkers) > 1 {
return
}
}
}
}()
}
pf.chFn <- fn
//Check for done/overTime
for {
select {
case <-pf.limitTicker.C:
pf.currentState = OVERMAXTIME
return nil
case <-pf.ticker.C:
if isDone {
pf.currentState = DONE
return nil
}
}
}
}