From 1e10e9f1bf952fb74cb3531adf2680a8a793e79b Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Thu, 18 Apr 2019 08:34:19 +0200 Subject: [PATCH] copy: move progress creation before copying the stream Also add a progress.Bar.ReplaceBar() method, and a progress.DigestToCopyAction() convenience function that can later be used in TryReusingBlob() and PutBlob(). Signed-off-by: Valentin Rothberg --- copy/copy.go | 50 +++++++++++++++------------------------- pkg/progress/progress.go | 27 +++++++++++++++++++++- 2 files changed, 44 insertions(+), 33 deletions(-) diff --git a/copy/copy.go b/copy/copy.go index b4b3454c02..240b6de046 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -606,22 +606,6 @@ func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) { } } -// blobInfoToProgressAction creates a string based on the blobinfo's short -// digest and size and kind, for instance, "Copying blob bdf0201b3a05". -// Note that kind must either be "blob" or "config". -func blobInfoToProgressAction(blobinfo types.BlobInfo, kind string) string { - // shortDigestLen is the length of the digest used for blobs. - const shortDigestLen = 12 - action := fmt.Sprintf("Copying %s %s", kind, blobinfo.Digest.Encoded()) - // Truncate the string (chopping of some part of the digest) to make all - // progress bars aligned in a column. - maxLen := len("Copying blob ") + shortDigestLen - if len(action) > maxLen { - action = action[:maxLen] - } - return action -} - // copyConfig copies config.json, if any, from src to dest. func (c *copier) copyConfig(ctx context.Context, src types.Image) error { srcInfo := src.ConfigInfo() @@ -636,10 +620,11 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { defer pool.CleanUp() bar := pool.AddBar( - blobInfoToProgressAction(srcInfo, "config"), - srcInfo.Size, + progress.DigestToCopyAction(srcInfo.Digest, "config"), + 0, progress.BarOptions{ - OnCompletionMessage: "done", + RemoveOnCompletion: true, + StaticMessage: " ", }) destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar) @@ -671,7 +656,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, la cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" - progressBarAction := blobInfoToProgressAction(srcInfo, "blob") + progressBarAction := progress.DigestToCopyAction(srcInfo.Digest, "blob") bar := pool.AddBar( progressBarAction, 0, @@ -688,12 +673,11 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, la if reused { logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) - pool.AddBar( - blobInfoToProgressAction(srcInfo, "blob"), + bar.ReplaceBar( + progressBarAction, 0, progress.BarOptions{ StaticMessage: "skipped: already exists", - ReplaceBar: bar, }) return blobInfo, cachedDiffID, nil } @@ -706,14 +690,6 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, la } defer srcStream.Close() - bar = pool.AddBar( - blobInfoToProgressAction(srcInfo, "blob"), - srcInfo.Size, - progress.BarOptions{ - OnCompletionMessage: "done", - ReplaceBar: bar, - }) - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar) if err != nil { return types.BlobInfo{}, "", err @@ -828,7 +804,6 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest) } isCompressed := decompressor != nil - destStream = bar.ProxyReader(destStream) // === Send a copy of the original, uncompressed, stream, to a separate path if necessary. var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so. @@ -881,6 +856,17 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr } } + kind := "blob" + if isConfig { + kind = "config" + } + bar = bar.ReplaceBar( + progress.DigestToCopyAction(srcInfo.Digest, kind), + srcInfo.Size, + progress.BarOptions{ + OnCompletionMessage: "done", + }) + destStream = bar.ProxyReader(destStream) // === Finally, send the layer stream to dest. uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig, nil) if err != nil { diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 408128a844..22374943ad 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -7,6 +7,7 @@ import ( "fmt" "io" + "github.com/opencontainers/go-digest" "github.com/vbauerster/mpb" "github.com/vbauerster/mpb/decor" ) @@ -26,7 +27,8 @@ type Bar struct { // BarOptions includes various options to control AddBar. type BarOptions struct { - // Remove the bar on completion. + // Remove the bar on completion. This must be true if the bar will be + // replaced by another one. RemoveOnCompletion bool // OnCompletionMessage will be shown on completion and replace the progress bar. OnCompletionMessage string @@ -60,6 +62,21 @@ func (p *Pool) getWriter() io.Writer { return p.writer } +// DigestToCopyAction returns a string based on the blobinfo and kind. +// It's a convenience function for the c/image library when copying images. +func DigestToCopyAction(digest digest.Digest, kind string) string { + // shortDigestLen is the length of the digest used for blobs. + const shortDigestLen = 12 + const maxLen = len("Copying blob ") + shortDigestLen + // Truncate the string (chopping of some part of the digest) to make all + // progress bars aligned in a column. + copyAction := fmt.Sprintf("Copying %s %s", kind, digest.Encoded()) + if len(copyAction) > maxLen { + copyAction = copyAction[:maxLen] + } + return copyAction +} + // AddBar adds a new Bar to the Pool. func (p *Pool) AddBar(action string, size int64, options BarOptions) *Bar { var bar *mpb.Bar @@ -101,6 +118,14 @@ func (p *Pool) AddBar(action string, size int64, options BarOptions) *Bar { } } +// ReplaceBar returns a bar replacing the current one. Note that the current one +// will be terminated and should have been created with +// options.RemoveOnCompletion. +func (b *Bar) ReplaceBar(action string, size int64, options BarOptions) *Bar { + options.ReplaceBar = b + return b.pool.AddBar(action, size, options) +} + // ProxyReader wraps the reader with metrics for progress tracking. func (b *Bar) ProxyReader(reader io.Reader) io.ReadCloser { return b.bar.ProxyReader(reader)