Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blob-copy detection #611

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
977d86d
storage destination: commit layer in PutBlob()
vrothberg Apr 4, 2019
bbb1db4
add pkg/progress for progress bar management
vrothberg Apr 17, 2019
8146913
Extend TryReusingBlob and PutBlob with a progress.Bar
vrothberg Apr 17, 2019
39588b3
copy: create replacable placeholder bar
vrothberg Apr 18, 2019
0d67737
copy: move progress creation before copying the stream
vrothberg Apr 18, 2019
da746eb
copy: let storage update the progress.Bar in {Put,TryResuing}Blob()
vrothberg Apr 18, 2019
c85c1de
storage destination: blob-copy detection
vrothberg Apr 18, 2019
4f9f19c
copy doc comments to all implementations of {Put,TryReusing}Blob
vrothberg Apr 23, 2019
fdaf13a
pkg/progress: improve comments
vrothberg Apr 23, 2019
f05e672
copy: don't show progress when copying a config
vrothberg Apr 23, 2019
edac09b
fix gofmt issues
vrothberg Apr 23, 2019
f3cd7f2
copy: remove leftover debug output
vrothberg Apr 26, 2019
263938f
copy: set ProyReader on decompressed destStream
vrothberg Apr 26, 2019
73569e9
storage: image: remove logrus.Errorf()
vrothberg Apr 26, 2019
f5f95ad
storage: remove Name() func
vrothberg Apr 26, 2019
02c3576
copy: remove unused newProgressPool
vrothberg Apr 29, 2019
c930452
pkg/progress: remove unused getWriter
vrothberg Apr 29, 2019
95226e3
storage_test: correct arguments for PutBlob()
vrothberg Apr 29, 2019
d2f5233
update docs for {TryReusing,Put}Blob
vrothberg Apr 29, 2019
2055854
storageImageDestination: update docs of internal members
vrothberg Apr 29, 2019
affe61d
storageImageDestination: remove unused blobLayerIDs
vrothberg Apr 29, 2019
b377743
storage: TryReusingBlob: defer writing to done channel
vrothberg Apr 29, 2019
74e2319
storage: PutBlob: make index checks as early as possible
vrothberg Apr 29, 2019
21c9254
storage: TryReusingBlob: s/reusable/reused/
vrothberg Apr 29, 2019
c4ae4e9
storage: Commit: no layers: change error string
vrothberg Apr 29, 2019
621d10b
storageImageDestination: commitBlob: add TODO comment
vrothberg Apr 29, 2019
83a1dab
storageImageDestination: cleanup docs of commitBlob()
vrothberg Apr 30, 2019
5cc5f9e
copy: set index of empty layers to -1
vrothberg Apr 30, 2019
7e5a85b
copy: use "Internal error: " prefix in copyLayers
vrothberg Apr 30, 2019
116a38a
copy: don't skip already copied layer
vrothberg May 7, 2019
fbd2a0c
copy: return non-context.Canceled error
vrothberg May 7, 2019
90d77b2
storage: PutBlob: fix commit condition
vrothberg May 7, 2019
e816243
storage: refactor commit logic
vrothberg May 8, 2019
a0f6365
storage: PutBlob: return correct BlobInfo
vrothberg May 13, 2019
9cc1fd7
storage: PutBlob: do not commit config
vrothberg May 15, 2019
88210a1
copy: don't use the manifest's LayerInfos for copying
vrothberg May 15, 2019
1fde497
storage: digest lock: use RecursiveLock()
vrothberg May 16, 2019
31890b5
storage: TryReusingBlob: create layer for reoccurring blob
vrothberg May 16, 2019
1326dce
storage: PutBlob: don't change the blob's
vrothberg May 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 125 additions & 90 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
"github.com/containers/image/manifest"
"github.com/containers/image/pkg/blobinfocache"
"github.com/containers/image/pkg/compression"
"github.com/containers/image/pkg/progress"
"github.com/containers/image/signature"
"github.com/containers/image/storage"
"github.com/containers/image/transports"
"github.com/containers/image/types"
"github.com/klauspost/pgzip"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"
"golang.org/x/crypto/ssh/terminal"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -160,7 +160,6 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef,

// If reportWriter is not a TTY (e.g., when piping to a file), do not
// print the progress bars to avoid long and hard to parse output.
// createProgressBar() will print a single line instead.
progressOutput := reportWriter
if !isTTY(reportWriter) {
progressOutput = ioutil.Discard
Expand Down Expand Up @@ -448,16 +447,26 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
srcInfosUpdated = true
}

// Create a map from the manifest to check if a srcLayer may be empty.
mblob, mtype, err := ic.src.Manifest(ctx)
if err != nil {
return err
}
man, err := manifest.FromBlob(mblob, mtype)
if err != nil {
return err
}
emptyLayerMap := make(map[digest.Digest]bool)
for _, info := range man.LayerInfos() {
emptyLayerMap[info.Digest] = info.EmptyLayer
}

type copyLayerData struct {
destInfo types.BlobInfo
diffID digest.Digest
err error
}

// copyGroup is used to determine if all layers are copied
copyGroup := sync.WaitGroup{}
copyGroup.Add(numLayers)

// copySemaphore is used to limit the number of parallel downloads to
// avoid malicious images causing troubles and to be nice to servers.
var copySemaphore *semaphore.Weighted
Expand All @@ -467,44 +476,72 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
copySemaphore = semaphore.NewWeighted(int64(1))
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
// copyGroup is used to determine if all layers are copied
copyGroup := sync.WaitGroup{}

// progressPool for creating progress bars
progressPool := progress.NewPool(ctx, ic.c.progressOutput)
defer progressPool.CleanUp()

// A context.WithCancel is needed when encountering an error while
// copying/downloading layers in parallel.
cancelCtx, cancelCopyLayer := context.WithCancel(ctx)
vrothberg marked this conversation as resolved.
Show resolved Hide resolved
defer cancelCopyLayer()

copyData := make([]copyLayerData, numLayers)
layerIndex := 0 // some layers might be skipped, so we need a dedicated counter
for i, srcLayer := range srcInfos {
if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 {
// DiffIDs are, currently, needed only when converting from schema1.
// In which case src.LayerInfos will not have URLs because schema1
// does not support them.
if ic.diffIDsAreNeeded {
cld.err = errors.New("getting DiffID for foreign layers is unimplemented")
} else {
cld.destInfo = srcLayer
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
return errors.New("getting DiffID for foreign layers is unimplemented")
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool)
}
data[index] = cld
}

func() { // A scope for defer
progressPool, progressCleanup := ic.c.newProgressPool(ctx)
defer progressCleanup()
logrus.Debugf("Skipping foreign layer %q copy to %s", srcLayer.Digest, ic.c.dest.Reference().Transport().Name())
copyData[i].destInfo = srcLayer
continue // skip copying
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case should, in principle, correctly increment (or not) layerIndex.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we'd increment here (and not call PutBlob()) we'd run into a deadlock as the next layer would indefinitely wait for the foreign layer to be committed.

I agree that, in principle, the layer index would not correspond to the one in the image but to the locally committed ones. Should we document that in some way or is that too far in the corner to worry about?

}

copySemaphore.Acquire(cancelCtx, 1) // limits parallel copy operations
copyGroup.Add(1) // allows the main routine to wait for all copy operations to finish

index := layerIndex
emptyLayer := false
if ok, empty := emptyLayerMap[srcLayer.Digest]; ok && empty {
emptyLayer = true
index = -1
}
// Copy the layer.
go func(dataIndex, layerIndex int, srcLayer types.BlobInfo) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(cancelCtx, srcLayer, layerIndex, progressPool)
if cld.err != nil {
// Stop all other running goroutines as we can't recover from an error
cancelCopyLayer()
}
copyData[dataIndex] = cld
}(i, index, srcLayer)

for i, srcLayer := range srcInfos {
copySemaphore.Acquire(ctx, 1)
go copyLayerHelper(i, srcLayer, progressPool)
if !emptyLayer {
layerIndex++
}
}

// Wait for all layers to be copied
copyGroup.Wait()
}()
// Wait for all layer-copy operations to finish
copyGroup.Wait()

destInfos := make([]types.BlobInfo, numLayers)
diffIDs := make([]digest.Digest, numLayers)
for i, cld := range data {
for i := range copyData {
cld := copyData[i]
if cld.err != nil {
// Skip context.Canceled errors to determine the real error.
if cld.err == context.Canceled {
continue
}
return cld.err
vrothberg marked this conversation as resolved.
Show resolved Hide resolved
}
destInfos[i] = cld.destInfo
Expand Down Expand Up @@ -573,45 +610,6 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte
return manifest, nil
}

// newProgressPool creates a *mpb.Progress and a cleanup function.
// The caller must eventually call the returned cleanup function after the pool will no longer be updated.
func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
ctx, cancel := context.WithCancel(ctx)
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx))
return pool, func() {
cancel()
pool.Wait()
}
}

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

prefix := fmt.Sprintf("Copying %s %s", kind, info.Digest.Encoded())
// Truncate the prefix (chopping of some part of the digest) to make all progress bars aligned in a column.
maxPrefixLen := len("Copying blob ") + shortDigestLen
if len(prefix) > maxPrefixLen {
prefix = prefix[:maxPrefixLen]
}

bar := pool.AddBar(info.Size,
mpb.BarClearOnComplete(),
mpb.PrependDecorators(
decor.Name(prefix),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete),
),
)
if c.progressOutput == ioutil.Discard {
c.Printf("Copying %s %s\n", kind, info.Digest)
}
return bar
}

// copyConfig copies config.json, if any, from src to dest.
func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
srcInfo := src.ConfigInfo()
Expand All @@ -622,14 +620,21 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
}

destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
pool := progress.NewPool(ctx, c.progressOutput)
defer pool.CleanUp()

bar := pool.AddBar(
progress.DigestToCopyAction(srcInfo.Digest, "config"),
0,
progress.BarOptions{
RemoveOnCompletion: true,
StaticMessage: " ",
})

destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar)
if err != nil {
return types.BlobInfo{}, err
}
bar.SetTotal(int64(len(configBlob)), true)
return destInfo, nil
}()
if err != nil {
Expand All @@ -651,20 +656,37 @@ type diffIDResult struct {

// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *progress.Pool) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

progressBarAction := progress.DigestToCopyAction(srcInfo.Digest, "blob")
bar := pool.AddBar(
progressBarAction,
0,
progress.BarOptions{
RemoveOnCompletion: true,
StaticMessage: " ",
})
// If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source.
if !diffIDIsNeeded {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs, bar)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)
// Instead of adding boilerplate code to ALL TryReusingBlob()s, just
// special case the storage transport, which is the only transport
// where we need control over the progress.Bar.
if ic.c.dest.Reference().Transport().Name() != storage.Transport.Name() {
bar.ReplaceBar(
progressBarAction,
0,
progress.BarOptions{
StaticMessage: "skipped: already exists",
})
}
return blobInfo, cachedDiffID, nil
}
}
Expand All @@ -676,9 +698,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
}
defer srcStream.Close()

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar)
blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar)
if err != nil {
return types.BlobInfo{}, "", err
}
Expand All @@ -700,7 +720,6 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
}
}

bar.SetTotal(srcInfo.Size, true)
return blobInfo, diffID, nil
}

Expand All @@ -709,7 +728,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
// perhaps compressing the stream if canCompress,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
layerIndexInImage int, diffIDIsNeeded bool, bar *progress.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

Expand All @@ -733,7 +752,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea
return pipeWriter
}
}
blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success
blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success
return blobInfo, diffIDChan, err
// We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan
}
Expand Down Expand Up @@ -768,9 +787,9 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc)
// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil,
// perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied blob.
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) {
canModifyBlob bool, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers.
// === Input: srcStream

Expand All @@ -793,7 +812,23 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
isCompressed := decompressor != nil
destStream = bar.ProxyReader(destStream)
vrothberg marked this conversation as resolved.
Show resolved Hide resolved

// Instead of adding boilerplate code to ALL PutBlob()s, just special case
// the storage transport, which is the only transport where we need control
// over the progress.Bar.
if c.dest.Reference().Transport().Name() != storage.Transport.Name() {
kind := "blob"
if isConfig {
kind = "config"
}
bar = bar.ReplaceBar(
progress.DigestToCopyAction(srcInfo.Digest, kind),
srcInfo.Size,
progress.BarOptions{
OnCompletionMessage: "done",
})
destStream = bar.ProxyReader(destStream)
}

// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
Expand Down Expand Up @@ -847,7 +882,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig)
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig, bar)
if err != nil {
return types.BlobInfo{}, errors.Wrap(err, "Error writing blob")
}
Expand Down
Loading