Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not merge - Copy detection #607

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 164 additions & 22 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type digestingReader struct {
// downloads. Let's follow Firefox by limiting it to 6.
var maxParallelDownloads = 6

// progressBarRefreshRate defines the refresh rate of the mpb progress bars.
// 120 ms aligns with the initial upstream defaults. Setting the rate here is
// more idiomatic than using some magic number in the code below.
var progressBarRefreshRate = 120 * time.Millisecond

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
// or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest.
// (neither is set if EOF is never reached).
Expand Down Expand Up @@ -467,11 +472,9 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
copySemaphore = semaphore.NewWeighted(int64(1))
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) {
copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress, cld *copyLayerData) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 {
// DiffIDs are, currently, needed only when converting from schema1.
// In which case src.LayerInfos will not have URLs because schema1
Expand All @@ -485,16 +488,26 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool)
}
data[index] = cld
}

// digestToCopyData maps a digest to a corresponding copyLayerData and
// avoids to redundantly copy the same layer.
digestToCopyData := make(map[digest.Digest]*copyLayerData)
func() { // A scope for defer
progressPool, progressCleanup := ic.c.newProgressPool(ctx)
defer progressCleanup()

for i, srcLayer := range srcInfos {
cld, ok := digestToCopyData[srcLayer.Digest]
if ok {
// digest is already being copied
copyGroup.Done()
continue
}
copySemaphore.Acquire(ctx, 1)
go copyLayerHelper(i, srcLayer, progressPool)
cld = &copyLayerData{}
digestToCopyData[srcLayer.Digest] = cld
go copyLayerHelper(i, srcLayer, progressPool, cld)
}

// Wait for all layers to be copied
Expand All @@ -503,7 +516,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {

destInfos := make([]types.BlobInfo, numLayers)
diffIDs := make([]digest.Digest, numLayers)
for i, cld := range data {
for i, srcLayer := range srcInfos {
cld, ok := digestToCopyData[srcLayer.Digest]
if !ok {
return fmt.Errorf("no copy data found for layer %q", srcLayer.Digest)
}
if cld.err != nil {
return cld.err
}
Expand Down Expand Up @@ -577,16 +594,25 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte
// The caller must eventually call the returned cleanup function after the pool will no longer be updated.
func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
ctx, cancel := context.WithCancel(ctx)
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx))
pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx), mpb.WithRefreshRate(progressBarRefreshRate))
return pool, func() {
cancel()
pool.Wait()
}
}

// createReplacementProgressBar is wrapper around createProgressBar and creates
// a progress bar to replace the `toReplace` bar.
func (c *copier) createReplacementProgressBar(pool *mpb.Progress, toReplace *mpb.Bar, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar {
barOpts = append(barOpts, mpb.BarReplaceOnComplete(toReplace))
bar := c.createProgressBar(pool, info, kind, filler, barOpts...)
toReplace.SetTotal(0, true)
return bar
}

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

Expand All @@ -597,15 +623,35 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind
prefix = prefix[:maxPrefixLen]
}

bar := pool.AddBar(info.Size,
mpb.BarClearOnComplete(),
options := []mpb.BarOption{
mpb.PrependDecorators(
decor.Name(prefix),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete),
),
)
}
options = append(options, barOpts...)

// Fillers are used for placeholder bars that are later replaced by a
// common progress bar. Hence, only append a counter decorator when
// filler == "".
if filler == "" {
options = append(options,
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " done"),
))
options = append(options, mpb.BarClearOnComplete())
}

var bar *mpb.Bar
if filler != "" {
barFiller := mpb.FillerFunc(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(BTW, if I understand the idea correctly, we don’t need to replace the bars only to change the message; the FillerFunc could just fmt.Fprint(*someVariable) and we could update the variable (+ locking in both the reader and writer).

That does not look like an improvement right now, but if the logic moved into storageImageDestination.TryReuseBlob, it could make the API to update the progress bar with status messages notably easier.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's possible to do that but this would not work when replacing a "placeholder" with the real progress bar, which does not specify a filler. I prefer to keep it as is (now with a dedicated method to create a replacement bar).

Copy link
Collaborator

@mtrmac mtrmac Apr 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the real progress bar can be a different animal.

The fmt.Fprintf(*someVariable) approach seems attractive in that it would give GetBlob (for status about mirrors) and TryReusingBlob (for all of this code, if we manage to move it into the transport) a pretty nice way to report progress: we could give them a

type UserProgressNotifier interface {
   Update(string message)
}

func(w io.Writer, width int, st *decor.Statistics) {
fmt.Fprint(w, filler)
})
bar = pool.Add(0, barFiller, options...)
} else {
bar = pool.AddBar(info.Size, options...)
}

if c.progressOutput == ioutil.Discard {
c.Printf("Copying %s %s\n", kind, info.Digest)
}
Expand All @@ -624,7 +670,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
bar := c.createProgressBar(progressPool, srcInfo, "config", "")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
if err != nil {
return types.BlobInfo{}, err
Expand All @@ -649,23 +695,119 @@ type diffIDResult struct {
err error
}

// tryReusingBlob is wrapper around the destination's TryReusingBlob() method.
// If the blob is present, it returns immediately. Otherwise, if the
// destination supports blob locking, it will acquire the blob lock and perform
// another call to TryReusingBlob() to see if another process has already
// copied the blob.
func (ic *imageCopier) tryReusingBlob(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress, bar *mpb.Bar) (bool, types.BlobInfo, error) {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return false, types.BlobInfo{}, errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
newBar := ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "skipped: already exists")
newBar.SetTotal(0, true)
return true, blobInfo, nil
}

if !ic.c.dest.SupportsBlobLocks() {
return false, types.BlobInfo{}, nil
}

// Check if another process might be copying the layer at the moment.
// If so, wait for it's completion and try to reuse the blob again.
logrus.Debugf("Checking if blob %s is already being copied by another process", srcInfo.Digest)

// LockBlob() does not support try-lock semantics which would be a
// straight-forward mechanism to detect if another process is currently
// copying a specific layer.
//
// The trick we are using here is to acquire the lock in a separate
// goroutine and use a channel for signaling when the lock has been
// acquired. To detect if this goroutine is blocking on the lock, we
// spin up a second one sending a message via another channel after a
// time out.
chanDone := make(chan bool, 1)
chanBlockedByAnotherProcess := make(chan bool, 1)

var lockError error
// Acquire the blob lock
go func() {
lockError = ic.c.dest.LockBlob(srcInfo)
chanDone <- true
}()
// Spin up the timer
go func() {
// The initial sleep time is `progressBarRefreshRate`
// to make sure the bar is properly rendered before
// replacing it.
time.Sleep(progressBarRefreshRate)
chanBlockedByAnotherProcess <- true
}()

copiedByAnotherProcess := false
select {
case <-chanDone:
break
case <-chanBlockedByAnotherProcess:
logrus.Debugf("Blob %s is already being copied by another process", srcInfo.Digest)
copiedByAnotherProcess = true
bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "paused: being copied by another process", mpb.BarRemoveOnComplete())
// Wait until we acquired the blob lock
<-chanDone
}

if lockError != nil {
return false, types.BlobInfo{}, errors.Wrapf(err, "Error acquiring lock for blob %s", srcInfo.Digest)
}

// If the blob has not been copied by another process, we need to copy
// it on our own and can return immediately.
if !copiedByAnotherProcess {
logrus.Debugf("Blob %s is not being copied by another process", srcInfo.Digest)
return false, types.BlobInfo{}, nil
}

// If the blob has been copied by another process, it
// must be present in the in the destintation and be
// available for reuse.
reused, blobInfo, err = ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return false, types.BlobInfo{}, errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}

if reused {
ic.c.dest.UnlockBlob(srcInfo)
logrus.Debugf("Blob %s has been successfully copied by another process", srcInfo.Digest)
bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "done: copied by another process")
bar.SetTotal(0, true)
return true, blobInfo, nil
}

logrus.Debugf("Blob %s is not present: copy operation of other process must have failed", srcInfo.Digest)
return false, types.BlobInfo{}, nil
}

// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

// Create the initial placeholder bar which is later being replaced by
// another one.
bar := ic.c.createProgressBar(pool, srcInfo, "blob", " ", mpb.BarRemoveOnComplete())

// If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source.
if !diffIDIsNeeded {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
reused, blobInfo, err := ic.tryReusingBlob(ctx, srcInfo, pool, bar)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
return types.BlobInfo{}, "", err
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)
return blobInfo, cachedDiffID, nil
return blobInfo, cachedDiffID, err
}
}

Expand All @@ -676,7 +818,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
}
defer srcStream.Close()

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")
bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "")

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool {
return false
}

// SupportsBlobLocks indicates whether the ImageDestination supports blob
// locking.
func (d *dirImageDestination) SupportsBlobLocks() bool {
return false
}

// LockBlob can be used to synchronize operations on it (e.g., copying).
func (d *dirImageDestination) LockBlob(b types.BlobInfo) error {
// NOOP for this type.
return nil
}

// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob()
// multiple times. Only the first call is unlocking the blob. This is
// required to unlock a blob in the presence of errors or panics during copy
// operations.
func (d *dirImageDestination) UnlockBlob(b types.BlobInfo) error {
// NOOP for this type.
return nil
}

// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.
Expand Down
21 changes: 21 additions & 0 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,27 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool {
return true
}

// SupportsBlobLocks indicates whether the ImageDestination supports blob
// locking.
func (d *dockerImageDestination) SupportsBlobLocks() bool {
return false
}

// LockBlob can be used to synchronize operations on it (e.g., copying).
func (d *dockerImageDestination) LockBlob(b types.BlobInfo) error {
// NOOP for this type.
return nil
}

// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob()
// multiple times. Only the first call is unlocking the blob. This is
// required to unlock a blob in the presence of errors or panics during copy
// operations.
func (d *dockerImageDestination) UnlockBlob(b types.BlobInfo) error {
// NOOP for this type.
return nil
}

// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.
Expand Down
21 changes: 21 additions & 0 deletions docker/tarfile/dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ func (d *Destination) HasThreadSafePutBlob() bool {
return false
}

// SupportsBlobLocks indicates whether the ImageDestination supports blob
// locking.
func (d *Destination) SupportsBlobLocks() bool {
return false
}

// LockBlob can be used to synchronize operations on it (e.g., copying).
func (d *Destination) LockBlob(b types.BlobInfo) error {
// NOOP for this type.
return nil
}

// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob()
// multiple times. Only the first call is unlocking the blob. This is
// required to unlock a blob in the presence of errors or panics during copy
// operations.
func (d *Destination) UnlockBlob(b types.BlobInfo) error {
// NOOP for this type.
return nil
}

// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.
Expand Down
9 changes: 9 additions & 0 deletions image/docker_schema2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,15 @@ func (d *memoryImageDest) IgnoresEmbeddedDockerReference() bool {
func (d *memoryImageDest) HasThreadSafePutBlob() bool {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) SupportsBlobLocks() bool {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) LockBlob(b types.BlobInfo) error {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) UnlockBlob(b types.BlobInfo) error {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
if d.storedBlobs == nil {
d.storedBlobs = make(map[digest.Digest][]byte)
Expand Down
Loading