From 93715c752c702b2fe96c9a3d2478ae133267559f Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Mon, 16 Mar 2020 17:21:11 +0100 Subject: [PATCH] avoid parallel pulls of the same image To suit the needs of OpenShift CI [1], we need to avoid parallel pulls. While we have an open PR against containers/image [2] to detect parallel copy operations across processes, we are still far from getting this merged; it must be split into smaller chunks and we plan to tackle in the next few months but who knows how long this might take. To support that in CRI-O, however, we enjoy the benefit of running a daemon where we can easily avoid redundantly pulling the same image in parallel as we're in the same process space and can do all synchronisation in memory. [1] https://bugzilla.redhat.com/show_bug.cgi?id=1785390 [2] https://github.com/containers/image/pull/611 Signed-off-by: Valentin Rothberg --- server/image_pull.go | 62 +++++++++++++++++++++++++++++++++++++------- server/server.go | 20 ++++++++++++++ 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/server/image_pull.go b/server/image_pull.go index 3e1069c35503..8928fd494bdf 100644 --- a/server/image_pull.go +++ b/server/image_pull.go @@ -27,6 +27,48 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp image = img.Image } + // We use the server's pullCache to record which images are currently being + // pulled. This allows for avoiding pulling the same image in parallel. + // Hence, if a given image is currently being pulled, we queue into the + // pullOperation's waitgroup and wait for the pulling goroutine to unblock + // us and re-use its results. + pullOp := &pullOperation{} + s.pullCacheLock.Lock() + pr, pullInProcess := s.pullCache[image] + if pullInProcess { + pullOp = pr + } else { + s.pullCache[image] = pullOp + pullOp.wg.Add(1) + } + s.pullCacheLock.Unlock() + + if !pullInProcess { + pullOp.imageRef, pullOp.err = s.pullImage(ctx, req, image) + // Unblock waiting goroutines and delete the entry from the cache. + s.pullCacheLock.Lock() + delete(s.pullCache, image) + pullOp.wg.Done() + s.pullCacheLock.Unlock() + } else { + pullOp.wg.Wait() + } + + if pullOp.imageRef == "" && pullOp.err != nil { + return nil, pullOp.err + } + + resp = &pb.PullImageResponse{ + ImageRef: pullOp.imageRef, + } + return resp, nil +} + +// pullImage performs the actual pull operation of PullImage. Used to separate +// the pull implementation from the pullCache logic in PullImage and improve +// readability and maintainability. +func (s *Server) pullImage(ctx context.Context, req *pb.PullImageRequest, image string) (string, error) { + var err error sourceCtx := *s.config.SystemContext // A shallow copy we can modify if req.GetAuth() != nil { username := req.GetAuth().Username @@ -34,8 +76,8 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp if req.GetAuth().Auth != "" { username, password, err = decodeDockerAuth(req.GetAuth().Auth) if err != nil { - log.Debugf(ctx, "error decoding authentication for image %s: %v", img, err) - return nil, err + log.Debugf(ctx, "error decoding authentication for image %s: %v", image, err) + return "", err } } // Specifying a username indicates the user intends to send authentication to the registry. @@ -49,7 +91,7 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp decryptConfig, err := getDecryptionKeys(s.config.DecryptionKeysPath) if err != nil { - return nil, err + return "", err } var ( @@ -58,7 +100,7 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp ) images, err = s.StorageImageServer().ResolveNames(s.config.SystemContext, image) if err != nil { - return nil, err + return "", err } for _, img := range images { var tmpImg types.ImageCloser @@ -159,21 +201,21 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp pulled = img break } + if pulled == "" && err != nil { - return nil, err + return "", err } + status, err := s.StorageImageServer().ImageStatus(s.config.SystemContext, pulled) if err != nil { - return nil, err + return "", err } imageRef := status.ID if len(status.RepoDigests) > 0 { imageRef = status.RepoDigests[0] } - resp = &pb.PullImageResponse{ - ImageRef: imageRef, - } - return resp, nil + + return imageRef, nil } func decodeDockerAuth(s string) (user, password string, err error) { diff --git a/server/server.go b/server/server.go index 919fb1a8340d..ba0f40ab421b 100644 --- a/server/server.go +++ b/server/server.go @@ -61,6 +61,25 @@ type Server struct { defaultIDMappings *idtools.IDMappings updateLock sync.RWMutex + + // pullCache is used to avoid pulling the same image in parallel. Goroutines + // will block on the pullResult. + pullCache map[string]*pullOperation + // pullCacheLock is used to synchronize the pullCache. + pullCacheLock sync.Mutex +} + +// pullOperation is used to synchronize parallel pull operations via the +// server's pullCache. Goroutines can block the pullOperation's waitgroup and +// be released once the pull operation has finished. +type pullOperation struct { + // imageRef is the reference of the pulled image. + imageRef string + // err is the error indicating if the pull operation has succeeded or not. + err error + // wg allows for Goroutines trying to pull the same image to wait until the + // currently running pull operation has finished. + wg sync.WaitGroup } type certConfigCache struct { @@ -330,6 +349,7 @@ func New( config: *config, monitorsChan: make(chan struct{}), defaultIDMappings: idMappings, + pullCache: make(map[string]*pullOperation), } if err := configureMaxThreads(); err != nil {