Skip to content

Commit

Permalink
storage destination: commit layer in PutBlob()
Browse files Browse the repository at this point in the history
Commit a layer directly in PutBlob() to the storage. This will allow
future commits to implement blob locking without leaking and deferring
parts of the logic to callers.

Note that API of PutBlob() and TryReusingBlob() are extended with an
index corresponding to the layer's index in the image.  Currently,
only the storage destination is making use of the index.

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Apr 8, 2019
1 parent b322138 commit d8e9814
Show file tree
Hide file tree
Showing 15 changed files with 261 additions and 178 deletions.
27 changes: 16 additions & 11 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) {
copyLayerHelper := func(index int, srcLayer types.BlobInfo, previousLayer digest.Digest, pool *mpb.Progress) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
Expand All @@ -483,7 +483,8 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool)
c := context.WithValue(ctx, "previousLayer", previousLayer)
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(c, srcLayer, index, pool)
}
data[index] = cld
}
Expand All @@ -493,8 +494,12 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
defer progressCleanup()

for i, srcLayer := range srcInfos {
var previousLayer digest.Digest
if i >= 1 {
previousLayer = srcInfos[i-1].Digest
}
copySemaphore.Acquire(ctx, 1)
go copyLayerHelper(i, srcLayer, progressPool)
go copyLayerHelper(i, srcLayer, previousLayer, progressPool)
}

// Wait for all layers to be copied
Expand Down Expand Up @@ -625,7 +630,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, 0, nil, false, true, bar)
if err != nil {
return types.BlobInfo{}, err
}
Expand All @@ -651,13 +656,13 @@ type diffIDResult struct {

// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

// If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source.
if !diffIDIsNeeded {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
Expand All @@ -678,7 +683,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar)
blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar)
if err != nil {
return types.BlobInfo{}, "", err
}
Expand Down Expand Up @@ -709,7 +714,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
// perhaps compressing the stream if canCompress,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
layerIndexInImage int, diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

Expand All @@ -733,7 +738,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea
return pipeWriter
}
}
blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success
blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success
return blobInfo, diffIDChan, err
// We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan
}
Expand Down Expand Up @@ -768,7 +773,7 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc)
// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil,
// perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied blob.
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers.
Expand Down Expand Up @@ -847,7 +852,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig)
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig)
if err != nil {
return types.BlobInfo{}, errors.Wrap(err, "Error writing blob")
}
Expand Down
4 changes: 2 additions & 2 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob")
if err != nil {
return types.BlobInfo{}, err
Expand Down Expand Up @@ -182,7 +182,7 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
Expand Down
4 changes: 2 additions & 2 deletions directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestGetPutBlob(t *testing.T) {
require.NoError(t, err)
defer dest.Close()
assert.Equal(t, types.PreserveOriginal, dest.DesiredLayerCompression())
info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, cache, false)
info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, 0, cache, false)
assert.NoError(t, err)
err = dest.Commit(context.Background())
assert.NoError(t, err)
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestPutBlobDigestFailure(t *testing.T) {
dest, err := ref.NewImageDestination(context.Background(), nil)
require.NoError(t, err)
defer dest.Close()
_, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false)
_, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, 0, cache, false)
assert.Error(t, err)
assert.Contains(t, digestErrorString, err.Error())
err = dest.Commit(context.Background())
Expand Down
10 changes: 5 additions & 5 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/containers/image/pkg/blobinfocache/none"
"github.com/containers/image/types"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/api/v2"
v2 "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/client"
"github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -124,12 +124,12 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
if inputInfo.Digest.String() != "" {
// This should not really be necessary, at least the copy code calls TryReusingBlob automatically.
// Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value.
// But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_.
haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, none.NoCache, false)
haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, none.NoCache, false)
if err != nil {
return types.BlobInfo{}, err
}
Expand Down Expand Up @@ -270,8 +270,8 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
// May use and/or update cache.TryRe
func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
Expand Down
6 changes: 3 additions & 3 deletions docker/tarfile/dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (d *Destination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
// Ouch, we need to stream the blob into a temporary file just to determine the size.
// When the layer is decompressed, we also have to generate the digest on uncompressed datas.
if inputInfo.Size == -1 || inputInfo.Digest.String() == "" {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
}

// Maybe the blob has been already sent
ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, cache, false)
ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, cache, false)
if err != nil {
return types.BlobInfo{}, err
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf("Can not check for a blob with unknown digest")
}
Expand Down
2 changes: 1 addition & 1 deletion image/docker_schema2.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (m *manifestSchema2) convertToManifestSchema1(ctx context.Context, dest typ
logrus.Debugf("Uploading empty layer during conversion to schema 1")
// Ideally we should update the relevant BlobInfoCache about this layer, but that would require passing it down here,
// and anyway this blob is so small that it’s easier to just copy it than to worry about figuring out another location where to get it.
info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, none.NoCache, false)
info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, 0, none.NoCache, false)
if err != nil {
return nil, errors.Wrap(err, "Error uploading empty layer")
}
Expand Down
4 changes: 2 additions & 2 deletions image/docker_schema2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (d *memoryImageDest) IgnoresEmbeddedDockerReference() bool {
func (d *memoryImageDest) HasThreadSafePutBlob() bool {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
if d.storedBlobs == nil {
d.storedBlobs = make(map[digest.Digest][]byte)
}
Expand All @@ -415,7 +415,7 @@ func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputIn
d.storedBlobs[inputInfo.Digest] = contents
return types.BlobInfo{Digest: inputInfo.Digest, Size: int64(len(contents))}, nil
}
func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) {
func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, int, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) PutManifest(ctx context.Context, m []byte) error {
Expand Down
8 changes: 4 additions & 4 deletions oci/archive/oci_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (d *ociArchiveImageDestination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
return d.unpackedDest.PutBlob(ctx, stream, inputInfo, cache, isConfig)
func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
return d.unpackedDest.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig)
}

// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
Expand All @@ -101,8 +101,8 @@ func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Read
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
return d.unpackedDest.TryReusingBlob(ctx, info, cache, canSubstitute)
func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
return d.unpackedDest.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute)
}

// PutManifest writes manifest to the destination
Expand Down
Loading

0 comments on commit d8e9814

Please sign in to comment.