From dfd69768333b06d95c66cec57a76a727b9ec26f3 Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Mon, 16 Mar 2020 17:21:11 +0100 Subject: [PATCH] avoid parallel pulls of the same image To suit the needs of OpenShift CI [1], we need to avoid parallel pulls. While we have an open PR against containers/image [2] to detect parallel copy operations across processes, we are still far from getting this merged; it must be split into smaller chunks and we plan to tackle in the next few months but who knows how long this might take. To support that in CRI-O, however, we enjoy the benefit of running a daemon where we can easily avoid redundantly pulling the same image in parallel as we're in the same process space and can do all synchronisation in memory. [1] https://bugzilla.redhat.com/show_bug.cgi?id=1785390 [2] https://github.com/containers/image/pull/611 Signed-off-by: Valentin Rothberg --- server/image_pull.go | 74 +++++++++++++++++++++++++++++++++++++------- server/server.go | 41 ++++++++++++++++++++---- 2 files changed, 97 insertions(+), 18 deletions(-) diff --git a/server/image_pull.go b/server/image_pull.go index 3e1069c35503..5d159fb75512 100644 --- a/server/image_pull.go +++ b/server/image_pull.go @@ -12,6 +12,7 @@ import ( "github.com/cri-o/cri-o/internal/log" "github.com/cri-o/cri-o/internal/storage" "github.com/cri-o/cri-o/server/metrics" + "github.com/pkg/errors" "golang.org/x/net/context" pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" ) @@ -27,38 +28,87 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp image = img.Image } - sourceCtx := *s.config.SystemContext // A shallow copy we can modify + pullArgs := pullArguments{image: image} if req.GetAuth() != nil { username := req.GetAuth().Username password := req.GetAuth().Password if req.GetAuth().Auth != "" { username, password, err = decodeDockerAuth(req.GetAuth().Auth) if err != nil { - log.Debugf(ctx, "error decoding authentication for image %s: %v", img, err) + log.Debugf(ctx, "error decoding authentication for image %s: %v", image, err) return nil, err } } // Specifying a username indicates the user intends to send authentication to the registry. if username != "" { - sourceCtx.DockerAuthConfig = &types.DockerAuthConfig{ + pullArgs.credentials = types.DockerAuthConfig{ Username: username, Password: password, } } } + // We use the server's pullOperationsInProgress to record which images are + // currently being pulled. This allows for avoiding pulling the same image + // in parallel. Hence, if a given image is currently being pulled, we queue + // into the pullOperation's waitgroup and wait for the pulling goroutine to + // unblock us and re-use its results. + pullOp, pullInProcess := func(pullArgs *pullArguments) (pullOp *pullOperation, inProgress bool) { + s.pullOperationsLock.Lock() + defer s.pullOperationsLock.Unlock() + pullOp, inProgress = s.pullOperationsInProgress[*pullArgs] + if !inProgress { + pullOp = &pullOperation{} + s.pullOperationsInProgress[*pullArgs] = pullOp + pullOp.wg.Add(1) + } + return pullOp, inProgress + }(&pullArgs) + + if !pullInProcess { + pullOp.err = errors.New("pullImage was aborted by a Go panic") + defer func() { + s.pullOperationsLock.Lock() + delete(s.pullOperationsInProgress, pullArgs) + pullOp.wg.Done() + s.pullOperationsLock.Unlock() + }() + pullOp.imageRef, pullOp.err = s.pullImage(ctx, &pullArgs) + } else { + // Wait for the pull operation to finish. + pullOp.wg.Wait() + } + + if pullOp.err != nil { + return nil, pullOp.err + } + + resp = &pb.PullImageResponse{ + ImageRef: pullOp.imageRef, + } + return resp, nil +} + +// pullImage performs the actual pull operation of PullImage. Used to separate +// the pull implementation from the pullCache logic in PullImage and improve +// readability and maintainability. +func (s *Server) pullImage(ctx context.Context, pullArgs *pullArguments) (string, error) { + var err error + sourceCtx := *s.config.SystemContext // A shallow copy we can modify + sourceCtx.DockerAuthConfig = &pullArgs.credentials + decryptConfig, err := getDecryptionKeys(s.config.DecryptionKeysPath) if err != nil { - return nil, err + return "", err } var ( images []string pulled string ) - images, err = s.StorageImageServer().ResolveNames(s.config.SystemContext, image) + images, err = s.StorageImageServer().ResolveNames(s.config.SystemContext, pullArgs.image) if err != nil { - return nil, err + return "", err } for _, img := range images { var tmpImg types.ImageCloser @@ -159,21 +209,21 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp pulled = img break } + if pulled == "" && err != nil { - return nil, err + return "", err } + status, err := s.StorageImageServer().ImageStatus(s.config.SystemContext, pulled) if err != nil { - return nil, err + return "", err } imageRef := status.ID if len(status.RepoDigests) > 0 { imageRef = status.RepoDigests[0] } - resp = &pb.PullImageResponse{ - ImageRef: imageRef, - } - return resp, nil + + return imageRef, nil } func decodeDockerAuth(s string) (user, password string, err error) { diff --git a/server/server.go b/server/server.go index 919fb1a8340d..8c924cced70a 100644 --- a/server/server.go +++ b/server/server.go @@ -16,6 +16,7 @@ import ( "sync" "time" + "github.com/containers/image/v5/types" "github.com/containers/storage/pkg/idtools" "github.com/cri-o/cri-o/internal/lib" "github.com/cri-o/cri-o/internal/lib/sandbox" @@ -61,6 +62,33 @@ type Server struct { defaultIDMappings *idtools.IDMappings updateLock sync.RWMutex + + // pullOperationsInProgress is used to avoid pulling the same image in parallel. Goroutines + // will block on the pullResult. + pullOperationsInProgress map[pullArguments]*pullOperation + // pullOperationsLock is used to synchronize pull operations. + pullOperationsLock sync.Mutex +} + +// pullArguments are used to identify a pullOperation via an input image name and +// possibly specified credentials. +type pullArguments struct { + image string + credentials types.DockerAuthConfig +} + +// pullOperation is used to synchronize parallel pull operations via the +// server's pullCache. Goroutines can block the pullOperation's waitgroup and +// be released once the pull operation has finished. +type pullOperation struct { + // wg allows for Goroutines trying to pull the same image to wait until the + // currently running pull operation has finished. + wg sync.WaitGroup + // imageRef is the reference of the actually pulled image which will differ + // from the input if it was a short name (e.g., alpine). + imageRef string + // err is the error indicating if the pull operation has succeeded or not. + err error } type certConfigCache struct { @@ -324,12 +352,13 @@ func New( } s := &Server{ - ContainerServer: containerServer, - netPlugin: netPlugin, - hostportManager: hostportManager, - config: *config, - monitorsChan: make(chan struct{}), - defaultIDMappings: idMappings, + ContainerServer: containerServer, + netPlugin: netPlugin, + hostportManager: hostportManager, + config: *config, + monitorsChan: make(chan struct{}), + defaultIDMappings: idMappings, + pullOperationsInProgress: make(map[pullArguments]*pullOperation), } if err := configureMaxThreads(); err != nil {