Skip to content

Commit

Permalink
Add remote.WithJobs and use it in remote.MultiWrite
Browse files Browse the repository at this point in the history
- default parallel is 4
- an error is returned if WithParallel is called with a value <= 0

We can probably reuse this to fetch and push blobs elsewhere in remote.

MultiWrite'ing 123 distroless images with WithJobs(100) took 4s
compared to 11s for the default.
  • Loading branch information
imjasonh committed Nov 2, 2020
1 parent ab3252b commit 2265d79
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
12 changes: 4 additions & 8 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (
"golang.org/x/sync/errgroup"
)

// Parallelism of blob and manifest uploads
// TODO(jasonhall): Make this an Option.
const jobs = 4

// MultiWrite writes the given Images or ImageIndexes to the given refs, as
// efficiently as possible, by deduping shared layer blobs and uploading layers
// in parallel, then uploading all manifests in parallel.
Expand Down Expand Up @@ -96,9 +92,9 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
}

// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*jobs)
blobChan := make(chan v1.Layer, 2*o.jobs)
var g errgroup.Group
for i := 0; i < jobs; i++ {
for i := 0; i < o.jobs; i++ {
// Start N workers consuming blobs to upload.
g.Go(func() error {
for b := range blobChan {
Expand Down Expand Up @@ -126,8 +122,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
i Taggable
ref name.Reference
}
taskChan := make(chan task, 2*jobs)
for i := 0; i < jobs; i++ {
taskChan := make(chan task, 2*o.jobs)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming tasks to upload manifests.
g.Go(func() error {
for t := range taskChan {
Expand Down
20 changes: 20 additions & 0 deletions pkg/v1/remote/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package remote

import (
"context"
"errors"
"net/http"

"github.com/google/go-containerregistry/pkg/authn"
Expand All @@ -33,14 +34,18 @@ type options struct {
transport http.RoundTripper
platform v1.Platform
context context.Context
jobs int
}

const defaultJobs = 4

func makeOptions(target authn.Resource, opts ...Option) (*options, error) {
o := &options{
auth: authn.Anonymous,
transport: http.DefaultTransport,
platform: defaultPlatform,
context: context.Background(),
jobs: defaultJobs,
}

for _, option := range opts {
Expand Down Expand Up @@ -131,3 +136,18 @@ func WithContext(ctx context.Context) Option {
return nil
}
}

// WithJobs is a functional option for setting the parallelism of remote
// operations performed by a given function. Note that not all remote
// operations support parallelism.
//
// The default value is 4.
func WithJobs(jobs int) Option {
return func(o *options) error {
if jobs <= 0 {
return errors.New("jobs must be greater than zero")
}
o.jobs = jobs
return nil
}
}

0 comments on commit 2265d79

Please sign in to comment.