forked from roadrunner-server/jobs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker_pool.go
111 lines (96 loc) · 2.65 KB
/
worker_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package jobs
import (
"context"
"sync"
"time"
jobsApi "github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"go.uber.org/zap"
)
type processor struct {
wg sync.WaitGroup
mu sync.Mutex
consumers *sync.Map
runners *map[string]struct{}
log *zap.Logger
queueCh chan *pjob
maxWorkers int
errs []error
}
type pjob struct {
jc jobsApi.Constructor
pipe jobsApi.Pipeline
queue jobsApi.Queue
cmdCh chan<- jobsApi.Commander
configKey string
timeout int
}
func newPipesProc(log *zap.Logger, consumers *sync.Map, runners *map[string]struct{}, maxWorkers int) *processor {
p := &processor{
log: log,
queueCh: make(chan *pjob, 100),
maxWorkers: maxWorkers,
consumers: consumers,
runners: runners,
wg: sync.WaitGroup{},
mu: sync.Mutex{},
errs: make([]error, 0, 1),
}
// start the processor
p.run()
return p
}
func (p *processor) run() {
for i := 0; i < p.maxWorkers; i++ {
go func() {
for job := range p.queueCh {
p.log.Debug("initializing driver", zap.String("pipeline", job.pipe.Name()), zap.String("driver", job.pipe.Driver()))
t := time.Now().UTC()
initializedDriver, err := job.jc.DriverFromConfig(job.configKey, job.queue, job.pipe, job.cmdCh)
if err != nil {
p.mu.Lock()
p.errs = append(p.errs, err)
p.mu.Unlock()
p.wg.Done()
p.log.Error("failed to initialize driver",
zap.String("pipeline", job.pipe.Name()),
zap.String("driver", job.pipe.Driver()),
zap.Error(err))
continue
}
// add a driver to the set of the consumers (name - pipeline name, value - associated driver)
p.consumers.Store(job.pipe.Name(), initializedDriver)
p.log.Debug("driver ready", zap.String("pipeline", job.pipe.Name()), zap.String("driver", job.pipe.Driver()), zap.Time("start", t), zap.Int64("elapsed", time.Since(t).Milliseconds()))
// if a pipeline initialized to be consumed, call Run on it
if _, ok := (*p.runners)[job.pipe.Name()]; ok {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(job.timeout))
err = initializedDriver.Run(ctx, job.pipe)
if err != nil {
p.mu.Lock()
p.errs = append(p.errs, err)
p.mu.Unlock()
}
cancel()
}
p.wg.Done()
}
p.log.Debug("exited from jobs pipeline processor")
}()
}
}
func (p *processor) add(pjob *pjob) {
p.wg.Add(1)
p.queueCh <- pjob
}
func (p *processor) errors() []error {
p.mu.Lock()
defer p.mu.Unlock()
errs := make([]error, len(p.errs))
copy(errs, p.errs)
return errs
}
func (p *processor) wait() {
p.wg.Wait()
}
func (p *processor) stop() {
close(p.queueCh)
}