From 122acd754685a0477669f294093ce870fc46517b Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Mon, 16 Mar 2020 17:21:11 +0100 Subject: [PATCH] avoid parallel pulls of the same image To suit the needs of OpenShift CI [1], we need to avoid parallel pulls. While we have an open PR against containers/image [2] to detect parallel copy operations across processes, we are still far from getting this merged; it must be split into smaller chunks and we plan to tackle in the next few months but who knows how long this might take. To support that in CRI-O, however, we enjoy the benefit of running a daemon where we can easily avoid redundantly pulling the same image in parallel as we're in the same process space and can do all synchronisation in memory. [1] https://bugzilla.redhat.com/show_bug.cgi?id=1785390 [2] https://github.com/containers/image/pull/611 Signed-off-by: Valentin Rothberg --- go.sum | 1 + server/image_pull.go | 69 +++++++++++++++++++++++++++++++++----------- server/server.go | 10 +++++++ 3 files changed, 63 insertions(+), 17 deletions(-) diff --git a/go.sum b/go.sum index 116667018456..2e613530864d 100644 --- a/go.sum +++ b/go.sum @@ -798,6 +798,7 @@ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoH github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= diff --git a/server/image_pull.go b/server/image_pull.go index 3e1069c35503..3150a8ef991a 100644 --- a/server/image_pull.go +++ b/server/image_pull.go @@ -18,15 +18,8 @@ import ( var localRegistryPrefix = libpodImage.DefaultLocalRegistry + "/" -// PullImage pulls a image with authentication config. -func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp *pb.PullImageResponse, err error) { - // TODO: what else do we need here? (Signatures when the story isn't just pulling from docker://) - image := "" - img := req.GetImage() - if img != nil { - image = img.Image - } - +func (s *Server) pullImage(ctx context.Context, req *pb.PullImageRequest, image string) (string, error) { + var err error sourceCtx := *s.config.SystemContext // A shallow copy we can modify if req.GetAuth() != nil { username := req.GetAuth().Username @@ -34,8 +27,8 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp if req.GetAuth().Auth != "" { username, password, err = decodeDockerAuth(req.GetAuth().Auth) if err != nil { - log.Debugf(ctx, "error decoding authentication for image %s: %v", img, err) - return nil, err + log.Debugf(ctx, "error decoding authentication for image %s: %v", image, err) + return "", err } } // Specifying a username indicates the user intends to send authentication to the registry. @@ -49,7 +42,7 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp decryptConfig, err := getDecryptionKeys(s.config.DecryptionKeysPath) if err != nil { - return nil, err + return "", err } var ( @@ -58,7 +51,7 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp ) images, err = s.StorageImageServer().ResolveNames(s.config.SystemContext, image) if err != nil { - return nil, err + return "", err } for _, img := range images { var tmpImg types.ImageCloser @@ -159,19 +152,61 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp pulled = img break } - if pulled == "" && err != nil { - return nil, err + + if pulled == "" { + return "", nil } + status, err := s.StorageImageServer().ImageStatus(s.config.SystemContext, pulled) if err != nil { - return nil, err + return "", err } imageRef := status.ID if len(status.RepoDigests) > 0 { imageRef = status.RepoDigests[0] } + + return imageRef, nil +} + +// PullImage pulls a image with authentication config. +func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp *pb.PullImageResponse, err error) { + // TODO: what else do we need here? (Signatures when the story isn't just pulling from docker://) + image := "" + img := req.GetImage() + if img != nil { + image = img.Image + } + + result := &pullResult{} + s.pullLock.Lock() + pr, pullInProcess := s.pullCache[image] + if pullInProcess { + result = pr + } else { + s.pullCache[image] = result + result.wg.Add(1) + } + s.pullLock.Unlock() + + if !pullInProcess { + result.imageRef, result.err = s.pullImage(ctx, req, image) + // Note: we must lock to quickly notify routines that already queued-in + // and to clean the cache afterwards. + s.pullLock.Lock() + delete(s.pullCache, image) + result.wg.Done() + s.pullLock.Unlock() + } else { + result.wg.Wait() + } + + if result.imageRef == "" && result.err != nil { + return nil, result.err + } + resp = &pb.PullImageResponse{ - ImageRef: imageRef, + ImageRef: result.imageRef, } return resp, nil } diff --git a/server/server.go b/server/server.go index 919fb1a8340d..2f6ced9f6b73 100644 --- a/server/server.go +++ b/server/server.go @@ -49,6 +49,12 @@ type StreamService struct { streaming.Runtime } +type pullResult struct { + imageRef string // reference of the pulled image + err error // error from the pulling goroutine + wg sync.WaitGroup // waitgroup allows for queuing requests +} + // Server implements the RuntimeService and ImageService type Server struct { config libconfig.Config @@ -61,6 +67,9 @@ type Server struct { defaultIDMappings *idtools.IDMappings updateLock sync.RWMutex + + pullLock sync.Mutex + pullCache map[string]*pullResult } type certConfigCache struct { @@ -330,6 +339,7 @@ func New( config: *config, monitorsChan: make(chan struct{}), defaultIDMappings: idMappings, + pullCache: make(map[string]*pullResult), } if err := configureMaxThreads(); err != nil {