Skip to content

Commit

Permalink
Support WithJobs in remote.Write (#958)
Browse files Browse the repository at this point in the history
  • Loading branch information
imjasonh authored Mar 11, 2021
1 parent 71a6fe9 commit 2f4c6f0
Showing 1 changed file with 48 additions and 27 deletions.
75 changes: 48 additions & 27 deletions pkg/v1/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,39 +64,60 @@ func Write(ref name.Reference, img v1.Image, options ...Option) error {
context: o.context,
}

// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*o.jobs)
g, ctx := errgroup.WithContext(o.context)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming blobs to upload.
g.Go(func() error {
for b := range blobChan {
if err := w.uploadOne(b); err != nil {
return err
}
}
return nil
})
}

// Upload individual layers in goroutines and collect any errors.
// If we can dedupe by the layer digest, try to do so. If we can't determine
// the digest for whatever reason, we can't dedupe and might re-upload.
var g errgroup.Group
uploaded := map[v1.Hash]bool{}
for _, l := range ls {
l := l

// Handle foreign layers.
mt, err := l.MediaType()
if err != nil {
return err
}
if !mt.IsDistributable() && !o.allowNondistributableArtifacts {
continue
}
g.Go(func() error {
defer close(blobChan)
uploaded := map[v1.Hash]bool{}
for _, l := range ls {
l := l

// Handle foreign layers.
mt, err := l.MediaType()
if err != nil {
return err
}
if !mt.IsDistributable() && !o.allowNondistributableArtifacts {
continue
}

// Streaming layers calculate their digests while uploading them. Assume
// an error here indicates we need to upload the layer.
h, err := l.Digest()
if err == nil {
// If we can determine the layer's digest ahead of
// time, use it to dedupe uploads.
if uploaded[h] {
continue // Already uploading.
// Streaming layers calculate their digests while uploading them. Assume
// an error here indicates we need to upload the layer.
h, err := l.Digest()
if err == nil {
// If we can determine the layer's digest ahead of
// time, use it to dedupe uploads.
if uploaded[h] {
continue // Already uploading.
}
uploaded[h] = true
}
select {
case blobChan <- l:
case <-ctx.Done():
return ctx.Err()
}
uploaded[h] = true
}

// TODO(#803): Pipe through remote.WithJobs and upload these in parallel.
g.Go(func() error {
return w.uploadOne(l)
})
return nil
})
if err := g.Wait(); err != nil {
return err
}

if l, err := partial.ConfigLayer(img); err != nil {
Expand Down

0 comments on commit 2f4c6f0

Please sign in to comment.