Skip to content

Commit

Permalink
blobinfocache: track compression types for locations
Browse files Browse the repository at this point in the history
Extend the blob info cache to also cache the name of the type of
compression used on a blob that we've seen, or specific values that
indicate that we know the blob was not compressed, or that we don't
know whether or not it was compressed.

New methods for adding known blob-compression pairs and reading
candidate locations including compression information are part of a new
internal BlobInfoCache2 interface which the library's BlobInfoCache
implementors also implement.

When we copy a blob, try to record the state of compression for the
source blob, and if we applied any changes, the blob we produced.

Make sure that when TryReusingBlob successfully uses a blob from the
blob info cache, that it provides compression information in the
BlobInfo that it returns, so that manifests can be updated to describe
layers using the correct MIME types.

When attempting to write a manifest, if a manifest can't be written
because layers were compressed using an algorithm which can't be
expressed using that manifest type, continue on to trying other manifest
formats.

Signed-off-by: Nalin Dahyabhai <[email protected]>
  • Loading branch information
nalind committed Dec 15, 2020
1 parent d34a0a6 commit 32d0532
Show file tree
Hide file tree
Showing 21 changed files with 469 additions and 77 deletions.
36 changes: 31 additions & 5 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/image"
internalblobinfocache "github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/internal/pkg/platform"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache"
Expand Down Expand Up @@ -654,7 +655,9 @@ func (c *copier) copyOneImage(ctx context.Context, policyContext *signature.Poli
if err != nil {
logrus.Debugf("Writing manifest using preferred type %s failed: %v", preferredManifestMIMEType, err)
// … if it fails, _and_ the failure is because the manifest is rejected, we may have other options.
if _, isManifestRejected := errors.Cause(err).(types.ManifestTypeRejectedError); !isManifestRejected || len(otherManifestMIMETypeCandidates) == 0 {
_, isManifestRejected := errors.Cause(err).(types.ManifestTypeRejectedError)
_, isCompressionIncompatible := errors.Cause(err).(manifest.ManifestLayerCompressionIncompatibilityError)
if (!isManifestRejected && !isCompressionIncompatible) || len(otherManifestMIMETypeCandidates) == 0 {
// We don’t have other options.
// In principle the code below would handle this as well, but the resulting error message is fairly ugly.
// Don’t bother the user with MIME types if we have no choice.
Expand Down Expand Up @@ -896,7 +899,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
return nil
}

// layerDigestsDiffer return true iff the digests in a and b differ (ignoring sizes and possible other fields)
// layerDigestsDiffer returns true iff the digests in a and b differ (ignoring sizes and possible other fields)
func layerDigestsDiffer(a, b []types.BlobInfo) bool {
if len(a) != len(b) {
return true
Expand Down Expand Up @@ -951,7 +954,7 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context, instanc
instanceDigest = &manifestDigest
}
if err := ic.c.dest.PutManifest(ctx, man, instanceDigest); err != nil {
return nil, "", errors.Wrap(err, "Error writing manifest")
return nil, "", errors.Wrapf(err, "Error writing manifest %q", string(man))
}
return man, manifestDigest, nil
}
Expand Down Expand Up @@ -1390,7 +1393,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}
}

// This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consumer
// This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consume
// all of the input (to compute DiffIDs), even if dest.PutBlob does not need it.
// So, read everything from originalLayerReader, which will cause the rest to be
// sent there if we are not already at EOF.
Expand All @@ -1408,18 +1411,41 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
if inputInfo.Digest != "" && uploadedInfo.Digest != inputInfo.Digest {
return types.BlobInfo{}, errors.Errorf("Internal error writing blob %s, blob with digest %s saved with digest %s", srcInfo.Digest, inputInfo.Digest, uploadedInfo.Digest)
}

if digestingReader.validationSucceeded {
bic := internalblobinfocache.FromBlobInfoCache(c.blobInfoCache)
// If compressionOperation != types.PreserveOriginal, we now have two reliable digest values:
// srcinfo.Digest describes the pre-compressionOperation input, verified by digestingReader
// uploadedInfo.Digest describes the post-compressionOperation output, computed by PutBlob
// (because inputInfo.Digest == "", this must have been computed afresh).
switch compressionOperation {
case types.PreserveOriginal:
break // Do nothing, we have only one digest and we might not have even verified it.
if desiredCompressionFormat.Name() != "" && isCompressed {
// we recompressed it
bic.RecordDigestCompressorName(uploadedInfo.Digest, desiredCompressionFormat.Name())
} else {
// it wasn't, and still isn't, compressed
bic.RecordDigestCompressorName(uploadedInfo.Digest, internalblobinfocache.Uncompressed)
}
// Do nothing else, we have only one digest and we might not have even verified it.
case types.Compress:
c.blobInfoCache.RecordDigestUncompressedPair(uploadedInfo.Digest, srcInfo.Digest)
if srcInfo.Digest != "" && !isCompressed {
// the original blob wasn't compressed
bic.RecordDigestCompressorName(srcInfo.Digest, internalblobinfocache.Uncompressed)
}
if desiredCompressionFormat.Name() != "" {
// we compressed the original blob
bic.RecordDigestCompressorName(uploadedInfo.Digest, desiredCompressionFormat.Name())
}
case types.Decompress:
c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, uploadedInfo.Digest)
if srcInfo.Digest != "" && isCompressed {
// the original blob was compressed
bic.RecordDigestCompressorName(srcInfo.Digest, compressionFormat.Name())
}
// we decompressed the original blob
bic.RecordDigestCompressorName(uploadedInfo.Digest, internalblobinfocache.Uncompressed)
default:
return types.BlobInfo{}, errors.Errorf("Internal error: Unexpected compressionOperation value %#v", compressionOperation)
}
Expand Down
5 changes: 3 additions & 2 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
Expand All @@ -210,7 +212,6 @@ func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.Blo
return false, types.BlobInfo{}, err
}
return true, types.BlobInfo{Digest: info.Digest, Size: finfo.Size()}, nil

}

// PutManifest writes manifest to the destination.
Expand Down
25 changes: 20 additions & 5 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"

"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/internal/iolimits"
"github.com/containers/image/v5/internal/uploadreader"
"github.com/containers/image/v5/manifest"
Expand Down Expand Up @@ -162,6 +163,7 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,

digester := digest.Canonical.Digester()
sizeCounter := &sizeCounter{}

uploadLocation, err = func() (*url.URL, error) { // A scope for defer
uploadReader := uploadreader.NewUploadReader(io.TeeReader(stream, io.MultiWriter(digester.Hash(), sizeCounter)))
// This error text should never be user-visible, we terminate only after makeRequestToResolvedURL
Expand Down Expand Up @@ -299,17 +301,23 @@ func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.
}
if exists {
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: info.Digest, Size: size}, nil
return true, types.BlobInfo{Digest: info.Digest, MediaType: info.MediaType, Size: size}, nil
}

// Then try reusing blobs from other locations.
for _, candidate := range cache.CandidateLocations(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, canSubstitute) {
bic := blobinfocache.FromBlobInfoCache(cache)
candidates := bic.CandidateLocations2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, canSubstitute)
for _, candidate := range candidates {
candidateRepo, err := parseBICLocationReference(candidate.Location)
if err != nil {
logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err)
continue
}
logrus.Debugf("Trying to reuse cached location %s in %s", candidate.Digest.String(), candidateRepo.Name())
if candidate.CompressorName != blobinfocache.Uncompressed {
logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name())
} else {
logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name())
}

// Sanity checks:
if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) {
Expand Down Expand Up @@ -351,8 +359,15 @@ func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.
continue
}
}
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: candidate.Digest, Size: size}, nil

bic.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, newBICLocationReference(d.ref))

compressionOperation, compressionAlgorithm, err := blobinfocache.OperationAndAlgorithmForCompressor(candidate.CompressorName)
if err != nil {
return false, types.BlobInfo{}, err
}

return true, types.BlobInfo{Digest: candidate.Digest, MediaType: info.MediaType, Size: size, CompressionOperation: compressionOperation, CompressionAlgorithm: compressionAlgorithm}, nil
}

return false, types.BlobInfo{}, nil
Expand Down
4 changes: 3 additions & 1 deletion docker/tarfile/dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
Expand Down
72 changes: 72 additions & 0 deletions internal/blobinfocache/blobinfocache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package blobinfocache

import (
"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
)

// FromBlobInfoCache returns a BlobInfoCache2 based on a BlobInfoCache, returning the original
// object if it implements BlobInfoCache2, or a wrapper which discards compression information
// if it only implements BlobInfoCache.
func FromBlobInfoCache(bic types.BlobInfoCache) BlobInfoCache2 {
if bic2, ok := bic.(BlobInfoCache2); ok {
return bic2
}
return &wrappedBlobInfoCache{
BlobInfoCache: bic,
}
}

type wrappedBlobInfoCache struct {
types.BlobInfoCache
}

func (bic *wrappedBlobInfoCache) RecordDigestCompressorName(anyDigest digest.Digest, compressorName string) {
}

func (bic *wrappedBlobInfoCache) CandidateLocations2(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, canSubstitute bool) []BICReplacementCandidate2 {
oldCandidates := bic.CandidateLocations(transport, scope, digest, canSubstitute)
results := make([]BICReplacementCandidate2, 0, len(oldCandidates))
for _, c := range oldCandidates {
results = append(results, BICReplacementCandidate2{
Digest: c.Digest,
Location: c.Location,
CompressorName: UnknownCompression,
})
}
return results
}

// CandidateLocationsFromV2 converts a slice of BICReplacementCandidate2 to a slice of
// types.BICReplacementCandidate, dropping compression information.
func CandidateLocationsFromV2(v2candidates []BICReplacementCandidate2) []types.BICReplacementCandidate {
candidates := make([]types.BICReplacementCandidate, 0, len(v2candidates))
for _, c := range v2candidates {
candidates = append(candidates, types.BICReplacementCandidate{
Digest: c.Digest,
Location: c.Location,
})
}
return candidates
}

// OperationAndAlgorithmForCompressor returns CompressionOperation and CompressionAlgorithm
// values suitable for inclusion in a types.BlobInfo structure, based on the name of the
// compression algorithm, or Uncompressed, or UnknownCompression. This is typically used by
// TryReusingBlob() implementations to set values in the BlobInfo structure that they return
// upon success.
func OperationAndAlgorithmForCompressor(compressorName string) (types.LayerCompression, *compression.Algorithm, error) {
switch compressorName {
case Uncompressed:
return types.Decompress, nil, nil
case UnknownCompression:
return types.PreserveOriginal, nil, nil
default:
algo, err := compression.AlgorithmByName(compressorName)
if err == nil {
return types.Compress, &algo, nil
}
return types.PreserveOriginal, nil, err
}
}
46 changes: 46 additions & 0 deletions internal/blobinfocache/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package blobinfocache

import (
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
)

const (
// Uncompressed is the value we store in a blob info cache to indicate that we know that the blob in
// the corresponding location is not compressed.
Uncompressed = "uncompressed"
// UnknownCompression s the value we store in a blob info cache to indicate that we don't know if the
// blob in the corresponding location is compressed or not.
UnknownCompression = "unknown"
)

// BlobInfoCache2 extends BlobInfoCache by adding the ability to track information about what kind
// of compression was applied to the blobs it keeps information about.
type BlobInfoCache2 interface {
types.BlobInfoCache
blobInfoCache2WithoutV1
}

// BICReplacementCandidate2 is an item returned by BlobInfoCache2.CandidateLocations2.
type BICReplacementCandidate2 struct {
Digest digest.Digest
CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression
Location types.BICLocationReference
}

// BlobInfoCache2WithoutV1 contains extended versions of the BlobInfoCache methods which add the
// ability to track information about what kind of compression was applied to the blobs it keeps
// information about.
type blobInfoCache2WithoutV1 interface {
// RecordDigestCompressorName records a compressor for the blob with the specified digest,
// or Uncompressed or UnknownCompression.
RecordDigestCompressorName(anyDigest digest.Digest, compressorName string)
// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations
// that could possibly be reused within the specified (transport scope) (if they still
// exist, which is not guaranteed).
//
// If !canSubstitute, the returned cadidates will match the submitted digest exactly; if
// canSubstitute, data from previous RecordDigestUncompressedPair calls is used to also look
// up variants of the blob which have the same uncompressed digest.
CandidateLocations2(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, canSubstitute bool) []BICReplacementCandidate2
}
21 changes: 17 additions & 4 deletions manifest/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -70,15 +69,15 @@ func compressionVariantMIMEType(variantTable []compressionMIMETypeSet, mimeType
return res, nil
}
if name != mtsUncompressed {
return "", fmt.Errorf("%s compression is not supported", name)
return "", ManifestLayerCompressionIncompatibilityError{fmt.Sprintf("%s compression is not supported for type %q", name, mt)}
}
return "", errors.New("uncompressed variant is not supported")
return "", ManifestLayerCompressionIncompatibilityError{fmt.Sprintf("uncompressed variant is not supported for type %q", mt)}
}
if name != mtsUncompressed {
return "", fmt.Errorf("unknown compression algorithm %s", name)
}
// We can't very well say “the idea of no compression is unknown”
return "", errors.New("uncompressed variant is not supported")
return "", ManifestLayerCompressionIncompatibilityError{fmt.Sprintf("uncompressed variant is not supported for type %q", mt)}
}
}
}
Expand All @@ -99,6 +98,12 @@ func updatedMIMEType(variantTable []compressionMIMETypeSet, mimeType string, upd
// {de}compressed.
switch updated.CompressionOperation {
case types.PreserveOriginal:
// Force a change to the media type if we're being told to use a particular compressor,
// since it might be different from the one associated with the media type. Otherwise,
// try to keep the original media type.
if updated.CompressionAlgorithm != nil && updated.CompressionAlgorithm.Name() != mtsUncompressed {
return compressionVariantMIMEType(variantTable, mimeType, updated.CompressionAlgorithm)
}
// Keep the original media type.
return mimeType, nil

Expand All @@ -116,3 +121,11 @@ func updatedMIMEType(variantTable []compressionMIMETypeSet, mimeType string, upd
return "", fmt.Errorf("unknown compression operation (%d)", updated.CompressionOperation)
}
}

type ManifestLayerCompressionIncompatibilityError struct {
text string
}

func (m ManifestLayerCompressionIncompatibilityError) Error() string {
return m.text
}
4 changes: 3 additions & 1 deletion oci/archive/oci_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Read
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
Expand Down
Loading

0 comments on commit 32d0532

Please sign in to comment.