-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgroup.go
298 lines (256 loc) · 8.62 KB
/
group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
package goservices
import (
"context"
"fmt"
"sync"
)
var _ Service = (*Group)(nil)
// Group is a group of services to start and stop in parallel.
// It implements the Service interface itself.
type Group struct {
name string
services []Service
hooks Hooks
startStopMutex sync.Mutex
state State
stateMutex sync.RWMutex
fanIn *errorsFanIn
runningServices map[string]struct{}
interceptStop chan struct{}
interceptDone chan struct{}
}
// NewGroup creates a new group of services given the settings,
// and returns an error if any setting is not valid.
func NewGroup(settings GroupSettings) (group *Group, err error) {
settings.setDefaults()
err = settings.validate()
if err != nil {
return nil, fmt.Errorf("validating settings: %w", err)
}
services := make([]Service, len(settings.Services))
copy(services, settings.Services)
return &Group{
name: settings.Name,
services: services,
hooks: settings.Hooks,
state: StateStopped,
runningServices: make(map[string]struct{}),
}, nil
}
func (g *Group) String() string {
if g.name == "" {
return "group"
}
return "group " + g.name
}
// Start starts services specified in parallel.
//
// If a service fails to start, the `startErr` is returned
// and all other running services are stopped.
//
// If a service fails after `Start` returns without error,
// all other running services are stopped and the error is
// sent in the `runError` channel which is then closed.
// A caller should listen on `runError` until the `Stop` method
// call fully completes, since a run error can theoretically happen
// at the same time the caller calls `Stop` on the group.
//
// If the group is already running, the `ErrAlreadyStarted` error
// is returned.
//
// If the context is canceled, all the starting operations are canceled,
// all already running services are stopped and the context error is wrapped
// in the `startErr` returned.
func (g *Group) Start(ctx context.Context) (runError <-chan error, startErr error) {
g.startStopMutex.Lock()
defer g.startStopMutex.Unlock()
// Lock the state in case the group is already running.
g.stateMutex.RLock()
state := g.state
// no need to keep a lock on the state since the `startStopMutex`
// prevents concurrent calls to `Start` and `Stop`.
g.stateMutex.RUnlock()
if state == StateRunning {
return nil, fmt.Errorf("%s: %w", g, ErrAlreadyStarted)
}
g.state = StateStarting
var fanInErrorCh <-chan serviceError
g.fanIn, fanInErrorCh = newErrorsFanIn()
runErrorChannels := make(map[string]<-chan error, len(g.services))
startErrorCh := make(chan *serviceError)
runErrorMapMutex := new(sync.Mutex)
for _, service := range g.services {
serviceString := service.String()
go startGroupedServiceAsync(ctx, service, serviceString,
g.hooks, startErrorCh, runErrorChannels, runErrorMapMutex)
// assume all the services are going to be running
g.runningServices[serviceString] = struct{}{}
}
// Collect eventual start error and wait for all services
// to be started or failed to start.
for range g.services {
serviceErr := <-startErrorCh
if serviceErr == nil {
continue
}
delete(g.runningServices, serviceErr.serviceName)
if startErr == nil {
startErr = addCtxErrorIfNeeded(serviceErr, ctx.Err())
}
}
if startErr != nil {
_ = g.stop()
return nil, startErr
}
for serviceString, runError := range runErrorChannels {
g.fanIn.add(serviceString, runError)
}
// Hold the state mutex until the intercept run error goroutine is ready
// and we change the state to running.
// This is as such because the intercept goroutine may catch a service run error
// as soon as it starts, and try to set the group state as crashed.
// With this lock, the goroutine must wait for the mutex unlock below before
// changing the state to crashed.
g.stateMutex.Lock()
runErrorCh := make(chan error)
interceptReady := make(chan struct{})
g.interceptStop = make(chan struct{})
g.interceptDone = make(chan struct{})
go g.interceptRunError(interceptReady, fanInErrorCh, runErrorCh)
<-interceptReady
g.state = StateRunning
g.stateMutex.Unlock()
return runErrorCh, nil
}
func startGroupedServiceAsync(ctx context.Context, service Starter,
serviceString string, hooks Hooks, startErrorCh chan<- *serviceError,
runErrorChannels map[string]<-chan error, mutex *sync.Mutex) {
hooks.OnStart(serviceString)
runError, err := service.Start(ctx)
hooks.OnStarted(serviceString, err)
if err != nil {
startErrorCh <- &serviceError{
format: errorFormatStart,
serviceName: serviceString,
err: err,
}
return
}
mutex.Lock()
runErrorChannels[serviceString] = runError
mutex.Unlock()
startErrorCh <- nil
}
// interceptRunError, if it catches an error from the input
// channel, registers the crashed service of the group,
// stops other running services and forwards the error
// to the output channel and finally closes this channel.
// If the stop channel triggers, the function returns.
func (g *Group) interceptRunError(ready chan<- struct{},
input <-chan serviceError, output chan<- error) {
defer close(g.interceptDone)
close(ready)
select {
case <-g.interceptStop:
case serviceErr := <-input:
// Lock the state mutex in case we are stopping
// or trying to stop the group at the same time.
g.stateMutex.Lock()
if g.state == StateStopping {
// Discard the eventual single service run error
// fanned-in if we are stopping the group.
g.stateMutex.Unlock()
return
}
// The first and only service fanned-in run error was
// caught and we are not currently stopping the group.
g.state = StateCrashed
delete(g.runningServices, serviceErr.serviceName)
g.stateMutex.Unlock()
g.hooks.OnCrash(serviceErr.serviceName, serviceErr.err)
_ = g.stop()
output <- &serviceErr
close(output)
}
}
// Stop stops running services of the group in parallel.
// If an error occurs for any of the service stop,
// the other running services will still be stopped.
// Only the first non nil service stop error encountered
// is returned, but the hooks can be used to process each
// error returned.
// If the group is already stopped, the `ErrAlreadyStopped` error
// is returned.
func (g *Group) Stop() (err error) {
g.startStopMutex.Lock()
defer g.startStopMutex.Unlock()
g.stateMutex.Lock()
switch g.state {
case StateRunning: // continue stopping the group
case StateCrashed:
g.stateMutex.Unlock()
// group is already stopped or stopping from
// the intercept goroutine, so just wait for the
// intercept goroutine to finish.
<-g.interceptDone
return nil
case StateStopped:
g.stateMutex.Unlock()
return fmt.Errorf("%s: %w", g, ErrAlreadyStopped)
case StateStarting, StateStopping:
g.stateMutex.Unlock()
panic("bad group implementation code: this code path should be unreachable")
}
g.state = StateStopping
g.stateMutex.Unlock()
err = g.stop()
// Stop the intercept error goroutine after we stop
// all the group services. This means the fan in might
// send one error to the intercept goroutine, but it will
// discard it since we are in the stopping state.
// The error fan in takes care of reading and discarding
// errors from other services once it caught the first error.
close(g.interceptStop)
<-g.interceptDone
g.state = StateStopped
return err
}
// stop stops all running services in the group of services.
// If a service fails to stop in the group, its error
// is returned but the other services are still stopped.
// All service stop errors are wrapped together in the format
// stopping <name_1>: %w; stopping <name_2>: %w; ...
// and can be checked individually with errors.Is(err, ErrDefined).
// Hooks can be used to access each stopping and stop result.
func (g *Group) stop() (err error) {
stopErrors := make(chan serviceError)
var runningCount uint
for _, service := range g.services {
serviceString := service.String()
_, running := g.runningServices[serviceString]
if !running {
continue
}
runningCount++
go func(service Stopper, serviceString string, stopErrors chan<- serviceError) {
g.hooks.OnStop(serviceString)
err := service.Stop()
g.hooks.OnStopped(serviceString, err)
stopErrors <- serviceError{
format: errorFormatStop,
serviceName: serviceString,
err: err,
}
}(service, serviceString, stopErrors)
}
for i := uint(0); i < runningCount; i++ {
stopErr := <-stopErrors
err = addStopError(err, stopErr.serviceName, stopErr.err)
delete(g.runningServices, stopErr.serviceName)
}
// Only stop the fan in after stopping all services
// so it can read and discard any eventual run errors
// from these whilst we stop them.
g.fanIn.stop()
return err
}