Skip to content

Commit

Permalink
storage destination: commit layer in PutBlob()
Browse files Browse the repository at this point in the history
Commit a layer directly in PutBlob() to the storage. This will allow
future commits to implement blob locking without leaking and deferring
parts of the logic to callers.

Note that API of PutBlob() and TryReusingBlob() are extended with an
index corresponding to the layer's index in the image.  Currently,
only the storage destination is making use of the index.

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Apr 17, 2019
1 parent e9c3d17 commit 26fba6a
Show file tree
Hide file tree
Showing 15 changed files with 355 additions and 183 deletions.
90 changes: 56 additions & 34 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,6 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
err error
}

// copyGroup is used to determine if all layers are copied
copyGroup := sync.WaitGroup{}
copyGroup.Add(numLayers)

// copySemaphore is used to limit the number of parallel downloads to
// avoid malicious images causing troubles and to be nice to servers.
var copySemaphore *semaphore.Weighted
Expand All @@ -467,43 +463,69 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
copySemaphore = semaphore.NewWeighted(int64(1))
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
// copyGroup is used to determine if all layers are copied
copyGroup := sync.WaitGroup{}

// A context.WithCancel is needed when encountering an error while
// copying/downloading layers in parallel.
cancelCtx, cancelCopyLayer := context.WithCancel(ctx)
defer cancelCopyLayer()
progressPool, progressCleanup := ic.c.newProgressPool(cancelCtx)
defer progressCleanup()

layerIndex := 0 // some layers might be skipped, so we need a dedicated counter
digestToCopyData := make(map[digest.Digest]*copyLayerData)
for _, srcLayer := range srcInfos {
// Check if we'are already copying the layer
if _, ok := digestToCopyData[srcLayer.Digest]; ok {
continue
}

cld := &copyLayerData{}
digestToCopyData[srcLayer.Digest] = cld
if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 {
// DiffIDs are, currently, needed only when converting from schema1.
// In which case src.LayerInfos will not have URLs because schema1
// does not support them.
if ic.diffIDsAreNeeded {
cld.err = errors.New("getting DiffID for foreign layers is unimplemented")
cancelCopyLayer()
return errors.New("getting DiffID for foreign layers is unimplemented")
} else {
logrus.Debugf("Skipping foreign layer %q copy to %s", srcLayer.Digest, ic.c.dest.Reference().Transport().Name())
cld.destInfo = srcLayer
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
continue
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool)
}
data[index] = cld
}

func() { // A scope for defer
progressPool, progressCleanup := ic.c.newProgressPool(ctx)
defer progressCleanup()
copySemaphore.Acquire(cancelCtx, 1) // limits parallel copy operations
copyGroup.Add(1) // allows the main routine to wait for all copy operations to finish

for i, srcLayer := range srcInfos {
copySemaphore.Acquire(ctx, 1)
go copyLayerHelper(i, srcLayer, progressPool)
}
// Copy the layer. Note that cld is a pointer; changes to it are
// propagated implicitly.
go func(index int, srcLayer types.BlobInfo, cld *copyLayerData) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(cancelCtx, srcLayer, index, progressPool)
if cld.err != nil {
// Note that the error will be caught below.
logrus.Errorf("copying layer %d failed: %v", index, cld.err)
cancelCopyLayer()
}
}(layerIndex, srcLayer, cld)

// Wait for all layers to be copied
copyGroup.Wait()
}()
layerIndex++
}

// Wait for all layer-copy operations to finish
copyGroup.Wait()

destInfos := make([]types.BlobInfo, numLayers)
diffIDs := make([]digest.Digest, numLayers)
for i, cld := range data {
for i, srcLayer := range srcInfos {
cld, ok := digestToCopyData[srcLayer.Digest]
if !ok {
return errors.Errorf("no copy data for layer %q", srcLayer.Digest)
}
if cld.err != nil {
return cld.err
}
Expand Down Expand Up @@ -625,7 +647,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar)
if err != nil {
return types.BlobInfo{}, err
}
Expand All @@ -651,13 +673,13 @@ type diffIDResult struct {

// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

// If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source.
if !diffIDIsNeeded {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
Expand All @@ -678,7 +700,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar)
blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar)
if err != nil {
return types.BlobInfo{}, "", err
}
Expand Down Expand Up @@ -709,7 +731,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
// perhaps compressing the stream if canCompress,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
layerIndexInImage int, diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

Expand All @@ -733,7 +755,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea
return pipeWriter
}
}
blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success
blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success
return blobInfo, diffIDChan, err
// We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan
}
Expand Down Expand Up @@ -768,7 +790,7 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc)
// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil,
// perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied blob.
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers.
Expand Down Expand Up @@ -847,7 +869,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig)
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig)
if err != nil {
return types.BlobInfo{}, errors.Wrap(err, "Error writing blob")
}
Expand Down
4 changes: 2 additions & 2 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob")
if err != nil {
return types.BlobInfo{}, err
Expand Down Expand Up @@ -182,7 +182,7 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
Expand Down
4 changes: 2 additions & 2 deletions directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestGetPutBlob(t *testing.T) {
require.NoError(t, err)
defer dest.Close()
assert.Equal(t, types.PreserveOriginal, dest.DesiredLayerCompression())
info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, cache, false)
info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, 0, cache, false)
assert.NoError(t, err)
err = dest.Commit(context.Background())
assert.NoError(t, err)
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestPutBlobDigestFailure(t *testing.T) {
dest, err := ref.NewImageDestination(context.Background(), nil)
require.NoError(t, err)
defer dest.Close()
_, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false)
_, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, 0, cache, false)
assert.Error(t, err)
assert.Contains(t, digestErrorString, err.Error())
err = dest.Commit(context.Background())
Expand Down
8 changes: 4 additions & 4 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/containers/image/pkg/blobinfocache/none"
"github.com/containers/image/types"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/api/v2"
v2 "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/client"
"github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -124,12 +124,12 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
if inputInfo.Digest.String() != "" {
// This should not really be necessary, at least the copy code calls TryReusingBlob automatically.
// Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value.
// But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_.
haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, none.NoCache, false)
haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, none.NoCache, false)
if err != nil {
return types.BlobInfo{}, err
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
Expand Down
6 changes: 3 additions & 3 deletions docker/tarfile/dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (d *Destination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
// Ouch, we need to stream the blob into a temporary file just to determine the size.
// When the layer is decompressed, we also have to generate the digest on uncompressed datas.
if inputInfo.Size == -1 || inputInfo.Digest.String() == "" {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
}

// Maybe the blob has been already sent
ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, cache, false)
ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, cache, false)
if err != nil {
return types.BlobInfo{}, err
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf("Can not check for a blob with unknown digest")
}
Expand Down
2 changes: 1 addition & 1 deletion image/docker_schema2.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (m *manifestSchema2) convertToManifestSchema1(ctx context.Context, dest typ
logrus.Debugf("Uploading empty layer during conversion to schema 1")
// Ideally we should update the relevant BlobInfoCache about this layer, but that would require passing it down here,
// and anyway this blob is so small that it’s easier to just copy it than to worry about figuring out another location where to get it.
info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, none.NoCache, false)
info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, -1, none.NoCache, false)
if err != nil {
return nil, errors.Wrap(err, "Error uploading empty layer")
}
Expand Down
4 changes: 2 additions & 2 deletions image/docker_schema2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (d *memoryImageDest) IgnoresEmbeddedDockerReference() bool {
func (d *memoryImageDest) HasThreadSafePutBlob() bool {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
if d.storedBlobs == nil {
d.storedBlobs = make(map[digest.Digest][]byte)
}
Expand All @@ -415,7 +415,7 @@ func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputIn
d.storedBlobs[inputInfo.Digest] = contents
return types.BlobInfo{Digest: inputInfo.Digest, Size: int64(len(contents))}, nil
}
func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) {
func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, int, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) PutManifest(ctx context.Context, m []byte) error {
Expand Down
Loading

0 comments on commit 26fba6a

Please sign in to comment.