Skip to content

Commit

Permalink
copy: detect copy operations of other processes
Browse files Browse the repository at this point in the history
Detect copy operations of other processes when copying layers to the
containers-storage.  The blob locks used by the storage image
destination are using file locks from containers/storage.  Those locks
do not support try-lock semantics as they are constrained by the
fcntl(2) syscall.  A try-lock mechanism is needed to detect if another
process is currently holding the lock and is hence copying the layer
in question.  Once we figured that out, we can display the information
on the progress bars to inform the user what is happening.  The
workaround of the missing try-lock semantics is to acquire the lock in a
sepearate goroutine and wait a certain amount of time.  If we still
don't own the lock after the timeout, we are assuming the layer to be
copied by another process.

Unlocking a blob presents another problem as we should only unlock when
we are certain the operation has succeeded, which is *after* the blob
has been committed to the containers storage.  Hence, we unlock as soon
as possible during Commit() of the storage image destination.  To avoid
potential deadlocks during errors or panics, we also unlock all layers
after copyOneImage() as a deferred anonymous function.

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Mar 20, 2019
1 parent bd70b4e commit cb0b5b5
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 21 deletions.
164 changes: 146 additions & 18 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type digestingReader struct {
// downloads. Let's follow Firefox by limiting it to 6.
var maxParallelDownloads = 6

// progressBarRefreshRate defines the refresh rate of the mpb progress bars.
// 120 ms aligns with the initial upstream defaults. Setting the rate here is
// more idiomatic than using some magic number in the code below.
var progressBarRefreshRate = 120 * time.Millisecond

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
// or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest.
// (neither is set if EOF is never reached).
Expand Down Expand Up @@ -318,6 +323,18 @@ func (c *copier) copyOneImage(ctx context.Context, policyContext *signature.Poli
// If src.UpdatedImageNeedsLayerDiffIDs(ic.manifestUpdates) will be true, it needs to be true by the time we get here.
ic.diffIDsAreNeeded = src.UpdatedImageNeedsLayerDiffIDs(*ic.manifestUpdates)

// Unblock all blobs as a precation to avoid potential deadlocks in the
// presence of errors or even panics. Note that it is safe to unblock
// here as locks are only unblocked if we're the owner of the lock;
// only the first call to UnlockBlob() will effectively unlock the file.
if ic.c.dest.SupportsBlobLocks() {
defer func() {
srcInfos := ic.src.LayerInfos()
for _, blob := range srcInfos {
ic.c.dest.UnlockBlob(blob)
}
}()
}
if err := ic.copyLayers(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -581,7 +598,7 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte
// The caller must eventually call the returned cleanup function after the pool will no longer be updated.
func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
ctx, cancel := context.WithCancel(ctx)
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx))
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx), mpb.WithRefreshRate(progressBarRefreshRate))
return pool, func() {
cancel()
pool.Wait()
Expand All @@ -590,7 +607,7 @@ func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

Expand All @@ -601,15 +618,35 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind
prefix = prefix[:maxPrefixLen]
}

bar := pool.AddBar(info.Size,
mpb.BarClearOnComplete(),
options := []mpb.BarOption{
mpb.PrependDecorators(
decor.Name(prefix),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete),
),
)
}
options = append(options, barOpts...)

// Fillers are used for placeholder bars that are later replaced by a
// common progress bar. Hence, only append a counter decorator when
// filler == nil.
if filler == "" {
options = append(options,
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " done"),
))
options = append(options, mpb.BarClearOnComplete())
}

var bar *mpb.Bar
if filler != "" {
barFiller := mpb.FillerFunc(
func(w io.Writer, width int, st *decor.Statistics) {
fmt.Fprint(w, filler)
})
bar = pool.Add(0, barFiller, options...)
} else {
bar = pool.AddBar(info.Size, options...)
}

if c.progressOutput == ioutil.Discard {
c.Printf("Copying %s %s\n", kind, info.Digest)
}
Expand All @@ -628,7 +665,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
bar := c.createProgressBar(progressPool, srcInfo, "config", "")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
if err != nil {
return types.BlobInfo{}, err
Expand Down Expand Up @@ -659,30 +696,121 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

// Create the initial placeholder bar which is later being replaced by
// another one.
bar := ic.c.createProgressBar(pool, srcInfo, "blob", " ", mpb.BarRemoveOnComplete())

// If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source.
if !diffIDIsNeeded {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
// tryReusingBlob is an anonymous function to avoid adding
// another method with a complex parameter list and semantics.
// It's sole purpose is to simplify the control with an early
// return if the image destination does not blob locking.
tryReusingBlob := func() (bool, types.BlobInfo, digest.Digest, error) {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return false, types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar.SetTotal(0, true)
ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists", mpb.BarReplaceOnComplete(bar))
return true, blobInfo, cachedDiffID, nil
}

if !ic.c.dest.SupportsBlobLocks() {
return false, types.BlobInfo{}, "", nil
}

// Check if another process might be copying the layer at the
// moment. If so, wait for it's completion and try to reuse
// the blob again.
//
logrus.Debugf("Checking if blob %s is already being copied by another process", srcInfo.Digest)

// LockBlob() does not support try-lock semantics which would
// be a straight-forward mechanism to detect if another process
// is currently copying a specific layer. However, with
// LockBlob() we are constrained by the file locks in
// containers/storage which cannot support try-lock semantics
// as fcntl(2) does not support that.
//
// The trick we are using here is to acquire the lock in a
// separate goroutine, which sets a boolean signal-variable as
// soon as the lock is acquired. The main-routine is then
// sleeping for a certain amount of time. If the lock has not
// been acquired until then, the progress bar will be updated
// with a message indicating that the blob is being copied by
// another process and wait until the signal-variable is set
// true.
acquiredBlobLock := false
go func() {
ic.c.dest.LockBlob(srcInfo)
acquiredBlobLock = true
}()

if !acquiredBlobLock {
// The initial sleep time is `progressBarRefreshRate`
// to make sure the bar is properly rendered before
// replacing it.
time.Sleep(progressBarRefreshRate)
if !acquiredBlobLock {
logrus.Debugf("Blob %s is already being copied by another process", srcInfo.Digest)
// If we still don't have the lock, replace the
// initial progress bar with a new one
// indicating that the blob is copied by
// another process.
//
// Note that we call SetTotal() after the new
// bar is created to avoid potential flickering
// if the bar removal and replacement happens
// across refresh rates.
toReplaceBar := bar
bar = ic.c.createProgressBar(pool, srcInfo, "blob", "stopped: copied by another process", mpb.BarRemoveOnComplete(), mpb.BarReplaceOnComplete(bar))
toReplaceBar.SetTotal(0, true)

for !acquiredBlobLock {
time.Sleep(50 * time.Millisecond)
}
}
}

reused, blobInfo, err = ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return false, types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}

if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
toReplaceBar := bar
ic.c.createProgressBar(pool, srcInfo, "blob", "done: copied by another process", mpb.BarReplaceOnComplete(bar))
toReplaceBar.SetTotal(0, true)
return true, blobInfo, cachedDiffID, nil
}

logrus.Debugf("Blob %s is not present: copy operation of other process must have failed", srcInfo.Digest)
return false, types.BlobInfo{}, "", nil
}

reused, blob, diff, err := tryReusingBlob()
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
return types.BlobInfo{}, "", err
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)
return blobInfo, cachedDiffID, nil
return blob, diff, err
}
}

//

// Fallback: copy the layer, computing the diffID if we need to do so
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
defer srcStream.Close()

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")
toReplaceBar := bar
bar = ic.c.createProgressBar(pool, srcInfo, "blob", "", mpb.BarReplaceOnComplete(bar))
toReplaceBar.SetTotal(0, true)

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type storageImageDestination struct {
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
lock storage.Locker // Lockfile for the blob locking
}

type storageImageCloser struct {
Expand Down Expand Up @@ -368,12 +369,12 @@ func (d *storageImageDestination) SupportsBlobLocks() bool {

// NOOP for this type.
func (s *storageImageDestination) LockBlob(b types.BlobInfo) error {
return nil
return s.imageRef.transport.store.LockDigest(b.Digest)
}

// NOOP for this type.
func (s *storageImageDestination) UnlockBlob(b types.BlobInfo) error {
return nil
return s.imageRef.transport.store.UnlockDigest(b.Digest)
}

// PutBlob writes contents of stream and returns data representing the result.
Expand Down Expand Up @@ -653,6 +654,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error {
}
}
if layer == "" {
s.UnlockBlob(blob.BlobInfo)
return errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest)
}
// Read the layer's contents.
Expand All @@ -662,6 +664,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error {
}
diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions)
if err2 != nil {
s.UnlockBlob(blob.BlobInfo)
return errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest)
}
// Copy the layer diff to a file. Diff() takes a lock that it holds
Expand All @@ -672,6 +675,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600)
if err != nil {
diff.Close()
s.UnlockBlob(blob.BlobInfo)
return errors.Wrapf(err, "error creating temporary file %q", filename)
}
// Copy the data to the file.
Expand All @@ -681,6 +685,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error {
diff.Close()
file.Close()
if err != nil {
s.UnlockBlob(blob.BlobInfo)
return errors.Wrapf(err, "error storing blob to file %q", filename)
}
// Make sure that we can find this file later, should we need the layer's
Expand All @@ -690,16 +695,19 @@ func (s *storageImageDestination) Commit(ctx context.Context) error {
// Read the cached blob and use it as a diff.
file, err := os.Open(filename)
if err != nil {
s.UnlockBlob(blob.BlobInfo)
return errors.Wrapf(err, "error opening file %q", filename)
}
defer file.Close()
// Build the new layer using the diff, regardless of where it came from.
// TODO: This can take quite some time, and should ideally be cancellable using ctx.Done().
layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, nil, file)
if err != nil && errors.Cause(err) != storage.ErrDuplicateID {
s.UnlockBlob(blob.BlobInfo)
return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest)
}
lastLayer = layer.ID
s.UnlockBlob(blob.BlobInfo)
}

// If one of those blobs was a configuration blob, then we can try to dig out the date when the image
Expand Down
2 changes: 1 addition & 1 deletion vendor.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/containers/image

github.com/sirupsen/logrus v1.0.0
github.com/containers/storage master
github.com/containers/storage copy-locks https://github.com/vrothberg/storage
github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76
github.com/docker/docker-credential-helpers d68f9aeca33f5fd3f08eeae5e9d175edf4e731d1
github.com/docker/distribution 5f6282db7d65e6d72ad7c2cc66310724a57be716
Expand Down

0 comments on commit cb0b5b5

Please sign in to comment.