From 7e43cbb7a2e9324d6c25c61237fccdb0215d6d48 Mon Sep 17 00:00:00 2001 From: Jason Hall Date: Mon, 2 Nov 2020 09:30:38 -0500 Subject: [PATCH] Add remote.WithJobs and use it in remote.MultiWrite - default parallel is 4 - an error is returned if WithParallel is called with a value <= 0 We can probably reuse this to fetch and push blobs elsewhere in remote. MultiWrite'ing 123 distroless images with WithJobs(100) took 4s compared to 11s for the default. --- pkg/v1/remote/descriptor.go | 5 ----- pkg/v1/remote/multi_write.go | 12 ++++-------- pkg/v1/remote/options.go | 25 +++++++++++++++++++++++++ pkg/v1/remote/write.go | 2 ++ 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/pkg/v1/remote/descriptor.go b/pkg/v1/remote/descriptor.go index d8dbeed65..7214c8934 100644 --- a/pkg/v1/remote/descriptor.go +++ b/pkg/v1/remote/descriptor.go @@ -34,11 +34,6 @@ import ( "github.com/google/go-containerregistry/pkg/v1/v1util" ) -var defaultPlatform = v1.Platform{ - Architecture: "amd64", - OS: "linux", -} - // ErrSchema1 indicates that we received a schema1 manifest from the registry. // This library doesn't have plans to support this legacy image format: // https://github.com/google/go-containerregistry/issues/377 diff --git a/pkg/v1/remote/multi_write.go b/pkg/v1/remote/multi_write.go index 769fee8cb..259348d23 100644 --- a/pkg/v1/remote/multi_write.go +++ b/pkg/v1/remote/multi_write.go @@ -26,10 +26,6 @@ import ( "golang.org/x/sync/errgroup" ) -// Parallelism of blob and manifest uploads -// TODO(jasonhall): Make this an Option. -const jobs = 4 - // MultiWrite writes the given Images or ImageIndexes to the given refs, as // efficiently as possible, by deduping shared layer blobs and uploading layers // in parallel, then uploading all manifests in parallel. @@ -96,9 +92,9 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error { } // Upload individual blobs and collect any errors. - blobChan := make(chan v1.Layer, 2*jobs) + blobChan := make(chan v1.Layer, 2*o.jobs) var g errgroup.Group - for i := 0; i < jobs; i++ { + for i := 0; i < o.jobs; i++ { // Start N workers consuming blobs to upload. g.Go(func() error { for b := range blobChan { @@ -126,8 +122,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error { i Taggable ref name.Reference } - taskChan := make(chan task, 2*jobs) - for i := 0; i < jobs; i++ { + taskChan := make(chan task, 2*o.jobs) + for i := 0; i < o.jobs; i++ { // Start N workers consuming tasks to upload manifests. g.Go(func() error { for t := range taskChan { diff --git a/pkg/v1/remote/options.go b/pkg/v1/remote/options.go index 22b08a727..7c008d763 100644 --- a/pkg/v1/remote/options.go +++ b/pkg/v1/remote/options.go @@ -16,6 +16,7 @@ package remote import ( "context" + "errors" "net/http" "github.com/google/go-containerregistry/pkg/authn" @@ -33,14 +34,23 @@ type options struct { transport http.RoundTripper platform v1.Platform context context.Context + jobs int } +var defaultPlatform = v1.Platform{ + Architecture: "amd64", + OS: "linux", +} + +const defaultJobs = 4 + func makeOptions(target authn.Resource, opts ...Option) (*options, error) { o := &options{ auth: authn.Anonymous, transport: http.DefaultTransport, platform: defaultPlatform, context: context.Background(), + jobs: defaultJobs, } for _, option := range opts { @@ -131,3 +141,18 @@ func WithContext(ctx context.Context) Option { return nil } } + +// WithJobs is a functional option for setting the parallelism of remote +// operations performed by a given function. Note that not all remote +// operations support parallelism. +// +// The default value is 4. +func WithJobs(jobs int) Option { + return func(o *options) error { + if jobs <= 0 { + return errors.New("jobs must be greater than zero") + } + o.jobs = jobs + return nil + } +} diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index 43006fe20..3115e5504 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -92,6 +92,7 @@ func Write(ref name.Reference, img v1.Image, options ...Option) error { uploaded[h] = true } + // TODO(#803): Pipe through remote.WithJobs and upload these in parallel. g.Go(func() error { return w.uploadOne(l) }) @@ -501,6 +502,7 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) error { context: o.context, } + // TODO(#803): Pipe through remote.WithJobs and upload these in parallel. for _, desc := range index.Manifests { ref := ref.Context().Digest(desc.Digest.String()) exists, err := w.checkExistingManifest(desc.Digest, desc.MediaType)