Skip to content

Commit

Permalink
avoid parallel pulls of the same image
Browse files Browse the repository at this point in the history
Backport of commit 38ba094.

To suit the needs of OpenShift CI [1], we need to avoid parallel pulls.
While we have an open PR against containers/image [2] to detect parallel
copy operations across processes, we are still far from getting this
merged; it must be split into smaller chunks and we plan to tackle in
the next few months but who knows how long this might take.

To support that in CRI-O, however, we enjoy the benefit of running a
daemon where we can easily avoid redundantly pulling the same image in
parallel as we're in the same process space and can do all
synchronisation in memory.

[1] https://bugzilla.redhat.com/show_bug.cgi?id=1785390
[2] containers/image#611

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Mar 23, 2020
1 parent ba0d0e8 commit 0877879
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 22 deletions.
75 changes: 63 additions & 12 deletions server/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cri-o/cri-o/internal/pkg/log"
"github.com/cri-o/cri-o/internal/pkg/storage"
"github.com/cri-o/cri-o/server/metrics"
"github.com/pkg/errors"
"golang.org/x/net/context"
pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)
Expand All @@ -26,31 +27,82 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp
image = img.Image
}

sourceCtx := *s.systemContext // A shallow copy we can modify
pullArgs := pullArguments{image: image}
if req.GetAuth() != nil {
username := req.GetAuth().Username
password := req.GetAuth().Password
if req.GetAuth().Auth != "" {
username, password, err = decodeDockerAuth(req.GetAuth().Auth)
if err != nil {
log.Debugf(ctx, "error decoding authentication for image %s: %v", img, err)
log.Debugf(ctx, "error decoding authentication for image %s: %v", image, err)
return nil, err
}
}
// Specifying a username indicates the user intends to send authentication to the registry.
if username != "" {
sourceCtx.DockerAuthConfig = &types.DockerAuthConfig{
pullArgs.credentials = types.DockerAuthConfig{
Username: username,
Password: password,
}
}
}

// We use the server's pullOperationsInProgress to record which images are
// currently being pulled. This allows for avoiding pulling the same image
// in parallel. Hence, if a given image is currently being pulled, we queue
// into the pullOperation's waitgroup and wait for the pulling goroutine to
// unblock us and re-use its results.
pullOp, pullInProcess := func() (pullOp *pullOperation, inProgress bool) {
s.pullOperationsLock.Lock()
defer s.pullOperationsLock.Unlock()
pullOp, inProgress = s.pullOperationsInProgress[pullArgs]
if !inProgress {
pullOp = &pullOperation{}
s.pullOperationsInProgress[pullArgs] = pullOp
pullOp.wg.Add(1)
}
return pullOp, inProgress
}()

if !pullInProcess {
pullOp.err = errors.New("pullImage was aborted by a Go panic")
defer func() {
s.pullOperationsLock.Lock()
delete(s.pullOperationsInProgress, pullArgs)
pullOp.wg.Done()
s.pullOperationsLock.Unlock()
}()
pullOp.imageRef, pullOp.err = s.pullImage(ctx, &pullArgs)
} else {
// Wait for the pull operation to finish.
pullOp.wg.Wait()
}

if pullOp.err != nil {
return nil, pullOp.err
}

resp = &pb.PullImageResponse{
ImageRef: pullOp.imageRef,
}
return resp, nil
}

// pullImage performs the actual pull operation of PullImage. Used to separate
// the pull implementation from the pullCache logic in PullImage and improve
// readability and maintainability.
func (s *Server) pullImage(ctx context.Context, pullArgs *pullArguments) (string, error) {
var err error
sourceCtx := *s.systemContext // A shallow copy we can modify
if pullArgs.credentials.Username != "" {
sourceCtx.DockerAuthConfig = &pullArgs.credentials
}

var dcc *encconfig.DecryptConfig
if _, err := os.Stat(s.decryptionKeysPath); err == nil {
cc, err := getDecryptionKeys(s.decryptionKeysPath)
if err != nil {
return nil, err
return "", err
}
dcc = cc.DecryptConfig
}
Expand All @@ -59,9 +111,9 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp
images []string
pulled string
)
images, err = s.StorageImageServer().ResolveNames(s.systemContext, image)
images, err = s.StorageImageServer().ResolveNames(s.systemContext, pullArgs.image)
if err != nil {
return nil, err
return "", err
}
for _, img := range images {
var tmpImg types.ImageCloser
Expand Down Expand Up @@ -150,21 +202,20 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp
pulled = img
break
}

if pulled == "" && err != nil {
return nil, err
return "", err
}
status, err := s.StorageImageServer().ImageStatus(s.systemContext, pulled)
if err != nil {
return nil, err
return "", err
}
imageRef := status.ID
if len(status.RepoDigests) > 0 {
imageRef = status.RepoDigests[0]
}
resp = &pb.PullImageResponse{
ImageRef: imageRef,
}
return resp, nil

return imageRef, nil
}

func decodeDockerAuth(s string) (user, password string, err error) {
Expand Down
47 changes: 37 additions & 10 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,32 @@ type Server struct {

seccompEnabled bool
appArmorEnabled bool
// pullOperationsInProgress is used to avoid pulling the same image in parallel. Goroutines
// will block on the pullResult.
pullOperationsInProgress map[pullArguments]*pullOperation
// pullOperationsLock is used to synchronize pull operations.
pullOperationsLock sync.Mutex
}

// pullArguments are used to identify a pullOperation via an input image name and
// possibly specified credentials.
type pullArguments struct {
image string
credentials types.DockerAuthConfig
}

// pullOperation is used to synchronize parallel pull operations via the
// server's pullCache. Goroutines can block the pullOperation's waitgroup and
// be released once the pull operation has finished.
type pullOperation struct {
// wg allows for Goroutines trying to pull the same image to wait until the
// currently running pull operation has finished.
wg sync.WaitGroup
// imageRef is the reference of the actually pulled image which will differ
// from the input if it was a short name (e.g., alpine).
imageRef string
// err is the error indicating if the pull operation has succeeded or not.
err error
}

type certConfigCache struct {
Expand Down Expand Up @@ -350,16 +376,17 @@ func New(
}

s := &Server{
ContainerServer: containerServer,
netPlugin: netPlugin,
hostportManager: hostportManager,
config: *config,
seccompEnabled: seccomp.IsEnabled(),
appArmorEnabled: apparmor.IsEnabled(),
appArmorProfile: config.ApparmorProfile,
monitorsChan: make(chan struct{}),
defaultIDMappings: idMappings,
systemContext: systemContext,
ContainerServer: containerServer,
netPlugin: netPlugin,
hostportManager: hostportManager,
config: *config,
seccompEnabled: seccomp.IsEnabled(),
appArmorEnabled: apparmor.IsEnabled(),
appArmorProfile: config.ApparmorProfile,
monitorsChan: make(chan struct{}),
defaultIDMappings: idMappings,
systemContext: systemContext,
pullOperationsInProgress: make(map[pullArguments]*pullOperation),
}

s.decryptionKeysPath = config.DecryptionKeysPath
Expand Down

0 comments on commit 0877879

Please sign in to comment.