Skip to content

Commit

Permalink
avoid parallel pulls of the same image
Browse files Browse the repository at this point in the history
To suit the needs of OpenShift CI [1], we need to avoid parallel pulls.
While we have an open PR against containers/image [2] to detect parallel
copy operations across processes, we are still far from getting this
merged; it must be split into smaller chunks and we plan to tackle in
the next few months but who knows how long this might take.

To support that in CRI-O, however, we enjoy the benefit of running a
daemon where we can easily avoid redundantly pulling the same image in
parallel as we're in the same process space and can do all
synchronisation in memory.

[1] https://bugzilla.redhat.com/show_bug.cgi?id=1785390
[2] containers/image#611

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Mar 17, 2020
1 parent f7dc46e commit aac8910
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 18 deletions.
61 changes: 49 additions & 12 deletions server/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,75 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp
image = img.Image
}

sourceCtx := *s.config.SystemContext // A shallow copy we can modify
pullArgs := pullArguments{image: image}
if req.GetAuth() != nil {
username := req.GetAuth().Username
password := req.GetAuth().Password
if req.GetAuth().Auth != "" {
username, password, err = decodeDockerAuth(req.GetAuth().Auth)
if err != nil {
log.Debugf(ctx, "error decoding authentication for image %s: %v", img, err)
log.Debugf(ctx, "error decoding authentication for image %s: %v", image, err)
return nil, err
}
}
// Specifying a username indicates the user intends to send authentication to the registry.
if username != "" {
sourceCtx.DockerAuthConfig = &types.DockerAuthConfig{
pullArgs.credentials = types.DockerAuthConfig{
Username: username,
Password: password,
}
}
}

// We use the server's pullCache to record which images are currently being
// pulled. This allows for avoiding pulling the same image in parallel.
// Hence, if a given image is currently being pulled, we queue into the
// pullOperation's waitgroup and wait for the pulling goroutine to unblock
// us and re-use its results.
pullOp, pullInProcess := s.pullOperation(&pullArgs)
if !pullInProcess {
go func() {
pullOp.imageRef, pullOp.err = s.pullImage(ctx, req, &pullArgs)
// Unblock waiting goroutines and delete the entry from the cache.
s.pullOperationsLock.Lock()
delete(s.pullOperationsInProgress, pullArgs)
pullOp.wg.Done()
s.pullOperationsLock.Unlock()
}()
}
// Wait for the pull operation to finish.
pullOp.wg.Wait()

if pullOp.err != nil {
return nil, pullOp.err
}

resp = &pb.PullImageResponse{
ImageRef: pullOp.imageRef,
}
return resp, nil
}

// pullImage performs the actual pull operation of PullImage. Used to separate
// the pull implementation from the pullCache logic in PullImage and improve
// readability and maintainability.
func (s *Server) pullImage(ctx context.Context, req *pb.PullImageRequest, pullArgs *pullArguments) (string, error) {
var err error
sourceCtx := *s.config.SystemContext // A shallow copy we can modify
sourceCtx.DockerAuthConfig = &pullArgs.credentials

decryptConfig, err := getDecryptionKeys(s.config.DecryptionKeysPath)
if err != nil {
return nil, err
return "", err
}

var (
images []string
pulled string
)
images, err = s.StorageImageServer().ResolveNames(s.config.SystemContext, image)
images, err = s.StorageImageServer().ResolveNames(s.config.SystemContext, pullArgs.image)
if err != nil {
return nil, err
return "", err
}
for _, img := range images {
var tmpImg types.ImageCloser
Expand Down Expand Up @@ -159,21 +196,21 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp
pulled = img
break
}

if pulled == "" && err != nil {
return nil, err
return "", err
}

status, err := s.StorageImageServer().ImageStatus(s.config.SystemContext, pulled)
if err != nil {
return nil, err
return "", err
}
imageRef := status.ID
if len(status.RepoDigests) > 0 {
imageRef = status.RepoDigests[0]
}
resp = &pb.PullImageResponse{
ImageRef: imageRef,
}
return resp, nil

return imageRef, nil
}

func decodeDockerAuth(s string) (user, password string, err error) {
Expand Down
55 changes: 49 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync"
"time"

"github.com/containers/image/v5/types"
"github.com/containers/storage/pkg/idtools"
"github.com/cri-o/cri-o/internal/lib"
"github.com/cri-o/cri-o/internal/lib/sandbox"
Expand Down Expand Up @@ -61,6 +62,47 @@ type Server struct {
defaultIDMappings *idtools.IDMappings

updateLock sync.RWMutex

// pullOperationsInProgress is used to avoid pulling the same image in parallel. Goroutines
// will block on the pullResult.
pullOperationsInProgress map[pullArguments]*pullOperation
// pullOperationsLock is used to synchronize pull operations.
pullOperationsLock sync.Mutex
}

// pullArguments are used to identify a pullOperation via an input image name and
// possibly specified credentials.
type pullArguments struct {
image string
credentials types.DockerAuthConfig
}

// pullOperation is used to synchronize parallel pull operations via the
// server's pullCache. Goroutines can block the pullOperation's waitgroup and
// be released once the pull operation has finished.
type pullOperation struct {
// wg allows for Goroutines trying to pull the same image to wait until the
// currently running pull operation has finished.
wg sync.WaitGroup
// imageRef is the reference of the actually pulled image which will differ
// from the input if it was a short name (e.g., alpine).
imageRef string
// err is the error indicating if the pull operation has succeeded or not.
err error
}

// pullOperation returns a pullOperation and a bool indicating if the
// pullOperation is already in progress.
func (s *Server) pullOperation(pullArgs *pullArguments) (pullOp *pullOperation, inProgress bool) {
s.pullOperationsLock.Lock()
defer s.pullOperationsLock.Unlock()
pullOp, inProgress = s.pullOperationsInProgress[*pullArgs]
if !inProgress {
pullOp = &pullOperation{}
s.pullOperationsInProgress[*pullArgs] = pullOp
pullOp.wg.Add(1)
}
return pullOp, inProgress
}

type certConfigCache struct {
Expand Down Expand Up @@ -324,12 +366,13 @@ func New(
}

s := &Server{
ContainerServer: containerServer,
netPlugin: netPlugin,
hostportManager: hostportManager,
config: *config,
monitorsChan: make(chan struct{}),
defaultIDMappings: idMappings,
ContainerServer: containerServer,
netPlugin: netPlugin,
hostportManager: hostportManager,
config: *config,
monitorsChan: make(chan struct{}),
defaultIDMappings: idMappings,
pullOperationsInProgress: make(map[pullArguments]*pullOperation),
}

if err := configureMaxThreads(); err != nil {
Expand Down

0 comments on commit aac8910

Please sign in to comment.