-
Notifications
You must be signed in to change notification settings - Fork 2k
/
Copy pathcoordinator.go
405 lines (335 loc) · 11.9 KB
/
coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
package docker
import (
"context"
"fmt"
"regexp"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)
// pullFuture is a sharable future for retrieving a pulled images ID and any
// error that may have occurred during the pull.
type pullFuture struct {
waitCh chan struct{}
err error
imageID string
}
// newPullFuture returns a new pull future
func newPullFuture() *pullFuture {
return &pullFuture{
waitCh: make(chan struct{}),
}
}
// wait waits till the future has a result
func (p *pullFuture) wait() *pullFuture {
<-p.waitCh
return p
}
// result returns the results of the future and should only ever be called after
// wait returns.
func (p *pullFuture) result() (imageID string, err error) {
return p.imageID, p.err
}
// set is used to set the results and unblock any waiter. This may only be
// called once.
func (p *pullFuture) set(imageID string, err error) {
p.imageID = imageID
p.err = err
close(p.waitCh)
}
// DockerImageClient provides the methods required to do CRUD operations on the
// Docker images
type DockerImageClient interface {
PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error
InspectImage(id string) (*docker.Image, error)
RemoveImage(id string) error
}
// LogEventFn is a callback which allows Drivers to emit task events.
type LogEventFn func(message string, annotations map[string]string)
// noopLogEventFn satisfies the LogEventFn type but noops when called
func noopLogEventFn(string, map[string]string) {}
// dockerCoordinatorConfig is used to configure the Docker coordinator.
type dockerCoordinatorConfig struct {
ctx context.Context
// logger is the logger the coordinator should use
logger hclog.Logger
// cleanup marks whether images should be deleted when the reference count
// is zero
cleanup bool
// client is the Docker client to use for communicating with Docker
client DockerImageClient
// removeDelay is the delay between an image's reference count going to
// zero and the image actually being deleted.
removeDelay time.Duration
}
// dockerCoordinator is used to coordinate actions against images to prevent
// racy deletions. It can be thought of as a reference counter on images.
type dockerCoordinator struct {
*dockerCoordinatorConfig
// imageLock is used to lock access to all images
imageLock sync.Mutex
// pullFutures is used to allow multiple callers to pull the same image but
// only have one request be sent to Docker
pullFutures map[string]*pullFuture
// pullLoggers is used to track the LogEventFn for each alloc pulling an image.
// If multiple alloc's are attempting to pull the same image, each will need
// to register its own LogEventFn with the coordinator.
pullLoggers map[string][]LogEventFn
// pullLoggerLock is used to sync access to the pullLoggers map
pullLoggerLock sync.RWMutex
// imageRefCount is the reference count of image IDs
imageRefCount map[string]map[string]struct{}
// deleteFuture is indexed by image ID and has a cancelable delete future
deleteFuture map[string]context.CancelFunc
}
// newDockerCoordinator returns a new Docker coordinator
func newDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
if config.client == nil {
return nil
}
return &dockerCoordinator{
dockerCoordinatorConfig: config,
pullFutures: make(map[string]*pullFuture),
pullLoggers: make(map[string][]LogEventFn),
imageRefCount: make(map[string]map[string]struct{}),
deleteFuture: make(map[string]context.CancelFunc),
}
}
// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occurred during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string,
emitFn LogEventFn, pullTimeout, pullActivityTimeout time.Duration) (imageID string, err error) {
// Get the future
d.imageLock.Lock()
future, ok := d.pullFutures[image]
d.registerPullLogger(image, emitFn)
if !ok {
// Make the future
future = newPullFuture()
d.pullFutures[image] = future
go d.pullImageImpl(image, authOptions, pullTimeout, pullActivityTimeout, future)
}
d.imageLock.Unlock()
// We unlock while we wait since this can take a while
id, err := future.wait().result()
d.imageLock.Lock()
defer d.imageLock.Unlock()
// Delete the future since we don't need it and we don't want to cache an
// image being there if it has possibly been manually deleted (outside of
// Nomad).
delete(d.pullFutures, image)
// If we are cleaning up, we increment the reference count on the image
if err == nil && d.cleanup {
d.incrementImageReferenceImpl(id, image, callerID)
}
return id, err
}
// pullImageImpl is the implementation of pulling an image. The results are
// returned via the passed future
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration,
pullTimeout, pullActivityTimeout time.Duration, future *pullFuture) {
defer d.clearPullLogger(image)
// Parse the repo and tag
repo, tag := parseDockerImage(image)
ctx, cancel := context.WithTimeout(context.Background(), pullTimeout)
defer cancel()
pm := newImageProgressManager(image, cancel, pullActivityTimeout, d.handlePullInactivity,
d.handlePullProgressReport, d.handleSlowPullProgressReport)
defer pm.stop()
pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
OutputStream: pm,
RawJSONStream: true,
Context: ctx,
}
// Attempt to pull the image
var auth docker.AuthConfiguration
if authOptions != nil {
auth = *authOptions
}
err := d.client.PullImage(pullOptions, auth)
if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded {
d.logger.Error("timeout pulling container", "image_ref", dockerImageRef(repo, tag))
future.set("", recoverablePullError(ctxErr, image))
return
}
if err != nil {
d.logger.Error("failed pulling container", "image_ref", dockerImageRef(repo, tag),
"error", err)
future.set("", recoverablePullError(err, image))
return
}
d.logger.Debug("docker pull succeeded", "image_ref", dockerImageRef(repo, tag))
dockerImage, err := d.client.InspectImage(image)
if err != nil {
d.logger.Error("failed getting image id", "image_name", image, "error", err)
future.set("", recoverableErrTimeouts(err))
return
}
future.set(dockerImage.ID, nil)
}
// IncrementImageReference is used to increment an image reference count
func (d *dockerCoordinator) IncrementImageReference(imageID, imageName, callerID string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()
if d.cleanup {
d.incrementImageReferenceImpl(imageID, imageName, callerID)
}
}
// incrementImageReferenceImpl assumes the lock is held
func (d *dockerCoordinator) incrementImageReferenceImpl(imageID, imageName, callerID string) {
// Cancel any pending delete
if cancel, ok := d.deleteFuture[imageID]; ok {
d.logger.Debug("cancelling removal of container image", "image_name", imageName)
cancel()
delete(d.deleteFuture, imageID)
}
// Increment the reference
references, ok := d.imageRefCount[imageID]
if !ok {
references = make(map[string]struct{})
d.imageRefCount[imageID] = references
}
if _, ok := references[callerID]; !ok {
references[callerID] = struct{}{}
d.logger.Debug("image reference count incremented", "image_name", imageName, "image_id", imageID, "references", len(references))
}
}
// RemoveImage removes the given image. If there are any errors removing the
// image, the remove is retried internally.
func (d *dockerCoordinator) RemoveImage(imageID, callerID string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()
if !d.cleanup {
return
}
references, ok := d.imageRefCount[imageID]
if !ok {
d.logger.Warn("RemoveImage on non-referenced counted image id", "image_id", imageID)
return
}
// Decrement the reference count
delete(references, callerID)
count := len(references)
d.logger.Debug("image id reference count decremented", "image_id", imageID, "references", count)
// Nothing to do
if count != 0 {
return
}
// This should never be the case but we safety guard so we don't leak a
// cancel.
if cancel, ok := d.deleteFuture[imageID]; ok {
d.logger.Error("image id has lingering delete future", "image_id", imageID)
cancel()
}
// Setup a future to delete the image
ctx, cancel := context.WithCancel(d.ctx)
d.deleteFuture[imageID] = cancel
go d.removeImageImpl(imageID, ctx)
// Delete the key from the reference count
delete(d.imageRefCount, imageID)
}
// removeImageImpl is used to remove an image. It wil wait the specified remove
// delay to remove the image. If the context is cancelled before that the image
// removal will be cancelled.
func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {
// Wait for the delay or a cancellation event
select {
case <-ctx.Done():
// We have been cancelled
return
case <-time.After(d.removeDelay):
}
// Ensure we are suppose to delete. Do a short check while holding the lock
// so there can't be interleaving. There is still the smallest chance that
// the delete occurs after the image has been pulled but before it has been
// incremented. For handling that we just treat it as a recoverable error in
// the docker driver.
d.imageLock.Lock()
select {
case <-ctx.Done():
d.imageLock.Unlock()
return
default:
}
d.imageLock.Unlock()
for i := 0; i < 3; i++ {
err := d.client.RemoveImage(id)
if err == nil {
break
}
if err == docker.ErrNoSuchImage {
d.logger.Debug("unable to cleanup image, does not exist", "image_id", id)
return
}
if derr, ok := err.(*docker.Error); ok && derr.Status == 409 {
d.logger.Debug("unable to cleanup image, still in use", "image_id", id)
return
}
// Retry on unknown errors
d.logger.Debug("failed to remove image", "image_id", id, "attempt", i+1, "error", err)
select {
case <-ctx.Done():
// We have been cancelled
return
case <-time.After(3 * time.Second):
}
}
d.logger.Debug("cleanup removed downloaded image", "image_id", id)
// Cleanup the future from the map and free the context by cancelling it
d.imageLock.Lock()
if cancel, ok := d.deleteFuture[id]; ok {
delete(d.deleteFuture, id)
cancel()
}
d.imageLock.Unlock()
}
func (d *dockerCoordinator) registerPullLogger(image string, logger LogEventFn) {
d.pullLoggerLock.Lock()
defer d.pullLoggerLock.Unlock()
if _, ok := d.pullLoggers[image]; !ok {
d.pullLoggers[image] = []LogEventFn{}
}
d.pullLoggers[image] = append(d.pullLoggers[image], logger)
}
func (d *dockerCoordinator) clearPullLogger(image string) {
d.pullLoggerLock.Lock()
defer d.pullLoggerLock.Unlock()
delete(d.pullLoggers, image)
}
func (d *dockerCoordinator) emitEvent(image, message string, annotations map[string]string) {
d.pullLoggerLock.RLock()
defer d.pullLoggerLock.RUnlock()
for i := range d.pullLoggers[image] {
go d.pullLoggers[image][i](message, annotations)
}
}
func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp time.Time) {
d.logger.Error("image pull aborted due to inactivity", "image_name", image,
"last_event_timestamp", timestamp.String(), "last_event", msg)
}
func (d *dockerCoordinator) handlePullProgressReport(image, msg string, _ time.Time) {
d.logger.Debug("image pull progress", "image_name", image, "message", msg)
}
func (d *dockerCoordinator) handleSlowPullProgressReport(image, msg string, _ time.Time) {
d.emitEvent(image, fmt.Sprintf("Docker image pull progress: %s", msg), map[string]string{
"image": image,
})
}
// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func recoverablePullError(err error, image string) error {
recoverable := true
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return structs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}