From 2dd3b3c1be6f1cab824b48c2a24925ccb32929b9 Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Thu, 4 Apr 2019 16:31:38 +0200 Subject: [PATCH] storage destination: commit layer in PutBlob() Commit a layer directly in PutBlob() to the storage. This will allow future commits to implement blob locking without leaking and deferring parts of the logic to callers. Note that API of PutBlob() and TryReusingBlob() are extended with an index corresponding to the layer's index in the image. Currently, only the storage destination is making use of the index. Signed-off-by: Valentin Rothberg --- copy/copy.go | 26 +-- directory/directory_dest.go | 4 +- directory/directory_test.go | 4 +- docker/docker_image_dest.go | 8 +- docker/tarfile/dest.go | 6 +- image/docker_schema2.go | 2 +- image/docker_schema2_test.go | 4 +- oci/archive/oci_dest.go | 8 +- oci/layout/oci_dest.go | 4 +- oci/layout/oci_dest_test.go | 2 +- openshift/openshift.go | 8 +- ostree/ostree_dest.go | 4 +- storage/storage_image.go | 303 ++++++++++++++++++++++++----------- storage/storage_test.go | 30 ++-- types/types.go | 6 +- 15 files changed, 269 insertions(+), 150 deletions(-) diff --git a/copy/copy.go b/copy/copy.go index 3ed8a2b824..4cf334fa82 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -468,7 +468,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { } data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) { + copyLayerHelper := func(layerIndex int, srcLayer types.BlobInfo, pool *mpb.Progress) { defer copySemaphore.Release(1) defer copyGroup.Done() cld := copyLayerData{} @@ -483,18 +483,18 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) } } else { - cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool) + cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, layerIndex, pool) } - data[index] = cld + data[layerIndex] = cld } func() { // A scope for defer progressPool, progressCleanup := ic.c.newProgressPool(ctx) defer progressCleanup() - for i, srcLayer := range srcInfos { + for layerIndex, srcLayer := range srcInfos { copySemaphore.Acquire(ctx, 1) - go copyLayerHelper(i, srcLayer, progressPool) + go copyLayerHelper(layerIndex, srcLayer, progressPool) } // Wait for all layers to be copied @@ -625,7 +625,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { progressPool, progressCleanup := c.newProgressPool(ctx) defer progressCleanup() bar := c.createProgressBar(progressPool, srcInfo, "config", "done") - destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) + destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar) if err != nil { return types.BlobInfo{}, err } @@ -651,13 +651,13 @@ type diffIDResult struct { // copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded -func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { +func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" // If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source. if !diffIDIsNeeded { - reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs) if err != nil { return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) } @@ -678,7 +678,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar) if err != nil { return types.BlobInfo{}, "", err } @@ -709,7 +709,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po // perhaps compressing the stream if canCompress, // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, - diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { + layerIndexInImage int, diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -733,7 +733,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea return pipeWriter } } - blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success + blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success return blobInfo, diffIDChan, err // We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan } @@ -768,7 +768,7 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) // perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil, // perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied blob. -func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, +func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int, getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer, canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers. @@ -847,7 +847,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr } // === Finally, send the layer stream to dest. - uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig) + uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig) if err != nil { return types.BlobInfo{}, errors.Wrap(err, "Error writing blob") } diff --git a/directory/directory_dest.go b/directory/directory_dest.go index 4b2ab022e2..3efd093b11 100644 --- a/directory/directory_dest.go +++ b/directory/directory_dest.go @@ -136,7 +136,7 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob") if err != nil { return types.BlobInfo{}, err @@ -182,7 +182,7 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/directory/directory_test.go b/directory/directory_test.go index 121b1cf964..ecf0a82586 100644 --- a/directory/directory_test.go +++ b/directory/directory_test.go @@ -65,7 +65,7 @@ func TestGetPutBlob(t *testing.T) { require.NoError(t, err) defer dest.Close() assert.Equal(t, types.PreserveOriginal, dest.DesiredLayerCompression()) - info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, cache, false) + info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, 0, cache, false) assert.NoError(t, err) err = dest.Commit(context.Background()) assert.NoError(t, err) @@ -123,7 +123,7 @@ func TestPutBlobDigestFailure(t *testing.T) { dest, err := ref.NewImageDestination(context.Background(), nil) require.NoError(t, err) defer dest.Close() - _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false) + _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, 0, cache, false) assert.Error(t, err) assert.Contains(t, digestErrorString, err.Error()) err = dest.Commit(context.Background()) diff --git a/docker/docker_image_dest.go b/docker/docker_image_dest.go index c116cbec32..abe100fbe4 100644 --- a/docker/docker_image_dest.go +++ b/docker/docker_image_dest.go @@ -19,7 +19,7 @@ import ( "github.com/containers/image/pkg/blobinfocache/none" "github.com/containers/image/types" "github.com/docker/distribution/registry/api/errcode" - "github.com/docker/distribution/registry/api/v2" + v2 "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client" "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -124,12 +124,12 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { if inputInfo.Digest.String() != "" { // This should not really be necessary, at least the copy code calls TryReusingBlob automatically. // Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value. // But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_. - haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, none.NoCache, false) + haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, none.NoCache, false) if err != nil { return types.BlobInfo{}, err } @@ -271,7 +271,7 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/docker/tarfile/dest.go b/docker/tarfile/dest.go index 5f30eddbc7..270120ba77 100644 --- a/docker/tarfile/dest.go +++ b/docker/tarfile/dest.go @@ -94,7 +94,7 @@ func (d *Destination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Ouch, we need to stream the blob into a temporary file just to determine the size. // When the layer is decompressed, we also have to generate the digest on uncompressed datas. if inputInfo.Size == -1 || inputInfo.Digest.String() == "" { @@ -126,7 +126,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t } // Maybe the blob has been already sent - ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, cache, false) + ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, cache, false) if err != nil { return types.BlobInfo{}, err } @@ -164,7 +164,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf("Can not check for a blob with unknown digest") } diff --git a/image/docker_schema2.go b/image/docker_schema2.go index 351e73ea1d..1cd4f402b6 100644 --- a/image/docker_schema2.go +++ b/image/docker_schema2.go @@ -252,7 +252,7 @@ func (m *manifestSchema2) convertToManifestSchema1(ctx context.Context, dest typ logrus.Debugf("Uploading empty layer during conversion to schema 1") // Ideally we should update the relevant BlobInfoCache about this layer, but that would require passing it down here, // and anyway this blob is so small that it’s easier to just copy it than to worry about figuring out another location where to get it. - info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, none.NoCache, false) + info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, -1, none.NoCache, false) if err != nil { return nil, errors.Wrap(err, "Error uploading empty layer") } diff --git a/image/docker_schema2_test.go b/image/docker_schema2_test.go index 9d3f96fc6d..333010bbce 100644 --- a/image/docker_schema2_test.go +++ b/image/docker_schema2_test.go @@ -401,7 +401,7 @@ func (d *memoryImageDest) IgnoresEmbeddedDockerReference() bool { func (d *memoryImageDest) HasThreadSafePutBlob() bool { panic("Unexpected call to a mock function") } -func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { if d.storedBlobs == nil { d.storedBlobs = make(map[digest.Digest][]byte) } @@ -415,7 +415,7 @@ func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputIn d.storedBlobs[inputInfo.Digest] = contents return types.BlobInfo{Digest: inputInfo.Digest, Size: int64(len(contents))}, nil } -func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) { +func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, int, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) { panic("Unexpected call to a mock function") } func (d *memoryImageDest) PutManifest(ctx context.Context, m []byte) error { diff --git a/oci/archive/oci_dest.go b/oci/archive/oci_dest.go index 9571c37e2b..7a29ea7481 100644 --- a/oci/archive/oci_dest.go +++ b/oci/archive/oci_dest.go @@ -90,8 +90,8 @@ func (d *ociArchiveImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.unpackedDest.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { + return d.unpackedDest.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -101,8 +101,8 @@ func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Read // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.unpackedDest.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + return d.unpackedDest.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute) } // PutManifest writes manifest to the destination diff --git a/oci/layout/oci_dest.go b/oci/layout/oci_dest.go index db102184db..a0f441648e 100644 --- a/oci/layout/oci_dest.go +++ b/oci/layout/oci_dest.go @@ -120,7 +120,7 @@ func (d *ociImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.dir, "oci-put-blob") if err != nil { return types.BlobInfo{}, err @@ -187,7 +187,7 @@ func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/oci/layout/oci_dest_test.go b/oci/layout/oci_dest_test.go index 44316fabb4..27aaad508a 100644 --- a/oci/layout/oci_dest_test.go +++ b/oci/layout/oci_dest_test.go @@ -53,7 +53,7 @@ func TestPutBlobDigestFailure(t *testing.T) { dest, err := ref.NewImageDestination(context.Background(), nil) require.NoError(t, err) defer dest.Close() - _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false) + _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, 0, cache, false) assert.Error(t, err) assert.Contains(t, digestErrorString, err.Error()) err = dest.Commit(context.Background()) diff --git a/openshift/openshift.go b/openshift/openshift.go index 814c3eea1d..2dae485a43 100644 --- a/openshift/openshift.go +++ b/openshift/openshift.go @@ -395,8 +395,8 @@ func (d *openshiftImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.docker.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { + return d.docker.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -406,8 +406,8 @@ func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reade // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.docker.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + return d.docker.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute) } // PutManifest writes manifest to the destination. diff --git a/ostree/ostree_dest.go b/ostree/ostree_dest.go index d69f4fa331..d0ea867c97 100644 --- a/ostree/ostree_dest.go +++ b/ostree/ostree_dest.go @@ -145,7 +145,7 @@ func (d *ostreeImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { tmpDir, err := ioutil.TempDir(d.tmpDirPath, "blob") if err != nil { return types.BlobInfo{}, err @@ -342,7 +342,7 @@ func (d *ostreeImageDestination) importConfig(repo *otbuiltin.Repo, blob *blobTo // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if d.repo == nil { repo, err := openRepo(d.ref.repo) if err != nil { diff --git a/storage/storage_image.go b/storage/storage_image.go index b39d2bcc04..4404f1cee3 100644 --- a/storage/storage_image.go +++ b/storage/storage_image.go @@ -54,16 +54,19 @@ type storageImageSource struct { } type storageImageDestination struct { - imageRef storageReference - directory string // Temporary directory where we store blobs until Commit() time - nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs - manifest []byte // Manifest contents, temporary - signatures []byte // Signature contents, temporary - putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions - blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs - fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes - filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them - SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + imageRef storageReference + directory string // Temporary directory where we store blobs until Commit() time + nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs + manifest []byte // Manifest contents, temporary + signatures []byte // Signature contents, temporary + putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions + blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs + blobLayerIDs map[digest.Digest]string // Mapping from layer blobsums to their corresponding storage layer ID + fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes + filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them + indexToStorageID map[int]string // Mapping from layer index to the layer IDs in the storage + indexToDoneChannel map[int]chan bool // Mapping from layer index to a channel to indicate the layer has been written to storage + SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice } type storageImageCloser struct { @@ -323,16 +326,31 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e return nil, errors.Wrapf(err, "error creating a temporary directory") } image := &storageImageDestination{ - imageRef: imageRef, - directory: directory, - blobDiffIDs: make(map[digest.Digest]digest.Digest), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, + imageRef: imageRef, + directory: directory, + blobDiffIDs: make(map[digest.Digest]digest.Digest), + blobLayerIDs: make(map[digest.Digest]string), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + indexToStorageID: make(map[int]string), + indexToDoneChannel: make(map[int]chan bool), + SignatureSizes: []int{}, } return image, nil } +func (s *storageImageDestination) getChannelForLayer(layerIndexInImage int) chan bool { + s.putBlobMutex.Lock() + defer s.putBlobMutex.Unlock() + channel, ok := s.indexToDoneChannel[layerIndexInImage] + if !ok { + // A buffered channel to allow non-blocking sends + channel = make(chan bool, 1) + s.indexToDoneChannel[layerIndexInImage] = channel + } + return channel +} + // Reference returns the reference used to set up this destination. Note that this should directly correspond to user's intent, // e.g. it should use the public hostname instead of the result of resolving CNAMEs or following redirects. func (s *storageImageDestination) Reference() types.ImageReference { @@ -368,7 +386,7 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Stores a layer or data blob in our temporary directory, checking that any information // in the blobinfo matches the incoming data. errorBlobInfo := types.BlobInfo{ @@ -425,21 +443,51 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, } // This is safe because we have just computed both values ourselves. cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest()) - return types.BlobInfo{ + blob := types.BlobInfo{ Digest: blobDigest, Size: blobSize, MediaType: blobinfo.MediaType, - }, nil + } + + // First, wait for the previous layer to be committed + previousID := "" + if layerIndexInImage > 0 { + channel := s.getChannelForLayer(layerIndexInImage - 1) + if committed := <-channel; !committed { + return errorBlobInfo, errors.Wrapf(err, "committing previous layer %d failed", layerIndexInImage-1) + } + var ok bool + s.putBlobMutex.Lock() + previousID, ok = s.indexToStorageID[layerIndexInImage-1] + s.putBlobMutex.Unlock() + if !ok { + return errorBlobInfo, fmt.Errorf("error committing blob %q: could not find parent layer ID", blob.Digest.String()) + } + } + // Commit the blob + if layerIndexInImage >= 0 { + id, commitError := s.commitBlob(ctx, blob, previousID) + if err == nil { + s.putBlobMutex.Lock() + s.blobLayerIDs[blob.Digest] = id + s.indexToStorageID[layerIndexInImage] = id + s.putBlobMutex.Unlock() + } + // It's a buffered channel, so we don't wait for the message to be + // received + channel := s.getChannelForLayer(layerIndexInImage) + if commitError == nil { + channel <- true + } else { + channel <- false + return errorBlobInfo, err + } + } + return blob, nil } -// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination -// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). -// info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. -// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. -// May use and/or update cache. -func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +// tryReusingBlob is a helper method for TryReusingBlob to wrap it +func (s *storageImageDestination) tryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { // lock the entire method as it executes fairly quickly s.putBlobMutex.Lock() defer s.putBlobMutex.Unlock() @@ -467,6 +515,10 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t if len(layers) > 0 { // Save this for completeness. s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest + s.blobLayerIDs[blobinfo.Digest] = layers[0].ID + if layerIndexInImage >= 0 { + s.indexToStorageID[layerIndexInImage] = layers[0].ID + } return true, types.BlobInfo{ Digest: blobinfo.Digest, Size: layers[0].UncompressedSize, @@ -482,6 +534,10 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t if len(layers) > 0 { // Record the uncompressed value so that we can use it to calculate layer IDs. s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest + s.blobLayerIDs[blobinfo.Digest] = layers[0].ID + if layerIndexInImage >= 0 { + s.indexToStorageID[layerIndexInImage] = layers[0].ID + } return true, types.BlobInfo{ Digest: blobinfo.Digest, Size: layers[0].CompressedSize, @@ -500,6 +556,10 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t } if len(layers) > 0 { s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest + s.blobLayerIDs[blobinfo.Digest] = layers[0].ID + if layerIndexInImage >= 0 { + s.indexToStorageID[layerIndexInImage] = layers[0].ID + } return true, types.BlobInfo{ Digest: uncompressedDigest, Size: layers[0].UncompressedSize, @@ -513,6 +573,26 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t return false, types.BlobInfo{}, nil } +// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination +// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). +// info.Digest must not be empty. +// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. +// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. +// May use and/or update cache. +func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + reusable, blob, err := s.tryReusingBlob(ctx, blobinfo, layerIndexInImage, cache, canSubstitute) + // If we can reuse the blob or hit an error trying to do so, we need to + // signal the result through the channel as another Goroutine is potentially + // waiting for it. If we can't resuse the blob and encountered no error, we + // need to copy it and will send the signal in PutBlob(). + if (layerIndexInImage >= 0) && (err != nil || reusable) { + channel := s.getChannelForLayer(layerIndexInImage) + channel <- (err == nil) + } + return reusable, blob, err +} + // computeID computes a recommended image ID based on information we have so far. If // the manifest is not of a type that we recognize, we return an empty value, indicating // that since we don't have a recommendation, a random ID should be used if one needs @@ -572,6 +652,101 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er return nil, errors.New("blob not found") } +// commitBlobs commits the specified blob to the storage. If not already done, +// it will block until the parent layer is committed to the storage. Note that +// the only caller of commitBlob() is PutBlob(), which is recording the results +// and send the error through the corresponding channel in s.previousLayerResult. +func (s *storageImageDestination) commitBlob(ctx context.Context, blob types.BlobInfo, previousID string) (string, error) { + // Check if there's already a layer with the ID that we'd give to the result of applying + // this layer blob to its parent, if it has one, or the blob's hex value otherwise. + s.putBlobMutex.Lock() + diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + s.putBlobMutex.Unlock() + if !haveDiffID { + return "", errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) + } + + id := diffID.Hex() + if previousID != "" { + id = digest.Canonical.FromBytes([]byte(previousID + "+" + diffID.Hex())).Hex() + } + if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { + return layer.ID, nil + } + logrus.Debugf("committing blob %q", blob.Digest) + // Check if we previously cached a file with that blob's contents. If we didn't, + // then we need to read the desired contents from a layer. + s.putBlobMutex.Lock() + filename, ok := s.filenames[blob.Digest] + s.putBlobMutex.Unlock() + if !ok { + // Try to find the layer with contents matching that blobsum. + layer := "" + layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } else { + layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } + } + if layer == "" { + return "", errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) + } + // Read the layer's contents. + noCompression := archive.Uncompressed + diffOptions := &storage.DiffOptions{ + Compression: &noCompression, + } + diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) + if err2 != nil { + return "", errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) + } + // Copy the layer diff to a file. Diff() takes a lock that it holds + // until the ReadCloser that it returns is closed, and PutLayer() wants + // the same lock, so the diff can't just be directly streamed from one + // to the other. + filename = s.computeNextBlobCacheFile() + file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) + if err != nil { + diff.Close() + return "", errors.Wrapf(err, "error creating temporary file %q", filename) + } + // Copy the data to the file. + // TODO: This can take quite some time, and should ideally be cancellable using + // ctx.Done(). + _, err = io.Copy(file, diff) + diff.Close() + file.Close() + if err != nil { + return "", errors.Wrapf(err, "error storing blob to file %q", filename) + } + // Make sure that we can find this file later, should we need the layer's + // contents again. + s.putBlobMutex.Lock() + s.filenames[blob.Digest] = filename + s.putBlobMutex.Unlock() + } + // Read the cached blob and use it as a diff. + file, err := os.Open(filename) + if err != nil { + return "", errors.Wrapf(err, "error opening file %q", filename) + } + defer file.Close() + // Build the new layer using the diff, regardless of where it came from. + // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). + layer, _, err := s.imageRef.transport.store.PutLayer(id, previousID, nil, "", false, nil, file) + if err != nil && errors.Cause(err) != storage.ErrDuplicateID { + return "", errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + } + return layer.ID, nil +} + +// Commit marks the process of storing the image as successful and asks for the image to be persisted. +// WARNING: This does not have any transactional semantics: +// - Uploaded data MAY be visible to others before Commit() is called +// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed) func (s *storageImageDestination) Commit(ctx context.Context) error { // Find the list of layer blobs. if len(s.manifest) == 0 { @@ -581,6 +756,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if err != nil { return errors.Wrapf(err, "error parsing manifest") } + layerBlobs := man.LayerInfos() // Extract or find the layers. lastLayer := "" @@ -588,9 +764,6 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if blob.EmptyLayer { continue } - - // Check if there's already a layer with the ID that we'd give to the result of applying - // this layer blob to its parent, if it has one, or the blob's hex value otherwise. diffID, haveDiffID := s.blobDiffIDs[blob.Digest] if !haveDiffID { // Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(), @@ -600,7 +773,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { // TryReusingBlob; not calling PutBlob already violates the documented API, so there’s only // so far we are going to accommodate that (if we should be doing that at all). logrus.Debugf("looking for diffID for blob %+v", blob.Digest) - has, _, err := s.TryReusingBlob(ctx, blob.BlobInfo, none.NoCache, false) + has, _, err := s.tryReusingBlob(ctx, blob.BlobInfo, -1, none.NoCache, false) if err != nil { return errors.Wrapf(err, "error checking for a layer based on blob %q", blob.Digest.String()) } @@ -621,69 +794,15 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { lastLayer = layer.ID continue } - // Check if we previously cached a file with that blob's contents. If we didn't, - // then we need to read the desired contents from a layer. - filename, ok := s.filenames[blob.Digest] - if !ok { - // Try to find the layer with contents matching that blobsum. - layer := "" - layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } else { - layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } - } - if layer == "" { - return errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) - } - // Read the layer's contents. - noCompression := archive.Uncompressed - diffOptions := &storage.DiffOptions{ - Compression: &noCompression, - } - diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) - if err2 != nil { - return errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) - } - // Copy the layer diff to a file. Diff() takes a lock that it holds - // until the ReadCloser that it returns is closed, and PutLayer() wants - // the same lock, so the diff can't just be directly streamed from one - // to the other. - filename = s.computeNextBlobCacheFile() - file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) - if err != nil { - diff.Close() - return errors.Wrapf(err, "error creating temporary file %q", filename) - } - // Copy the data to the file. - // TODO: This can take quite some time, and should ideally be cancellable using - // ctx.Done(). - _, err = io.Copy(file, diff) - diff.Close() - file.Close() - if err != nil { - return errors.Wrapf(err, "error storing blob to file %q", filename) - } - // Make sure that we can find this file later, should we need the layer's - // contents again. - s.filenames[blob.Digest] = filename - } - // Read the cached blob and use it as a diff. - file, err := os.Open(filename) + newID, err := s.commitBlob(ctx, blob.BlobInfo, lastLayer) if err != nil { - return errors.Wrapf(err, "error opening file %q", filename) - } - defer file.Close() - // Build the new layer using the diff, regardless of where it came from. - // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). - layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, nil, file) - if err != nil && errors.Cause(err) != storage.ErrDuplicateID { - return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + return err } - lastLayer = layer.ID + lastLayer = newID + } + + if lastLayer == "" { + return fmt.Errorf("could not find top layer") } // If one of those blobs was a configuration blob, then we can try to dig out the date when the image diff --git a/storage/storage_test.go b/storage/storage_test.go index d91d7aaa03..cd8c28d479 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -380,11 +380,11 @@ func TestWriteRead(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination: %v", err) } t.Logf("Wrote randomly-generated layer %q (%d/%d bytes) to destination", digest, size, decompressedSize) - if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, cache, false); err != nil { + if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, 0, cache, false); err != nil { t.Fatalf("Error saving config to destination: %v", err) } manifest := strings.Replace(manifestFmt, "%lh", digest.String(), -1) @@ -541,7 +541,7 @@ func TestDuplicateName(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, first pass: %v", err) } manifest := fmt.Sprintf(` @@ -576,7 +576,7 @@ func TestDuplicateName(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: int64(size), Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, second pass: %v", err) } manifest = fmt.Sprintf(` @@ -628,7 +628,7 @@ func TestDuplicateID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, first pass: %v", err) } manifest := fmt.Sprintf(` @@ -663,7 +663,7 @@ func TestDuplicateID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: int64(size), Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, second pass: %v", err) } manifest = fmt.Sprintf(` @@ -718,7 +718,7 @@ func TestDuplicateNameID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, first pass: %v", err) } manifest := fmt.Sprintf(` @@ -753,7 +753,7 @@ func TestDuplicateNameID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: int64(size), Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, second pass: %v", err) } manifest = fmt.Sprintf(` @@ -850,21 +850,21 @@ func TestSize(t *testing.T) { if dest == nil { t.Fatalf("NewImageDestination(%q) returned no destination", ref.StringWithinTransport()) } - if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, cache, false); err != nil { + if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, 0, cache, true); err != nil { t.Fatalf("Error saving config to destination: %v", err) } digest1, usize1, size1, blob := makeLayer(t, archive.Gzip) if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size1, Digest: digest1, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 1 to destination: %v", err) } digest2, usize2, size2, blob := makeLayer(t, archive.Gzip) if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size2, Digest: digest2, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 2 to destination: %v", err) } manifest := fmt.Sprintf(` @@ -946,26 +946,26 @@ func TestDuplicateBlob(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob1), types.BlobInfo{ Size: size1, Digest: digest1, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 1 to destination (first copy): %v", err) } digest2, _, size2, blob2 := makeLayer(t, archive.Gzip) if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob2), types.BlobInfo{ Size: size2, Digest: digest2, - }, cache, false); err != nil { + }, 1, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 2 to destination (first copy): %v", err) } if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob1), types.BlobInfo{ Size: size1, Digest: digest1, - }, cache, false); err != nil { + }, 2, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 1 to destination (second copy): %v", err) } if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob2), types.BlobInfo{ Size: size2, Digest: digest2, - }, cache, false); err != nil { + }, 3, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 2 to destination (second copy): %v", err) } manifest := fmt.Sprintf(` diff --git a/types/types.go b/types/types.go index 9fdab2314a..2772894ee4 100644 --- a/types/types.go +++ b/types/types.go @@ -7,7 +7,7 @@ import ( "github.com/containers/image/docker/reference" "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" ) // ImageTransport is a top-level namespace for ways to to store/load an image. @@ -265,7 +265,7 @@ type ImageDestination interface { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. - PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, cache BlobInfoCache, isConfig bool) (BlobInfo, error) + PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, layerIndexInImage int, cache BlobInfoCache, isConfig bool) (BlobInfo, error) // HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently. HasThreadSafePutBlob() bool // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -275,7 +275,7 @@ type ImageDestination interface { // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. - TryReusingBlob(ctx context.Context, info BlobInfo, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) + TryReusingBlob(ctx context.Context, info BlobInfo, layerIndexInImage int, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) // PutManifest writes manifest to the destination. // FIXME? This should also receive a MIME type if known, to differentiate between schema versions. // If the destination is in principle available, refuses this manifest type (e.g. it does not recognize the schema),