Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(registry/remote): implement Mount #500

Merged
merged 1 commit into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 91 additions & 6 deletions registry/remote/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,19 @@ func (r *Repository) Push(ctx context.Context, expected ocispec.Descriptor, cont
return r.blobStore(expected).Push(ctx, expected, content)
}

// Mount makes the blob with the given digest in fromRepo
// available in the repository signified by the receiver.
//
// This avoids the need to pull content down from fromRepo only to push it to r.
//
// If the registry does not implement mounting, getContent will be used to get the
// content to push. If getContent is nil, the content will be pulled from the source
// repository. If getContent returns an error, it will be wrapped inside the error
// returned from Mount.
func (r *Repository) Mount(ctx context.Context, desc ocispec.Descriptor, fromRepo string, getContent func() (io.ReadCloser, error)) error {
return r.Blobs().(registry.Mounter).Mount(ctx, desc, fromRepo, getContent)
}

// Exists returns true if the described content exists.
func (r *Repository) Exists(ctx context.Context, target ocispec.Descriptor) (bool, error) {
return r.blobStore(target).Exists(ctx, target)
Expand Down Expand Up @@ -660,6 +673,73 @@ func (s *blobStore) Fetch(ctx context.Context, target ocispec.Descriptor) (rc io
}
}

// Mount mounts the given descriptor from fromRepo into s.
func (s *blobStore) Mount(ctx context.Context, desc ocispec.Descriptor, fromRepo string, getContent func() (io.ReadCloser, error)) error {
// pushing usually requires both pull and push actions.
// Reference: https://github.com/distribution/distribution/blob/v2.7.1/registry/handlers/app.go#L921-L930
ctx = registryutil.WithScopeHint(ctx, s.repo.Reference, auth.ActionPull, auth.ActionPush)

// We also need pull access to the source repo.
fromRef := s.repo.Reference
fromRef.Repository = fromRepo
ctx = registryutil.WithScopeHint(ctx, fromRef, auth.ActionPull)

url := buildRepositoryBlobMountURL(s.repo.PlainHTTP, s.repo.Reference, desc.Digest, fromRepo)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil)
if err != nil {
return err
}
resp, err := s.repo.client().Do(req)
if err != nil {
return err
}
if resp.StatusCode == http.StatusCreated {
defer resp.Body.Close()
// Check the server seems to be behaving.
return verifyContentDigest(resp, desc.Digest)
}
if resp.StatusCode != http.StatusAccepted {
defer resp.Body.Close()
return errutil.ParseErrorResponse(resp)
}
resp.Body.Close()
// From the [spec]:
//
// "If a registry does not support cross-repository mounting
// or is unable to mount the requested blob,
// it SHOULD return a 202.
// This indicates that the upload session has begun
// and that the client MAY proceed with the upload."
//
rogpeppe marked this conversation as resolved.
Show resolved Hide resolved
// So we need to get the content from somewhere in order to
// push it. If the caller has provided a getContent function, we
// can use that, otherwise pull the content from the source repository.
//
// [spec]: https://github.com/opencontainers/distribution-spec/blob/main/spec.md#mounting-a-blob-from-another-repository

var r io.ReadCloser
if getContent != nil {
r, err = getContent()
} else {
r, err = s.sibling(fromRepo).Fetch(ctx, desc)
}
if err != nil {
return fmt.Errorf("cannot read source blob: %w", err)
}
defer r.Close()
return s.completePushAfterInitialPost(ctx, req, resp, desc, r)
}

// sibling returns a blob store for another repository in the same
// registry.
func (s *blobStore) sibling(otherRepoName string) *blobStore {
otherRepo := *s.repo
otherRepo.Reference.Repository = otherRepoName
return &blobStore{
repo: &otherRepo,
}
}

// Push pushes the content, matching the expected descriptor.
// Existing content is not checked by Push() to minimize the number of out-going
// requests.
Expand All @@ -680,11 +760,8 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte
if err != nil {
return err
}
reqHostname := req.URL.Hostname()
reqPort := req.URL.Port()

client := s.repo.client()
resp, err := client.Do(req)
resp, err := s.repo.client().Do(req)
if err != nil {
return err
}
Expand All @@ -694,7 +771,15 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte
return errutil.ParseErrorResponse(resp)
}
resp.Body.Close()
return s.completePushAfterInitialPost(ctx, req, resp, expected, content)
}

// completePushAfterInitialPost implements step 2 of the push protocol. This can be invoked either by
// Push or by Mount when the receiving repository does not implement the
// mount endpoint.
func (s *blobStore) completePushAfterInitialPost(ctx context.Context, req *http.Request, resp *http.Response, expected ocispec.Descriptor, content io.Reader) error {
reqHostname := req.URL.Hostname()
reqPort := req.URL.Port()
// monolithic upload
location, err := resp.Location()
if err != nil {
Expand All @@ -711,7 +796,7 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte
if reqPort == "443" && locationHostname == reqHostname && locationPort == "" {
location.Host = locationHostname + ":" + reqPort
}
url = location.String()
url := location.String()
req, err = http.NewRequestWithContext(ctx, http.MethodPut, url, content)
if err != nil {
return err
Expand All @@ -731,7 +816,7 @@ func (s *blobStore) Push(ctx context.Context, expected ocispec.Descriptor, conte
if auth := resp.Request.Header.Get("Authorization"); auth != "" {
req.Header.Set("Authorization", auth)
}
resp, err = client.Do(req)
resp, err = s.repo.client().Do(req)
if err != nil {
return err
}
Expand Down
Loading