From eebf6833d2e223ef13c857978c3bd9c2dfadfc42 Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Fri, 5 Apr 2019 13:24:05 +0200 Subject: [PATCH] update containers/image ImageDestination.{PutBlob,TryReusingBlob} are extended to receive the index of the layer in the corresponding image. Some transports, containers storage in particular, require the layers to be comitted in sequence. The new sequencing allows containers storage to commit the layers in PutBlob. Signed-off-by: Valentin Rothberg --- cmd/skopeo/layers.go | 4 +- vendor.conf | 4 +- .../github.com/containers/buildah/vendor.conf | 5 +- .../github.com/containers/image/copy/copy.go | 90 +++-- .../image/directory/directory_dest.go | 4 +- .../image/docker/docker_image_dest.go | 8 +- .../containers/image/docker/tarfile/dest.go | 6 +- .../containers/image/image/docker_schema2.go | 2 +- .../containers/image/oci/archive/oci_dest.go | 8 +- .../containers/image/oci/layout/oci_dest.go | 4 +- .../containers/image/openshift/openshift.go | 8 +- .../containers/image/ostree/ostree_dest.go | 4 +- .../containers/image/storage/storage_image.go | 349 ++++++++++++------ .../containers/image/types/types.go | 15 +- 14 files changed, 342 insertions(+), 169 deletions(-) diff --git a/cmd/skopeo/layers.go b/cmd/skopeo/layers.go index 3a09d8bd3e..8db441b1d2 100644 --- a/cmd/skopeo/layers.go +++ b/cmd/skopeo/layers.go @@ -121,12 +121,12 @@ func (opts *layersOptions) run(args []string, stdout io.Writer) (retErr error) { } }() - for _, bd := range blobDigests { + for index, bd := range blobDigests { r, blobSize, err := rawSource.GetBlob(ctx, types.BlobInfo{Digest: bd.digest, Size: -1}, cache) if err != nil { return err } - if _, err := dest.PutBlob(ctx, r, types.BlobInfo{Digest: bd.digest, Size: blobSize}, cache, bd.isConfig); err != nil { + if _, err := dest.PutBlob(ctx, r, types.BlobInfo{Digest: bd.digest, Size: blobSize}, index, cache, bd.isConfig); err != nil { if closeErr := r.Close(); closeErr != nil { return errors.Wrapf(err, " (close error: %v)", closeErr) } diff --git a/vendor.conf b/vendor.conf index 307267f0d0..8e44edcba8 100644 --- a/vendor.conf +++ b/vendor.conf @@ -2,8 +2,8 @@ github.com/urfave/cli v1.20.0 github.com/kr/pretty v0.1.0 github.com/kr/text v0.1.0 -github.com/containers/image e9c3d17ddb8cb5d48450bc0588525b17b181c049 -github.com/containers/buildah 25b7c1164a262934a6526ded1115dfacfc51f8ec +github.com/containers/image commit https://github.com/vrothberg/image +github.com/containers/buildah image-commit https://github.com/vrothberg/buildah github.com/vbauerster/mpb v3.3.4 github.com/mattn/go-isatty v0.0.4 github.com/VividCortex/ewma v1.1.1 diff --git a/vendor/github.com/containers/buildah/vendor.conf b/vendor/github.com/containers/buildah/vendor.conf index 327de39b21..9f729202a0 100644 --- a/vendor/github.com/containers/buildah/vendor.conf +++ b/vendor/github.com/containers/buildah/vendor.conf @@ -3,12 +3,12 @@ github.com/blang/semver v3.5.0 github.com/BurntSushi/toml v0.2.0 github.com/containerd/continuity 004b46473808b3e7a4a3049c20e4376c91eb966d github.com/containernetworking/cni v0.7.0-rc2 -github.com/containers/image f52cf78ebfa1916da406f8b6210d8f7764ec1185 +github.com/containers/image commit https://github.com/vrothberg/image github.com/vbauerster/mpb v3.3.4 github.com/mattn/go-isatty v0.0.4 github.com/VividCortex/ewma v1.1.1 github.com/boltdb/bolt v1.3.1 -github.com/containers/storage v1.12.1 +github.com/containers/storage v1.12.2 github.com/docker/distribution 5f6282db7d65e6d72ad7c2cc66310724a57be716 github.com/docker/docker 54dddadc7d5d89fe0be88f76979f6f6ab0dede83 github.com/docker/docker-credential-helpers v0.6.1 @@ -65,3 +65,4 @@ github.com/klauspost/cpuid v1.2.0 github.com/onsi/gomega v1.4.3 github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 +github.com/ishidawataru/sctp 07191f837fedd2f13d1ec7b5f885f0f3ec54b1cb diff --git a/vendor/github.com/containers/image/copy/copy.go b/vendor/github.com/containers/image/copy/copy.go index 3ed8a2b824..b1103062f0 100644 --- a/vendor/github.com/containers/image/copy/copy.go +++ b/vendor/github.com/containers/image/copy/copy.go @@ -454,10 +454,6 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { err error } - // copyGroup is used to determine if all layers are copied - copyGroup := sync.WaitGroup{} - copyGroup.Add(numLayers) - // copySemaphore is used to limit the number of parallel downloads to // avoid malicious images causing troubles and to be nice to servers. var copySemaphore *semaphore.Weighted @@ -467,43 +463,69 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { copySemaphore = semaphore.NewWeighted(int64(1)) } - data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) { - defer copySemaphore.Release(1) - defer copyGroup.Done() - cld := copyLayerData{} + // copyGroup is used to determine if all layers are copied + copyGroup := sync.WaitGroup{} + + // A context.WithCancel is needed when encountering an error while + // copying/downloading layers in parallel. + cancelCtx, cancelCopyLayer := context.WithCancel(ctx) + defer cancelCopyLayer() + progressPool, progressCleanup := ic.c.newProgressPool(cancelCtx) + defer progressCleanup() + + layerIndex := 0 // some layers might be skipped, so we need a dedicated counter + digestToCopyData := make(map[digest.Digest]*copyLayerData) + for _, srcLayer := range srcInfos { + // Check if we'are already copying the layer + if _, ok := digestToCopyData[srcLayer.Digest]; ok { + continue + } + + cld := ©LayerData{} + digestToCopyData[srcLayer.Digest] = cld if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 { // DiffIDs are, currently, needed only when converting from schema1. // In which case src.LayerInfos will not have URLs because schema1 // does not support them. if ic.diffIDsAreNeeded { - cld.err = errors.New("getting DiffID for foreign layers is unimplemented") + cancelCopyLayer() + return errors.New("getting DiffID for foreign layers is unimplemented") } else { + logrus.Debugf("Skipping foreign layer %q copy to %s", srcLayer.Digest, ic.c.dest.Reference().Transport().Name()) cld.destInfo = srcLayer - logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) + continue } - } else { - cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool) } - data[index] = cld - } - func() { // A scope for defer - progressPool, progressCleanup := ic.c.newProgressPool(ctx) - defer progressCleanup() + copySemaphore.Acquire(cancelCtx, 1) // limits parallel copy operations + copyGroup.Add(1) // allows the main routine to wait for all copy operations to finish - for i, srcLayer := range srcInfos { - copySemaphore.Acquire(ctx, 1) - go copyLayerHelper(i, srcLayer, progressPool) - } + // Copy the layer. Note that cld is a pointer; changes to it are + // propagated implicitly. + go func(index int, srcLayer types.BlobInfo, cld *copyLayerData) { + defer copySemaphore.Release(1) + defer copyGroup.Done() + cld.destInfo, cld.diffID, cld.err = ic.copyLayer(cancelCtx, srcLayer, index, progressPool) + if cld.err != nil { + // Note that the error will be caught below. + logrus.Errorf("copying layer %d failed: %v", index, cld.err) + cancelCopyLayer() + } + }(layerIndex, srcLayer, cld) - // Wait for all layers to be copied - copyGroup.Wait() - }() + layerIndex++ + } + + // Wait for all layer-copy operations to finish + copyGroup.Wait() destInfos := make([]types.BlobInfo, numLayers) diffIDs := make([]digest.Digest, numLayers) - for i, cld := range data { + for i, srcLayer := range srcInfos { + cld, ok := digestToCopyData[srcLayer.Digest] + if !ok { + return errors.Errorf("no copy data for layer %q", srcLayer.Digest) + } if cld.err != nil { return cld.err } @@ -625,7 +647,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { progressPool, progressCleanup := c.newProgressPool(ctx) defer progressCleanup() bar := c.createProgressBar(progressPool, srcInfo, "config", "done") - destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) + destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar) if err != nil { return types.BlobInfo{}, err } @@ -651,13 +673,13 @@ type diffIDResult struct { // copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded -func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { +func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" // If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source. if !diffIDIsNeeded { - reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs) if err != nil { return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) } @@ -678,7 +700,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar) if err != nil { return types.BlobInfo{}, "", err } @@ -709,7 +731,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po // perhaps compressing the stream if canCompress, // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, - diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { + layerIndexInImage int, diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -733,7 +755,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea return pipeWriter } } - blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success + blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success return blobInfo, diffIDChan, err // We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan } @@ -768,7 +790,7 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) // perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil, // perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied blob. -func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, +func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int, getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer, canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers. @@ -847,7 +869,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr } // === Finally, send the layer stream to dest. - uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig) + uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig) if err != nil { return types.BlobInfo{}, errors.Wrap(err, "Error writing blob") } diff --git a/vendor/github.com/containers/image/directory/directory_dest.go b/vendor/github.com/containers/image/directory/directory_dest.go index 4b2ab022e2..3efd093b11 100644 --- a/vendor/github.com/containers/image/directory/directory_dest.go +++ b/vendor/github.com/containers/image/directory/directory_dest.go @@ -136,7 +136,7 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob") if err != nil { return types.BlobInfo{}, err @@ -182,7 +182,7 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/vendor/github.com/containers/image/docker/docker_image_dest.go b/vendor/github.com/containers/image/docker/docker_image_dest.go index c116cbec32..abe100fbe4 100644 --- a/vendor/github.com/containers/image/docker/docker_image_dest.go +++ b/vendor/github.com/containers/image/docker/docker_image_dest.go @@ -19,7 +19,7 @@ import ( "github.com/containers/image/pkg/blobinfocache/none" "github.com/containers/image/types" "github.com/docker/distribution/registry/api/errcode" - "github.com/docker/distribution/registry/api/v2" + v2 "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client" "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -124,12 +124,12 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { if inputInfo.Digest.String() != "" { // This should not really be necessary, at least the copy code calls TryReusingBlob automatically. // Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value. // But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_. - haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, none.NoCache, false) + haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, none.NoCache, false) if err != nil { return types.BlobInfo{}, err } @@ -271,7 +271,7 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/vendor/github.com/containers/image/docker/tarfile/dest.go b/vendor/github.com/containers/image/docker/tarfile/dest.go index 5f30eddbc7..270120ba77 100644 --- a/vendor/github.com/containers/image/docker/tarfile/dest.go +++ b/vendor/github.com/containers/image/docker/tarfile/dest.go @@ -94,7 +94,7 @@ func (d *Destination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Ouch, we need to stream the blob into a temporary file just to determine the size. // When the layer is decompressed, we also have to generate the digest on uncompressed datas. if inputInfo.Size == -1 || inputInfo.Digest.String() == "" { @@ -126,7 +126,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t } // Maybe the blob has been already sent - ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, cache, false) + ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, cache, false) if err != nil { return types.BlobInfo{}, err } @@ -164,7 +164,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf("Can not check for a blob with unknown digest") } diff --git a/vendor/github.com/containers/image/image/docker_schema2.go b/vendor/github.com/containers/image/image/docker_schema2.go index 351e73ea1d..1cd4f402b6 100644 --- a/vendor/github.com/containers/image/image/docker_schema2.go +++ b/vendor/github.com/containers/image/image/docker_schema2.go @@ -252,7 +252,7 @@ func (m *manifestSchema2) convertToManifestSchema1(ctx context.Context, dest typ logrus.Debugf("Uploading empty layer during conversion to schema 1") // Ideally we should update the relevant BlobInfoCache about this layer, but that would require passing it down here, // and anyway this blob is so small that it’s easier to just copy it than to worry about figuring out another location where to get it. - info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, none.NoCache, false) + info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, -1, none.NoCache, false) if err != nil { return nil, errors.Wrap(err, "Error uploading empty layer") } diff --git a/vendor/github.com/containers/image/oci/archive/oci_dest.go b/vendor/github.com/containers/image/oci/archive/oci_dest.go index 9571c37e2b..7a29ea7481 100644 --- a/vendor/github.com/containers/image/oci/archive/oci_dest.go +++ b/vendor/github.com/containers/image/oci/archive/oci_dest.go @@ -90,8 +90,8 @@ func (d *ociArchiveImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.unpackedDest.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { + return d.unpackedDest.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -101,8 +101,8 @@ func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Read // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.unpackedDest.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + return d.unpackedDest.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute) } // PutManifest writes manifest to the destination diff --git a/vendor/github.com/containers/image/oci/layout/oci_dest.go b/vendor/github.com/containers/image/oci/layout/oci_dest.go index db102184db..a0f441648e 100644 --- a/vendor/github.com/containers/image/oci/layout/oci_dest.go +++ b/vendor/github.com/containers/image/oci/layout/oci_dest.go @@ -120,7 +120,7 @@ func (d *ociImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.dir, "oci-put-blob") if err != nil { return types.BlobInfo{}, err @@ -187,7 +187,7 @@ func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/vendor/github.com/containers/image/openshift/openshift.go b/vendor/github.com/containers/image/openshift/openshift.go index 814c3eea1d..2dae485a43 100644 --- a/vendor/github.com/containers/image/openshift/openshift.go +++ b/vendor/github.com/containers/image/openshift/openshift.go @@ -395,8 +395,8 @@ func (d *openshiftImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.docker.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { + return d.docker.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -406,8 +406,8 @@ func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reade // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.docker.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + return d.docker.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute) } // PutManifest writes manifest to the destination. diff --git a/vendor/github.com/containers/image/ostree/ostree_dest.go b/vendor/github.com/containers/image/ostree/ostree_dest.go index d69f4fa331..d0ea867c97 100644 --- a/vendor/github.com/containers/image/ostree/ostree_dest.go +++ b/vendor/github.com/containers/image/ostree/ostree_dest.go @@ -145,7 +145,7 @@ func (d *ostreeImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { tmpDir, err := ioutil.TempDir(d.tmpDirPath, "blob") if err != nil { return types.BlobInfo{}, err @@ -342,7 +342,7 @@ func (d *ostreeImageDestination) importConfig(repo *otbuiltin.Repo, blob *blobTo // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if d.repo == nil { repo, err := openRepo(d.ref.repo) if err != nil { diff --git a/vendor/github.com/containers/image/storage/storage_image.go b/vendor/github.com/containers/image/storage/storage_image.go index b39d2bcc04..0e2d17845e 100644 --- a/vendor/github.com/containers/image/storage/storage_image.go +++ b/vendor/github.com/containers/image/storage/storage_image.go @@ -54,16 +54,19 @@ type storageImageSource struct { } type storageImageDestination struct { - imageRef storageReference - directory string // Temporary directory where we store blobs until Commit() time - nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs - manifest []byte // Manifest contents, temporary - signatures []byte // Signature contents, temporary - putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions - blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs - fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes - filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them - SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + imageRef storageReference + directory string // Temporary directory where we store blobs until Commit() time + nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs + manifest []byte // Manifest contents, temporary + signatures []byte // Signature contents, temporary + putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions + blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs + blobLayerIDs map[digest.Digest]string // Mapping from layer blobsums to their corresponding storage layer ID + fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes + filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them + indexToStorageID map[int]string // Mapping from layer index to the layer IDs in the storage + indexToDoneChannel map[int]chan bool // Mapping from layer index to a channel to indicate the layer has been written to storage + SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice } type storageImageCloser struct { @@ -323,16 +326,31 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e return nil, errors.Wrapf(err, "error creating a temporary directory") } image := &storageImageDestination{ - imageRef: imageRef, - directory: directory, - blobDiffIDs: make(map[digest.Digest]digest.Digest), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, + imageRef: imageRef, + directory: directory, + blobDiffIDs: make(map[digest.Digest]digest.Digest), + blobLayerIDs: make(map[digest.Digest]string), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + indexToStorageID: make(map[int]string), + indexToDoneChannel: make(map[int]chan bool), + SignatureSizes: []int{}, } return image, nil } +func (s *storageImageDestination) getChannelForLayer(layerIndexInImage int) chan bool { + s.putBlobMutex.Lock() + defer s.putBlobMutex.Unlock() + channel, ok := s.indexToDoneChannel[layerIndexInImage] + if !ok { + // A buffered channel to allow non-blocking sends + channel = make(chan bool, 1) + s.indexToDoneChannel[layerIndexInImage] = channel + } + return channel +} + // Reference returns the reference used to set up this destination. Note that this should directly correspond to user's intent, // e.g. it should use the public hostname instead of the result of resolving CNAMEs or following redirects. func (s *storageImageDestination) Reference() types.ImageReference { @@ -365,10 +383,72 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the backend storage in sequential order. A value >= indicates that the blob a layer. +// Note that only the containers-storage destination is sensitive to the layerIndexInImage parameter. Other transport destinations ignore it. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (blob types.BlobInfo, err error) { + errorBlobInfo := types.BlobInfo{ + Digest: "", + Size: -1, + } + + blob, putBlobError := s.putBlob(ctx, stream, blobinfo, layerIndexInImage, cache, isConfig) + if putBlobError != nil { + return errorBlobInfo, putBlobError + } + + // Deferred call to an anonymous func to signal potentially waiting + // goroutines via the index-specific channel. + defer func() { + // No need to wait + if layerIndexInImage >= 0 { + // It's a buffered channel, so we don't wait for the message to be + // received + channel := s.getChannelForLayer(layerIndexInImage) + channel <- err == nil + if err != nil { + logrus.Errorf("error while committing blob %d: %v", layerIndexInImage, err) + } + } + }() + + // First, wait for the previous layer to be committed + previousID := "" + if layerIndexInImage > 0 { + channel := s.getChannelForLayer(layerIndexInImage - 1) + if committed := <-channel; !committed { + err := fmt.Errorf("committing previous layer %d failed", layerIndexInImage-1) + logrus.Errorf(err.Error()) + return errorBlobInfo, err + } + var ok bool + s.putBlobMutex.Lock() + previousID, ok = s.indexToStorageID[layerIndexInImage-1] + s.putBlobMutex.Unlock() + if !ok { + return errorBlobInfo, fmt.Errorf("error committing blob %q: could not find parent layer ID", blob.Digest.String()) + } + } + + // Commit the blob + if layerIndexInImage >= 0 { + id, err := s.commitBlob(ctx, blob, previousID) + if err == nil { + s.putBlobMutex.Lock() + s.blobLayerIDs[blob.Digest] = id + s.indexToStorageID[layerIndexInImage] = id + s.putBlobMutex.Unlock() + } else { + return errorBlobInfo, err + } + } + return blob, nil +} + +func (s *storageImageDestination) putBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Stores a layer or data blob in our temporary directory, checking that any information // in the blobinfo matches the incoming data. errorBlobInfo := types.BlobInfo{ @@ -425,21 +505,17 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, } // This is safe because we have just computed both values ourselves. cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest()) - return types.BlobInfo{ + blob := types.BlobInfo{ Digest: blobDigest, Size: blobSize, MediaType: blobinfo.MediaType, - }, nil + } + + return blob, nil } -// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination -// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). -// info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. -// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. -// May use and/or update cache. -func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +// tryReusingBlob is a helper method for TryReusingBlob to wrap it +func (s *storageImageDestination) tryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { // lock the entire method as it executes fairly quickly s.putBlobMutex.Lock() defer s.putBlobMutex.Unlock() @@ -467,6 +543,10 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t if len(layers) > 0 { // Save this for completeness. s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest + s.blobLayerIDs[blobinfo.Digest] = layers[0].ID + if layerIndexInImage >= 0 { + s.indexToStorageID[layerIndexInImage] = layers[0].ID + } return true, types.BlobInfo{ Digest: blobinfo.Digest, Size: layers[0].UncompressedSize, @@ -482,6 +562,10 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t if len(layers) > 0 { // Record the uncompressed value so that we can use it to calculate layer IDs. s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest + s.blobLayerIDs[blobinfo.Digest] = layers[0].ID + if layerIndexInImage >= 0 { + s.indexToStorageID[layerIndexInImage] = layers[0].ID + } return true, types.BlobInfo{ Digest: blobinfo.Digest, Size: layers[0].CompressedSize, @@ -500,6 +584,10 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t } if len(layers) > 0 { s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest + s.blobLayerIDs[blobinfo.Digest] = layers[0].ID + if layerIndexInImage >= 0 { + s.indexToStorageID[layerIndexInImage] = layers[0].ID + } return true, types.BlobInfo{ Digest: uncompressedDigest, Size: layers[0].UncompressedSize, @@ -513,6 +601,30 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t return false, types.BlobInfo{}, nil } +// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination +// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). +// info.Digest must not be empty. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the backend storage in sequential order. A value >= indicates that the blob a layer. +// Non-layer blobs (e.g., configs or throwaway layers) must have a value < 0. +// Note that only the containers-storage destination is sensitive to the layerIndexInImage parameter. Other transport destinations ignore it. +// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. +// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. +// May use and/or update cache. +func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + reusable, blob, err := s.tryReusingBlob(ctx, blobinfo, layerIndexInImage, cache, canSubstitute) + // If we can reuse the blob or hit an error trying to do so, we need to + // signal the result through the channel as another Goroutine is potentially + // waiting for it. If we can't resuse the blob and encountered no error, we + // need to copy it and will send the signal in PutBlob(). + if (layerIndexInImage >= 0) && (err != nil || reusable) { + channel := s.getChannelForLayer(layerIndexInImage) + channel <- (err == nil) + } + return reusable, blob, err +} + // computeID computes a recommended image ID based on information we have so far. If // the manifest is not of a type that we recognize, we return an empty value, indicating // that since we don't have a recommendation, a random ID should be used if one needs @@ -572,6 +684,102 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er return nil, errors.New("blob not found") } +// commitBlobs commits the specified blob to the storage. If not already done, +// it will block until the parent layer is committed to the storage. Note that +// the only caller of commitBlob() is PutBlob(), which is recording the results +// and send the error through the corresponding channel in s.previousLayerResult. +func (s *storageImageDestination) commitBlob(ctx context.Context, blob types.BlobInfo, previousID string) (string, error) { + logrus.Debugf("committing blob %q", blob.Digest) + // Check if there's already a layer with the ID that we'd give to the result of applying + // this layer blob to its parent, if it has one, or the blob's hex value otherwise. + s.putBlobMutex.Lock() + diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + s.putBlobMutex.Unlock() + if !haveDiffID { + return "", errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) + } + + id := diffID.Hex() + if previousID != "" { + id = digest.Canonical.FromBytes([]byte(previousID + "+" + diffID.Hex())).Hex() + } + if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { + logrus.Debugf("layer for blob %q already found in storage", blob.Digest) + return layer.ID, nil + } + // Check if we previously cached a file with that blob's contents. If we didn't, + // then we need to read the desired contents from a layer. + s.putBlobMutex.Lock() + filename, ok := s.filenames[blob.Digest] + s.putBlobMutex.Unlock() + if !ok { + // Try to find the layer with contents matching that blobsum. + layer := "" + layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } else { + layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } + } + if layer == "" { + return "", errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) + } + // Read the layer's contents. + noCompression := archive.Uncompressed + diffOptions := &storage.DiffOptions{ + Compression: &noCompression, + } + diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) + if err2 != nil { + return "", errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) + } + // Copy the layer diff to a file. Diff() takes a lock that it holds + // until the ReadCloser that it returns is closed, and PutLayer() wants + // the same lock, so the diff can't just be directly streamed from one + // to the other. + filename = s.computeNextBlobCacheFile() + file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) + if err != nil { + diff.Close() + return "", errors.Wrapf(err, "error creating temporary file %q", filename) + } + // Copy the data to the file. + // TODO: This can take quite some time, and should ideally be cancellable using + // ctx.Done(). + _, err = io.Copy(file, diff) + diff.Close() + file.Close() + if err != nil { + return "", errors.Wrapf(err, "error storing blob to file %q", filename) + } + // Make sure that we can find this file later, should we need the layer's + // contents again. + s.putBlobMutex.Lock() + s.filenames[blob.Digest] = filename + s.putBlobMutex.Unlock() + } + // Read the cached blob and use it as a diff. + file, err := os.Open(filename) + if err != nil { + return "", errors.Wrapf(err, "error opening file %q", filename) + } + defer file.Close() + // Build the new layer using the diff, regardless of where it came from. + // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). + layer, _, err := s.imageRef.transport.store.PutLayer(id, previousID, nil, "", false, nil, file) + if err != nil && errors.Cause(err) != storage.ErrDuplicateID { + return "", errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + } + return layer.ID, nil +} + +// Commit marks the process of storing the image as successful and asks for the image to be persisted. +// WARNING: This does not have any transactional semantics: +// - Uploaded data MAY be visible to others before Commit() is called +// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed) func (s *storageImageDestination) Commit(ctx context.Context) error { // Find the list of layer blobs. if len(s.manifest) == 0 { @@ -581,6 +789,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if err != nil { return errors.Wrapf(err, "error parsing manifest") } + layerBlobs := man.LayerInfos() // Extract or find the layers. lastLayer := "" @@ -588,10 +797,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if blob.EmptyLayer { continue } - - // Check if there's already a layer with the ID that we'd give to the result of applying - // this layer blob to its parent, if it has one, or the blob's hex value otherwise. - diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + _, haveDiffID := s.blobDiffIDs[blob.Digest] if !haveDiffID { // Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(), // or to even check if we had it. @@ -600,90 +806,27 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { // TryReusingBlob; not calling PutBlob already violates the documented API, so there’s only // so far we are going to accommodate that (if we should be doing that at all). logrus.Debugf("looking for diffID for blob %+v", blob.Digest) - has, _, err := s.TryReusingBlob(ctx, blob.BlobInfo, none.NoCache, false) + has, _, err := s.tryReusingBlob(ctx, blob.BlobInfo, -1, none.NoCache, false) if err != nil { return errors.Wrapf(err, "error checking for a layer based on blob %q", blob.Digest.String()) } if !has { return errors.Errorf("error determining uncompressed digest for blob %q", blob.Digest.String()) } - diffID, haveDiffID = s.blobDiffIDs[blob.Digest] + _, haveDiffID = s.blobDiffIDs[blob.Digest] if !haveDiffID { return errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) } } - id := diffID.Hex() - if lastLayer != "" { - id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffID.Hex())).Hex() - } - if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { - // There's already a layer that should have the right contents, just reuse it. - lastLayer = layer.ID - continue - } - // Check if we previously cached a file with that blob's contents. If we didn't, - // then we need to read the desired contents from a layer. - filename, ok := s.filenames[blob.Digest] - if !ok { - // Try to find the layer with contents matching that blobsum. - layer := "" - layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } else { - layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } - } - if layer == "" { - return errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) - } - // Read the layer's contents. - noCompression := archive.Uncompressed - diffOptions := &storage.DiffOptions{ - Compression: &noCompression, - } - diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) - if err2 != nil { - return errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) - } - // Copy the layer diff to a file. Diff() takes a lock that it holds - // until the ReadCloser that it returns is closed, and PutLayer() wants - // the same lock, so the diff can't just be directly streamed from one - // to the other. - filename = s.computeNextBlobCacheFile() - file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) - if err != nil { - diff.Close() - return errors.Wrapf(err, "error creating temporary file %q", filename) - } - // Copy the data to the file. - // TODO: This can take quite some time, and should ideally be cancellable using - // ctx.Done(). - _, err = io.Copy(file, diff) - diff.Close() - file.Close() - if err != nil { - return errors.Wrapf(err, "error storing blob to file %q", filename) - } - // Make sure that we can find this file later, should we need the layer's - // contents again. - s.filenames[blob.Digest] = filename - } - // Read the cached blob and use it as a diff. - file, err := os.Open(filename) + newID, err := s.commitBlob(ctx, blob.BlobInfo, lastLayer) if err != nil { - return errors.Wrapf(err, "error opening file %q", filename) - } - defer file.Close() - // Build the new layer using the diff, regardless of where it came from. - // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). - layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, nil, file) - if err != nil && errors.Cause(err) != storage.ErrDuplicateID { - return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + return err } - lastLayer = layer.ID + lastLayer = newID + } + + if lastLayer == "" { + return fmt.Errorf("could not find top layer") } // If one of those blobs was a configuration blob, then we can try to dig out the date when the image diff --git a/vendor/github.com/containers/image/types/types.go b/vendor/github.com/containers/image/types/types.go index 9fdab2314a..62662c6598 100644 --- a/vendor/github.com/containers/image/types/types.go +++ b/vendor/github.com/containers/image/types/types.go @@ -7,7 +7,7 @@ import ( "github.com/containers/image/docker/reference" "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" ) // ImageTransport is a top-level namespace for ways to to store/load an image. @@ -262,20 +262,27 @@ type ImageDestination interface { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. + // layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of + // PutBlob() and TryReusingBlob() where the layers must be written to the backend storage in sequential order. A value >= indicates that the blob a layer. + // Non-layer blobs (e.g., configs or throwaway layers) must have a value < 0. + // Note that only the containers-storage destination is sensitive to the layerIndexInImage parameter. Other transport destinations ignore it. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. - PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, cache BlobInfoCache, isConfig bool) (BlobInfo, error) + PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, layerIndexInImage int, cache BlobInfoCache, isConfig bool) (BlobInfo, error) // HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently. HasThreadSafePutBlob() bool // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. + // layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of + // PutBlob() and TryReusingBlob() where the layers must be written to the backend storage in sequential order. A value >= indicates that the blob a layer. + // Note that only the containers-storage destination is sensitive to the layerIndexInImage parameter. Other transport destinations ignore it. // If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. - // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. + // If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. - TryReusingBlob(ctx context.Context, info BlobInfo, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) + TryReusingBlob(ctx context.Context, info BlobInfo, layerIndexInImage int, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) // PutManifest writes manifest to the destination. // FIXME? This should also receive a MIME type if known, to differentiate between schema versions. // If the destination is in principle available, refuses this manifest type (e.g. it does not recognize the schema),