diff --git a/server/image_pull.go b/server/image_pull.go index cc586868cb1..b5b8e4709e3 100644 --- a/server/image_pull.go +++ b/server/image_pull.go @@ -13,6 +13,7 @@ import ( "github.com/cri-o/cri-o/internal/pkg/log" "github.com/cri-o/cri-o/internal/pkg/storage" "github.com/cri-o/cri-o/server/metrics" + "github.com/pkg/errors" "golang.org/x/net/context" pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" ) @@ -26,31 +27,82 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp image = img.Image } - sourceCtx := *s.systemContext // A shallow copy we can modify + pullArgs := pullArguments{image: image} if req.GetAuth() != nil { username := req.GetAuth().Username password := req.GetAuth().Password if req.GetAuth().Auth != "" { username, password, err = decodeDockerAuth(req.GetAuth().Auth) if err != nil { - log.Debugf(ctx, "error decoding authentication for image %s: %v", img, err) + log.Debugf(ctx, "error decoding authentication for image %s: %v", image, err) return nil, err } } // Specifying a username indicates the user intends to send authentication to the registry. if username != "" { - sourceCtx.DockerAuthConfig = &types.DockerAuthConfig{ + pullArgs.credentials = types.DockerAuthConfig{ Username: username, Password: password, } } } + // We use the server's pullOperationsInProgress to record which images are + // currently being pulled. This allows for avoiding pulling the same image + // in parallel. Hence, if a given image is currently being pulled, we queue + // into the pullOperation's waitgroup and wait for the pulling goroutine to + // unblock us and re-use its results. + pullOp, pullInProcess := func() (pullOp *pullOperation, inProgress bool) { + s.pullOperationsLock.Lock() + defer s.pullOperationsLock.Unlock() + pullOp, inProgress = s.pullOperationsInProgress[pullArgs] + if !inProgress { + pullOp = &pullOperation{} + s.pullOperationsInProgress[pullArgs] = pullOp + pullOp.wg.Add(1) + } + return pullOp, inProgress + }() + + if !pullInProcess { + pullOp.err = errors.New("pullImage was aborted by a Go panic") + defer func() { + s.pullOperationsLock.Lock() + delete(s.pullOperationsInProgress, pullArgs) + pullOp.wg.Done() + s.pullOperationsLock.Unlock() + }() + pullOp.imageRef, pullOp.err = s.pullImage(ctx, &pullArgs) + } else { + // Wait for the pull operation to finish. + pullOp.wg.Wait() + } + + if pullOp.err != nil { + return nil, pullOp.err + } + + resp = &pb.PullImageResponse{ + ImageRef: pullOp.imageRef, + } + return resp, nil +} + +// pullImage performs the actual pull operation of PullImage. Used to separate +// the pull implementation from the pullCache logic in PullImage and improve +// readability and maintainability. +func (s *Server) pullImage(ctx context.Context, pullArgs *pullArguments) (string, error) { + var err error + sourceCtx := *s.systemContext // A shallow copy we can modify + if pullArgs.credentials.Username != "" { + sourceCtx.DockerAuthConfig = &pullArgs.credentials + } + var dcc *encconfig.DecryptConfig if _, err := os.Stat(s.decryptionKeysPath); err == nil { cc, err := getDecryptionKeys(s.decryptionKeysPath) if err != nil { - return nil, err + return "", err } dcc = cc.DecryptConfig } @@ -59,9 +111,9 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp images []string pulled string ) - images, err = s.StorageImageServer().ResolveNames(s.systemContext, image) + images, err = s.StorageImageServer().ResolveNames(s.systemContext, pullArgs.image) if err != nil { - return nil, err + return "", err } for _, img := range images { var tmpImg types.ImageCloser @@ -150,21 +202,20 @@ func (s *Server) PullImage(ctx context.Context, req *pb.PullImageRequest) (resp pulled = img break } + if pulled == "" && err != nil { - return nil, err + return "", err } status, err := s.StorageImageServer().ImageStatus(s.systemContext, pulled) if err != nil { - return nil, err + return "", err } imageRef := status.ID if len(status.RepoDigests) > 0 { imageRef = status.RepoDigests[0] } - resp = &pb.PullImageResponse{ - ImageRef: imageRef, - } - return resp, nil + + return imageRef, nil } func decodeDockerAuth(s string) (user, password string, err error) { diff --git a/server/server.go b/server/server.go index 8884f4fd735..ed9ac3de80b 100644 --- a/server/server.go +++ b/server/server.go @@ -77,6 +77,32 @@ type Server struct { seccompEnabled bool appArmorEnabled bool + // pullOperationsInProgress is used to avoid pulling the same image in parallel. Goroutines + // will block on the pullResult. + pullOperationsInProgress map[pullArguments]*pullOperation + // pullOperationsLock is used to synchronize pull operations. + pullOperationsLock sync.Mutex +} + +// pullArguments are used to identify a pullOperation via an input image name and +// possibly specified credentials. +type pullArguments struct { + image string + credentials types.DockerAuthConfig +} + +// pullOperation is used to synchronize parallel pull operations via the +// server's pullCache. Goroutines can block the pullOperation's waitgroup and +// be released once the pull operation has finished. +type pullOperation struct { + // wg allows for Goroutines trying to pull the same image to wait until the + // currently running pull operation has finished. + wg sync.WaitGroup + // imageRef is the reference of the actually pulled image which will differ + // from the input if it was a short name (e.g., alpine). + imageRef string + // err is the error indicating if the pull operation has succeeded or not. + err error } type certConfigCache struct { @@ -350,16 +376,17 @@ func New( } s := &Server{ - ContainerServer: containerServer, - netPlugin: netPlugin, - hostportManager: hostportManager, - config: *config, - seccompEnabled: seccomp.IsEnabled(), - appArmorEnabled: apparmor.IsEnabled(), - appArmorProfile: config.ApparmorProfile, - monitorsChan: make(chan struct{}), - defaultIDMappings: idMappings, - systemContext: systemContext, + ContainerServer: containerServer, + netPlugin: netPlugin, + hostportManager: hostportManager, + config: *config, + seccompEnabled: seccomp.IsEnabled(), + appArmorEnabled: apparmor.IsEnabled(), + appArmorProfile: config.ApparmorProfile, + monitorsChan: make(chan struct{}), + defaultIDMappings: idMappings, + systemContext: systemContext, + pullOperationsInProgress: make(map[pullArguments]*pullOperation), } s.decryptionKeysPath = config.DecryptionKeysPath