Skip to content

Commit

Permalink
Prevent concurrent pulls of same imageRef
Browse files Browse the repository at this point in the history
  • Loading branch information
towe75 committed Mar 3, 2022
1 parent 1cf5dc1 commit 4db0768
Showing 1 changed file with 68 additions and 13 deletions.
81 changes: 68 additions & 13 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"

"github.com/armon/circbuf"
Expand Down Expand Up @@ -104,6 +105,11 @@ type Driver struct {
systemInfo api.Info
// Queried from systemInfo: is podman running on a cgroupv2 system?
cgroupV2 bool

// lists ongoing image pull operations. Key is a imageRef
pullInfo map[string]*sync.WaitGroup
// guards access to pullInfo[]
pullLock sync.RWMutex
}

// TaskState is the state which is encoded in the handle returned in
Expand All @@ -127,6 +133,7 @@ func NewPodmanDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx: ctx,
signalShutdown: cancel,
logger: logger.Named(pluginName),
pullInfo: make(map[string]*sync.WaitGroup),
}
}

Expand Down Expand Up @@ -751,24 +758,72 @@ func (d *Driver) createImage(image string, auth *AuthConfig, forcePull bool, cfg
return imageID, nil
}

d.logger.Info("Pulling image", "image", imageName)
//nolint // ignore returned error, can't react in a good way
d.eventer.EmitEvent(&drivers.TaskEvent{
TaskID: cfg.ID,
TaskName: cfg.Name,
AllocID: cfg.AllocID,
Timestamp: time.Now(),
Message: "Pulling image " + imageName,
})
imageAuth := api.ImageAuthConfig{
Username: auth.Username,
Password: auth.Password,
}
if imageID, err = d.podman.ImagePull(d.ctx, imageName, imageAuth); err != nil {
return imageID, fmt.Errorf("failed to start task, unable to pull image %s : %w", imageName, err)

// see if a concurrent pull is in progress
d.pullLock.Lock()
waitFor, inProgress := d.pullInfo[imageName]
if !inProgress {
// no, we will have to pull it
var wg sync.WaitGroup
wg.Add(1)
d.pullInfo[imageName] = &wg
// but do not block pulls for other imageName's
d.pullLock.Unlock()

// ensure to remove our pullInfo entry when pull is done/failed
defer func() {
d.pullLock.Lock()
delete(d.pullInfo, imageName)
wg.Done()
d.pullLock.Unlock()
}()

d.logger.Info("Pulling image", "image", imageName)
//nolint // ignore returned error, can't react in a good way
d.eventer.EmitEvent(&drivers.TaskEvent{
TaskID: cfg.ID,
TaskName: cfg.Name,
AllocID: cfg.AllocID,
Timestamp: time.Now(),
Message: "Pulling image " + imageName,
})

if imageID, err = d.podman.ImagePull(d.ctx, imageName, imageAuth); err != nil {
return imageID, fmt.Errorf("failed to start task, unable to pull image %s : %w", imageName, err)
}

d.logger.Debug("Pulled image ID", "imageID", imageID)
return imageID, nil

} else {
d.pullLock.Unlock()
d.logger.Info("Waiting for other task to pull image", "image", imageName)
//nolint // ignore returned error, can't react in a good way
d.eventer.EmitEvent(&drivers.TaskEvent{
TaskID: cfg.ID,
TaskName: cfg.Name,
AllocID: cfg.AllocID,
Timestamp: time.Now(),
Message: "Waiting for image " + imageName + " to be pulled by other task",
})

// block until the concurrent pull is done
waitFor.Wait()
// did the other task succeed?
imageID, err := d.podman.ImageInspectID(d.ctx, imageName)
if err != nil {
// Something went wrong
d.logger.Warn("Image was not successfully pulled by other task", "image", imageName, "error", err)
return imageID, api.ImageNotFound
}
d.logger.Debug("Image is available now", "imageID", imageID)
return imageID, nil
}
d.logger.Debug("Pulled image ID", "imageID", imageID)
return imageID, nil

}

func parseImage(image string) (types.ImageReference, error) {
Expand Down

0 comments on commit 4db0768

Please sign in to comment.