Skip to content

Commit

Permalink
avoid parallel pulls of the same image
Browse files Browse the repository at this point in the history
To suit the needs of OpenShift CI [1], we need to avoid parallel pulls.
While we have an open PR against containers/image [2] to detect parallel
copy operations across processes, we are still far from getting this
merged; it must be split into smaller chunks and we plan to tackle in
the next few months but who knows how long this might take.

To support that in CRI-O, however, we enjoy the benefit of running a
daemon where we can easily avoid redundantly pulling the same image in
parallel as we're in the same process space and can do all
synchronisation in memory.

[1] https://bugzilla.redhat.com/show_bug.cgi?id=1785390
[2] containers/image#611

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Mar 17, 2020
1 parent f7dc46e commit 93715c7
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 10 deletions.
62 changes: 52 additions & 10 deletions server/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,57 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp
image = img.Image
}

// We use the server's pullCache to record which images are currently being
// pulled. This allows for avoiding pulling the same image in parallel.
// Hence, if a given image is currently being pulled, we queue into the
// pullOperation's waitgroup and wait for the pulling goroutine to unblock
// us and re-use its results.
pullOp := &pullOperation{}
s.pullCacheLock.Lock()
pr, pullInProcess := s.pullCache[image]
if pullInProcess {
pullOp = pr
} else {
s.pullCache[image] = pullOp
pullOp.wg.Add(1)
}
s.pullCacheLock.Unlock()

if !pullInProcess {
pullOp.imageRef, pullOp.err = s.pullImage(ctx, req, image)
// Unblock waiting goroutines and delete the entry from the cache.
s.pullCacheLock.Lock()
delete(s.pullCache, image)
pullOp.wg.Done()
s.pullCacheLock.Unlock()
} else {
pullOp.wg.Wait()
}

if pullOp.imageRef == "" && pullOp.err != nil {
return nil, pullOp.err
}

resp = &pb.PullImageResponse{
ImageRef: pullOp.imageRef,
}
return resp, nil
}

// pullImage performs the actual pull operation of PullImage. Used to separate
// the pull implementation from the pullCache logic in PullImage and improve
// readability and maintainability.
func (s *Server) pullImage(ctx context.Context, req *pb.PullImageRequest, image string) (string, error) {
var err error
sourceCtx := *s.config.SystemContext // A shallow copy we can modify
if req.GetAuth() != nil {
username := req.GetAuth().Username
password := req.GetAuth().Password
if req.GetAuth().Auth != "" {
username, password, err = decodeDockerAuth(req.GetAuth().Auth)
if err != nil {
log.Debugf(ctx, "error decoding authentication for image %s: %v", img, err)
return nil, err
log.Debugf(ctx, "error decoding authentication for image %s: %v", image, err)
return "", err
}
}
// Specifying a username indicates the user intends to send authentication to the registry.
Expand All @@ -49,7 +91,7 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp

decryptConfig, err := getDecryptionKeys(s.config.DecryptionKeysPath)
if err != nil {
return nil, err
return "", err
}

var (
Expand All @@ -58,7 +100,7 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp
)
images, err = s.StorageImageServer().ResolveNames(s.config.SystemContext, image)
if err != nil {
return nil, err
return "", err
}
for _, img := range images {
var tmpImg types.ImageCloser
Expand Down Expand Up @@ -159,21 +201,21 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp
pulled = img
break
}

if pulled == "" && err != nil {
return nil, err
return "", err
}

status, err := s.StorageImageServer().ImageStatus(s.config.SystemContext, pulled)
if err != nil {
return nil, err
return "", err
}
imageRef := status.ID
if len(status.RepoDigests) > 0 {
imageRef = status.RepoDigests[0]
}
resp = &pb.PullImageResponse{
ImageRef: imageRef,
}
return resp, nil

return imageRef, nil
}

func decodeDockerAuth(s string) (user, password string, err error) {
Expand Down
20 changes: 20 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ type Server struct {
defaultIDMappings *idtools.IDMappings

updateLock sync.RWMutex

// pullCache is used to avoid pulling the same image in parallel. Goroutines
// will block on the pullResult.
pullCache map[string]*pullOperation
// pullCacheLock is used to synchronize the pullCache.
pullCacheLock sync.Mutex
}

// pullOperation is used to synchronize parallel pull operations via the
// server's pullCache. Goroutines can block the pullOperation's waitgroup and
// be released once the pull operation has finished.
type pullOperation struct {
// imageRef is the reference of the pulled image.
imageRef string
// err is the error indicating if the pull operation has succeeded or not.
err error
// wg allows for Goroutines trying to pull the same image to wait until the
// currently running pull operation has finished.
wg sync.WaitGroup
}

type certConfigCache struct {
Expand Down Expand Up @@ -330,6 +349,7 @@ func New(
config: *config,
monitorsChan: make(chan struct{}),
defaultIDMappings: idMappings,
pullCache: make(map[string]*pullOperation),
}

if err := configureMaxThreads(); err != nil {
Expand Down

0 comments on commit 93715c7

Please sign in to comment.