diff --git a/pkg/blobcache/blobcache.go b/pkg/blobcache/blobcache.go index 31e6a428c38..81faf2bdd59 100644 --- a/pkg/blobcache/blobcache.go +++ b/pkg/blobcache/blobcache.go @@ -18,7 +18,7 @@ import ( "github.com/containers/storage/pkg/archive" "github.com/containers/storage/pkg/ioutils" digest "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -416,7 +416,7 @@ func (s *blobCacheDestination) HasThreadSafePutBlob() bool { return s.destination.HasThreadSafePutBlob() } -func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { var tempfile *os.File var err error var n int @@ -479,7 +479,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in } } } - newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, cache, isConfig) + newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig) if err != nil { return newBlobInfo, errors.Wrapf(err, "error storing blob to image destination for cache %q", transports.ImageName(d.reference)) } @@ -491,8 +491,8 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in return newBlobInfo, nil } -func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute) if err != nil || present { return present, reusedInfo, err } @@ -502,7 +502,7 @@ func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.Bl f, err := os.Open(filename) if err == nil { defer f.Close() - uploadedInfo, err := d.destination.PutBlob(ctx, f, info, cache, isConfig) + uploadedInfo, err := d.destination.PutBlob(ctx, f, info, layerIndexInImage, cache, isConfig) if err != nil { return false, types.BlobInfo{}, err } diff --git a/pkg/blobcache/blobcache_test.go b/pkg/blobcache/blobcache_test.go index 7000050a295..d8619db15fd 100644 --- a/pkg/blobcache/blobcache_test.go +++ b/pkg/blobcache/blobcache_test.go @@ -21,7 +21,7 @@ import ( "github.com/containers/storage/pkg/archive" digest "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -128,11 +128,11 @@ func TestBlobCache(t *testing.T) { if err != nil { t.Fatalf("error opening source image for writing: %v", err) } - _, err = destImage.PutBlob(context.TODO(), bytes.NewReader(blobBytes), blobInfo, none.NoCache, false) + _, err = destImage.PutBlob(context.TODO(), bytes.NewReader(blobBytes), blobInfo, 0, none.NoCache, false) if err != nil { t.Fatalf("error writing layer blob to source image: %v", err) } - _, err = destImage.PutBlob(context.TODO(), bytes.NewReader(configBytes), configInfo, none.NoCache, true) + _, err = destImage.PutBlob(context.TODO(), bytes.NewReader(configBytes), configInfo, 0, none.NoCache, true) if err != nil { t.Fatalf("error writing config blob to source image: %v", err) } diff --git a/vendor.conf b/vendor.conf index a77130acb5a..9f729202a09 100644 --- a/vendor.conf +++ b/vendor.conf @@ -3,7 +3,7 @@ github.com/blang/semver v3.5.0 github.com/BurntSushi/toml v0.2.0 github.com/containerd/continuity 004b46473808b3e7a4a3049c20e4376c91eb966d github.com/containernetworking/cni v0.7.0-rc2 -github.com/containers/image f52cf78ebfa1916da406f8b6210d8f7764ec1185 +github.com/containers/image commit https://github.com/vrothberg/image github.com/vbauerster/mpb v3.3.4 github.com/mattn/go-isatty v0.0.4 github.com/VividCortex/ewma v1.1.1 diff --git a/vendor/github.com/containers/image/copy/copy.go b/vendor/github.com/containers/image/copy/copy.go index 3ed8a2b8243..cfde76714a3 100644 --- a/vendor/github.com/containers/image/copy/copy.go +++ b/vendor/github.com/containers/image/copy/copy.go @@ -454,10 +454,6 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { err error } - // copyGroup is used to determine if all layers are copied - copyGroup := sync.WaitGroup{} - copyGroup.Add(numLayers) - // copySemaphore is used to limit the number of parallel downloads to // avoid malicious images causing troubles and to be nice to servers. var copySemaphore *semaphore.Weighted @@ -467,43 +463,60 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { copySemaphore = semaphore.NewWeighted(int64(1)) } - data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) { - defer copySemaphore.Release(1) - defer copyGroup.Done() + // copyGroup is used to determine if all layers are copied + copyGroup := sync.WaitGroup{} + + cancelCtx, cancelFunc := context.WithCancel(ctx) + progressPool, progressCleanup := ic.c.newProgressPool(cancelCtx) + defer progressCleanup() + + layerIndex := 0 + digestToCopyData := make(map[digest.Digest]*copyLayerData) + for _, srcLayer := range srcInfos { + // Do not copy a layer twice + if _, ok := digestToCopyData[srcLayer.Digest]; ok { + continue + } + cld := copyLayerData{} + digestToCopyData[srcLayer.Digest] = &cld if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 { // DiffIDs are, currently, needed only when converting from schema1. // In which case src.LayerInfos will not have URLs because schema1 // does not support them. if ic.diffIDsAreNeeded { - cld.err = errors.New("getting DiffID for foreign layers is unimplemented") + cancelFunc() + return errors.New("getting DiffID for foreign layers is unimplemented") } else { + logrus.Debugf("Skipping foreign layer %q copy to %s", srcLayer.Digest, ic.c.dest.Reference().Transport().Name()) cld.destInfo = srcLayer - logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) } - } else { - cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool) } - data[index] = cld - } - func() { // A scope for defer - progressPool, progressCleanup := ic.c.newProgressPool(ctx) - defer progressCleanup() - - for i, srcLayer := range srcInfos { - copySemaphore.Acquire(ctx, 1) - go copyLayerHelper(i, srcLayer, progressPool) - } + copySemaphore.Acquire(cancelCtx, 1) + copyGroup.Add(1) + go func(index int, srcLayer types.BlobInfo, cld *copyLayerData) { + defer copySemaphore.Release(1) + defer copyGroup.Done() + cld.destInfo, cld.diffID, cld.err = ic.copyLayer(cancelCtx, srcLayer, index, progressPool) + if cld.err != nil { + logrus.Errorf("copying layer %d failed: %v", index, cld.err) + cancelFunc() + } + }(layerIndex, srcLayer, &cld) + layerIndex++ + } - // Wait for all layers to be copied - copyGroup.Wait() - }() + // Wait for all layers to be copied + copyGroup.Wait() destInfos := make([]types.BlobInfo, numLayers) diffIDs := make([]digest.Digest, numLayers) - for i, cld := range data { + for i, srcLayer := range srcInfos { + cld, ok := digestToCopyData[srcLayer.Digest] + if !ok { + return errors.Errorf("no copy data for layer %q", srcLayer.Digest) + } if cld.err != nil { return cld.err } @@ -516,6 +529,16 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { ic.manifestUpdates.InformationOnly.LayerDiffIDs = diffIDs } if srcInfosUpdated || layerDigestsDiffer(srcInfos, destInfos) { + logrus.Errorf("DIGESTS DIFFER!") + for _, b := range srcInfos { + logrus.Errorf("%q", b.Digest) + } + logrus.Errorf("----------------------------") + for _, b := range destInfos { + logrus.Errorf("%q", b.Digest) + } + logrus.Errorf("----------------------------") + ic.manifestUpdates.LayerInfos = destInfos } return nil @@ -625,7 +648,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { progressPool, progressCleanup := c.newProgressPool(ctx) defer progressCleanup() bar := c.createProgressBar(progressPool, srcInfo, "config", "done") - destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) + destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar) if err != nil { return types.BlobInfo{}, err } @@ -651,13 +674,13 @@ type diffIDResult struct { // copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded -func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { +func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" // If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source. if !diffIDIsNeeded { - reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs) if err != nil { return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) } @@ -678,7 +701,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar) if err != nil { return types.BlobInfo{}, "", err } @@ -709,7 +732,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po // perhaps compressing the stream if canCompress, // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, - diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { + layerIndexInImage int, diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -733,7 +756,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea return pipeWriter } } - blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success + blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success return blobInfo, diffIDChan, err // We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan } @@ -768,7 +791,7 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) // perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil, // perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied blob. -func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, +func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int, getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer, canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers. @@ -847,7 +870,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr } // === Finally, send the layer stream to dest. - uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig) + uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig) if err != nil { return types.BlobInfo{}, errors.Wrap(err, "Error writing blob") } diff --git a/vendor/github.com/containers/image/directory/directory_dest.go b/vendor/github.com/containers/image/directory/directory_dest.go index 4b2ab022e24..3efd093b114 100644 --- a/vendor/github.com/containers/image/directory/directory_dest.go +++ b/vendor/github.com/containers/image/directory/directory_dest.go @@ -136,7 +136,7 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob") if err != nil { return types.BlobInfo{}, err @@ -182,7 +182,7 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/vendor/github.com/containers/image/docker/docker_image_dest.go b/vendor/github.com/containers/image/docker/docker_image_dest.go index c116cbec32b..abe100fbe40 100644 --- a/vendor/github.com/containers/image/docker/docker_image_dest.go +++ b/vendor/github.com/containers/image/docker/docker_image_dest.go @@ -19,7 +19,7 @@ import ( "github.com/containers/image/pkg/blobinfocache/none" "github.com/containers/image/types" "github.com/docker/distribution/registry/api/errcode" - "github.com/docker/distribution/registry/api/v2" + v2 "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client" "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -124,12 +124,12 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { if inputInfo.Digest.String() != "" { // This should not really be necessary, at least the copy code calls TryReusingBlob automatically. // Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value. // But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_. - haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, none.NoCache, false) + haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, none.NoCache, false) if err != nil { return types.BlobInfo{}, err } @@ -271,7 +271,7 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/vendor/github.com/containers/image/docker/tarfile/dest.go b/vendor/github.com/containers/image/docker/tarfile/dest.go index 5f30eddbc72..270120ba77f 100644 --- a/vendor/github.com/containers/image/docker/tarfile/dest.go +++ b/vendor/github.com/containers/image/docker/tarfile/dest.go @@ -94,7 +94,7 @@ func (d *Destination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Ouch, we need to stream the blob into a temporary file just to determine the size. // When the layer is decompressed, we also have to generate the digest on uncompressed datas. if inputInfo.Size == -1 || inputInfo.Digest.String() == "" { @@ -126,7 +126,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t } // Maybe the blob has been already sent - ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, cache, false) + ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, cache, false) if err != nil { return types.BlobInfo{}, err } @@ -164,7 +164,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf("Can not check for a blob with unknown digest") } diff --git a/vendor/github.com/containers/image/image/docker_schema2.go b/vendor/github.com/containers/image/image/docker_schema2.go index 351e73ea1da..1cd4f402b68 100644 --- a/vendor/github.com/containers/image/image/docker_schema2.go +++ b/vendor/github.com/containers/image/image/docker_schema2.go @@ -252,7 +252,7 @@ func (m *manifestSchema2) convertToManifestSchema1(ctx context.Context, dest typ logrus.Debugf("Uploading empty layer during conversion to schema 1") // Ideally we should update the relevant BlobInfoCache about this layer, but that would require passing it down here, // and anyway this blob is so small that it’s easier to just copy it than to worry about figuring out another location where to get it. - info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, none.NoCache, false) + info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, -1, none.NoCache, false) if err != nil { return nil, errors.Wrap(err, "Error uploading empty layer") } diff --git a/vendor/github.com/containers/image/oci/archive/oci_dest.go b/vendor/github.com/containers/image/oci/archive/oci_dest.go index 9571c37e2ba..7a29ea74812 100644 --- a/vendor/github.com/containers/image/oci/archive/oci_dest.go +++ b/vendor/github.com/containers/image/oci/archive/oci_dest.go @@ -90,8 +90,8 @@ func (d *ociArchiveImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.unpackedDest.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { + return d.unpackedDest.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -101,8 +101,8 @@ func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Read // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.unpackedDest.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + return d.unpackedDest.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute) } // PutManifest writes manifest to the destination diff --git a/vendor/github.com/containers/image/oci/layout/oci_dest.go b/vendor/github.com/containers/image/oci/layout/oci_dest.go index db102184dbc..a0f441648e7 100644 --- a/vendor/github.com/containers/image/oci/layout/oci_dest.go +++ b/vendor/github.com/containers/image/oci/layout/oci_dest.go @@ -120,7 +120,7 @@ func (d *ociImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.dir, "oci-put-blob") if err != nil { return types.BlobInfo{}, err @@ -187,7 +187,7 @@ func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/vendor/github.com/containers/image/openshift/openshift.go b/vendor/github.com/containers/image/openshift/openshift.go index 814c3eea1de..2dae485a435 100644 --- a/vendor/github.com/containers/image/openshift/openshift.go +++ b/vendor/github.com/containers/image/openshift/openshift.go @@ -395,8 +395,8 @@ func (d *openshiftImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.docker.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { + return d.docker.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -406,8 +406,8 @@ func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reade // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.docker.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + return d.docker.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute) } // PutManifest writes manifest to the destination. diff --git a/vendor/github.com/containers/image/ostree/ostree_dest.go b/vendor/github.com/containers/image/ostree/ostree_dest.go index d69f4fa331a..d0ea867c97b 100644 --- a/vendor/github.com/containers/image/ostree/ostree_dest.go +++ b/vendor/github.com/containers/image/ostree/ostree_dest.go @@ -145,7 +145,7 @@ func (d *ostreeImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { tmpDir, err := ioutil.TempDir(d.tmpDirPath, "blob") if err != nil { return types.BlobInfo{}, err @@ -342,7 +342,7 @@ func (d *ostreeImageDestination) importConfig(repo *otbuiltin.Repo, blob *blobTo // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { if d.repo == nil { repo, err := openRepo(d.ref.repo) if err != nil { diff --git a/vendor/github.com/containers/image/pkg/blobinfocache/default.go b/vendor/github.com/containers/image/pkg/blobinfocache/default.go index 1e6e543b2fe..357333215df 100644 --- a/vendor/github.com/containers/image/pkg/blobinfocache/default.go +++ b/vendor/github.com/containers/image/pkg/blobinfocache/default.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "github.com/containers/image/pkg/blobinfocache/boltdb" "github.com/containers/image/pkg/blobinfocache/memory" @@ -47,9 +48,18 @@ func blobInfoCacheDir(sys *types.SystemContext, euid int) (string, error) { return filepath.Join(dataDir, "containers", "cache"), nil } +func getRootlessUID() int { + uidEnv := os.Getenv("_CONTAINERS_ROOTLESS_UID") + if uidEnv != "" { + u, _ := strconv.Atoi(uidEnv) + return u + } + return os.Geteuid() +} + // DefaultCache returns the default BlobInfoCache implementation appropriate for sys. func DefaultCache(sys *types.SystemContext) types.BlobInfoCache { - dir, err := blobInfoCacheDir(sys, os.Geteuid()) + dir, err := blobInfoCacheDir(sys, getRootlessUID()) if err != nil { logrus.Debugf("Error determining a location for %s, using a memory-only cache", blobInfoCacheFilename) return memory.New() diff --git a/vendor/github.com/containers/image/storage/storage_image.go b/vendor/github.com/containers/image/storage/storage_image.go index b39d2bcc04a..199a60bf253 100644 --- a/vendor/github.com/containers/image/storage/storage_image.go +++ b/vendor/github.com/containers/image/storage/storage_image.go @@ -54,16 +54,19 @@ type storageImageSource struct { } type storageImageDestination struct { - imageRef storageReference - directory string // Temporary directory where we store blobs until Commit() time - nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs - manifest []byte // Manifest contents, temporary - signatures []byte // Signature contents, temporary - putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions - blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs - fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes - filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them - SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + imageRef storageReference + directory string // Temporary directory where we store blobs until Commit() time + nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs + manifest []byte // Manifest contents, temporary + signatures []byte // Signature contents, temporary + putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions + blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs + blobLayerIDs map[digest.Digest]string // Mapping from layer blobsums to their corresponding storage layer ID + fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes + filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them + indexToStorageID map[int]string // Mapping from layer index to the layer IDs in the storage + indexToDoneChannel map[int]chan bool // Mapping from layer index to a channel to indicate the layer has been written to storage + SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice } type storageImageCloser struct { @@ -323,16 +326,31 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e return nil, errors.Wrapf(err, "error creating a temporary directory") } image := &storageImageDestination{ - imageRef: imageRef, - directory: directory, - blobDiffIDs: make(map[digest.Digest]digest.Digest), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, + imageRef: imageRef, + directory: directory, + blobDiffIDs: make(map[digest.Digest]digest.Digest), + blobLayerIDs: make(map[digest.Digest]string), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + indexToStorageID: make(map[int]string), + indexToDoneChannel: make(map[int]chan bool), + SignatureSizes: []int{}, } return image, nil } +func (s *storageImageDestination) getChannelForLayer(layerIndexInImage int) chan bool { + s.putBlobMutex.Lock() + defer s.putBlobMutex.Unlock() + channel, ok := s.indexToDoneChannel[layerIndexInImage] + if !ok { + // A buffered channel to allow non-blocking sends + channel = make(chan bool, 1) + s.indexToDoneChannel[layerIndexInImage] = channel + } + return channel +} + // Reference returns the reference used to set up this destination. Note that this should directly correspond to user's intent, // e.g. it should use the public hostname instead of the result of resolving CNAMEs or following redirects. func (s *storageImageDestination) Reference() types.ImageReference { @@ -368,7 +386,64 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (blob types.BlobInfo, err error) { + errorBlobInfo := types.BlobInfo{ + Digest: "", + Size: -1, + } + + blob, putBlobError := s.putBlob(ctx, stream, blobinfo, layerIndexInImage, cache, isConfig) + if putBlobError != nil { + return errorBlobInfo, putBlobError + } + + // Signal potentially waiting goroutines via the channel + defer func() { + if layerIndexInImage >= 0 { + // It's a buffered channel, so we don't wait for the message to be + // received + channel := s.getChannelForLayer(layerIndexInImage) + channel <- err == nil + if err != nil { + logrus.Errorf("error while committing blob %d: %v", layerIndexInImage, err) + } + } + }() + + // First, wait for the previous layer to be committed + previousID := "" + if layerIndexInImage > 0 { + channel := s.getChannelForLayer(layerIndexInImage - 1) + if committed := <-channel; !committed { + err := fmt.Errorf("committing previous layer %d failed", layerIndexInImage-1) + logrus.Errorf(err.Error()) + return errorBlobInfo, err + } + var ok bool + s.putBlobMutex.Lock() + previousID, ok = s.indexToStorageID[layerIndexInImage-1] + s.putBlobMutex.Unlock() + if !ok { + return errorBlobInfo, fmt.Errorf("error committing blob %q: could not find parent layer ID", blob.Digest.String()) + } + } + + // Commit the blob + if layerIndexInImage >= 0 { + id, err := s.commitBlob(ctx, blob, previousID) + if err == nil { + s.putBlobMutex.Lock() + s.blobLayerIDs[blob.Digest] = id + s.indexToStorageID[layerIndexInImage] = id + s.putBlobMutex.Unlock() + } else { + return errorBlobInfo, err + } + } + return blob, nil +} + +func (s *storageImageDestination) putBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Stores a layer or data blob in our temporary directory, checking that any information // in the blobinfo matches the incoming data. errorBlobInfo := types.BlobInfo{ @@ -425,21 +500,17 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, } // This is safe because we have just computed both values ourselves. cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest()) - return types.BlobInfo{ + blob := types.BlobInfo{ Digest: blobDigest, Size: blobSize, MediaType: blobinfo.MediaType, - }, nil + } + + return blob, nil } -// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination -// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). -// info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. -// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. -// May use and/or update cache. -func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +// tryReusingBlob is a helper method for TryReusingBlob to wrap it +func (s *storageImageDestination) tryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { // lock the entire method as it executes fairly quickly s.putBlobMutex.Lock() defer s.putBlobMutex.Unlock() @@ -467,6 +538,10 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t if len(layers) > 0 { // Save this for completeness. s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest + s.blobLayerIDs[blobinfo.Digest] = layers[0].ID + if layerIndexInImage >= 0 { + s.indexToStorageID[layerIndexInImage] = layers[0].ID + } return true, types.BlobInfo{ Digest: blobinfo.Digest, Size: layers[0].UncompressedSize, @@ -482,6 +557,10 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t if len(layers) > 0 { // Record the uncompressed value so that we can use it to calculate layer IDs. s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest + s.blobLayerIDs[blobinfo.Digest] = layers[0].ID + if layerIndexInImage >= 0 { + s.indexToStorageID[layerIndexInImage] = layers[0].ID + } return true, types.BlobInfo{ Digest: blobinfo.Digest, Size: layers[0].CompressedSize, @@ -500,6 +579,10 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t } if len(layers) > 0 { s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest + s.blobLayerIDs[blobinfo.Digest] = layers[0].ID + if layerIndexInImage >= 0 { + s.indexToStorageID[layerIndexInImage] = layers[0].ID + } return true, types.BlobInfo{ Digest: uncompressedDigest, Size: layers[0].UncompressedSize, @@ -513,6 +596,26 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t return false, types.BlobInfo{}, nil } +// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination +// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). +// info.Digest must not be empty. +// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. +// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. +// May use and/or update cache. +func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + reusable, blob, err := s.tryReusingBlob(ctx, blobinfo, layerIndexInImage, cache, canSubstitute) + // If we can reuse the blob or hit an error trying to do so, we need to + // signal the result through the channel as another Goroutine is potentially + // waiting for it. If we can't resuse the blob and encountered no error, we + // need to copy it and will send the signal in PutBlob(). + if (layerIndexInImage >= 0) && (err != nil || reusable) { + channel := s.getChannelForLayer(layerIndexInImage) + channel <- (err == nil) + } + return reusable, blob, err +} + // computeID computes a recommended image ID based on information we have so far. If // the manifest is not of a type that we recognize, we return an empty value, indicating // that since we don't have a recommendation, a random ID should be used if one needs @@ -572,6 +675,102 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er return nil, errors.New("blob not found") } +// commitBlobs commits the specified blob to the storage. If not already done, +// it will block until the parent layer is committed to the storage. Note that +// the only caller of commitBlob() is PutBlob(), which is recording the results +// and send the error through the corresponding channel in s.previousLayerResult. +func (s *storageImageDestination) commitBlob(ctx context.Context, blob types.BlobInfo, previousID string) (string, error) { + logrus.Debugf("committing blob %q", blob.Digest) + // Check if there's already a layer with the ID that we'd give to the result of applying + // this layer blob to its parent, if it has one, or the blob's hex value otherwise. + s.putBlobMutex.Lock() + diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + s.putBlobMutex.Unlock() + if !haveDiffID { + return "", errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) + } + + id := diffID.Hex() + if previousID != "" { + id = digest.Canonical.FromBytes([]byte(previousID + "+" + diffID.Hex())).Hex() + } + if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { + logrus.Debugf("layer for blob %q already found in storage", blob.Digest) + return layer.ID, nil + } + // Check if we previously cached a file with that blob's contents. If we didn't, + // then we need to read the desired contents from a layer. + s.putBlobMutex.Lock() + filename, ok := s.filenames[blob.Digest] + s.putBlobMutex.Unlock() + if !ok { + // Try to find the layer with contents matching that blobsum. + layer := "" + layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } else { + layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } + } + if layer == "" { + return "", errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) + } + // Read the layer's contents. + noCompression := archive.Uncompressed + diffOptions := &storage.DiffOptions{ + Compression: &noCompression, + } + diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) + if err2 != nil { + return "", errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) + } + // Copy the layer diff to a file. Diff() takes a lock that it holds + // until the ReadCloser that it returns is closed, and PutLayer() wants + // the same lock, so the diff can't just be directly streamed from one + // to the other. + filename = s.computeNextBlobCacheFile() + file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) + if err != nil { + diff.Close() + return "", errors.Wrapf(err, "error creating temporary file %q", filename) + } + // Copy the data to the file. + // TODO: This can take quite some time, and should ideally be cancellable using + // ctx.Done(). + _, err = io.Copy(file, diff) + diff.Close() + file.Close() + if err != nil { + return "", errors.Wrapf(err, "error storing blob to file %q", filename) + } + // Make sure that we can find this file later, should we need the layer's + // contents again. + s.putBlobMutex.Lock() + s.filenames[blob.Digest] = filename + s.putBlobMutex.Unlock() + } + // Read the cached blob and use it as a diff. + file, err := os.Open(filename) + if err != nil { + return "", errors.Wrapf(err, "error opening file %q", filename) + } + defer file.Close() + // Build the new layer using the diff, regardless of where it came from. + // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). + layer, _, err := s.imageRef.transport.store.PutLayer(id, previousID, nil, "", false, nil, file) + if err != nil && errors.Cause(err) != storage.ErrDuplicateID { + return "", errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + } + return layer.ID, nil +} + +// Commit marks the process of storing the image as successful and asks for the image to be persisted. +// WARNING: This does not have any transactional semantics: +// - Uploaded data MAY be visible to others before Commit() is called +// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed) func (s *storageImageDestination) Commit(ctx context.Context) error { // Find the list of layer blobs. if len(s.manifest) == 0 { @@ -581,6 +780,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if err != nil { return errors.Wrapf(err, "error parsing manifest") } + layerBlobs := man.LayerInfos() // Extract or find the layers. lastLayer := "" @@ -588,10 +788,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if blob.EmptyLayer { continue } - - // Check if there's already a layer with the ID that we'd give to the result of applying - // this layer blob to its parent, if it has one, or the blob's hex value otherwise. - diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + _, haveDiffID := s.blobDiffIDs[blob.Digest] if !haveDiffID { // Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(), // or to even check if we had it. @@ -600,90 +797,27 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { // TryReusingBlob; not calling PutBlob already violates the documented API, so there’s only // so far we are going to accommodate that (if we should be doing that at all). logrus.Debugf("looking for diffID for blob %+v", blob.Digest) - has, _, err := s.TryReusingBlob(ctx, blob.BlobInfo, none.NoCache, false) + has, _, err := s.tryReusingBlob(ctx, blob.BlobInfo, -1, none.NoCache, false) if err != nil { return errors.Wrapf(err, "error checking for a layer based on blob %q", blob.Digest.String()) } if !has { return errors.Errorf("error determining uncompressed digest for blob %q", blob.Digest.String()) } - diffID, haveDiffID = s.blobDiffIDs[blob.Digest] + _, haveDiffID = s.blobDiffIDs[blob.Digest] if !haveDiffID { return errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) } } - id := diffID.Hex() - if lastLayer != "" { - id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffID.Hex())).Hex() - } - if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { - // There's already a layer that should have the right contents, just reuse it. - lastLayer = layer.ID - continue - } - // Check if we previously cached a file with that blob's contents. If we didn't, - // then we need to read the desired contents from a layer. - filename, ok := s.filenames[blob.Digest] - if !ok { - // Try to find the layer with contents matching that blobsum. - layer := "" - layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } else { - layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } - } - if layer == "" { - return errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) - } - // Read the layer's contents. - noCompression := archive.Uncompressed - diffOptions := &storage.DiffOptions{ - Compression: &noCompression, - } - diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) - if err2 != nil { - return errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) - } - // Copy the layer diff to a file. Diff() takes a lock that it holds - // until the ReadCloser that it returns is closed, and PutLayer() wants - // the same lock, so the diff can't just be directly streamed from one - // to the other. - filename = s.computeNextBlobCacheFile() - file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) - if err != nil { - diff.Close() - return errors.Wrapf(err, "error creating temporary file %q", filename) - } - // Copy the data to the file. - // TODO: This can take quite some time, and should ideally be cancellable using - // ctx.Done(). - _, err = io.Copy(file, diff) - diff.Close() - file.Close() - if err != nil { - return errors.Wrapf(err, "error storing blob to file %q", filename) - } - // Make sure that we can find this file later, should we need the layer's - // contents again. - s.filenames[blob.Digest] = filename - } - // Read the cached blob and use it as a diff. - file, err := os.Open(filename) + newID, err := s.commitBlob(ctx, blob.BlobInfo, lastLayer) if err != nil { - return errors.Wrapf(err, "error opening file %q", filename) - } - defer file.Close() - // Build the new layer using the diff, regardless of where it came from. - // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). - layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, nil, file) - if err != nil && errors.Cause(err) != storage.ErrDuplicateID { - return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + return err } - lastLayer = layer.ID + lastLayer = newID + } + + if lastLayer == "" { + return fmt.Errorf("could not find top layer") } // If one of those blobs was a configuration blob, then we can try to dig out the date when the image diff --git a/vendor/github.com/containers/image/types/types.go b/vendor/github.com/containers/image/types/types.go index 9fdab2314a4..2772894ee4d 100644 --- a/vendor/github.com/containers/image/types/types.go +++ b/vendor/github.com/containers/image/types/types.go @@ -7,7 +7,7 @@ import ( "github.com/containers/image/docker/reference" "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" ) // ImageTransport is a top-level namespace for ways to to store/load an image. @@ -265,7 +265,7 @@ type ImageDestination interface { // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. - PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, cache BlobInfoCache, isConfig bool) (BlobInfo, error) + PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, layerIndexInImage int, cache BlobInfoCache, isConfig bool) (BlobInfo, error) // HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently. HasThreadSafePutBlob() bool // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination @@ -275,7 +275,7 @@ type ImageDestination interface { // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. - TryReusingBlob(ctx context.Context, info BlobInfo, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) + TryReusingBlob(ctx context.Context, info BlobInfo, layerIndexInImage int, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) // PutManifest writes manifest to the destination. // FIXME? This should also receive a MIME type if known, to differentiate between schema versions. // If the destination is in principle available, refuses this manifest type (e.g. it does not recognize the schema), diff --git a/vendor/github.com/containers/image/version/version.go b/vendor/github.com/containers/image/version/version.go index 9915cb2faed..184274736df 100644 --- a/vendor/github.com/containers/image/version/version.go +++ b/vendor/github.com/containers/image/version/version.go @@ -4,11 +4,11 @@ import "fmt" const ( // VersionMajor is for an API incompatible changes - VersionMajor = 0 + VersionMajor = 1 // VersionMinor is for functionality in a backwards-compatible manner - VersionMinor = 1 + VersionMinor = 7 // VersionPatch is for backwards-compatible bug fixes - VersionPatch = 6 + VersionPatch = 0 // VersionDev indicates development branch. Releases will be empty string. VersionDev = "-dev"