Skip to content

Commit

Permalink
blobinfocache: track compression types for locations
Browse files Browse the repository at this point in the history
Extend the blob info cache to also cache the name of the type of
compression used on a blob in a known location.  (Caching the full MIME
type would have required additional logic to map types in cases where we
convert images during copying.)

New methods for adding known locations and reading candidate locations
including compression information are part of a new BlobInfoCache2
interface which BlobInfoCache implementors may implement.

Pass information about compression changes to PutBlob() in its input
BlobInfo, so that it can populate the blob info cache with correct
compression information when it succeeds.

Make sure that when TryReusingBlob successfully uses a blob from the
blob info cache or can detect the compression of a blob on the
filesystem, that it provides compression information in the BlobInfo
that it returns, so that manifests can be updated to describe layers
using the correct MIME types.

When attempting to write a manifest, if a manifest can't be written
because layers were compressed using an algorithm which can't be
expressed using that manifest type, continue on to trying other manifest
formats.

Signed-off-by: Nalin Dahyabhai <[email protected]>
  • Loading branch information
nalind committed Dec 3, 2020
1 parent f7f3725 commit 1f26e4b
Show file tree
Hide file tree
Showing 19 changed files with 440 additions and 78 deletions.
10 changes: 6 additions & 4 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,9 @@ func (c *copier) copyOneImage(ctx context.Context, policyContext *signature.Poli
if err != nil {
logrus.Debugf("Writing manifest using preferred type %s failed: %v", preferredManifestMIMEType, err)
// … if it fails, _and_ the failure is because the manifest is rejected, we may have other options.
if _, isManifestRejected := errors.Cause(err).(types.ManifestTypeRejectedError); !isManifestRejected || len(otherManifestMIMETypeCandidates) == 0 {
_, isManifestRejected := errors.Cause(err).(types.ManifestTypeRejectedError)
_, isCompressionIncompatible := errors.Cause(err).(manifest.ManifestLayerCompressionIncompatibilityError)
if (!isManifestRejected && !isCompressionIncompatible) || len(otherManifestMIMETypeCandidates) == 0 {
// We don’t have other options.
// In principle the code below would handle this as well, but the resulting error message is fairly ugly.
// Don’t bother the user with MIME types if we have no choice.
Expand Down Expand Up @@ -896,7 +898,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
return nil
}

// layerDigestsDiffer return true iff the digests in a and b differ (ignoring sizes and possible other fields)
// layerDigestsDiffer returns true iff the digests in a and b differ (ignoring sizes and possible other fields)
func layerDigestsDiffer(a, b []types.BlobInfo) bool {
if len(a) != len(b) {
return true
Expand Down Expand Up @@ -951,7 +953,7 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context, instanc
instanceDigest = &manifestDigest
}
if err := ic.c.dest.PutManifest(ctx, man, instanceDigest); err != nil {
return nil, "", errors.Wrap(err, "Error writing manifest")
return nil, "", errors.Wrapf(err, "Error writing manifest %q", string(man))
}
return man, manifestDigest, nil
}
Expand Down Expand Up @@ -1390,7 +1392,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}
}

// This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consumer
// This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consume
// all of the input (to compute DiffIDs), even if dest.PutBlob does not need it.
// So, read everything from originalLayerReader, which will cause the rest to be
// sent there if we are not already at EOF.
Expand Down
32 changes: 29 additions & 3 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"runtime"

"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
Expand Down Expand Up @@ -194,23 +195,48 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
blobPath := d.ref.layerPath(info.Digest)
finfo, err := os.Stat(blobPath)

f, err := os.Open(blobPath)
if err != nil && os.IsNotExist(err) {
return false, types.BlobInfo{}, nil
}
if err != nil {
return false, types.BlobInfo{}, err
}
return true, types.BlobInfo{Digest: info.Digest, Size: finfo.Size()}, nil
defer f.Close()

finfo, err := f.Stat()
if err != nil {
return false, types.BlobInfo{}, err
}

algo, _, _, err := compression.DetectCompressionFormat(f)
if err != nil {
logrus.Debugf("error detecting compression used for %q, ignoring it", blobPath)
return false, types.BlobInfo{}, nil
}

var compressionOperation types.LayerCompression
var compressionAlgorithm *compression.Algorithm
if algo.Name() != "" {
compressionOperation = types.Compress
compressionAlgorithm = &algo
} else {
compressionOperation = types.Decompress
compressionAlgorithm = &compression.Algorithm{}
}

return true, types.BlobInfo{Digest: info.Digest, Size: finfo.Size(), CompressionOperation: compressionOperation, CompressionAlgorithm: compressionAlgorithm}, nil
}

// PutManifest writes manifest to the destination.
Expand Down
71 changes: 63 additions & 8 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/containers/image/v5/internal/uploadreader"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache/none"
"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/types"
"github.com/docker/distribution/registry/api/errcode"
v2 "github.com/docker/distribution/registry/api/v2"
Expand Down Expand Up @@ -162,8 +163,14 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,

digester := digest.Canonical.Digester()
sizeCounter := &sizeCounter{}

compressionAlgorithm, _, compressedHeader, err := compression.DetectCompressionFormat(stream)
if err != nil {
return types.BlobInfo{}, errors.Wrap(err, "Error checking if input blob is compressed")
}

uploadLocation, err = func() (*url.URL, error) { // A scope for defer
uploadReader := uploadreader.NewUploadReader(io.TeeReader(stream, io.MultiWriter(digester.Hash(), sizeCounter)))
uploadReader := uploadreader.NewUploadReader(io.TeeReader(io.MultiReader(compressedHeader, stream), io.MultiWriter(digester.Hash(), sizeCounter)))
// This error text should never be user-visible, we terminate only after makeRequestToResolvedURL
// returns, so there isn’t a way for the error text to be provided to any of our callers.
defer uploadReader.Terminate(errors.New("Reading data from an already terminated upload"))
Expand Down Expand Up @@ -204,7 +211,11 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,
}

logrus.Debugf("Upload of layer %s complete", computedDigest)
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), computedDigest, newBICLocationReference(d.ref))
if cache2, ok := cache.(types.BlobInfoCache2); ok {
cache2.RecordKnownLocation2(d.ref.Transport(), bicTransportScope(d.ref), computedDigest, compressionAlgorithm.Name(), newBICLocationReference(d.ref))
} else {
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), computedDigest, newBICLocationReference(d.ref))
}
return types.BlobInfo{Digest: computedDigest, Size: sizeCounter.size}, nil
}

Expand Down Expand Up @@ -298,18 +309,43 @@ func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.
return false, types.BlobInfo{}, err
}
if exists {
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: info.Digest, Size: size}, nil
var compressorName string
if info.CompressionAlgorithm != nil {
compressorName = info.CompressionAlgorithm.Name()
}
if cache2, ok := cache.(types.BlobInfoCache2); ok {
cache2.RecordKnownLocation2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, compressorName, newBICLocationReference(d.ref))
} else {
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, newBICLocationReference(d.ref))
}
return true, types.BlobInfo{Digest: info.Digest, MediaType: info.MediaType, Size: size}, nil
}

// Then try reusing blobs from other locations.
for _, candidate := range cache.CandidateLocations(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, canSubstitute) {
var candidates []types.BICReplacementCandidate2
if cache2, ok := cache.(types.BlobInfoCache2); ok {
candidates = cache2.CandidateLocations2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, canSubstitute)
} else {
candidateList := cache.CandidateLocations(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, canSubstitute)
candidates = make([]types.BICReplacementCandidate2, 0, len(candidateList))
for _, c := range candidateList {
candidates = append(candidates, types.BICReplacementCandidate2{
Digest: c.Digest,
Location: c.Location,
})
}
}
for _, candidate := range candidates {
candidateRepo, err := parseBICLocationReference(candidate.Location)
if err != nil {
logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err)
continue
}
logrus.Debugf("Trying to reuse cached location %s in %s", candidate.Digest.String(), candidateRepo.Name())
if candidate.CompressorName != "" {
logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name())
} else {
logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name())
}

// Sanity checks:
if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) {
Expand Down Expand Up @@ -351,8 +387,27 @@ func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.
continue
}
}
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: candidate.Digest, Size: size}, nil
if cache2, ok := cache.(types.BlobInfoCache2); ok {
cache2.RecordKnownLocation2(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, candidate.CompressorName, newBICLocationReference(d.ref))
} else {
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, newBICLocationReference(d.ref))
}

var compressionOperation types.LayerCompression
var compressionAlgorithm *compression.Algorithm
if candidate.CompressorName != "" {
algorithm, err := compression.AlgorithmByName(candidate.CompressorName)
if err != nil {
logrus.Debugf("... unknown compressor %q", candidate.CompressorName)
return false, types.BlobInfo{}, nil
}
compressionOperation = types.Compress
compressionAlgorithm = &algorithm
} else {
compressionOperation = types.Decompress
compressionAlgorithm = &compression.Algorithm{}
}
return true, types.BlobInfo{Digest: candidate.Digest, MediaType: info.MediaType, Size: size, CompressionOperation: compressionOperation, CompressionAlgorithm: compressionAlgorithm}, nil
}

return false, types.BlobInfo{}, nil
Expand Down
22 changes: 20 additions & 2 deletions docker/docker_image_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/internal/iolimits"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/pkg/sysregistriesv2"
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -287,8 +288,25 @@ func (s *dockerImageSource) GetBlob(ctx context.Context, info types.BlobInfo, ca
if err := httpResponseToError(res, "Error fetching blob"); err != nil {
return nil, 0, err
}
cache.RecordKnownLocation(s.physicalRef.Transport(), bicTransportScope(s.physicalRef), info.Digest, newBICLocationReference(s.physicalRef))
return res.Body, getBlobSize(res), nil
bodyHeader := io.Reader(strings.NewReader(""))
if cache2, ok := cache.(types.BlobInfoCache2); ok {
algo, _, header, err := compression.DetectCompressionFormat(res.Body)
if err != nil {
return nil, 0, err
}
bodyHeader = header
cache2.RecordKnownLocation2(s.physicalRef.Transport(), bicTransportScope(s.physicalRef), info.Digest, algo.Name(), newBICLocationReference(s.physicalRef))
} else {
cache.RecordKnownLocation(s.physicalRef.Transport(), bicTransportScope(s.physicalRef), info.Digest, newBICLocationReference(s.physicalRef))
}
rc := struct {
io.Reader
io.Closer
}{
Reader: io.MultiReader(bodyHeader, res.Body),
Closer: res.Body,
}
return &rc, getBlobSize(res), nil
}

// GetSignatures returns the image's signatures. It may use a remote (= slow) service.
Expand Down
4 changes: 3 additions & 1 deletion docker/tarfile/dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
Expand Down
21 changes: 17 additions & 4 deletions manifest/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -70,15 +69,15 @@ func compressionVariantMIMEType(variantTable []compressionMIMETypeSet, mimeType
return res, nil
}
if name != mtsUncompressed {
return "", fmt.Errorf("%s compression is not supported", name)
return "", ManifestLayerCompressionIncompatibilityError{fmt.Sprintf("%s compression is not supported for type %q", name, mt)}
}
return "", errors.New("uncompressed variant is not supported")
return "", ManifestLayerCompressionIncompatibilityError{fmt.Sprintf("uncompressed variant is not supported for type %q", mt)}
}
if name != mtsUncompressed {
return "", fmt.Errorf("unknown compression algorithm %s", name)
}
// We can't very well say “the idea of no compression is unknown”
return "", errors.New("uncompressed variant is not supported")
return "", ManifestLayerCompressionIncompatibilityError{fmt.Sprintf("uncompressed variant is not supported for type %q", mt)}
}
}
}
Expand All @@ -99,6 +98,12 @@ func updatedMIMEType(variantTable []compressionMIMETypeSet, mimeType string, upd
// {de}compressed.
switch updated.CompressionOperation {
case types.PreserveOriginal:
// Force a change to the media type if we're being told to use a particular compressor,
// since it might be different from the one associated with the media type. Otherwise,
// try to keep the original media type.
if updated.CompressionAlgorithm != nil && updated.CompressionAlgorithm.Name() != mtsUncompressed {
return compressionVariantMIMEType(variantTable, mimeType, updated.CompressionAlgorithm)
}
// Keep the original media type.
return mimeType, nil

Expand All @@ -116,3 +121,11 @@ func updatedMIMEType(variantTable []compressionMIMETypeSet, mimeType string, upd
return "", fmt.Errorf("unknown compression operation (%d)", updated.CompressionOperation)
}
}

type ManifestLayerCompressionIncompatibilityError struct {
text string
}

func (m ManifestLayerCompressionIncompatibilityError) Error() string {
return m.text
}
4 changes: 3 additions & 1 deletion oci/archive/oci_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Read
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
Expand Down
31 changes: 28 additions & 3 deletions oci/layout/oci_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"

"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
imgspec "github.com/opencontainers/image-spec/specs-go"
Expand Down Expand Up @@ -186,7 +187,9 @@ func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
Expand All @@ -197,14 +200,36 @@ func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.Blo
if err != nil {
return false, types.BlobInfo{}, err
}
finfo, err := os.Stat(blobPath)
f, err := os.Open(blobPath)
if err != nil && os.IsNotExist(err) {
return false, types.BlobInfo{}, nil
}
if err != nil {
return false, types.BlobInfo{}, err
}
return true, types.BlobInfo{Digest: info.Digest, Size: finfo.Size()}, nil
defer f.Close()

finfo, err := f.Stat()
if err != nil {
return false, types.BlobInfo{}, err
}

algo, _, _, err := compression.DetectCompressionFormat(f)
if err != nil {
return false, types.BlobInfo{}, nil
}

var compressionOperation types.LayerCompression
var compressionAlgorithm *compression.Algorithm
if algo.Name() != "" {
compressionOperation = types.Compress
compressionAlgorithm = &algo
} else {
compressionOperation = types.Decompress
compressionAlgorithm = &compression.Algorithm{}
}

return true, types.BlobInfo{Digest: info.Digest, Size: finfo.Size(), CompressionOperation: compressionOperation, CompressionAlgorithm: compressionAlgorithm}, nil
}

// PutManifest writes a manifest to the destination. Per our list of supported manifest MIME types,
Expand Down
Loading

0 comments on commit 1f26e4b

Please sign in to comment.