Skip to content

Commit

Permalink
blobinfocache: track compression types for locations
Browse files Browse the repository at this point in the history
Extend the blob info cache to also cache the name of the type of
compression used on a blob in a known location.  (Caching the full MIME
type would have required additional logic to map types in cases where we
convert images during copying.)

New methods for adding known locations and reading candidate locations
including compression information are part of a new internal
BlobInfoCache2 interface which the library's BlobInfoCache implementors
implement.

Pass information about compression changes to PutBlob() in its input
BlobInfo, so that it can populate the blob info cache with correct
compression information when it succeeds.

Make sure that when TryReusingBlob successfully uses a blob from the
blob info cache or can detect the compression of a blob on the
filesystem, that it provides compression information in the BlobInfo
that it returns, so that manifests can be updated to describe layers
using the correct MIME types.

When attempting to write a manifest, if a manifest can't be written
because layers were compressed using an algorithm which can't be
expressed using that manifest type, continue on to trying other manifest
formats.

Signed-off-by: Nalin Dahyabhai <[email protected]>
  • Loading branch information
nalind committed Dec 9, 2020
1 parent d34a0a6 commit f8c84ef
Show file tree
Hide file tree
Showing 23 changed files with 595 additions and 103 deletions.
10 changes: 6 additions & 4 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,9 @@ func (c *copier) copyOneImage(ctx context.Context, policyContext *signature.Poli
if err != nil {
logrus.Debugf("Writing manifest using preferred type %s failed: %v", preferredManifestMIMEType, err)
// … if it fails, _and_ the failure is because the manifest is rejected, we may have other options.
if _, isManifestRejected := errors.Cause(err).(types.ManifestTypeRejectedError); !isManifestRejected || len(otherManifestMIMETypeCandidates) == 0 {
_, isManifestRejected := errors.Cause(err).(types.ManifestTypeRejectedError)
_, isCompressionIncompatible := errors.Cause(err).(manifest.ManifestLayerCompressionIncompatibilityError)
if (!isManifestRejected && !isCompressionIncompatible) || len(otherManifestMIMETypeCandidates) == 0 {
// We don’t have other options.
// In principle the code below would handle this as well, but the resulting error message is fairly ugly.
// Don’t bother the user with MIME types if we have no choice.
Expand Down Expand Up @@ -896,7 +898,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
return nil
}

// layerDigestsDiffer return true iff the digests in a and b differ (ignoring sizes and possible other fields)
// layerDigestsDiffer returns true iff the digests in a and b differ (ignoring sizes and possible other fields)
func layerDigestsDiffer(a, b []types.BlobInfo) bool {
if len(a) != len(b) {
return true
Expand Down Expand Up @@ -951,7 +953,7 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context, instanc
instanceDigest = &manifestDigest
}
if err := ic.c.dest.PutManifest(ctx, man, instanceDigest); err != nil {
return nil, "", errors.Wrap(err, "Error writing manifest")
return nil, "", errors.Wrapf(err, "Error writing manifest %q", string(man))
}
return man, manifestDigest, nil
}
Expand Down Expand Up @@ -1390,7 +1392,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}
}

// This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consumer
// This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consume
// all of the input (to compute DiffIDs), even if dest.PutBlob does not need it.
// So, read everything from originalLayerReader, which will cause the rest to be
// sent there if we are not already at EOF.
Expand Down
22 changes: 19 additions & 3 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"runtime"

"github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
Expand Down Expand Up @@ -194,23 +195,38 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
blobPath := d.ref.layerPath(info.Digest)
finfo, err := os.Stat(blobPath)

f, err := os.Open(blobPath)
if err != nil && os.IsNotExist(err) {
return false, types.BlobInfo{}, nil
}
if err != nil {
return false, types.BlobInfo{}, err
}
return true, types.BlobInfo{Digest: info.Digest, Size: finfo.Size()}, nil
defer f.Close()

finfo, err := f.Stat()
if err != nil {
return false, types.BlobInfo{}, err
}

compressionOperation, compressionAlgorithm, _, _, err := blobinfocache.DetectCompressionFormat(f)
if err != nil {
logrus.Debugf("error detecting compression used for %q, ignoring it", blobPath)
return false, types.BlobInfo{}, nil
}

return true, types.BlobInfo{Digest: info.Digest, Size: finfo.Size(), CompressionOperation: compressionOperation, CompressionAlgorithm: compressionAlgorithm}, nil
}

// PutManifest writes manifest to the destination.
Expand Down
54 changes: 47 additions & 7 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"strings"

"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/internal/iolimits"
"github.com/containers/image/v5/internal/uploadreader"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache/none"
"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/types"
"github.com/docker/distribution/registry/api/errcode"
v2 "github.com/docker/distribution/registry/api/v2"
Expand Down Expand Up @@ -162,6 +164,13 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,

digester := digest.Canonical.Digester()
sizeCounter := &sizeCounter{}

_, _, compressorName, possiblyCompressedStream, err := blobinfocache.DetectCompressionFormat(stream)
if err != nil {
return types.BlobInfo{}, errors.Wrap(err, "Error checking if input blob is compressed")
}
stream = possiblyCompressedStream

uploadLocation, err = func() (*url.URL, error) { // A scope for defer
uploadReader := uploadreader.NewUploadReader(io.TeeReader(stream, io.MultiWriter(digester.Hash(), sizeCounter)))
// This error text should never be user-visible, we terminate only after makeRequestToResolvedURL
Expand Down Expand Up @@ -204,7 +213,8 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,
}

logrus.Debugf("Upload of layer %s complete", computedDigest)
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), computedDigest, newBICLocationReference(d.ref))
bic := blobinfocache.FromBlobInfoCache(cache)
bic.RecordKnownLocation2(d.ref.Transport(), bicTransportScope(d.ref), computedDigest, compressorName, newBICLocationReference(d.ref))
return types.BlobInfo{Digest: computedDigest, Size: sizeCounter.size}, nil
}

Expand Down Expand Up @@ -291,25 +301,39 @@ func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
bic := blobinfocache.FromBlobInfoCache(cache)

// First, check whether the blob happens to already exist at the destination.
exists, size, err := d.blobExists(ctx, d.ref.ref, info.Digest, nil)
if err != nil {
return false, types.BlobInfo{}, err
}
if exists {
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: info.Digest, Size: size}, nil
compressorName := blobinfocache.Uncompressed
if info.CompressionAlgorithm != nil {
compressorName = info.CompressionAlgorithm.Name()
}
bic.RecordKnownLocation2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, compressorName, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: info.Digest, MediaType: info.MediaType, Size: size}, nil
}

// Then try reusing blobs from other locations.
for _, candidate := range cache.CandidateLocations(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, canSubstitute) {
candidates := bic.CandidateLocations2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, canSubstitute)
for _, candidate := range candidates {
if candidate.CompressorName == blobinfocache.UnknownCompression {
continue
}

candidateRepo, err := parseBICLocationReference(candidate.Location)
if err != nil {
logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err)
continue
}
logrus.Debugf("Trying to reuse cached location %s in %s", candidate.Digest.String(), candidateRepo.Name())
if candidate.CompressorName != blobinfocache.Uncompressed {
logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name())
} else {
logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name())
}

// Sanity checks:
if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) {
Expand Down Expand Up @@ -351,8 +375,24 @@ func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.
continue
}
}
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: candidate.Digest, Size: size}, nil

bic.RecordKnownLocation2(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, candidate.CompressorName, newBICLocationReference(d.ref))

var compressionOperation types.LayerCompression
var compressionAlgorithm *compression.Algorithm
if candidate.CompressorName != blobinfocache.Uncompressed {
algorithm, err := compression.AlgorithmByName(candidate.CompressorName)
if err != nil {
logrus.Debugf("... unknown compressor %q", candidate.CompressorName)
return false, types.BlobInfo{}, nil
}
compressionOperation = types.Compress
compressionAlgorithm = &algorithm
} else {
compressionOperation = types.Decompress
compressionAlgorithm = &compression.Algorithm{}
}
return true, types.BlobInfo{Digest: candidate.Digest, MediaType: info.MediaType, Size: size, CompressionOperation: compressionOperation, CompressionAlgorithm: compressionAlgorithm}, nil
}

return false, types.BlobInfo{}, nil
Expand Down
17 changes: 15 additions & 2 deletions docker/docker_image_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"

"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/internal/iolimits"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/sysregistriesv2"
Expand Down Expand Up @@ -287,8 +288,20 @@ func (s *dockerImageSource) GetBlob(ctx context.Context, info types.BlobInfo, ca
if err := httpResponseToError(res, "Error fetching blob"); err != nil {
return nil, 0, err
}
cache.RecordKnownLocation(s.physicalRef.Transport(), bicTransportScope(s.physicalRef), info.Digest, newBICLocationReference(s.physicalRef))
return res.Body, getBlobSize(res), nil
_, _, compressorName, bodyReader, err := blobinfocache.DetectCompressionFormat(res.Body)
if err != nil {
return nil, 0, err
}
bic := blobinfocache.FromBlobInfoCache(cache)
bic.RecordKnownLocation2(s.physicalRef.Transport(), bicTransportScope(s.physicalRef), info.Digest, compressorName, newBICLocationReference(s.physicalRef))
rc := struct {
io.Reader
io.Closer
}{
Reader: bodyReader,
Closer: res.Body,
}
return &rc, getBlobSize(res), nil
}

// GetSignatures returns the image's signatures. It may use a remote (= slow) service.
Expand Down
4 changes: 3 additions & 1 deletion docker/tarfile/dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
Expand Down
93 changes: 93 additions & 0 deletions internal/blobinfocache/blobinfocache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package blobinfocache

import (
"io"

"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
)

// FromBlobInfoCache2 returns a BlobInfoCache based on a BlobInfoCache2, returning the original
// object if it implements both BlobInfoCache and BlobInfoCache2, or a wrapper which discards
// compression information if it doesn't implement the V1 methods for storing and retrieving
// location information.
func FromBlobInfoCache2(bic BlobInfoCache2WithoutV1) BlobInfoCache2 {
if bic2, ok := bic.(BlobInfoCache2); ok {
return bic2
}
return &wrappedBlobInfoCache2{
BlobInfoCache2WithoutV1: bic,
}
}

type wrappedBlobInfoCache2 struct {
BlobInfoCache2WithoutV1
}

func (bic *wrappedBlobInfoCache2) RecordKnownLocation(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, location types.BICLocationReference) {
bic.RecordKnownLocation2(transport, scope, digest, UnknownCompression, location)
}

func (bic *wrappedBlobInfoCache2) CandidateLocations(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, canSubstitute bool) []types.BICReplacementCandidate {
newCandidates := bic.CandidateLocations2(transport, scope, digest, canSubstitute)
results := make([]types.BICReplacementCandidate, 0, len(newCandidates))
for _, c := range newCandidates {
results = append(results, types.BICReplacementCandidate{
Digest: c.Digest,
Location: c.Location,
})
}
return results
}

// FromBlobInfoCache returns a BlobInfoCache2 based on a BlobInfoCache, returning the original
// object if it implements BlobInfoCache2, or a wrapper which discards compression information
// if it only implements BlobInfoCache.
func FromBlobInfoCache(bic types.BlobInfoCache) BlobInfoCache2 {
if bic2, ok := bic.(BlobInfoCache2); ok {
return bic2
}
return &wrappedBlobInfoCache{
BlobInfoCache: bic,
}
}

type wrappedBlobInfoCache struct {
types.BlobInfoCache
}

func (bic *wrappedBlobInfoCache) RecordKnownLocation2(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, compressorName string, location types.BICLocationReference) {
bic.RecordKnownLocation(transport, scope, digest, location)
}

func (bic *wrappedBlobInfoCache) CandidateLocations2(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, canSubstitute bool) []BICReplacementCandidate2 {
oldCandidates := bic.CandidateLocations(transport, scope, digest, canSubstitute)
results := make([]BICReplacementCandidate2, 0, len(oldCandidates))
for _, c := range oldCandidates {
results = append(results, BICReplacementCandidate2{
Digest: c.Digest,
Location: c.Location,
CompressorName: UnknownCompression,
})
}
return results
}

// DetectCompressionFormat wraps pkg/compression.DetectCompressionFormat and returns:
// compressionOperation: always either Compress or Decompress, PreserveOriginal on error
// compressionAlgorithm: either the specified algorithm if Compress, or nil if Decompress
// compressorName: compressionAlgorithm.Name() if Compress, Uncompressed if Decompress
// reader: as with pkg/compression.DetectCompressionFormat
// err: set on error
func DetectCompressionFormat(input io.Reader) (compressionOperation types.LayerCompression, compressionAlgorithm *compression.Algorithm, compressorName string, reader io.Reader, err error) {
algo, _, reader, err := compression.DetectCompressionFormat(input)
if err != nil {
return types.PreserveOriginal, nil, UnknownCompression, nil, err
}
if name := algo.Name(); name != "" {
return types.Compress, &algo, name, reader, nil
} else {
return types.Decompress, nil, Uncompressed, reader, nil
}
}
51 changes: 51 additions & 0 deletions internal/blobinfocache/blobinfocache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package blobinfocache

import (
"archive/tar"
"bytes"
"testing"

"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDetectCompressionFormat(t *testing.T) {
cases := []struct {
compressor *compression.Algorithm
compressionOperation types.LayerCompression
compressorName string
}{
{nil, types.Decompress, Uncompressed},
{&compression.Gzip, types.Compress, compression.Gzip.Name()},
{&compression.Zstd, types.Compress, compression.Zstd.Name()},
{&compression.Xz, types.Compress, compression.Xz.Name()},
}
for i, c := range cases {
var tw *tar.Writer
closeFn := func() error { return nil }
buf := bytes.Buffer{}
if c.compressor != nil {
wc, err := compression.CompressStream(&buf, *c.compressor, nil)
require.Nilf(t, err, "%d: compression.CompressStream(%q) failed", i, c.compressor.Name())
tw = tar.NewWriter(wc)
closeFn = wc.Close
} else {
tw = tar.NewWriter(&buf)
}
err := tw.Close()
assert.Nilf(t, err, "%d: tarWriter.Close()", i)
err = closeFn()
assert.Nilf(t, err, "%d: closing compressor writer", i)
op, compressor, compressorName, _, err := DetectCompressionFormat(&buf)
assert.Nilf(t, err, "%d: DetectCompressionFormat", i)
assert.Equalf(t, c.compressionOperation, op, "%d: unexpected compression operation", i)
assert.Equalf(t, c.compressorName, compressorName, "%d: unexpected compressor name", i)
if compressor != nil {
assert.Equalf(t, c.compressorName, compressor.Name(), "%d: unexpected compressor", i)
} else {
assert.Equalf(t, c.compressorName, Uncompressed, "%d: unexpected decompressed", i)
}
}
}
Loading

0 comments on commit f8c84ef

Please sign in to comment.