From 5e5e204e174c97956c5e3acaa2b4d74cff9dd85d Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Tue, 19 Mar 2019 11:52:48 +0100 Subject: [PATCH] copy: detect copy operations of other processes Detect copy operations of other processes when copying layers to the containers-storage. The blob locks used by the storage image destination are using file locks from containers/storage. Those locks do not support try-lock semantics as they are constrained by the fcntl(2) syscall. A try-lock mechanism is needed to detect if another process is currently holding the lock and is hence copying the layer in question. Once we figured that out, we can display the information on the progress bars to inform the user what is happening. The workaround of the missing try-lock semantics is to acquire the lock in a sepearate goroutine and wait a certain amount of time. If we still don't own the lock after the timeout, we are assuming the layer to be copied by another process. Unlocking a blob presents another problem as we should only unlock when we are certain the operation has succeeded, which is *after* the blob has been committed to the containers storage. Hence, we unlock as soon as possible during Commit() of the storage image destination. To avoid potential deadlocks during errors or panics, we also unlock all unlocked locks during (storageImageDestination).Close() which is invoked in a deferred call in copy.Image. Signed-off-by: Valentin Rothberg --- copy/copy.go | 186 ++++++++++++++++++++++++++++++++++----- storage/storage_image.go | 76 ++++++++++++---- types/types.go | 6 +- vendor.conf | 4 +- 4 files changed, 231 insertions(+), 41 deletions(-) diff --git a/copy/copy.go b/copy/copy.go index 3ed8a2b824..d2db17e84c 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -43,6 +43,11 @@ type digestingReader struct { // downloads. Let's follow Firefox by limiting it to 6. var maxParallelDownloads = 6 +// progressBarRefreshRate defines the refresh rate of the mpb progress bars. +// 120 ms aligns with the initial upstream defaults. Setting the rate here is +// more idiomatic than using some magic number in the code below. +var progressBarRefreshRate = 120 * time.Millisecond + // newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error // or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest. // (neither is set if EOF is never reached). @@ -467,11 +472,9 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { copySemaphore = semaphore.NewWeighted(int64(1)) } - data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) { + copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress, cld *copyLayerData) { defer copySemaphore.Release(1) defer copyGroup.Done() - cld := copyLayerData{} if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 { // DiffIDs are, currently, needed only when converting from schema1. // In which case src.LayerInfos will not have URLs because schema1 @@ -485,16 +488,26 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { } else { cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool) } - data[index] = cld } + // digestToCopyData maps a digest to a corresponding copyLayerData and + // avoids to redundantly copy the same layer. + digestToCopyData := make(map[digest.Digest]*copyLayerData) func() { // A scope for defer progressPool, progressCleanup := ic.c.newProgressPool(ctx) defer progressCleanup() for i, srcLayer := range srcInfos { + cld, ok := digestToCopyData[srcLayer.Digest] + if ok { + // digest is already being copied + copyGroup.Done() + continue + } copySemaphore.Acquire(ctx, 1) - go copyLayerHelper(i, srcLayer, progressPool) + cld = ©LayerData{} + digestToCopyData[srcLayer.Digest] = cld + go copyLayerHelper(i, srcLayer, progressPool, cld) } // Wait for all layers to be copied @@ -503,7 +516,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { destInfos := make([]types.BlobInfo, numLayers) diffIDs := make([]digest.Digest, numLayers) - for i, cld := range data { + for i, srcLayer := range srcInfos { + cld, ok := digestToCopyData[srcLayer.Digest] + if !ok { + return fmt.Errorf("no copy data found for layer %q", srcLayer.Digest) + } if cld.err != nil { return cld.err } @@ -577,16 +594,25 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte // The caller must eventually call the returned cleanup function after the pool will no longer be updated. func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) { ctx, cancel := context.WithCancel(ctx) - pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx)) + pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx), mpb.WithRefreshRate(progressBarRefreshRate)) return pool, func() { cancel() pool.Wait() } } +// createReplacementProgressBar is wrapper around createProgressBar and creates +// a progress bar to replace the `toReplace` bar. +func (c *copier) createReplacementProgressBar(pool *mpb.Progress, toReplace *mpb.Bar, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar { + barOpts = append(barOpts, mpb.BarReplaceOnComplete(toReplace)) + bar := c.createProgressBar(pool, info, kind, filler, barOpts...) + toReplace.SetTotal(0, true) + return bar +} + // createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter // is ioutil.Discard, the progress bar's output will be discarded -func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar { +func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar { // shortDigestLen is the length of the digest used for blobs. const shortDigestLen = 12 @@ -597,15 +623,35 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind prefix = prefix[:maxPrefixLen] } - bar := pool.AddBar(info.Size, - mpb.BarClearOnComplete(), + options := []mpb.BarOption{ mpb.PrependDecorators( decor.Name(prefix), ), - mpb.AppendDecorators( - decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete), - ), - ) + } + options = append(options, barOpts...) + + // Fillers are used for placeholder bars that are later replaced by a + // common progress bar. Hence, only append a counter decorator when + // filler == "". + if filler == "" { + options = append(options, + mpb.AppendDecorators( + decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " done"), + )) + options = append(options, mpb.BarClearOnComplete()) + } + + var bar *mpb.Bar + if filler != "" { + barFiller := mpb.FillerFunc( + func(w io.Writer, width int, st *decor.Statistics) { + fmt.Fprint(w, filler) + }) + bar = pool.Add(0, barFiller, options...) + } else { + bar = pool.AddBar(info.Size, options...) + } + if c.progressOutput == ioutil.Discard { c.Printf("Copying %s %s\n", kind, info.Digest) } @@ -624,7 +670,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { destInfo, err := func() (types.BlobInfo, error) { // A scope for defer progressPool, progressCleanup := c.newProgressPool(ctx) defer progressCleanup() - bar := c.createProgressBar(progressPool, srcInfo, "config", "done") + bar := c.createProgressBar(progressPool, srcInfo, "config", "") destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) if err != nil { return types.BlobInfo{}, err @@ -649,23 +695,119 @@ type diffIDResult struct { err error } +// tryReusingBlob is wrapper around the destination's TryReusingBlob() method. +// If the blob is present, it returns immediately. Otherwise, if the +// destination supports blob locking, it will acquire the blob lock and perform +// another call to TryReusingBlob() to see if another process has already +// copied the blob. +func (ic *imageCopier) tryReusingBlob(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress, bar *mpb.Bar) (bool, types.BlobInfo, error) { + reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + if err != nil { + return false, types.BlobInfo{}, errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) + } + if reused { + logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) + newBar := ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "skipped: already exists") + newBar.SetTotal(0, true) + return true, blobInfo, nil + } + + if !ic.c.dest.SupportsBlobLocks() { + return false, types.BlobInfo{}, nil + } + + // Check if another process might be copying the layer at the moment. + // If so, wait for it's completion and try to reuse the blob again. + logrus.Debugf("Checking if blob %s is already being copied by another process", srcInfo.Digest) + + // LockBlob() does not support try-lock semantics which would be a + // straight-forward mechanism to detect if another process is currently + // copying a specific layer. + // + // The trick we are using here is to acquire the lock in a separate + // goroutine and use a channel for signaling when the lock has been + // acquired. To detect if this goroutine is blocking on the lock, we + // spin up a second one sending a message via another channel after a + // time out. + chanDone := make(chan bool, 1) + chanBlockedByAnotherProcess := make(chan bool, 1) + + var lockError error + // Acquire the blob lock + go func() { + lockError = ic.c.dest.LockBlob(srcInfo) + chanDone <- true + }() + // Spin up the timer + go func() { + // The initial sleep time is `progressBarRefreshRate` + // to make sure the bar is properly rendered before + // replacing it. + time.Sleep(progressBarRefreshRate) + chanBlockedByAnotherProcess <- true + }() + + copiedByAnotherProcess := false + select { + case <-chanDone: + break + case <-chanBlockedByAnotherProcess: + logrus.Debugf("Blob %s is already being copied by another process", srcInfo.Digest) + copiedByAnotherProcess = true + bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "paused: being copied by another process", mpb.BarRemoveOnComplete()) + // Wait until we acquired the blob lock + <-chanDone + } + + if lockError != nil { + return false, types.BlobInfo{}, errors.Wrapf(err, "Error acquiring lock for blob %s", srcInfo.Digest) + } + + // If the blob has not been copied by another process, we need to copy + // it on our own and can return immediately. + if !copiedByAnotherProcess { + logrus.Debugf("Blob %s is not being copied by another process", srcInfo.Digest) + return false, types.BlobInfo{}, nil + } + + // If the blob has been copied by another process, it + // must be present in the in the destintation and be + // available for reuse. + reused, blobInfo, err = ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + if err != nil { + return false, types.BlobInfo{}, errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) + } + + if reused { + ic.c.dest.UnlockBlob(srcInfo) + logrus.Debugf("Blob %s has been successfully copied by another process", srcInfo.Digest) + bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "done: copied by another process") + bar.SetTotal(0, true) + return true, blobInfo, nil + } + + logrus.Debugf("Blob %s is not present: copy operation of other process must have failed", srcInfo.Digest) + return false, types.BlobInfo{}, nil +} + // copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" + // Create the initial placeholder bar which is later being replaced by + // another one. + bar := ic.c.createProgressBar(pool, srcInfo, "blob", " ", mpb.BarRemoveOnComplete()) + // If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source. if !diffIDIsNeeded { - reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + reused, blobInfo, err := ic.tryReusingBlob(ctx, srcInfo, pool, bar) if err != nil { - return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) + return types.BlobInfo{}, "", err } if reused { - logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists") - bar.SetTotal(0, true) - return blobInfo, cachedDiffID, nil + return blobInfo, cachedDiffID, err } } @@ -676,7 +818,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po } defer srcStream.Close() - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") + bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "") blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) if err != nil { diff --git a/storage/storage_image.go b/storage/storage_image.go index dfff28c800..93967a741f 100644 --- a/storage/storage_image.go +++ b/storage/storage_image.go @@ -64,6 +64,11 @@ type storageImageDestination struct { fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + + // data for {Lock,Unlock}Blob + lockedDigests map[digest.Digest]storage.Locker // Bookkeeping of digests that have been locked + unlockedDigests map[digest.Digest]bool // Bookkeeping of digests that have been unlocked. Required to unlock yet unlocked digests in Close() + digestLockMutex sync.Mutex // Serializes accesses to lockedDigests } type storageImageCloser struct { @@ -323,12 +328,14 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e return nil, errors.Wrapf(err, "error creating a temporary directory") } image := &storageImageDestination{ - imageRef: imageRef, - directory: directory, - blobDiffIDs: make(map[digest.Digest]digest.Digest), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, + imageRef: imageRef, + directory: directory, + blobDiffIDs: make(map[digest.Digest]digest.Digest), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + lockedDigests: make(map[digest.Digest]storage.Locker), + unlockedDigests: make(map[digest.Digest]bool), + SignatureSizes: []int{}, } return image, nil } @@ -339,8 +346,14 @@ func (s *storageImageDestination) Reference() types.ImageReference { return s.imageRef } -// Close cleans up the temporary directory. +// Close cleans up the temporary directory and unlocks all unlocked digest locks and removes them from `lockedDigests`. func (s *storageImageDestination) Close() error { + // Make sure that all lockes are unlocked to avoid potential deadlocks. + for digest, unlocked := range s.unlockedDigests { + if !unlocked { + s.unlockDigest(digest) + } + } return os.RemoveAll(s.directory) } @@ -362,25 +375,56 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool { // SupportsBlobLocks indicates whether the ImageDestination supports blob // locking. -func (d *storageImageDestination) SupportsBlobLocks() bool { - return false +func (s *storageImageDestination) SupportsBlobLocks() bool { + return true } // LockBlob can be used to synchronize operations on it (e.g., copying). +// Note that any unlocked lock will be unlocked in Close(). func (s *storageImageDestination) LockBlob(b types.BlobInfo) error { - // NOOP for this type. + locker, err := func() (storage.Locker, error) { + // anonymous function defer the Unlock() and not worry about + // error paths + s.digestLockMutex.Lock() + defer s.digestLockMutex.Unlock() + if locker, ok := s.lockedDigests[b.Digest]; ok { + return locker, nil + } + locker, err := s.imageRef.transport.store.GetDigestLock(b.Digest) + if err != nil { + return nil, err + } + s.lockedDigests[b.Digest] = locker + return locker, nil + }() + if err != nil { + return err + } + locker.Lock() + s.digestLockMutex.Lock() + s.unlockedDigests[b.Digest] = false + s.digestLockMutex.Unlock() return nil } -// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob() -// multiple times. Only the first call is unlocking the blob. This is -// required to unlock a blob in the presence of errors or panics during copy -// operations. -func (s *storageImageDestination) UnlockBlob(b types.BlobInfo) error { - // NOOP for this type. +func (s *storageImageDestination) unlockDigest(d digest.Digest) error { + s.digestLockMutex.Lock() + defer s.digestLockMutex.Unlock() + + locker, ok := s.lockedDigests[d] + if !ok { + return errors.Errorf("trying to unlock non existent lock for digest %q", d) + } + locker.Unlock() + s.unlockedDigests[d] = true return nil } +// UnlockBlob unlocks the blob. +func (s *storageImageDestination) UnlockBlob(b types.BlobInfo) error { + return s.unlockDigest(b.Digest) +} + // PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. diff --git a/types/types.go b/types/types.go index 1be7b87fd1..7114ae116c 100644 --- a/types/types.go +++ b/types/types.go @@ -289,7 +289,11 @@ type ImageDestination interface { Commit(ctx context.Context) error // SupportsBlobLocks indicates whether the ImageDestination supports blob locking. SupportsBlobLocks() bool - // LockBlob can be used to synchronize operations on it (e.g., copying). + // LockBlob is used to synchronize copy operations and is only + // implemented by `storage.storageImageDestination`. If a given + // BlobInfo is locked implies that the data is currently being copied + // the lock owner. This allows for avoiding redudantly downloading the + // same layer from a registry. LockBlob(BlobInfo) error // UnlockBlob unlocks the blob. UnlockBlob(BlobInfo) error diff --git a/vendor.conf b/vendor.conf index 89b29722b9..4557e22db7 100644 --- a/vendor.conf +++ b/vendor.conf @@ -1,7 +1,7 @@ github.com/containers/image github.com/sirupsen/logrus v1.0.0 -github.com/containers/storage v1.12.1 +github.com/containers/storage copy-locks https://github.com/vrothberg/storage github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76 github.com/docker/docker-credential-helpers d68f9aeca33f5fd3f08eeae5e9d175edf4e731d1 github.com/docker/distribution 5f6282db7d65e6d72ad7c2cc66310724a57be716 @@ -47,6 +47,6 @@ github.com/boltdb/bolt master github.com/klauspost/pgzip v1.2.1 github.com/klauspost/compress v1.4.1 github.com/klauspost/cpuid v1.2.0 -github.com/vbauerster/mpb v3.3.4 +github.com/vbauerster/mpb v3.4.0 github.com/mattn/go-isatty v0.0.4 github.com/VividCortex/ewma v1.1.1