Skip to content

Commit

Permalink
Fix caching of pull future
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Mar 1, 2017
1 parent 04d2be3 commit 8e6d77e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
4 changes: 4 additions & 0 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,10 @@ CREATE:
time.Sleep(1 * time.Second)
goto CREATE
}
} else if strings.Contains(strings.ToLower(createErr.Error()), "no such image") {
// There is still a very small chance this is possible even with the
// coordinator so retry.
structs.NewRecoverableError(createErr, true)
}

return nil, recoverableErrTimeouts(createErr)
Expand Down
58 changes: 46 additions & 12 deletions client/driver/docker_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,8 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occured during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration) (imageID string, err error) {
// Lock while we look up the future
d.imageLock.Lock()

// Get the future
d.imageLock.Lock()
future, ok := d.pullFutures[image]
if !ok {
// Make the future
Expand All @@ -147,9 +145,19 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf
// We unlock while we wait since this can take a while
id, err := future.wait().result()

d.imageLock.Lock()
defer d.imageLock.Unlock()

// Delete the future since we don't need it and we don't want to cache an
// image being there if it has possibly been manually deleted (outside of
// Nomad).
if _, ok := d.pullFutures[image]; ok {
delete(d.pullFutures, image)
}

// If we are cleaning up, we increment the reference count on the image
if err == nil && d.cleanup {
d.IncrementImageReference(id, image)
d.incrementImageReferenceImpl(id, image)
}

return id, err
Expand Down Expand Up @@ -196,16 +204,22 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth
// IncrementImageReference is used to increment an image reference count
func (d *dockerCoordinator) IncrementImageReference(id, image string) {
d.imageLock.Lock()
d.imageRefCount[id] += 1
d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", image, id, d.imageRefCount[id])
defer d.imageLock.Unlock()
d.incrementImageReferenceImpl(id, image)
}

// incrementImageReferenceImpl assumes the lock is held
func (d *dockerCoordinator) incrementImageReferenceImpl(id, image string) {
// Cancel any pending delete
if cancel, ok := d.deleteFuture[id]; ok {
d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", image)
cancel()
delete(d.deleteFuture, id)
}
d.imageLock.Unlock()

// Increment the reference
d.imageRefCount[id] += 1
d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", image, id, d.imageRefCount[id])
}

// RemoveImage removes the given image. If there are any errors removing the
Expand All @@ -214,6 +228,10 @@ func (d *dockerCoordinator) RemoveImage(id string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()

if !d.cleanup {
return
}

references, ok := d.imageRefCount[id]
if !ok {
d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", id)
Expand Down Expand Up @@ -250,11 +268,6 @@ func (d *dockerCoordinator) RemoveImage(id string) {
// delay to remove the image. If the context is cancalled before that the image
// removal will be cancelled.
func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {
// Sanity check
if !d.cleanup {
return
}

// Wait for the delay or a cancellation event
select {
case <-ctx.Done():
Expand All @@ -263,6 +276,20 @@ func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {
case <-time.After(d.removeDelay):
}

// Ensure we are suppose to delete. Do a short check while holding the lock
// so there can't be interleaving. There is still the smallest chance that
// the delete occurs after the image has been pulled but before it has been
// incremented. For handling that we just treat it as a recoverable error in
// the docker driver.
d.imageLock.Lock()
select {
case <-ctx.Done():
d.imageLock.Unlock()
return
default:
}
d.imageLock.Unlock()

for i := 0; i < 3; i++ {
err := d.client.RemoveImage(id)
if err == nil {
Expand All @@ -280,6 +307,13 @@ func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {

// Retry on unknown errors
d.logger.Printf("[DEBUG] driver.docker: failed to remove image %q (attempt %d): %v", id, i+1, err)

select {
case <-ctx.Done():
// We have been cancelled
return
case <-time.After(3 * time.Second):
}
}

d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id)
Expand Down
34 changes: 24 additions & 10 deletions client/driver/docker_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,31 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {

id := ""
for i := 0; i < 10; i++ {
id, _ = coordinator.PullImage(image, nil)
}

if p := mock.pulled[image]; p != 1 {
t.Fatalf("Got multiple pulls %d", p)
go func() {
id, _ = coordinator.PullImage(image, nil)
}()
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 10 {
t.Fatalf("Got reference count %d; want %d", r, 10)
}
testutil.WaitForResult(func() (bool, error) {
p := mock.pulled[image]
if p != 1 {
return false, fmt.Errorf("Wrong number of pulls: %d", p)
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 10 {
return false, fmt.Errorf("Got reference count %d; want %d", r, 10)
}

// Ensure there is no pull future
if len(coordinator.pullFutures) != 0 {
return false, fmt.Errorf("Pull future exists after pull finished")
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestDockerCoordinator_Pull_Remove(t *testing.T) {
Expand Down Expand Up @@ -146,7 +160,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
logger: testLogger(),
cleanup: true,
client: mock,
removeDelay: 1 * time.Millisecond,
removeDelay: 100 * time.Millisecond,
}

// Create a coordinator
Expand Down

0 comments on commit 8e6d77e

Please sign in to comment.