diff --git a/copy/copy.go b/copy/copy.go index 3ed8a2b824..1308b6f032 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -468,7 +468,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { } data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) { + copyLayerHelper := func(index int, srcLayer types.BlobInfo, previousLayer digest.Digest, pool *mpb.Progress) { defer copySemaphore.Release(1) defer copyGroup.Done() cld := copyLayerData{} @@ -483,7 +483,8 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) } } else { - cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool) + c := context.WithValue(ctx, "previousLayer", previousLayer) + cld.destInfo, cld.diffID, cld.err = ic.copyLayer(c, srcLayer, index, pool) } data[index] = cld } @@ -493,8 +494,12 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { defer progressCleanup() for i, srcLayer := range srcInfos { + var previousLayer digest.Digest + if i >= 1 { + previousLayer = srcInfos[i-1].Digest + } copySemaphore.Acquire(ctx, 1) - go copyLayerHelper(i, srcLayer, progressPool) + go copyLayerHelper(i, srcLayer, previousLayer, progressPool) } // Wait for all layers to be copied @@ -625,7 +630,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { progressPool, progressCleanup := c.newProgressPool(ctx) defer progressCleanup() bar := c.createProgressBar(progressPool, srcInfo, "config", "done") - destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) + destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, 0, nil, false, true, bar) if err != nil { return types.BlobInfo{}, err } @@ -651,13 +656,13 @@ type diffIDResult struct { // copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded -func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { +func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" // If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source. if !diffIDIsNeeded { - reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs) if err != nil { return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) } @@ -678,7 +683,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar) if err != nil { return types.BlobInfo{}, "", err } @@ -709,7 +714,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po // perhaps compressing the stream if canCompress, // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, - diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { + layerIndexInImage int, diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -733,7 +738,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea return pipeWriter } } - blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success + blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success return blobInfo, diffIDChan, err // We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan } @@ -768,7 +773,7 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) // perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil, // perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied blob. -func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, +func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int, getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer, canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers. @@ -847,7 +852,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr } // === Finally, send the layer stream to dest. - uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig) + uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig) if err != nil { return types.BlobInfo{}, errors.Wrap(err, "Error writing blob") } diff --git a/directory/directory_dest.go b/directory/directory_dest.go index 4b2ab022e2..3efd093b11 100644 --- a/directory/directory_dest.go +++ b/directory/directory_dest.go @@ -136,7 +136,7 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob") if err != nil { return types.BlobInfo{}, err @@ -182,7 +182,7 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/directory/directory_test.go b/directory/directory_test.go index 121b1cf964..ecf0a82586 100644 --- a/directory/directory_test.go +++ b/directory/directory_test.go @@ -65,7 +65,7 @@ func TestGetPutBlob(t *testing.T) { require.NoError(t, err) defer dest.Close() assert.Equal(t, types.PreserveOriginal, dest.DesiredLayerCompression()) - info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, cache, false) + info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, 0, cache, false) assert.NoError(t, err) err = dest.Commit(context.Background()) assert.NoError(t, err) @@ -123,7 +123,7 @@ func TestPutBlobDigestFailure(t *testing.T) { dest, err := ref.NewImageDestination(context.Background(), nil) require.NoError(t, err) defer dest.Close() - _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false) + _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, 0, cache, false) assert.Error(t, err) assert.Contains(t, digestErrorString, err.Error()) err = dest.Commit(context.Background()) diff --git a/docker/docker_image_dest.go b/docker/docker_image_dest.go index c116cbec32..e78254c957 100644 --- a/docker/docker_image_dest.go +++ b/docker/docker_image_dest.go @@ -19,7 +19,7 @@ import ( "github.com/containers/image/pkg/blobinfocache/none" "github.com/containers/image/types" "github.com/docker/distribution/registry/api/errcode" - "github.com/docker/distribution/registry/api/v2" + v2 "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client" "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -124,12 +124,12 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { if inputInfo.Digest.String() != "" { // This should not really be necessary, at least the copy code calls TryReusingBlob automatically. // Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value. // But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_. - haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, none.NoCache, false) + haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, none.NoCache, false) if err != nil { return types.BlobInfo{}, err } @@ -270,8 +270,8 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc // If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. -// May use and/or update cache. -func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +// May use and/or update cache.TryRe +func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/docker/tarfile/dest.go b/docker/tarfile/dest.go index 5f30eddbc7..270120ba77 100644 --- a/docker/tarfile/dest.go +++ b/docker/tarfile/dest.go @@ -94,7 +94,7 @@ func (d *Destination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Ouch, we need to stream the blob into a temporary file just to determine the size. // When the layer is decompressed, we also have to generate the digest on uncompressed datas. if inputInfo.Size == -1 || inputInfo.Digest.String() == "" { @@ -126,7 +126,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t } // Maybe the blob has been already sent - ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, cache, false) + ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, cache, false) if err != nil { return types.BlobInfo{}, err } @@ -164,7 +164,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf("Can not check for a blob with unknown digest") } diff --git a/image/docker_schema2.go b/image/docker_schema2.go index 351e73ea1d..055f162aca 100644 --- a/image/docker_schema2.go +++ b/image/docker_schema2.go @@ -252,7 +252,7 @@ func (m *manifestSchema2) convertToManifestSchema1(ctx context.Context, dest typ logrus.Debugf("Uploading empty layer during conversion to schema 1") // Ideally we should update the relevant BlobInfoCache about this layer, but that would require passing it down here, // and anyway this blob is so small that it’s easier to just copy it than to worry about figuring out another location where to get it. - info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, none.NoCache, false) + info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, 0, none.NoCache, false) if err != nil { return nil, errors.Wrap(err, "Error uploading empty layer") } diff --git a/image/docker_schema2_test.go b/image/docker_schema2_test.go index 9d3f96fc6d..333010bbce 100644 --- a/image/docker_schema2_test.go +++ b/image/docker_schema2_test.go @@ -401,7 +401,7 @@ func (d *memoryImageDest) IgnoresEmbeddedDockerReference() bool { func (d *memoryImageDest) HasThreadSafePutBlob() bool { panic("Unexpected call to a mock function") } -func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { if d.storedBlobs == nil { d.storedBlobs = make(map[digest.Digest][]byte) } @@ -415,7 +415,7 @@ func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputIn d.storedBlobs[inputInfo.Digest] = contents return types.BlobInfo{Digest: inputInfo.Digest, Size: int64(len(contents))}, nil } -func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) { +func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, int, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) { panic("Unexpected call to a mock function") } func (d *memoryImageDest) PutManifest(ctx context.Context, m []byte) error { diff --git a/oci/archive/oci_dest.go b/oci/archive/oci_dest.go index 9571c37e2b..7a29ea7481 100644 --- a/oci/archive/oci_dest.go +++ b/oci/archive/oci_dest.go @@ -90,8 +90,8 @@ func (d *ociArchiveImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.unpackedDest.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { + return d.unpackedDest.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -101,8 +101,8 @@ func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Read // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.unpackedDest.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + return d.unpackedDest.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute) } // PutManifest writes manifest to the destination diff --git a/oci/layout/oci_dest.go b/oci/layout/oci_dest.go index db102184db..a0f441648e 100644 --- a/oci/layout/oci_dest.go +++ b/oci/layout/oci_dest.go @@ -120,7 +120,7 @@ func (d *ociImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.dir, "oci-put-blob") if err != nil { return types.BlobInfo{}, err @@ -187,7 +187,7 @@ func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/oci/layout/oci_dest_test.go b/oci/layout/oci_dest_test.go index 44316fabb4..27aaad508a 100644 --- a/oci/layout/oci_dest_test.go +++ b/oci/layout/oci_dest_test.go @@ -53,7 +53,7 @@ func TestPutBlobDigestFailure(t *testing.T) { dest, err := ref.NewImageDestination(context.Background(), nil) require.NoError(t, err) defer dest.Close() - _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false) + _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, 0, cache, false) assert.Error(t, err) assert.Contains(t, digestErrorString, err.Error()) err = dest.Commit(context.Background()) diff --git a/openshift/openshift.go b/openshift/openshift.go index 814c3eea1d..2dae485a43 100644 --- a/openshift/openshift.go +++ b/openshift/openshift.go @@ -395,8 +395,8 @@ func (d *openshiftImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.docker.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { + return d.docker.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -406,8 +406,8 @@ func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reade // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.docker.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + return d.docker.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute) } // PutManifest writes manifest to the destination. diff --git a/ostree/ostree_dest.go b/ostree/ostree_dest.go index d69f4fa331..d0ea867c97 100644 --- a/ostree/ostree_dest.go +++ b/ostree/ostree_dest.go @@ -145,7 +145,7 @@ func (d *ostreeImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { tmpDir, err := ioutil.TempDir(d.tmpDirPath, "blob") if err != nil { return types.BlobInfo{}, err @@ -342,7 +342,7 @@ func (d *ostreeImageDestination) importConfig(repo *otbuiltin.Repo, blob *blobTo // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if d.repo == nil { repo, err := openRepo(d.ref.repo) if err != nil { diff --git a/storage/storage_image.go b/storage/storage_image.go index b39d2bcc04..6207057503 100644 --- a/storage/storage_image.go +++ b/storage/storage_image.go @@ -18,7 +18,6 @@ import ( "github.com/containers/image/image" "github.com/containers/image/internal/tmpdir" "github.com/containers/image/manifest" - "github.com/containers/image/pkg/blobinfocache/none" "github.com/containers/image/types" "github.com/containers/storage" "github.com/containers/storage/pkg/archive" @@ -64,6 +63,10 @@ type storageImageDestination struct { fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + + // The fields below are used to serialize writing the layers to the storage in their given order + indexToStorageID map[int]string // Mapping from layer index to the layer IDs in the storage + indexToErrorChannel map[int]chan error // Mapping from layer index to the error of TryReusingBlob() or PutBlob() } type storageImageCloser struct { @@ -323,16 +326,30 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e return nil, errors.Wrapf(err, "error creating a temporary directory") } image := &storageImageDestination{ - imageRef: imageRef, - directory: directory, - blobDiffIDs: make(map[digest.Digest]digest.Digest), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, + imageRef: imageRef, + directory: directory, + blobDiffIDs: make(map[digest.Digest]digest.Digest), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + indexToErrorChannel: make(map[int]chan error), + indexToStorageID: make(map[int]string), + SignatureSizes: []int{}, } return image, nil } +func (s *storageImageDestination) getChannelForLayer(layerIndexInImage int) chan error { + s.putBlobMutex.Lock() + channel, ok := s.indexToErrorChannel[layerIndexInImage] + if !ok { + // A buffered channel to allow non-blocking sends + channel = make(chan error, 1) + s.indexToErrorChannel[layerIndexInImage] = channel + } + s.putBlobMutex.Unlock() + return channel +} + // Reference returns the reference used to set up this destination. Note that this should directly correspond to user's intent, // e.g. it should use the public hostname instead of the result of resolving CNAMEs or following redirects. func (s *storageImageDestination) Reference() types.ImageReference { @@ -368,7 +385,7 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Stores a layer or data blob in our temporary directory, checking that any information // in the blobinfo matches the incoming data. errorBlobInfo := types.BlobInfo{ @@ -425,29 +442,42 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, } // This is safe because we have just computed both values ourselves. cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest()) - return types.BlobInfo{ + blob := types.BlobInfo{ Digest: blobDigest, Size: blobSize, MediaType: blobinfo.MediaType, - }, nil + } + + if !isConfig { + channel := s.getChannelForLayer(layerIndexInImage) + id, err := s.commitBlob(ctx, blob, layerIndexInImage) + s.putBlobMutex.Lock() + if err == nil { + // Set the id prior to sending the error through the channel to + // circumvent a race condition writing/reading the specific index + s.indexToStorageID[layerIndexInImage] = id + } + // It's a buffered channel, so we don't wait for the message to be + // received + channel <- err + if err != nil { + return errorBlobInfo, err + } + s.putBlobMutex.Unlock() + } + return blob, nil } -// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination -// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). -// info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. -// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. -// May use and/or update cache. -func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +// tryReusingBlob is a helper method for TryReusingBlob to wrap it +func (s *storageImageDestination) tryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, *storage.Layer, error) { // lock the entire method as it executes fairly quickly s.putBlobMutex.Lock() defer s.putBlobMutex.Unlock() if blobinfo.Digest == "" { - return false, types.BlobInfo{}, errors.Errorf(`Can not check for a blob with unknown digest`) + return false, types.BlobInfo{}, nil, errors.Errorf(`Can not check for a blob with unknown digest`) } if err := blobinfo.Digest.Validate(); err != nil { - return false, types.BlobInfo{}, errors.Wrapf(err, `Can not check for a blob with invalid digest`) + return false, types.BlobInfo{}, nil, errors.Wrapf(err, `Can not check for a blob with invalid digest`) } // Check if we've already cached it in a file. @@ -456,13 +486,13 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t Digest: blobinfo.Digest, Size: size, MediaType: blobinfo.MediaType, - }, nil + }, nil, nil } // Check if we have a wasn't-compressed layer in storage that's based on that blob. layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(blobinfo.Digest) if err != nil && errors.Cause(err) != storage.ErrLayerUnknown { - return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for layers with digest %q`, blobinfo.Digest) + return false, types.BlobInfo{}, nil, errors.Wrapf(err, `Error looking for layers with digest %q`, blobinfo.Digest) } if len(layers) > 0 { // Save this for completeness. @@ -471,13 +501,13 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t Digest: blobinfo.Digest, Size: layers[0].UncompressedSize, MediaType: blobinfo.MediaType, - }, nil + }, &layers[0], nil } // Check if we have a was-compressed layer in storage that's based on that blob. layers, err = s.imageRef.transport.store.LayersByCompressedDigest(blobinfo.Digest) if err != nil && errors.Cause(err) != storage.ErrLayerUnknown { - return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for compressed layers with digest %q`, blobinfo.Digest) + return false, types.BlobInfo{}, nil, errors.Wrapf(err, `Error looking for compressed layers with digest %q`, blobinfo.Digest) } if len(layers) > 0 { // Record the uncompressed value so that we can use it to calculate layer IDs. @@ -486,7 +516,7 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t Digest: blobinfo.Digest, Size: layers[0].CompressedSize, MediaType: blobinfo.MediaType, - }, nil + }, &layers[0], nil } // Does the blob correspond to a known DiffID which we already have available? @@ -496,7 +526,7 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t if uncompressedDigest := cache.UncompressedDigest(blobinfo.Digest); uncompressedDigest != "" && uncompressedDigest != blobinfo.Digest { layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(uncompressedDigest) if err != nil && errors.Cause(err) != storage.ErrLayerUnknown { - return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for layers with digest %q`, uncompressedDigest) + return false, types.BlobInfo{}, nil, errors.Wrapf(err, `Error looking for layers with digest %q`, uncompressedDigest) } if len(layers) > 0 { s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest @@ -504,13 +534,36 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t Digest: uncompressedDigest, Size: layers[0].UncompressedSize, MediaType: blobinfo.MediaType, - }, nil + }, &layers[0], nil } } } // Nope, we don't have it. - return false, types.BlobInfo{}, nil + return false, types.BlobInfo{}, nil, nil +} + +// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination +// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). +// info.Digest must not be empty. +// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. +// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. +// May use and/or update cache. +func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + reusable, blob, layer, err := s.tryReusingBlob(ctx, blobinfo, cache, canSubstitute) + if err == nil && layer != nil { + s.indexToStorageID[layerIndexInImage] = layer.ID + } + // If can reuse the blob or hit an error trying to do so, we need to signal + // the result through the channel. If we can't resuse the blob and + // encountered no error, we need to copy it and will send the signal in + // PutBlob(). + if err != nil || reusable { + channel := s.getChannelForLayer(layerIndexInImage) + channel <- err + } + return reusable, blob, err } // computeID computes a recommended image ID based on information we have so far. If @@ -572,6 +625,114 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er return nil, errors.New("blob not found") } +// commitBlobs commits the specified blob to the storage. If not already done, +// it will block until the parent layer is committed to the storage. Note that +// the only caller of commitBlob() is PutBlob(), which is recording the results +// and send the error through the corresponding channel in s.previousLayerResult. +func (s *storageImageDestination) commitBlob(ctx context.Context, blob types.BlobInfo, layerIndexInImage int) (string, error) { + previousID := "" + if layerIndexInImage > 0 { + channel := s.getChannelForLayer(layerIndexInImage - 1) + if err := <-channel; err != nil { + return "", errors.Wrapf(err, "error committing previous layer") + } + var ok bool + s.putBlobMutex.Lock() + previousID, ok = s.indexToStorageID[layerIndexInImage-1] + s.putBlobMutex.Unlock() + if !ok { + return "", fmt.Errorf("error committing blob %q: could not find storage ID or parent", blob.Digest.String()) + } + } + + // Check if there's already a layer with the ID that we'd give to the result of applying + // this layer blob to its parent, if it has one, or the blob's hex value otherwise. + s.putBlobMutex.Lock() + diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + s.putBlobMutex.Unlock() + if !haveDiffID { + return "", errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) + } + id := diffID.Hex() + if previousID != "" { + id = digest.Canonical.FromBytes([]byte(previousID + "+" + diffID.Hex())).Hex() + } + if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { + return id, nil + } + // Check if we previously cached a file with that blob's contents. If we didn't, + // then we need to read the desired contents from a layer. + s.putBlobMutex.Lock() + filename, ok := s.filenames[blob.Digest] + s.putBlobMutex.Unlock() + if !ok { + // Try to find the layer with contents matching that blobsum. + layer := "" + layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } else { + layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } + } + if layer == "" { + return "", errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) + } + // Read the layer's contents. + noCompression := archive.Uncompressed + diffOptions := &storage.DiffOptions{ + Compression: &noCompression, + } + diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) + if err2 != nil { + return "", errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) + } + // Copy the layer diff to a file. Diff() takes a lock that it holds + // until the ReadCloser that it returns is closed, and PutLayer() wants + // the same lock, so the diff can't just be directly streamed from one + // to the other. + filename = s.computeNextBlobCacheFile() + file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) + if err != nil { + diff.Close() + return "", errors.Wrapf(err, "error creating temporary file %q", filename) + } + // Copy the data to the file. + // TODO: This can take quite some time, and should ideally be cancellable using + // ctx.Done(). + _, err = io.Copy(file, diff) + diff.Close() + file.Close() + if err != nil { + return "", errors.Wrapf(err, "error storing blob to file %q", filename) + } + // Make sure that we can find this file later, should we need the layer's + // contents again. + s.putBlobMutex.Lock() + s.filenames[blob.Digest] = filename + s.putBlobMutex.Unlock() + } + // Read the cached blob and use it as a diff. + file, err := os.Open(filename) + if err != nil { + return "", errors.Wrapf(err, "error opening file %q", filename) + } + defer file.Close() + // Build the new layer using the diff, regardless of where it came from. + // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). + layer, _, err := s.imageRef.transport.store.PutLayer(id, previousID, nil, "", false, nil, file) + if err != nil && errors.Cause(err) != storage.ErrDuplicateID { + return "", errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + } + return layer.ID, nil +} + +// Commit marks the process of storing the image as successful and asks for the image to be persisted. +// WARNING: This does not have any transactional semantics: +// - Uploaded data MAY be visible to others before Commit() is called +// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed) func (s *storageImageDestination) Commit(ctx context.Context) error { // Find the list of layer blobs. if len(s.manifest) == 0 { @@ -583,107 +744,9 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { } layerBlobs := man.LayerInfos() // Extract or find the layers. - lastLayer := "" - for _, blob := range layerBlobs { - if blob.EmptyLayer { - continue - } - - // Check if there's already a layer with the ID that we'd give to the result of applying - // this layer blob to its parent, if it has one, or the blob's hex value otherwise. - diffID, haveDiffID := s.blobDiffIDs[blob.Digest] - if !haveDiffID { - // Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(), - // or to even check if we had it. - // Use none.NoCache to avoid a repeated DiffID lookup in the BlobInfoCache; a caller - // that relies on using a blob digest that has never been seeen by the store had better call - // TryReusingBlob; not calling PutBlob already violates the documented API, so there’s only - // so far we are going to accommodate that (if we should be doing that at all). - logrus.Debugf("looking for diffID for blob %+v", blob.Digest) - has, _, err := s.TryReusingBlob(ctx, blob.BlobInfo, none.NoCache, false) - if err != nil { - return errors.Wrapf(err, "error checking for a layer based on blob %q", blob.Digest.String()) - } - if !has { - return errors.Errorf("error determining uncompressed digest for blob %q", blob.Digest.String()) - } - diffID, haveDiffID = s.blobDiffIDs[blob.Digest] - if !haveDiffID { - return errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) - } - } - id := diffID.Hex() - if lastLayer != "" { - id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffID.Hex())).Hex() - } - if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { - // There's already a layer that should have the right contents, just reuse it. - lastLayer = layer.ID - continue - } - // Check if we previously cached a file with that blob's contents. If we didn't, - // then we need to read the desired contents from a layer. - filename, ok := s.filenames[blob.Digest] - if !ok { - // Try to find the layer with contents matching that blobsum. - layer := "" - layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } else { - layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } - } - if layer == "" { - return errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) - } - // Read the layer's contents. - noCompression := archive.Uncompressed - diffOptions := &storage.DiffOptions{ - Compression: &noCompression, - } - diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) - if err2 != nil { - return errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) - } - // Copy the layer diff to a file. Diff() takes a lock that it holds - // until the ReadCloser that it returns is closed, and PutLayer() wants - // the same lock, so the diff can't just be directly streamed from one - // to the other. - filename = s.computeNextBlobCacheFile() - file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) - if err != nil { - diff.Close() - return errors.Wrapf(err, "error creating temporary file %q", filename) - } - // Copy the data to the file. - // TODO: This can take quite some time, and should ideally be cancellable using - // ctx.Done(). - _, err = io.Copy(file, diff) - diff.Close() - file.Close() - if err != nil { - return errors.Wrapf(err, "error storing blob to file %q", filename) - } - // Make sure that we can find this file later, should we need the layer's - // contents again. - s.filenames[blob.Digest] = filename - } - // Read the cached blob and use it as a diff. - file, err := os.Open(filename) - if err != nil { - return errors.Wrapf(err, "error opening file %q", filename) - } - defer file.Close() - // Build the new layer using the diff, regardless of where it came from. - // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). - layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, nil, file) - if err != nil && errors.Cause(err) != storage.ErrDuplicateID { - return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) - } - lastLayer = layer.ID + lastLayer, ok := s.indexToStorageID[len(layerBlobs)-1] + if !ok { + return fmt.Errorf("could not find top layer") } // If one of those blobs was a configuration blob, then we can try to dig out the date when the image @@ -724,6 +787,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { for blob := range s.filenames { dataBlobs[blob] = struct{}{} } + for _, layerBlob := range layerBlobs { delete(dataBlobs, layerBlob.Digest) } diff --git a/storage/storage_test.go b/storage/storage_test.go index d91d7aaa03..287863a972 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -380,11 +380,11 @@ func TestWriteRead(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination: %v", err) } t.Logf("Wrote randomly-generated layer %q (%d/%d bytes) to destination", digest, size, decompressedSize) - if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, cache, false); err != nil { + if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, 0, cache, false); err != nil { t.Fatalf("Error saving config to destination: %v", err) } manifest := strings.Replace(manifestFmt, "%lh", digest.String(), -1) @@ -541,7 +541,7 @@ func TestDuplicateName(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, first pass: %v", err) } manifest := fmt.Sprintf(` @@ -576,7 +576,7 @@ func TestDuplicateName(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: int64(size), Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, second pass: %v", err) } manifest = fmt.Sprintf(` @@ -628,7 +628,7 @@ func TestDuplicateID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, first pass: %v", err) } manifest := fmt.Sprintf(` @@ -663,7 +663,7 @@ func TestDuplicateID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: int64(size), Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, second pass: %v", err) } manifest = fmt.Sprintf(` @@ -718,7 +718,7 @@ func TestDuplicateNameID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, first pass: %v", err) } manifest := fmt.Sprintf(` @@ -753,7 +753,7 @@ func TestDuplicateNameID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: int64(size), Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, second pass: %v", err) } manifest = fmt.Sprintf(` @@ -850,21 +850,21 @@ func TestSize(t *testing.T) { if dest == nil { t.Fatalf("NewImageDestination(%q) returned no destination", ref.StringWithinTransport()) } - if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, cache, false); err != nil { + if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, 0, cache, false); err != nil { t.Fatalf("Error saving config to destination: %v", err) } digest1, usize1, size1, blob := makeLayer(t, archive.Gzip) if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size1, Digest: digest1, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 1 to destination: %v", err) } digest2, usize2, size2, blob := makeLayer(t, archive.Gzip) if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size2, Digest: digest2, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 2 to destination: %v", err) } manifest := fmt.Sprintf(` @@ -946,26 +946,26 @@ func TestDuplicateBlob(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob1), types.BlobInfo{ Size: size1, Digest: digest1, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 1 to destination (first copy): %v", err) } digest2, _, size2, blob2 := makeLayer(t, archive.Gzip) if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob2), types.BlobInfo{ Size: size2, Digest: digest2, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 2 to destination (first copy): %v", err) } if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob1), types.BlobInfo{ Size: size1, Digest: digest1, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 1 to destination (second copy): %v", err) } if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob2), types.BlobInfo{ Size: size2, Digest: digest2, - }, cache, false); err != nil { + }, 0, cache, false); err != nil { t.Fatalf("Error saving randomly-generated layer 2 to destination (second copy): %v", err) } manifest := fmt.Sprintf(` diff --git a/types/types.go b/types/types.go index 9fdab2314a..2772894ee4 100644 --- a/types/types.go +++ b/types/types.go @@ -7,7 +7,7 @@ import ( "github.com/containers/image/docker/reference" "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" ) // ImageTransport is a top-level namespace for ways to to store/load an image. @@ -265,7 +265,7 @@ type ImageDestination interface { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. - PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, cache BlobInfoCache, isConfig bool) (BlobInfo, error) + PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, layerIndexInImage int, cache BlobInfoCache, isConfig bool) (BlobInfo, error) // HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently. HasThreadSafePutBlob() bool // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -275,7 +275,7 @@ type ImageDestination interface { // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. - TryReusingBlob(ctx context.Context, info BlobInfo, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) + TryReusingBlob(ctx context.Context, info BlobInfo, layerIndexInImage int, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) // PutManifest writes manifest to the destination. // FIXME? This should also receive a MIME type if known, to differentiate between schema versions. // If the destination is in principle available, refuses this manifest type (e.g. it does not recognize the schema),