Skip to content

Commit

Permalink
copy: move progress creation before copying the stream
Browse files Browse the repository at this point in the history
Also add a progress.Bar.ReplaceBar() method, and a
progress.DigestToCopyAction() convenience function that can
later be used in TryReusingBlob() and PutBlob().

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Apr 18, 2019
1 parent e2e1e18 commit 1e10e9f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 33 deletions.
50 changes: 18 additions & 32 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,22 +606,6 @@ func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
}
}

// blobInfoToProgressAction creates a string based on the blobinfo's short
// digest and size and kind, for instance, "Copying blob bdf0201b3a05".
// Note that kind must either be "blob" or "config".
func blobInfoToProgressAction(blobinfo types.BlobInfo, kind string) string {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12
action := fmt.Sprintf("Copying %s %s", kind, blobinfo.Digest.Encoded())
// Truncate the string (chopping of some part of the digest) to make all
// progress bars aligned in a column.
maxLen := len("Copying blob ") + shortDigestLen
if len(action) > maxLen {
action = action[:maxLen]
}
return action
}

// copyConfig copies config.json, if any, from src to dest.
func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
srcInfo := src.ConfigInfo()
Expand All @@ -636,10 +620,11 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
defer pool.CleanUp()

bar := pool.AddBar(
blobInfoToProgressAction(srcInfo, "config"),
srcInfo.Size,
progress.DigestToCopyAction(srcInfo.Digest, "config"),
0,
progress.BarOptions{
OnCompletionMessage: "done",
RemoveOnCompletion: true,
StaticMessage: " ",
})

destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar)
Expand Down Expand Up @@ -671,7 +656,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, la
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

progressBarAction := blobInfoToProgressAction(srcInfo, "blob")
progressBarAction := progress.DigestToCopyAction(srcInfo.Digest, "blob")
bar := pool.AddBar(
progressBarAction,
0,
Expand All @@ -688,12 +673,11 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, la
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)

pool.AddBar(
blobInfoToProgressAction(srcInfo, "blob"),
bar.ReplaceBar(
progressBarAction,
0,
progress.BarOptions{
StaticMessage: "skipped: already exists",
ReplaceBar: bar,
})
return blobInfo, cachedDiffID, nil
}
Expand All @@ -706,14 +690,6 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, la
}
defer srcStream.Close()

bar = pool.AddBar(
blobInfoToProgressAction(srcInfo, "blob"),
srcInfo.Size,
progress.BarOptions{
OnCompletionMessage: "done",
ReplaceBar: bar,
})

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar)
if err != nil {
return types.BlobInfo{}, "", err
Expand Down Expand Up @@ -828,7 +804,6 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
isCompressed := decompressor != nil
destStream = bar.ProxyReader(destStream)

// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
Expand Down Expand Up @@ -881,6 +856,17 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}
}

kind := "blob"
if isConfig {
kind = "config"
}
bar = bar.ReplaceBar(
progress.DigestToCopyAction(srcInfo.Digest, kind),
srcInfo.Size,
progress.BarOptions{
OnCompletionMessage: "done",
})
destStream = bar.ProxyReader(destStream)
// === Finally, send the layer stream to dest.
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig, nil)
if err != nil {
Expand Down
27 changes: 26 additions & 1 deletion pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"

"github.com/opencontainers/go-digest"
"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"
)
Expand All @@ -26,7 +27,8 @@ type Bar struct {

// BarOptions includes various options to control AddBar.
type BarOptions struct {
// Remove the bar on completion.
// Remove the bar on completion. This must be true if the bar will be
// replaced by another one.
RemoveOnCompletion bool
// OnCompletionMessage will be shown on completion and replace the progress bar.
OnCompletionMessage string
Expand Down Expand Up @@ -60,6 +62,21 @@ func (p *Pool) getWriter() io.Writer {
return p.writer
}

// DigestToCopyAction returns a string based on the blobinfo and kind.
// It's a convenience function for the c/image library when copying images.
func DigestToCopyAction(digest digest.Digest, kind string) string {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12
const maxLen = len("Copying blob ") + shortDigestLen
// Truncate the string (chopping of some part of the digest) to make all
// progress bars aligned in a column.
copyAction := fmt.Sprintf("Copying %s %s", kind, digest.Encoded())
if len(copyAction) > maxLen {
copyAction = copyAction[:maxLen]
}
return copyAction
}

// AddBar adds a new Bar to the Pool.
func (p *Pool) AddBar(action string, size int64, options BarOptions) *Bar {
var bar *mpb.Bar
Expand Down Expand Up @@ -101,6 +118,14 @@ func (p *Pool) AddBar(action string, size int64, options BarOptions) *Bar {
}
}

// ReplaceBar returns a bar replacing the current one. Note that the current one
// will be terminated and should have been created with
// options.RemoveOnCompletion.
func (b *Bar) ReplaceBar(action string, size int64, options BarOptions) *Bar {
options.ReplaceBar = b
return b.pool.AddBar(action, size, options)
}

// ProxyReader wraps the reader with metrics for progress tracking.
func (b *Bar) ProxyReader(reader io.Reader) io.ReadCloser {
return b.bar.ProxyReader(reader)
Expand Down

0 comments on commit 1e10e9f

Please sign in to comment.