Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote.WithJobs and use it in remote.MultiWrite #803

Merged
merged 1 commit into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/v1/remote/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ import (
"github.com/google/go-containerregistry/pkg/v1/v1util"
)

var defaultPlatform = v1.Platform{
Architecture: "amd64",
OS: "linux",
}

// ErrSchema1 indicates that we received a schema1 manifest from the registry.
// This library doesn't have plans to support this legacy image format:
// https://github.com/google/go-containerregistry/issues/377
Expand Down
12 changes: 4 additions & 8 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (
"golang.org/x/sync/errgroup"
)

// Parallelism of blob and manifest uploads
// TODO(jasonhall): Make this an Option.
const jobs = 4

// MultiWrite writes the given Images or ImageIndexes to the given refs, as
// efficiently as possible, by deduping shared layer blobs and uploading layers
// in parallel, then uploading all manifests in parallel.
Expand Down Expand Up @@ -96,9 +92,9 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
}

// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*jobs)
blobChan := make(chan v1.Layer, 2*o.jobs)
var g errgroup.Group
for i := 0; i < jobs; i++ {
for i := 0; i < o.jobs; i++ {
// Start N workers consuming blobs to upload.
g.Go(func() error {
for b := range blobChan {
Expand Down Expand Up @@ -126,8 +122,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
i Taggable
ref name.Reference
}
taskChan := make(chan task, 2*jobs)
for i := 0; i < jobs; i++ {
taskChan := make(chan task, 2*o.jobs)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming tasks to upload manifests.
g.Go(func() error {
for t := range taskChan {
Expand Down
25 changes: 25 additions & 0 deletions pkg/v1/remote/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package remote

import (
"context"
"errors"
"net/http"

"github.com/google/go-containerregistry/pkg/authn"
Expand All @@ -33,14 +34,23 @@ type options struct {
transport http.RoundTripper
platform v1.Platform
context context.Context
jobs int
}

var defaultPlatform = v1.Platform{
Architecture: "amd64",
OS: "linux",
}

const defaultJobs = 4
imjasonh marked this conversation as resolved.
Show resolved Hide resolved

func makeOptions(target authn.Resource, opts ...Option) (*options, error) {
o := &options{
auth: authn.Anonymous,
transport: http.DefaultTransport,
platform: defaultPlatform,
context: context.Background(),
jobs: defaultJobs,
}

for _, option := range opts {
Expand Down Expand Up @@ -131,3 +141,18 @@ func WithContext(ctx context.Context) Option {
return nil
}
}

// WithJobs is a functional option for setting the parallelism of remote
// operations performed by a given function. Note that not all remote
// operations support parallelism.
//
// The default value is 4.
func WithJobs(jobs int) Option {
return func(o *options) error {
if jobs <= 0 {
return errors.New("jobs must be greater than zero")
}
o.jobs = jobs
return nil
}
}
2 changes: 2 additions & 0 deletions pkg/v1/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func Write(ref name.Reference, img v1.Image, options ...Option) error {
uploaded[h] = true
}

// TODO(#803): Pipe through remote.WithJobs and upload these in parallel.
g.Go(func() error {
return w.uploadOne(l)
})
Expand Down Expand Up @@ -501,6 +502,7 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) error {
context: o.context,
}

// TODO(#803): Pipe through remote.WithJobs and upload these in parallel.
for _, desc := range index.Manifests {
ref := ref.Context().Digest(desc.Digest.String())
exists, err := w.checkExistingManifest(desc.Digest, desc.MediaType)
Expand Down