Skip to content

Commit

Permalink
copy: detect copy operations of other processes
Browse files Browse the repository at this point in the history
Detect copy operations of other processes when copying layers to the
containers-storage.  The blob locks used by the storage image
destination are using file locks from containers/storage.  Those locks
do not support try-lock semantics as they are constrained by the
fcntl(2) syscall.  A try-lock mechanism is needed to detect if another
process is currently holding the lock and is hence copying the layer
in question.  Once we figured that out, we can display the information
on the progress bars to inform the user what is happening.  The
workaround of the missing try-lock semantics is to acquire the lock in a
sepearate goroutine and wait a certain amount of time.  If we still
don't own the lock after the timeout, we are assuming the layer to be
copied by another process.

Unlocking a blob presents another problem as we should only unlock when
we are certain the operation has succeeded, which is *after* the blob
has been committed to the containers storage.  Hence, we unlock as soon
as possible during Commit() of the storage image destination.  To avoid
potential deadlocks during errors or panics, we also unlock all unlocked
locks during (storageImageDestination).Close() which is invoked in a
deferred call in copy.Image.

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Mar 22, 2019
1 parent 5d59830 commit 15a682b
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 38 deletions.
164 changes: 148 additions & 16 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ var maxParallelDownloads = int64(6)
// layers trying to attack us.
var maxGoroutines = int64(24)

// progressBarRefreshRate defines the refresh rate of the mpb progress bars.
// 120 ms aligns with the initial upstream defaults. Setting the rate here is
// more idiomatic than using some magic number in the code below.
var progressBarRefreshRate = 120 * time.Millisecond

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
// or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest.
// (neither is set if EOF is never reached).
Expand Down Expand Up @@ -586,7 +591,7 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte
// The caller must eventually call the returned cleanup function after the pool will no longer be updated.
func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
ctx, cancel := context.WithCancel(ctx)
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx))
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx), mpb.WithRefreshRate(progressBarRefreshRate))
return pool, func() {
cancel()
pool.Wait()
Expand All @@ -595,7 +600,7 @@ func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

Expand All @@ -606,15 +611,35 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind
prefix = prefix[:maxPrefixLen]
}

bar := pool.AddBar(info.Size,
mpb.BarClearOnComplete(),
options := []mpb.BarOption{
mpb.PrependDecorators(
decor.Name(prefix),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete),
),
)
}
options = append(options, barOpts...)

// Fillers are used for placeholder bars that are later replaced by a
// common progress bar. Hence, only append a counter decorator when
// filler == nil.
if filler == "" {
options = append(options,
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " done"),
))
options = append(options, mpb.BarClearOnComplete())
}

var bar *mpb.Bar
if filler != "" {
barFiller := mpb.FillerFunc(
func(w io.Writer, width int, st *decor.Statistics) {
fmt.Fprint(w, filler)
})
bar = pool.Add(0, barFiller, options...)
} else {
bar = pool.AddBar(info.Size, options...)
}

if c.progressOutput == ioutil.Discard {
c.Printf("Copying %s %s\n", kind, info.Digest)
}
Expand All @@ -633,7 +658,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
bar := c.createProgressBar(progressPool, srcInfo, "config", "")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
if err != nil {
return types.BlobInfo{}, err
Expand Down Expand Up @@ -664,17 +689,122 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

// Create the initial placeholder bar which is later being replaced by
// another one.
bar := ic.c.createProgressBar(pool, srcInfo, "blob", " ", mpb.BarRemoveOnComplete())

// If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source.
if !diffIDIsNeeded {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
// tryReusingBlob is an anonymous function to avoid adding
// another method with a complex parameter list and semantics.
// It's sole purpose is to simplify the control with an early
// return if the image destination does not blob locking.
tryReusingBlob := func() (bool, types.BlobInfo, digest.Digest, error) {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return false, types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar.SetTotal(0, true)
ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists", mpb.BarReplaceOnComplete(bar))
return true, blobInfo, cachedDiffID, nil
}

if !ic.c.dest.SupportsBlobLocks() {
return false, types.BlobInfo{}, "", nil
}

// Check if another process might be copying the layer at the
// moment. If so, wait for it's completion and try to reuse
// the blob again.
//
logrus.Debugf("Checking if blob %s is already being copied by another process", srcInfo.Digest)

// LockBlob() does not support try-lock semantics which would
// be a straight-forward mechanism to detect if another process
// is currently copying a specific layer. However, with
// LockBlob() we are constrained by the file locks in
// containers/storage which cannot support try-lock semantics
// as fcntl(2) does not support that.
//
// The trick we are using here is to acquire the lock in a
// separate goroutine. The main routine first sleeps for
// the progressBarRefreshRate amount of time and will be
// signaled via a semaphore if and when the blob lock has
// been acquired.
blobLockSem := semaphore.NewWeighted(1)
blobLockSem.Acquire(ctx, 1)
var lockError error
go func() {
lockError = ic.c.dest.LockBlob(srcInfo)
blobLockSem.Release(1)
}()

copiedByAnotherProcess := false
if !blobLockSem.TryAcquire(1) {
// The initial sleep time is `progressBarRefreshRate`
// to make sure the bar is properly rendered before
// replacing it.
time.Sleep(progressBarRefreshRate)
if !blobLockSem.TryAcquire(1) {
logrus.Debugf("Blob %s is already being copied by another process", srcInfo.Digest)
copiedByAnotherProcess = true
// If we still don't have the lock, replace the
// initial progress bar with a new one
// indicating that the blob is copied by
// another process.
//
// Note that we call SetTotal() after the new
// bar is created to avoid potential flickering
// if the bar removal and replacement happens
// across refresh rates.
toReplaceBar := bar
bar = ic.c.createProgressBar(pool, srcInfo, "blob", "paused: being copied by another process", mpb.BarRemoveOnComplete(), mpb.BarReplaceOnComplete(bar))
toReplaceBar.SetTotal(0, true)
// block until we acquired the blob lock
blobLockSem.Acquire(ctx, 1)
}
}

if lockError != nil {
return false, types.BlobInfo{}, "", errors.Wrapf(err, "Error acquiring lock for blob %s", srcInfo.Digest)
}

// If the blob has not been copied by another process
// return immediately.
if !copiedByAnotherProcess {
logrus.Debugf("Blob %s is not being copied by another process", srcInfo.Digest)
return false, types.BlobInfo{}, "", nil
}

// If the blob has been copied by another process, it
// must be present in the in the destintation and be
// available for reuse.
reused, blobInfo, err = ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return false, types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}

if reused {
ic.c.dest.UnlockBlob(srcInfo)
logrus.Debugf("Blob %s has been successfully copied by another process", srcInfo.Digest)
toReplaceBar := bar
ic.c.createProgressBar(pool, srcInfo, "blob", "done: copied by another process", mpb.BarReplaceOnComplete(bar))
toReplaceBar.SetTotal(0, true)
return true, blobInfo, cachedDiffID, nil
}

logrus.Debugf("Blob %s is not present: copy operation of other process must have failed", srcInfo.Digest)
return false, types.BlobInfo{}, "", nil
}

reused, blob, diff, err := tryReusingBlob()
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
return types.BlobInfo{}, "", err
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)
return blobInfo, cachedDiffID, nil
return blob, diff, err
}
}

Expand All @@ -685,7 +815,9 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
}
defer srcStream.Close()

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")
toReplaceBar := bar
bar = ic.c.createProgressBar(pool, srcInfo, "blob", "", mpb.BarReplaceOnComplete(bar))
toReplaceBar.SetTotal(0, true)

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar)
if err != nil {
Expand Down
81 changes: 61 additions & 20 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,19 @@ type storageImageSource struct {
}

type storageImageDestination struct {
imageRef storageReference
directory string // Temporary directory where we store blobs until Commit() time
nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs
manifest []byte // Manifest contents, temporary
signatures []byte // Signature contents, temporary
putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions
blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
imageRef storageReference
directory string // Temporary directory where we store blobs until Commit() time
nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs
manifest []byte // Manifest contents, temporary
signatures []byte // Signature contents, temporary
putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions
blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
lock storage.Locker // Lockfile for the blob locking
lockedDigests map[digest.Digest]storage.Locker // Bookkeeping of locked digests that have not yet been unlocked
digestLockMutex sync.Mutex // Serializes accesses to lockedDigests
}

type storageImageCloser struct {
Expand Down Expand Up @@ -328,6 +331,7 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e
blobDiffIDs: make(map[digest.Digest]digest.Digest),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
lockedDigests: make(map[digest.Digest]storage.Locker),
SignatureSizes: []int{},
}
return image, nil
Expand All @@ -339,8 +343,12 @@ func (s *storageImageDestination) Reference() types.ImageReference {
return s.imageRef
}

// Close cleans up the temporary directory.
// Close cleans up the temporary directory and unlocks all unlocked digest locks.
func (s *storageImageDestination) Close() error {
for digest := range s.lockedDigests {
// It's safe to delete entries while iterating over the map.
s.unlockDigest(digest)
}
return os.RemoveAll(s.directory)
}

Expand All @@ -362,25 +370,55 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool {

// SupportsBlobLocks indicates whether the ImageDestination supports blob
// locking.
func (d *storageImageDestination) SupportsBlobLocks() bool {
return false
func (s *storageImageDestination) SupportsBlobLocks() bool {
return true
}

// LockBlob can be used to synchronize operations on it (e.g., copying).
// Note that any unlocked lock will be unlocked in Close().
func (s *storageImageDestination) LockBlob(b types.BlobInfo) error {
// NOOP for this type.
// anonymous function to not worry about errors paths and panics
getLocker := func() (storage.Locker, error) {
s.digestLockMutex.Lock()
defer s.digestLockMutex.Unlock()
if locker, ok := s.lockedDigests[b.Digest]; ok {
return locker, nil
}
locker, err := s.imageRef.transport.store.GetDigestLock(b.Digest)
if err != nil {
return locker, err
}
s.lockedDigests[b.Digest] = locker
return locker, nil
}

locker, err := getLocker()
if err != nil {
return err
}
locker.Lock()

return nil
}

// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob()
// multiple times. Only the first call is unlocking the blob. This is
// required to unlock a blob in the presence of errors or panics during copy
// operations.
func (s *storageImageDestination) UnlockBlob(b types.BlobInfo) error {
// NOOP for this type.
func (s *storageImageDestination) unlockDigest(d digest.Digest) error {
s.digestLockMutex.Lock()
defer s.digestLockMutex.Unlock()

locker, ok := s.lockedDigests[d]
if !ok {
return errors.Errorf("trying to unlock non existent lock for digest %q", d)
}
defer delete(s.lockedDigests, d)
locker.Unlock()
return nil
}

// UnlockBlob unlocks the blob.
func (s *storageImageDestination) UnlockBlob(b types.BlobInfo) error {
return s.unlockDigest(b.Digest)
}

// PutBlob writes contents of stream and returns data representing the result.
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.
Expand Down Expand Up @@ -705,6 +743,9 @@ func (s *storageImageDestination) Commit(ctx context.Context) error {
return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest)
}
lastLayer = layer.ID
if err := s.UnlockBlob(blob.BlobInfo); err != nil {
return err
}
}

// If one of those blobs was a configuration blob, then we can try to dig out the date when the image
Expand Down
4 changes: 2 additions & 2 deletions vendor.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/containers/image

github.com/sirupsen/logrus v1.0.0
github.com/containers/storage v1.12.1
github.com/containers/storage copy-locks https://github.com/vrothberg/storage
github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76
github.com/docker/docker-credential-helpers d68f9aeca33f5fd3f08eeae5e9d175edf4e731d1
github.com/docker/distribution 5f6282db7d65e6d72ad7c2cc66310724a57be716
Expand Down Expand Up @@ -47,6 +47,6 @@ github.com/boltdb/bolt master
github.com/klauspost/pgzip v1.2.1
github.com/klauspost/compress v1.4.1
github.com/klauspost/cpuid v1.2.0
github.com/vbauerster/mpb v3.3.4
github.com/vbauerster/mpb v3.4.0
github.com/mattn/go-isatty v0.0.4
github.com/VividCortex/ewma v1.1.1

0 comments on commit 15a682b

Please sign in to comment.