Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add WithMount method to support cross repository blob mounting #632

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,66 @@
FindSuccessors func(ctx context.Context, fetcher content.Fetcher, desc ocispec.Descriptor) ([]ocispec.Descriptor, error)
}

// WithMount enabled cross repository blob mounting.
// sourceReference is the repository to use for mounting (the mount point).
// mounter is the destination for the mount (a well-known implementation of this is *registry.Repository representing the target).
// onMounted is called (if provided) when the blob is mounted.
// The original PreCopy hook is called only on copy, and therefore not when the blob is mounted.
func (opts *CopyGraphOptions) WithMount(sourceRepository string, mounter registry.Mounter, onMounted func(context.Context, ocispec.Descriptor) error) {
preCopy := opts.PreCopy
opts.PreCopy = func(ctx context.Context, desc ocispec.Descriptor) error {
// Only care to mount blobs
if descriptor.IsManifest(desc) {
// still want to call PreCopy if it is a manifest
if preCopy != nil {
return preCopy(ctx, desc)
}
return nil

Check warning on line 131 in copy.go

View check run for this annotation

Codecov / codecov/patch

copy.go#L131

Added line #L131 was not covered by tests
}

var mountFailed bool
getContent := func() (io.ReadCloser, error) {
// call the original PreCopy function if it exists
if preCopy != nil {
if err := preCopy(ctx, desc); err != nil {
return nil, err
}

Check warning on line 140 in copy.go

View check run for this annotation

Codecov / codecov/patch

copy.go#L139-L140

Added lines #L139 - L140 were not covered by tests
}
// the invocation of getContent indicates that mounting has failed
mountFailed = true

// To avoid needing a content.Fetcher as an input argument we simply fall back to the default behavior
// as if getContent was nil
return nil, registry.UseSourceRepository
}

// Mount or copy
if err := mounter.Mount(ctx, desc, sourceRepository, getContent); err != nil {
return err
}

Check warning on line 153 in copy.go

View check run for this annotation

Codecov / codecov/patch

copy.go#L152-L153

Added lines #L152 - L153 were not covered by tests

if !mountFailed {
// mounted
if onMounted != nil {
if err := onMounted(ctx, desc); err != nil {
return err
}

Check warning on line 160 in copy.go

View check run for this annotation

Codecov / codecov/patch

copy.go#L159-L160

Added lines #L159 - L160 were not covered by tests
}
// signal that the descriptor now exists
return SkipNode
}

// we copied it
if opts.PostCopy != nil {
if err := opts.PostCopy(ctx, desc); err != nil {
return err
}

Check warning on line 170 in copy.go

View check run for this annotation

Codecov / codecov/patch

copy.go#L169-L170

Added lines #L169 - L170 were not covered by tests
}
// signal that the descriptor now exists
return SkipNode
}
}

// Copy copies a rooted directed acyclic graph (DAG) with the tagged root node
// in the source Target to the destination Target.
// The destination reference will be the same as the source reference if the
Expand Down
145 changes: 144 additions & 1 deletion copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"oras.land/oras-go/v2/internal/cas"
"oras.land/oras-go/v2/internal/docker"
"oras.land/oras-go/v2/internal/spec"
"oras.land/oras-go/v2/registry"
)

// storageTracker tracks storage API counts.
Expand Down Expand Up @@ -1471,11 +1472,143 @@ func TestCopyGraph_WithOptions(t *testing.T) {
t.Errorf("count(Push()) = %d, want %d", got, expected)
}
})

t.Run("WithMount_Mounted", func(t *testing.T) {
root = descs[6]
dst := &countingStorage{storage: cas.NewMemory()}
var numOnMounted atomic.Int64
m := mounter(func(ctx context.Context,
desc ocispec.Descriptor,
fromRepo string,
getContent func() (io.ReadCloser, error),
) error {
if expected := "source"; fromRepo != expected {
t.Fatalf("fromRepo = %v, want %v", fromRepo, expected)
}
rc, err := src.Fetch(ctx, desc)
if err != nil {
t.Fatalf("Failed to fetch content: %v", err)
}
defer rc.Close()
err = dst.storage.Push(ctx, desc, rc) // bypass the counters
if err != nil {
t.Fatalf("Failed to push content: %v", err)
}
return nil
})
opts = oras.CopyGraphOptions{}
var numPreCopy, numPostCopy atomic.Int64
opts.PreCopy = func(ctx context.Context, desc ocispec.Descriptor) error {
numPreCopy.Add(1)
return nil
}
opts.PostCopy = func(ctx context.Context, desc ocispec.Descriptor) error {
numPostCopy.Add(1)
return nil
}
opts.WithMount("source", m, func(ctx context.Context, d ocispec.Descriptor) error {
numOnMounted.Add(1)
return nil
})
if err := oras.CopyGraph(ctx, src, dst, root, opts); err != nil {
t.Fatalf("CopyGraph() error = %v, wantErr %v", err, errdef.ErrSizeExceedsLimit)
}

if got, expected := dst.numExists.Load(), int64(7); got != expected {
t.Errorf("count(Exists()) = %d, want %d", got, expected)
}
if got, expected := dst.numFetch.Load(), int64(0); got != expected {
t.Errorf("count(Fetch()) = %d, want %d", got, expected)
}
// 7 (exists) - 1 (skipped) = 6 pushes expected
if got, expected := dst.numPush.Load(), int64(3); got != expected {
// If we get >=7 then ErrSkipDesc did not short circuit the push like it is supposed to do.
t.Errorf("count(Push()) = %d, want %d", got, expected)
}
if got, expected := numOnMounted.Load(), int64(4); got != expected {
t.Errorf("count(onMounted()) = %d, want %d", got, expected)
}
if got, expected := numPreCopy.Load(), int64(3); got != expected {
t.Errorf("count(PreCopy()) = %d, want %d", got, expected)
}
if got, expected := numPostCopy.Load(), int64(3); got != expected {
t.Errorf("count(PostCopy()) = %d, want %d", got, expected)
}
})

t.Run("WithMount_Copied", func(t *testing.T) {
root = descs[6]
dst := &countingStorage{storage: cas.NewMemory()}
var numOnMounted atomic.Int64
m := mounter(func(ctx context.Context,
desc ocispec.Descriptor,
fromRepo string,
getContent func() (io.ReadCloser, error),
) error {
if expected := "source"; fromRepo != expected {
t.Fatalf("fromRepo = %v, want %v", fromRepo, expected)
}

_, err := getContent()
if !errors.Is(err, registry.UseSourceRepository) {
t.Fatalf("Expected error %v", registry.UseSourceRepository)
}
rc, err := src.Fetch(ctx, desc)
if err != nil {
t.Fatalf("Failed to fetch content: %v", err)
}
defer rc.Close()
err = dst.storage.Push(ctx, desc, rc) // bypass the counters
if err != nil {
t.Fatalf("Failed to push content: %v", err)
}
return nil
})
opts = oras.CopyGraphOptions{}
var numPreCopy, numPostCopy atomic.Int64
opts.PreCopy = func(ctx context.Context, desc ocispec.Descriptor) error {
numPreCopy.Add(1)
return nil
}
opts.PostCopy = func(ctx context.Context, desc ocispec.Descriptor) error {
numPostCopy.Add(1)
return nil
}
opts.WithMount("source", m, func(ctx context.Context, d ocispec.Descriptor) error {
numOnMounted.Add(1)
return nil
})
if err := oras.CopyGraph(ctx, src, dst, root, opts); err != nil {
t.Fatalf("CopyGraph() error = %v, wantErr %v", err, errdef.ErrSizeExceedsLimit)
}

if got, expected := dst.numExists.Load(), int64(7); got != expected {
t.Errorf("count(Exists()) = %d, want %d", got, expected)
}
if got, expected := dst.numFetch.Load(), int64(0); got != expected {
t.Errorf("count(Fetch()) = %d, want %d", got, expected)
}
// 7 (exists) - 1 (skipped) = 6 pushes expected
if got, expected := dst.numPush.Load(), int64(3); got != expected {
// If we get >=7 then ErrSkipDesc did not short circuit the push like it is supposed to do.
t.Errorf("count(Push()) = %d, want %d", got, expected)
}
if got, expected := numOnMounted.Load(), int64(0); got != expected {
t.Errorf("count(onMounted()) = %d, want %d", got, expected)
}
if got, expected := numPreCopy.Load(), int64(7); got != expected {
t.Errorf("count(PreCopy()) = %d, want %d", got, expected)
}
if got, expected := numPostCopy.Load(), int64(7); got != expected {
t.Errorf("count(PostCopy()) = %d, want %d", got, expected)
}
})
}

// countingStorage counts the calls to its content.Storage methods
type countingStorage struct {
storage content.Storage
storage content.Storage

numExists, numFetch, numPush atomic.Int64
}

Expand All @@ -1494,6 +1627,16 @@ func (cs *countingStorage) Push(ctx context.Context, target ocispec.Descriptor,
return cs.storage.Push(ctx, target, r)
}

type mounter func(context.Context, ocispec.Descriptor, string, func() (io.ReadCloser, error)) error

func (m mounter) Mount(ctx context.Context,
desc ocispec.Descriptor,
fromRepo string,
getContent func() (io.ReadCloser, error),
) error {
return m(ctx, desc, fromRepo, getContent)
}

func TestCopyGraph_WithConcurrencyLimit(t *testing.T) {
src := cas.NewMemory()
// generate test content
Expand Down
32 changes: 32 additions & 0 deletions example_copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"oras.land/oras-go/v2/content/memory"
"oras.land/oras-go/v2/content/oci"
"oras.land/oras-go/v2/internal/spec"
"oras.land/oras-go/v2/registry"
"oras.land/oras-go/v2/registry/remote"
)

Expand Down Expand Up @@ -215,6 +216,37 @@ func ExampleCopy_remoteToRemote() {
// sha256:7cbb44b44e8ede5a89cf193db3f5f2fd019d89697e6b87e8ed2589e60649b0d1
}

func ExampleCopy_remoteToRemoteWithMount() {
reg, err := remote.NewRegistry(remoteHost)
if err != nil {
panic(err) // Handle error
}
ctx := context.Background()
src, err := reg.Repository(ctx, "source")
if err != nil {
panic(err) // Handle error
}
dst, err := reg.Repository(ctx, "target")
if err != nil {
panic(err) // Handle error
}

tagName := "latest"

opts := oras.CopyOptions{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if someone reuse oras.CopyOptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CopyOptions after a WithMount would only be usable when the same source and destination are not changed. That is not ideal. A shallow copy of the oras.CopyOptions before calling WithMount() solves this problem. That is how I use this in my code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be ideal if we would reuse the CopyOptions. Currently, only PackOptions is not re-usable.

Copy link
Contributor Author

@ktarplee ktarplee Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could change the signature of the WithMount() to return a modified copy of the receiver. It would not modify the receiver so opts below could be reused. It would then be used like so:

desc, err := oras.Copy(ctx, src, tagName, dst, tagName, 
                       opts.WithMount("source", dst.(registry.Mounter), nil))

I think that would reduce the misuse and make it more obvious what it does.

// Enable cross-repository blob mounting
opts.WithMount("source", dst.(registry.Mounter), nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if other target instead of dst is passed to WithMount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say the true destination target is dst but instead they pass in dst2. The blobs would be mounted or copied to dst2 but the manifest would be copied to dst and fail due to missing blobs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid that situation in the first place with a better API design?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if someone call WithMount twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fun one. I think it would work and not produce an error (but I have not tried it). Let's call the preCopy functions preCopy1, preCopy2, preCopy3. The last one is the one created by the last call to WithMount. The first thing that would happen is the call to preCopy3 which would attempt a mount. Let's assume the mount failed, then getContent would get called (defined by preCopy3) and it would call preCopy2. preCopy2 was created by the first call to WithMount so it will try to mount the blob with that source repo. Let's assume it also fails, then getContent would call preCopy1. Then getContent (from preCopy2) would return errdef.ErrUnsupported (to be renamed) and the blob would be copied. Then getContent (from preCopy3) would return errdef.ErrUnsupported (to be renamed) and the blob would be copied again. So it seems like the blob would be copied twice with this design. Not fatal but not ideal either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a field to CopyOptions, something like tryingMount bool. In WithMount() we can check that tryingMount is false (else panic) and then set it to true. Then if the programmer makes the error to call WithMount twice they get a reasonable error.


desc, err := oras.Copy(ctx, src, tagName, dst, tagName, opts)
if err != nil {
panic(err) // Handle error
}
fmt.Println("Final", desc.Digest)

// Output:
// Final sha256:7cbb44b44e8ede5a89cf193db3f5f2fd019d89697e6b87e8ed2589e60649b0d1
}

func ExampleCopy_remoteToLocal() {
reg, err := remote.NewRegistry(remoteHost)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions registry/remote/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,10 @@ func (s *blobStore) Mount(ctx context.Context, desc ocispec.Descriptor, fromRepo
var r io.ReadCloser
if getContent != nil {
r, err = getContent()
if errors.Is(err, registry.UseSourceRepository) {
// getContent can return a ErrUnsupported to fallback to the default copy operation
r, err = s.sibling(fromRepo).Fetch(ctx, desc)
}
} else {
r, err = s.sibling(fromRepo).Fetch(ctx, desc)
}
Expand Down
41 changes: 31 additions & 10 deletions registry/remote/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,16 +421,37 @@ func TestRepository_Mount_Fallback(t *testing.T) {
repo.PlainHTTP = true
ctx := context.Background()

err = repo.Mount(ctx, blobDesc, "test", nil)
if err != nil {
t.Fatalf("Repository.Push() error = %v", err)
}
if !bytes.Equal(gotBlob, blob) {
t.Errorf("Repository.Mount() = %v, want %v", gotBlob, blob)
}
if got, want := sequence, "post get put "; got != want {
t.Errorf("unexpected request sequence; got %q want %q", got, want)
}
t.Run("getContent is nil", func(t *testing.T) {
sequence = ""

err = repo.Mount(ctx, blobDesc, "test", nil)
if err != nil {
t.Fatalf("Repository.Push() error = %v", err)
}
if !bytes.Equal(gotBlob, blob) {
t.Errorf("Repository.Mount() = %v, want %v", gotBlob, blob)
}
if got, want := sequence, "post get put "; got != want {
t.Errorf("unexpected request sequence; got %q want %q", got, want)
}
})

t.Run("getContent is UseSourceRepository", func(t *testing.T) {
sequence = ""

err = repo.Mount(ctx, blobDesc, "test", func() (io.ReadCloser, error) {
return nil, registry.UseSourceRepository
})
if err != nil {
t.Fatalf("Repository.Push() error = %v", err)
}
if !bytes.Equal(gotBlob, blob) {
t.Errorf("Repository.Mount() = %v, want %v", gotBlob, blob)
}
if got, want := sequence, "post get put "; got != want {
t.Errorf("unexpected request sequence; got %q want %q", got, want)
}
})
}

func TestRepository_Mount_Error(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions registry/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package registry
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"

Expand Down Expand Up @@ -128,6 +129,10 @@ type Mounter interface {
) error
}

// UseSourceRepository signals to Mount implementations that the content from the source repository should be used. This may be returned from the getContent function of the Mounter.Mount method.
// This is useful to avoid having to know that source repository in the definition of getContent.
var UseSourceRepository = errors.New("use source repository")

// Tags lists the tags available in the repository.
func Tags(ctx context.Context, repo TagLister) ([]string, error) {
var res []string
Expand Down
Loading