diff --git a/copy/copy.go b/copy/copy.go index 539211ad80..fdb9ffdbec 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -43,6 +43,11 @@ type digestingReader struct { // downloads. Let's follow Firefox by limiting it to 6. var maxParallelDownloads = 6 +// progressBarRefreshRate defines the refresh rate of the mpb progress bars. +// 120 ms aligns with the initial upstream defaults. Setting the rate here is +// more idiomatic than using some magic number in the code below. +var progressBarRefreshRate = 120 * time.Millisecond + // newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error // or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest. // (neither is set if EOF is never reached). @@ -318,6 +323,18 @@ func (c *copier) copyOneImage(ctx context.Context, policyContext *signature.Poli // If src.UpdatedImageNeedsLayerDiffIDs(ic.manifestUpdates) will be true, it needs to be true by the time we get here. ic.diffIDsAreNeeded = src.UpdatedImageNeedsLayerDiffIDs(*ic.manifestUpdates) + // Unblock all blobs as a precation to avoid potential deadlocks in the + // presence of errors or even panics. Note that it is safe to unblock + // here as locks are only unblocked if we're the owner of the lock; + // only the first call to UnlockBlob() will effectively unlock the file. + if ic.c.dest.SupportsBlobLocks() { + defer func() { + srcInfos := ic.src.LayerInfos() + for _, blob := range srcInfos { + ic.c.dest.UnlockBlob(blob) + } + }() + } if err := ic.copyLayers(ctx); err != nil { return nil, err } @@ -581,7 +598,7 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte // The caller must eventually call the returned cleanup function after the pool will no longer be updated. func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) { ctx, cancel := context.WithCancel(ctx) - pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx)) + pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx), mpb.WithRefreshRate(progressBarRefreshRate)) return pool, func() { cancel() pool.Wait() @@ -590,7 +607,7 @@ func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) { // createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter // is ioutil.Discard, the progress bar's output will be discarded -func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar { +func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar { // shortDigestLen is the length of the digest used for blobs. const shortDigestLen = 12 @@ -601,15 +618,35 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind prefix = prefix[:maxPrefixLen] } - bar := pool.AddBar(info.Size, - mpb.BarClearOnComplete(), + options := []mpb.BarOption{ mpb.PrependDecorators( decor.Name(prefix), ), - mpb.AppendDecorators( - decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete), - ), - ) + } + options = append(options, barOpts...) + + // Fillers are used for placeholder bars that are later replaced by a + // common progress bar. Hence, only append a counter decorator when + // filler == nil. + if filler == "" { + options = append(options, + mpb.AppendDecorators( + decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " done"), + )) + options = append(options, mpb.BarClearOnComplete()) + } + + var bar *mpb.Bar + if filler != "" { + barFiller := mpb.FillerFunc( + func(w io.Writer, width int, st *decor.Statistics) { + fmt.Fprint(w, filler) + }) + bar = pool.Add(0, barFiller, options...) + } else { + bar = pool.AddBar(info.Size, options...) + } + if c.progressOutput == ioutil.Discard { c.Printf("Copying %s %s\n", kind, info.Digest) } @@ -628,7 +665,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { destInfo, err := func() (types.BlobInfo, error) { // A scope for defer progressPool, progressCleanup := c.newProgressPool(ctx) defer progressCleanup() - bar := c.createProgressBar(progressPool, srcInfo, "config", "done") + bar := c.createProgressBar(progressPool, srcInfo, "config", "") destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) if err != nil { return types.BlobInfo{}, err @@ -659,22 +696,111 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" + // Create the initial placeholder bar which is later being replaced by + // another one. + bar := ic.c.createProgressBar(pool, srcInfo, "blob", " ", mpb.BarRemoveOnComplete()) + // If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source. if !diffIDIsNeeded { - reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + // tryReusingBlob is an anonymous function to avoid adding + // another method with a complex parameter list and semantics. + // It's sole purpose is to simplify the control with an early + // return if the image destination does not blob locking. + tryReusingBlob := func() (bool, types.BlobInfo, digest.Digest, error) { + reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + if err != nil { + return false, types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) + } + if reused { + logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) + bar.SetTotal(0, true) + ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists", mpb.BarReplaceOnComplete(bar)) + return true, blobInfo, cachedDiffID, nil + } + + if !ic.c.dest.SupportsBlobLocks() { + return false, types.BlobInfo{}, "", nil + } + + // Check if another process might be copying the layer at the + // moment. If so, wait for it's completion and try to reuse + // the blob again. + // + logrus.Debugf("Checking if blob %s is already being copied by another process", srcInfo.Digest) + + // LockBlob() does not support try-lock semantics which would + // be a straight-forward mechanism to detect if another process + // is currently copying a specific layer. However, with + // LockBlob() we are constrained by the file locks in + // containers/storage which cannot support try-lock semantics + // as fcntl(2) does not support that. + // + // The trick we are using here is to acquire the lock in a + // separate goroutine, which sets a boolean signal-variable as + // soon as the lock is acquired. The main-routine is then + // sleeping for a certain amount of time. If the lock has not + // been acquired until then, the progress bar will be updated + // with a message indicating that the blob is being copied by + // another process and wait until the signal-variable is set + // true. + acquiredBlobLock := false + go func() { + ic.c.dest.LockBlob(srcInfo) + acquiredBlobLock = true + }() + + if !acquiredBlobLock { + // The initial sleep time is `progressBarRefreshRate` + // to make sure the bar is properly rendered before + // replacing it. + time.Sleep(progressBarRefreshRate) + if !acquiredBlobLock { + logrus.Debugf("Blob %s is already being copied by another process", srcInfo.Digest) + // If we still don't have the lock, replace the + // initial progress bar with a new one + // indicating that the blob is copied by + // another process. + // + // Note that we call SetTotal() after the new + // bar is created to avoid potential flickering + // if the bar removal and replacement happens + // across refresh rates. + toReplaceBar := bar + bar = ic.c.createProgressBar(pool, srcInfo, "blob", "stopped: copied by another process", mpb.BarRemoveOnComplete(), mpb.BarReplaceOnComplete(bar)) + toReplaceBar.SetTotal(0, true) + + for !acquiredBlobLock { + time.Sleep(50 * time.Millisecond) + } + } + } + + reused, blobInfo, err = ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + if err != nil { + return false, types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) + } + + if reused { + logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) + toReplaceBar := bar + ic.c.createProgressBar(pool, srcInfo, "blob", "done: copied by another process", mpb.BarReplaceOnComplete(bar)) + toReplaceBar.SetTotal(0, true) + return true, blobInfo, cachedDiffID, nil + } + + logrus.Debugf("Blob %s is not present: copy operation of other process must have failed", srcInfo.Digest) + return false, types.BlobInfo{}, "", nil + } + + reused, blob, diff, err := tryReusingBlob() if err != nil { - return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) + return types.BlobInfo{}, "", err } if reused { - logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists") - bar.SetTotal(0, true) - return blobInfo, cachedDiffID, nil + return blob, diff, err } } - // - // Fallback: copy the layer, computing the diffID if we need to do so srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache) if err != nil { @@ -682,7 +808,9 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po } defer srcStream.Close() - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") + toReplaceBar := bar + bar = ic.c.createProgressBar(pool, srcInfo, "blob", "", mpb.BarReplaceOnComplete(bar)) + toReplaceBar.SetTotal(0, true) blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) if err != nil { diff --git a/storage/storage_image.go b/storage/storage_image.go index 71ca993871..6769e0b1d1 100644 --- a/storage/storage_image.go +++ b/storage/storage_image.go @@ -64,6 +64,7 @@ type storageImageDestination struct { fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + lock storage.Locker // Lockfile for the blob locking } type storageImageCloser struct { @@ -368,12 +369,12 @@ func (d *storageImageDestination) SupportsBlobLocks() bool { // NOOP for this type. func (s *storageImageDestination) LockBlob(b types.BlobInfo) error { - return nil + return s.imageRef.transport.store.LockDigest(b.Digest) } // NOOP for this type. func (s *storageImageDestination) UnlockBlob(b types.BlobInfo) error { - return nil + return s.imageRef.transport.store.UnlockDigest(b.Digest) } // PutBlob writes contents of stream and returns data representing the result. @@ -653,6 +654,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { } } if layer == "" { + s.UnlockBlob(blob.BlobInfo) return errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) } // Read the layer's contents. @@ -662,6 +664,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { } diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) if err2 != nil { + s.UnlockBlob(blob.BlobInfo) return errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) } // Copy the layer diff to a file. Diff() takes a lock that it holds @@ -672,6 +675,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) if err != nil { diff.Close() + s.UnlockBlob(blob.BlobInfo) return errors.Wrapf(err, "error creating temporary file %q", filename) } // Copy the data to the file. @@ -681,6 +685,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { diff.Close() file.Close() if err != nil { + s.UnlockBlob(blob.BlobInfo) return errors.Wrapf(err, "error storing blob to file %q", filename) } // Make sure that we can find this file later, should we need the layer's @@ -690,6 +695,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { // Read the cached blob and use it as a diff. file, err := os.Open(filename) if err != nil { + s.UnlockBlob(blob.BlobInfo) return errors.Wrapf(err, "error opening file %q", filename) } defer file.Close() @@ -697,9 +703,11 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, nil, file) if err != nil && errors.Cause(err) != storage.ErrDuplicateID { + s.UnlockBlob(blob.BlobInfo) return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) } lastLayer = layer.ID + s.UnlockBlob(blob.BlobInfo) } // If one of those blobs was a configuration blob, then we can try to dig out the date when the image diff --git a/vendor.conf b/vendor.conf index a5ab395625..4557e22db7 100644 --- a/vendor.conf +++ b/vendor.conf @@ -1,7 +1,7 @@ github.com/containers/image github.com/sirupsen/logrus v1.0.0 -github.com/containers/storage master +github.com/containers/storage copy-locks https://github.com/vrothberg/storage github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76 github.com/docker/docker-credential-helpers d68f9aeca33f5fd3f08eeae5e9d175edf4e731d1 github.com/docker/distribution 5f6282db7d65e6d72ad7c2cc66310724a57be716