Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework progress bar termination #1332

Merged
merged 4 commits into from
Aug 5, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 88 additions & 71 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,12 +957,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
}

if err := func() error { // A scope for defer
progressPool, progressCleanup := ic.c.newProgressPool(ctx)
defer func() {
// Wait for all layers to be copied. progressCleanup() must not be called while any of the copyLayerHelpers interact with the progressPool.
copyGroup.Wait()
progressCleanup()
}()
progressPool := ic.c.newProgressPool()
defer progressPool.Wait()

// Ensure we wait for all layers to be copied. progressPool.Wait() must not be called while any of the copyLayerHelpers interact with the progressPool.
defer copyGroup.Wait()

for i, srcLayer := range srcInfos {
err = copySemaphore.Acquire(ctx, 1)
Expand Down Expand Up @@ -1061,15 +1060,13 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context, instanc
return man, manifestDigest, nil
}

// newProgressPool creates a *mpb.Progress and a cleanup function.
// The caller must eventually call the returned cleanup function after the pool will no longer be updated.
func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
ctx, cancel := context.WithCancel(ctx)
pool := mpb.NewWithContext(ctx, mpb.WithWidth(40), mpb.WithOutput(c.progressOutput))
return pool, func() {
cancel()
pool.Wait()
}
// newProgressPool creates a *mpb.Progress.
// The caller must eventually call pool.Wait() after the pool will no longer be updated.
// NOTE: Every progress bar created within the progress pool must either successfully
// complete or be aborted, or pool.Wait() will hang. That is typically done
// using "defer bar.Abort(false)", which must be called BEFORE pool.Wait() is called.
func (c *copier) newProgressPool() *mpb.Progress {
return mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput))
}

// customPartialBlobCounter provides a decorator function for the partial blobs retrieval progress bar
Expand All @@ -1090,6 +1087,9 @@ func customPartialBlobCounter(filler interface{}, wcc ...decor.WC) decor.Decorat

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
// NOTE: Every progress bar created within a progress pool must either successfully
// complete or be aborted, or pool.Wait() will hang. That is typically done
// using "defer bar.Abort(false)", which must happen BEFORE pool.Wait() is called.
func (c *copier) createProgressBar(pool *mpb.Progress, partial bool, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12
Expand Down Expand Up @@ -1155,9 +1155,11 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
}

destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
progressPool := c.newProgressPool()
defer progressPool.Wait()
bar := c.createProgressBar(progressPool, false, srcInfo, "config", "done")
defer bar.Abort(false)

destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, false, bar, -1, false)
if err != nil {
return types.BlobInfo{}, err
Expand Down Expand Up @@ -1245,8 +1247,11 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)
func() { // A scope for defer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call Abort() after SetTotal() instead of the extra scope?

Copy link
Collaborator Author

@mtrmac mtrmac Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing, reinforcing the general pattern. (And protecting against a panic in bar.SetTotal causing a hang in the deferred pool.Wait.)


Alternatively, would it be cleaner to farm this out into a

func (c *copier) runWithProgressBar(op func() (delete bool, error)) error {
   bar :=delete := false
   defer func() { bar.Abort(delete) }
   delete, err := op()
   return err
}

which would be a stronger enforcement of the “a bar must .Abort on every path” design? OTOH forcing the inner function to only return an err would require more caller infrastructure to return other data like BlobInfo values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And protecting against a panic in bar.SetTotal causing a hang in the deferred pool.Wait

Very good point, thanks!

bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "skipped: already exists")
defer bar.Abort(false)
bar.SetTotal(0, true)
}()

// Throw an event that the layer has been skipped
if ic.c.progress != nil && ic.c.progressInterval > 0 {
Expand Down Expand Up @@ -1279,40 +1284,49 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
imgSource, okSource := ic.c.rawSource.(internalTypes.ImageSourceSeekable)
imgDest, okDest := ic.c.dest.(internalTypes.ImageDestinationPartial)
if okSource && okDest && !diffIDIsNeeded {
bar := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done")

progress := make(chan int64)
terminate := make(chan interface{})

defer close(terminate)
defer close(progress)

proxy := imageSourceSeekableProxy{
source: imgSource,
progress: progress,
}
go func() {
for {
select {
case written := <-progress:
bar.IncrInt64(written)
case <-terminate:
return
if reused, blobInfo := func() (bool, types.BlobInfo) { // A scope for defer
bar := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done")
hideProgressBar := true
defer func() { // Note that this is not the same as defer bar.Abort(hideProgressBar); we need hideProgressBar to be evaluated lazily.
bar.Abort(hideProgressBar)
}()

progress := make(chan int64)
terminate := make(chan interface{})

defer close(terminate)
defer close(progress)

proxy := imageSourceSeekableProxy{
source: imgSource,
progress: progress,
}
go func() {
for {
select {
case written := <-progress:
bar.IncrInt64(written)
case <-terminate:
return
}
}
}()

bar.SetTotal(srcInfo.Size, false)
info, err := imgDest.PutBlobPartial(ctx, proxy, srcInfo, ic.c.blobInfoCache)
if err == nil {
bar.SetRefill(srcInfo.Size - bar.Current())
bar.SetCurrent(srcInfo.Size)
bar.SetTotal(srcInfo.Size, true)
hideProgressBar = false
logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest)
return true, info
}
}()

bar.SetTotal(srcInfo.Size, false)
info, err := imgDest.PutBlobPartial(ctx, proxy, srcInfo, ic.c.blobInfoCache)
if err == nil {
bar.SetRefill(srcInfo.Size - bar.Current())
bar.SetCurrent(srcInfo.Size)
bar.SetTotal(srcInfo.Size, true)
logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest)
return info, cachedDiffID, nil
logrus.Debugf("Failed to retrieve partial blob: %v", err)
return false, types.BlobInfo{}
}(); reused {
return blobInfo, cachedDiffID, nil
}
bar.Abort(true)
logrus.Debugf("Failed to retrieve partial blob: %v", err)
}

// Fallback: copy the layer, computing the diffID if we need to do so
Expand All @@ -1322,32 +1336,35 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
defer srcStream.Close()

bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "done")
return func() (types.BlobInfo, digest.Digest, error) { // A scope for defer
Copy link
Member

@vrothberg vrothberg Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the extra scope? It looks like the defer will be executed at the same time as before just with the overhead of creating and running the anonymous function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just reinforcing the general pattern:

func() {
   bar := ic.c.createProgressBar() 
   defer bar.Abort(false)
   // use bar
   // definite defer point, not “sometime in the future”
}

In this case, where we return just after we stop using the bar, it indeed makes no difference to the actual sequence of calls.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am OK to keep it as is if you find it more readable, but it comes at a (very) low cost.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t find the func() boilerplate more readable at all, but we’d need a language with generics and trailing closure syntax to improve on that.

My primary concern is reliability, and maintainability vs. future copy&pasted code; a small readability hit to get reliability is acceptable.

WRT performance, in this code that is limited by network bandwidth and latency, I don’t worry about a few once-per-layer function calls one bit (and a compiler could inline them, in principle).


Actually I would find it quite persuasive to argue that many of the functions in this file grew too long, and that progress bar scope is also a good candidate for splitting into a separate function. But the copy code has no tests, so I’d prefer to work on that first (and I did start writing some tests, though I’m very far from done) before large-scale aesthetic reorganization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur on the size issues of functions in this file. Starting with tests seems like a very wise decision. I think it will also feel like a open-heart surgery after but the tests would be a huge help.

bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "done")
defer bar.Abort(false)

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer)
if err != nil {
return types.BlobInfo{}, "", err
}
blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer)
if err != nil {
return types.BlobInfo{}, "", err
}

diffID := cachedDiffID
if diffIDIsNeeded {
select {
case <-ctx.Done():
return types.BlobInfo{}, "", ctx.Err()
case diffIDResult := <-diffIDChan:
if diffIDResult.err != nil {
return types.BlobInfo{}, "", errors.Wrap(diffIDResult.err, "computing layer DiffID")
diffID := cachedDiffID
if diffIDIsNeeded {
select {
case <-ctx.Done():
return types.BlobInfo{}, "", ctx.Err()
case diffIDResult := <-diffIDChan:
if diffIDResult.err != nil {
return types.BlobInfo{}, "", errors.Wrap(diffIDResult.err, "computing layer DiffID")
}
logrus.Debugf("Computed DiffID %s for layer %s", diffIDResult.digest, srcInfo.Digest)
// This is safe because we have just computed diffIDResult.Digest ourselves, and in the process
// we have read all of the input blob, so srcInfo.Digest must have been validated by digestingReader.
ic.c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, diffIDResult.digest)
diffID = diffIDResult.digest
}
logrus.Debugf("Computed DiffID %s for layer %s", diffIDResult.digest, srcInfo.Digest)
// This is safe because we have just computed diffIDResult.Digest ourselves, and in the process
// we have read all of the input blob, so srcInfo.Digest must have been validated by digestingReader.
ic.c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, diffIDResult.digest)
diffID = diffIDResult.digest
}
}

bar.SetTotal(srcInfo.Size, true)
return blobInfo, diffID, nil
bar.SetTotal(srcInfo.Size, true)
return blobInfo, diffID, nil
}()
}

// copyLayerFromStream is an implementation detail of copyLayer; mostly providing a separate “defer” scope.
Expand Down