Skip to content

Commit

Permalink
copy: detect copy operations of other processes
Browse files Browse the repository at this point in the history
Detect copy operations of other processes when copying layers to the
containers-storage.  The blob locks used by the storage image
destination are using file locks from containers/storage.  Those locks
do not support try-lock semantics as they are constrained by the
fcntl(2) syscall.  A try-lock mechanism is needed to detect if another
process is currently holding the lock and is hence copying the layer
in question.  Once we figured that out, we can display the information
on the progress bars to inform the user what is happening.  The
workaround of the missing try-lock semantics is to acquire the lock in a
sepearate goroutine and wait a certain amount of time.  If we still
don't own the lock after the timeout, we are assuming the layer to be
copied by another process.

Unlocking a blob presents another problem as we should only unlock when
we are certain the operation has succeeded, which is *after* the blob
has been committed to the containers storage.  Hence, we unlock as soon
as possible during Commit() of the storage image destination.  To avoid
potential deadlocks during errors or panics, we also unlock all unlocked
locks during (storageImageDestination).Close() which is invoked in a
deferred call in copy.Image.

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Apr 2, 2019
1 parent 67e4133 commit 5e5e204
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 41 deletions.
186 changes: 164 additions & 22 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type digestingReader struct {
// downloads. Let's follow Firefox by limiting it to 6.
var maxParallelDownloads = 6

// progressBarRefreshRate defines the refresh rate of the mpb progress bars.
// 120 ms aligns with the initial upstream defaults. Setting the rate here is
// more idiomatic than using some magic number in the code below.
var progressBarRefreshRate = 120 * time.Millisecond

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
// or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest.
// (neither is set if EOF is never reached).
Expand Down Expand Up @@ -467,11 +472,9 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
copySemaphore = semaphore.NewWeighted(int64(1))
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) {
copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress, cld *copyLayerData) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 {
// DiffIDs are, currently, needed only when converting from schema1.
// In which case src.LayerInfos will not have URLs because schema1
Expand All @@ -485,16 +488,26 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool)
}
data[index] = cld
}

// digestToCopyData maps a digest to a corresponding copyLayerData and
// avoids to redundantly copy the same layer.
digestToCopyData := make(map[digest.Digest]*copyLayerData)
func() { // A scope for defer
progressPool, progressCleanup := ic.c.newProgressPool(ctx)
defer progressCleanup()

for i, srcLayer := range srcInfos {
cld, ok := digestToCopyData[srcLayer.Digest]
if ok {
// digest is already being copied
copyGroup.Done()
continue
}
copySemaphore.Acquire(ctx, 1)
go copyLayerHelper(i, srcLayer, progressPool)
cld = &copyLayerData{}
digestToCopyData[srcLayer.Digest] = cld
go copyLayerHelper(i, srcLayer, progressPool, cld)
}

// Wait for all layers to be copied
Expand All @@ -503,7 +516,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {

destInfos := make([]types.BlobInfo, numLayers)
diffIDs := make([]digest.Digest, numLayers)
for i, cld := range data {
for i, srcLayer := range srcInfos {
cld, ok := digestToCopyData[srcLayer.Digest]
if !ok {
return fmt.Errorf("no copy data found for layer %q", srcLayer.Digest)
}
if cld.err != nil {
return cld.err
}
Expand Down Expand Up @@ -577,16 +594,25 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte
// The caller must eventually call the returned cleanup function after the pool will no longer be updated.
func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
ctx, cancel := context.WithCancel(ctx)
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx))
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx), mpb.WithRefreshRate(progressBarRefreshRate))
return pool, func() {
cancel()
pool.Wait()
}
}

// createReplacementProgressBar is wrapper around createProgressBar and creates
// a progress bar to replace the `toReplace` bar.
func (c *copier) createReplacementProgressBar(pool *mpb.Progress, toReplace *mpb.Bar, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar {
barOpts = append(barOpts, mpb.BarReplaceOnComplete(toReplace))
bar := c.createProgressBar(pool, info, kind, filler, barOpts...)
toReplace.SetTotal(0, true)
return bar
}

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

Expand All @@ -597,15 +623,35 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind
prefix = prefix[:maxPrefixLen]
}

bar := pool.AddBar(info.Size,
mpb.BarClearOnComplete(),
options := []mpb.BarOption{
mpb.PrependDecorators(
decor.Name(prefix),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete),
),
)
}
options = append(options, barOpts...)

// Fillers are used for placeholder bars that are later replaced by a
// common progress bar. Hence, only append a counter decorator when
// filler == "".
if filler == "" {
options = append(options,
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " done"),
))
options = append(options, mpb.BarClearOnComplete())
}

var bar *mpb.Bar
if filler != "" {
barFiller := mpb.FillerFunc(
func(w io.Writer, width int, st *decor.Statistics) {
fmt.Fprint(w, filler)
})
bar = pool.Add(0, barFiller, options...)
} else {
bar = pool.AddBar(info.Size, options...)
}

if c.progressOutput == ioutil.Discard {
c.Printf("Copying %s %s\n", kind, info.Digest)
}
Expand All @@ -624,7 +670,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
bar := c.createProgressBar(progressPool, srcInfo, "config", "")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
if err != nil {
return types.BlobInfo{}, err
Expand All @@ -649,23 +695,119 @@ type diffIDResult struct {
err error
}

// tryReusingBlob is wrapper around the destination's TryReusingBlob() method.
// If the blob is present, it returns immediately. Otherwise, if the
// destination supports blob locking, it will acquire the blob lock and perform
// another call to TryReusingBlob() to see if another process has already
// copied the blob.
func (ic *imageCopier) tryReusingBlob(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress, bar *mpb.Bar) (bool, types.BlobInfo, error) {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return false, types.BlobInfo{}, errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
newBar := ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "skipped: already exists")
newBar.SetTotal(0, true)
return true, blobInfo, nil
}

if !ic.c.dest.SupportsBlobLocks() {
return false, types.BlobInfo{}, nil
}

// Check if another process might be copying the layer at the moment.
// If so, wait for it's completion and try to reuse the blob again.
logrus.Debugf("Checking if blob %s is already being copied by another process", srcInfo.Digest)

// LockBlob() does not support try-lock semantics which would be a
// straight-forward mechanism to detect if another process is currently
// copying a specific layer.
//
// The trick we are using here is to acquire the lock in a separate
// goroutine and use a channel for signaling when the lock has been
// acquired. To detect if this goroutine is blocking on the lock, we
// spin up a second one sending a message via another channel after a
// time out.
chanDone := make(chan bool, 1)
chanBlockedByAnotherProcess := make(chan bool, 1)

var lockError error
// Acquire the blob lock
go func() {
lockError = ic.c.dest.LockBlob(srcInfo)
chanDone <- true
}()
// Spin up the timer
go func() {
// The initial sleep time is `progressBarRefreshRate`
// to make sure the bar is properly rendered before
// replacing it.
time.Sleep(progressBarRefreshRate)
chanBlockedByAnotherProcess <- true
}()

copiedByAnotherProcess := false
select {
case <-chanDone:
break
case <-chanBlockedByAnotherProcess:
logrus.Debugf("Blob %s is already being copied by another process", srcInfo.Digest)
copiedByAnotherProcess = true
bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "paused: being copied by another process", mpb.BarRemoveOnComplete())
// Wait until we acquired the blob lock
<-chanDone
}

if lockError != nil {
return false, types.BlobInfo{}, errors.Wrapf(err, "Error acquiring lock for blob %s", srcInfo.Digest)
}

// If the blob has not been copied by another process, we need to copy
// it on our own and can return immediately.
if !copiedByAnotherProcess {
logrus.Debugf("Blob %s is not being copied by another process", srcInfo.Digest)
return false, types.BlobInfo{}, nil
}

// If the blob has been copied by another process, it
// must be present in the in the destintation and be
// available for reuse.
reused, blobInfo, err = ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return false, types.BlobInfo{}, errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}

if reused {
ic.c.dest.UnlockBlob(srcInfo)
logrus.Debugf("Blob %s has been successfully copied by another process", srcInfo.Digest)
bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "done: copied by another process")
bar.SetTotal(0, true)
return true, blobInfo, nil
}

logrus.Debugf("Blob %s is not present: copy operation of other process must have failed", srcInfo.Digest)
return false, types.BlobInfo{}, nil
}

// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

// Create the initial placeholder bar which is later being replaced by
// another one.
bar := ic.c.createProgressBar(pool, srcInfo, "blob", " ", mpb.BarRemoveOnComplete())

// If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source.
if !diffIDIsNeeded {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
reused, blobInfo, err := ic.tryReusingBlob(ctx, srcInfo, pool, bar)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
return types.BlobInfo{}, "", err
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)
return blobInfo, cachedDiffID, nil
return blobInfo, cachedDiffID, err
}
}

Expand All @@ -676,7 +818,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
}
defer srcStream.Close()

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")
bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "")

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar)
if err != nil {
Expand Down
76 changes: 60 additions & 16 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type storageImageDestination struct {
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice

// data for {Lock,Unlock}Blob
lockedDigests map[digest.Digest]storage.Locker // Bookkeeping of digests that have been locked
unlockedDigests map[digest.Digest]bool // Bookkeeping of digests that have been unlocked. Required to unlock yet unlocked digests in Close()
digestLockMutex sync.Mutex // Serializes accesses to lockedDigests
}

type storageImageCloser struct {
Expand Down Expand Up @@ -323,12 +328,14 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e
return nil, errors.Wrapf(err, "error creating a temporary directory")
}
image := &storageImageDestination{
imageRef: imageRef,
directory: directory,
blobDiffIDs: make(map[digest.Digest]digest.Digest),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
SignatureSizes: []int{},
imageRef: imageRef,
directory: directory,
blobDiffIDs: make(map[digest.Digest]digest.Digest),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
lockedDigests: make(map[digest.Digest]storage.Locker),
unlockedDigests: make(map[digest.Digest]bool),
SignatureSizes: []int{},
}
return image, nil
}
Expand All @@ -339,8 +346,14 @@ func (s *storageImageDestination) Reference() types.ImageReference {
return s.imageRef
}

// Close cleans up the temporary directory.
// Close cleans up the temporary directory and unlocks all unlocked digest locks and removes them from `lockedDigests`.
func (s *storageImageDestination) Close() error {
// Make sure that all lockes are unlocked to avoid potential deadlocks.
for digest, unlocked := range s.unlockedDigests {
if !unlocked {
s.unlockDigest(digest)
}
}
return os.RemoveAll(s.directory)
}

Expand All @@ -362,25 +375,56 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool {

// SupportsBlobLocks indicates whether the ImageDestination supports blob
// locking.
func (d *storageImageDestination) SupportsBlobLocks() bool {
return false
func (s *storageImageDestination) SupportsBlobLocks() bool {
return true
}

// LockBlob can be used to synchronize operations on it (e.g., copying).
// Note that any unlocked lock will be unlocked in Close().
func (s *storageImageDestination) LockBlob(b types.BlobInfo) error {
// NOOP for this type.
locker, err := func() (storage.Locker, error) {
// anonymous function defer the Unlock() and not worry about
// error paths
s.digestLockMutex.Lock()
defer s.digestLockMutex.Unlock()
if locker, ok := s.lockedDigests[b.Digest]; ok {
return locker, nil
}
locker, err := s.imageRef.transport.store.GetDigestLock(b.Digest)
if err != nil {
return nil, err
}
s.lockedDigests[b.Digest] = locker
return locker, nil
}()
if err != nil {
return err
}
locker.Lock()
s.digestLockMutex.Lock()
s.unlockedDigests[b.Digest] = false
s.digestLockMutex.Unlock()
return nil
}

// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob()
// multiple times. Only the first call is unlocking the blob. This is
// required to unlock a blob in the presence of errors or panics during copy
// operations.
func (s *storageImageDestination) UnlockBlob(b types.BlobInfo) error {
// NOOP for this type.
func (s *storageImageDestination) unlockDigest(d digest.Digest) error {
s.digestLockMutex.Lock()
defer s.digestLockMutex.Unlock()

locker, ok := s.lockedDigests[d]
if !ok {
return errors.Errorf("trying to unlock non existent lock for digest %q", d)
}
locker.Unlock()
s.unlockedDigests[d] = true
return nil
}

// UnlockBlob unlocks the blob.
func (s *storageImageDestination) UnlockBlob(b types.BlobInfo) error {
return s.unlockDigest(b.Digest)
}

// PutBlob writes contents of stream and returns data representing the result.
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.
Expand Down
Loading

0 comments on commit 5e5e204

Please sign in to comment.