Skip to content

Commit

Permalink
Merge pull request #2508 from mtrmac/5.32-zstd
Browse files Browse the repository at this point in the history
[release-5.32] Zstd backports
  • Loading branch information
rhatdan authored Aug 9, 2024
2 parents 1bf67ef + 893740a commit 6a17c28
Show file tree
Hide file tree
Showing 34 changed files with 1,371 additions and 494 deletions.
8 changes: 4 additions & 4 deletions .codespellrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# See https://github.com/codespell-project/codespell#using-a-config-file
[codespell]
skip = .git,*.pdf,*.svg,.codespellrc,go.sum,system_registries_v2_test.go,Makefile,build,buildah,buildah.spec,imgtype,copy,AUTHORS,bin,vendor,CHANGELOG.md,changelog.txt,seccomp.json,.cirrus.yml,*.xz,*.gz,*.tar,*.tgz,*ico,*.png,*.1,*.5,*.orig,*.rej,*.gpg
check-hidden = true
ignore-regex = \b(isT|BU|this/doesnt:match)\b
ignore-words-list = te,pathc
skip = ./vendor,./.git,./go.sum,*.gpg

# NOTE words added to the list below need to be lowercased.
ignore-words-list = te,bu
161 changes: 98 additions & 63 deletions copy/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/containers/image/v5/pkg/compression"
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/types"
chunkedToc "github.com/containers/storage/pkg/chunked/toc"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)
Expand All @@ -34,10 +35,10 @@ var (

// bpDetectCompressionStepData contains data that the copy pipeline needs about the “detect compression” step.
type bpDetectCompressionStepData struct {
isCompressed bool
format compressiontypes.Algorithm // Valid if isCompressed
decompressor compressiontypes.DecompressorFunc // Valid if isCompressed
srcCompressorName string // Compressor name to possibly record in the blob info cache for the source blob.
isCompressed bool
format compressiontypes.Algorithm // Valid if isCompressed
decompressor compressiontypes.DecompressorFunc // Valid if isCompressed
srcCompressorBaseVariantName string // Compressor name to possibly record in the blob info cache for the source blob.
}

// blobPipelineDetectCompressionStep updates *stream to detect its current compression format.
Expand All @@ -51,15 +52,25 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI
}
stream.reader = reader

if decompressor != nil && format.Name() == compressiontypes.ZstdAlgorithmName {
tocDigest, err := chunkedToc.GetTOCDigest(srcInfo.Annotations)
if err != nil {
return bpDetectCompressionStepData{}, err
}
if tocDigest != nil {
format = compression.ZstdChunked
}

}
res := bpDetectCompressionStepData{
isCompressed: decompressor != nil,
format: format,
decompressor: decompressor,
}
if res.isCompressed {
res.srcCompressorName = format.Name()
res.srcCompressorBaseVariantName = format.BaseVariantName()
} else {
res.srcCompressorName = internalblobinfocache.Uncompressed
res.srcCompressorBaseVariantName = internalblobinfocache.Uncompressed
}

if expectedBaseFormat, known := expectedBaseCompressionFormats[stream.info.MediaType]; known && res.isCompressed && format.BaseVariantName() != expectedBaseFormat.Name() {
Expand All @@ -70,13 +81,14 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI

// bpCompressionStepData contains data that the copy pipeline needs about the compression step.
type bpCompressionStepData struct {
operation bpcOperation // What we are actually doing
uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do)
uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits.
uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed.
srcCompressorName string // Compressor name to record in the blob info cache for the source blob.
uploadedCompressorName string // Compressor name to record in the blob info cache for the uploaded blob.
closers []io.Closer // Objects to close after the upload is done, if any.
operation bpcOperation // What we are actually doing
uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do)
uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits.
uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed.
srcCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the source blob.
uploadedCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the uploaded blob.
uploadedCompressorSpecificVariantName string // Compressor specific variant name to record in the blob info cache for the uploaded blob.
closers []io.Closer // Objects to close after the upload is done, if any.
}

type bpcOperation int
Expand Down Expand Up @@ -128,11 +140,12 @@ func (ic *imageCopier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectComp
// We can’t do anything with an encrypted blob unless decrypted.
logrus.Debugf("Using original blob without modification for encrypted blob")
return &bpCompressionStepData{
operation: bpcOpPreserveOpaque,
uploadedOperation: types.PreserveOriginal,
uploadedAlgorithm: nil,
srcCompressorName: internalblobinfocache.UnknownCompression,
uploadedCompressorName: internalblobinfocache.UnknownCompression,
operation: bpcOpPreserveOpaque,
uploadedOperation: types.PreserveOriginal,
uploadedAlgorithm: nil,
srcCompressorBaseVariantName: internalblobinfocache.UnknownCompression,
uploadedCompressorBaseVariantName: internalblobinfocache.UnknownCompression,
uploadedCompressorSpecificVariantName: internalblobinfocache.UnknownCompression,
}, nil
}
return nil, nil
Expand All @@ -156,14 +169,19 @@ func (ic *imageCopier) bpcCompressUncompressed(stream *sourceStream, detected bp
Digest: "",
Size: -1,
}
specificVariantName := uploadedAlgorithm.Name()
if specificVariantName == uploadedAlgorithm.BaseVariantName() {
specificVariantName = internalblobinfocache.UnknownCompression
}
return &bpCompressionStepData{
operation: bpcOpCompressUncompressed,
uploadedOperation: types.Compress,
uploadedAlgorithm: uploadedAlgorithm,
uploadedAnnotations: annotations,
srcCompressorName: detected.srcCompressorName,
uploadedCompressorName: uploadedAlgorithm.Name(),
closers: []io.Closer{reader},
operation: bpcOpCompressUncompressed,
uploadedOperation: types.Compress,
uploadedAlgorithm: uploadedAlgorithm,
uploadedAnnotations: annotations,
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
uploadedCompressorBaseVariantName: uploadedAlgorithm.BaseVariantName(),
uploadedCompressorSpecificVariantName: specificVariantName,
closers: []io.Closer{reader},
}, nil
}
return nil, nil
Expand Down Expand Up @@ -196,15 +214,20 @@ func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bp
Digest: "",
Size: -1,
}
specificVariantName := ic.compressionFormat.Name()
if specificVariantName == ic.compressionFormat.BaseVariantName() {
specificVariantName = internalblobinfocache.UnknownCompression
}
succeeded = true
return &bpCompressionStepData{
operation: bpcOpRecompressCompressed,
uploadedOperation: types.PreserveOriginal,
uploadedAlgorithm: ic.compressionFormat,
uploadedAnnotations: annotations,
srcCompressorName: detected.srcCompressorName,
uploadedCompressorName: ic.compressionFormat.Name(),
closers: []io.Closer{decompressed, recompressed},
operation: bpcOpRecompressCompressed,
uploadedOperation: types.PreserveOriginal,
uploadedAlgorithm: ic.compressionFormat,
uploadedAnnotations: annotations,
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
uploadedCompressorBaseVariantName: ic.compressionFormat.BaseVariantName(),
uploadedCompressorSpecificVariantName: specificVariantName,
closers: []io.Closer{decompressed, recompressed},
}, nil
}
return nil, nil
Expand All @@ -225,12 +248,13 @@ func (ic *imageCopier) bpcDecompressCompressed(stream *sourceStream, detected bp
Size: -1,
}
return &bpCompressionStepData{
operation: bpcOpDecompressCompressed,
uploadedOperation: types.Decompress,
uploadedAlgorithm: nil,
srcCompressorName: detected.srcCompressorName,
uploadedCompressorName: internalblobinfocache.Uncompressed,
closers: []io.Closer{s},
operation: bpcOpDecompressCompressed,
uploadedOperation: types.Decompress,
uploadedAlgorithm: nil,
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
uploadedCompressorBaseVariantName: internalblobinfocache.Uncompressed,
uploadedCompressorSpecificVariantName: internalblobinfocache.UnknownCompression,
closers: []io.Closer{s},
}, nil
}
return nil, nil
Expand Down Expand Up @@ -268,11 +292,15 @@ func (ic *imageCopier) bpcPreserveOriginal(_ *sourceStream, detected bpDetectCom
algorithm = nil
}
return &bpCompressionStepData{
operation: bpcOp,
uploadedOperation: uploadedOp,
uploadedAlgorithm: algorithm,
srcCompressorName: detected.srcCompressorName,
uploadedCompressorName: detected.srcCompressorName,
operation: bpcOp,
uploadedOperation: uploadedOp,
uploadedAlgorithm: algorithm,
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
// We only record the base variant of the format on upload; we didn’t do anything with
// the TOC, we don’t know whether it matches the blob digest, so we don’t want to trigger
// reuse of any kind between the blob digest and the TOC digest.
uploadedCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
uploadedCompressorSpecificVariantName: internalblobinfocache.UnknownCompression,
}
}

Expand Down Expand Up @@ -308,6 +336,15 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
// No useful information
case bpcOpCompressUncompressed:
c.blobInfoCache.RecordDigestUncompressedPair(uploadedInfo.Digest, srcInfo.Digest)
if d.uploadedAnnotations != nil {
tocDigest, err := chunkedToc.GetTOCDigest(d.uploadedAnnotations)
if err != nil {
return fmt.Errorf("parsing just-created compression annotations: %w", err)
}
if tocDigest != nil {
c.blobInfoCache.RecordTOCUncompressedPair(*tocDigest, srcInfo.Digest)
}
}
case bpcOpDecompressCompressed:
c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, uploadedInfo.Digest)
case bpcOpRecompressCompressed, bpcOpPreserveCompressed:
Expand All @@ -323,29 +360,27 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
return fmt.Errorf("Internal error: Unexpected d.operation value %#v", d.operation)
}
}
if d.srcCompressorName == "" || d.uploadedCompressorName == "" {
return fmt.Errorf("internal error: missing compressor names (src: %q, uploaded: %q)",
d.srcCompressorName, d.uploadedCompressorName)
if d.srcCompressorBaseVariantName == "" || d.uploadedCompressorBaseVariantName == "" || d.uploadedCompressorSpecificVariantName == "" {
return fmt.Errorf("internal error: missing compressor names (src base: %q, uploaded base: %q, uploaded specific: %q)",
d.srcCompressorBaseVariantName, d.uploadedCompressorBaseVariantName, d.uploadedCompressorSpecificVariantName)
}
if d.uploadedCompressorName != internalblobinfocache.UnknownCompression {
if d.uploadedCompressorName != compressiontypes.ZstdChunkedAlgorithmName {
// HACK: Don’t record zstd:chunked algorithms.
// There is already a similar hack in internal/imagedestination/impl/helpers.CandidateMatchesTryReusingBlobOptions,
// and that one prevents reusing zstd:chunked blobs, so recording the algorithm here would be mostly harmless.
//
// We skip that here anyway to work around the inability of blobPipelineDetectCompressionStep to differentiate
// between zstd and zstd:chunked; so we could, in varying situations over time, call RecordDigestCompressorName
// with the same digest and both ZstdAlgorithmName and ZstdChunkedAlgorithmName , which causes warnings about
// inconsistent data to be logged.
c.blobInfoCache.RecordDigestCompressorName(uploadedInfo.Digest, d.uploadedCompressorName)
}
if d.uploadedCompressorBaseVariantName != internalblobinfocache.UnknownCompression {
c.blobInfoCache.RecordDigestCompressorData(uploadedInfo.Digest, internalblobinfocache.DigestCompressorData{
BaseVariantCompressor: d.uploadedCompressorBaseVariantName,
SpecificVariantCompressor: d.uploadedCompressorSpecificVariantName,
SpecificVariantAnnotations: d.uploadedAnnotations,
})
}
if srcInfo.Digest != "" && srcInfo.Digest != uploadedInfo.Digest &&
d.srcCompressorName != internalblobinfocache.UnknownCompression {
if d.srcCompressorName != compressiontypes.ZstdChunkedAlgorithmName {
// HACK: Don’t record zstd:chunked algorithms, see above.
c.blobInfoCache.RecordDigestCompressorName(srcInfo.Digest, d.srcCompressorName)
}
d.srcCompressorBaseVariantName != internalblobinfocache.UnknownCompression {
// If the source is already using some TOC-dependent variant, we either copied the
// blob as is, or perhaps decompressed it; either way we don’t trust the TOC digest,
// so record neither the variant name, nor the TOC digest.
c.blobInfoCache.RecordDigestCompressorData(srcInfo.Digest, internalblobinfocache.DigestCompressorData{
BaseVariantCompressor: d.srcCompressorBaseVariantName,
SpecificVariantCompressor: internalblobinfocache.UnknownCompression,
SpecificVariantAnnotations: nil,
})
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion copy/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (ic *imageCopier) blobPipelineDecryptionStep(stream *sourceStream, srcInfo
Annotations: stream.info.Annotations,
}
// DecryptLayer supposedly returns a digest of the decrypted stream.
// In pratice, that value is never set in the current implementation.
// In practice, that value is never set in the current implementation.
// And we shouldn’t use it anyway, because it is not trusted: encryption can be made to a public key,
// i.e. it doesn’t authenticate the origin of the metadata in any way.
reader, _, err := ocicrypt.DecryptLayer(ic.c.options.OciDecryptConfig, stream.reader, desc, false)
Expand Down
6 changes: 3 additions & 3 deletions copy/multiple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestPrepareCopyInstancesforInstanceCopyClone(t *testing.T) {
// * Still copy gzip variants if they exist in the original
// * Not create new Zstd variants if they exist in the original.

// We crated a list of three instances `sourceInstances` and since in oci1.index.zstd-selection.json
// We created a list of three instances `sourceInstances` and since in oci1.index.zstd-selection.json
// amd64 already has a zstd instance i.e sourceInstance[1] so it should not create replication for
// `sourceInstance[0]` and `sourceInstance[1]` but should do it for `sourceInstance[2]` for `arm64`
// and still copy `sourceInstance[2]`.
Expand All @@ -93,8 +93,8 @@ func TestPrepareCopyInstancesforInstanceCopyClone(t *testing.T) {
actualResponse := convertInstanceCopyToSimplerInstanceCopy(instancesToCopy)
assert.Equal(t, expectedResponse, actualResponse)

// Test option with multiple copy request for same compression format
// above expection should stay same, if out ensureCompressionVariantsExist requests zstd twice
// Test option with multiple copy request for same compression format.
// The above expectation should stay the same, if ensureCompressionVariantsExist requests zstd twice.
ensureCompressionVariantsExist = []OptionCompressionVariant{{Algorithm: compression.Zstd}, {Algorithm: compression.Zstd}}
instancesToCopy, err = prepareInstanceCopies(list, sourceInstances, &Options{EnsureCompressionVariantsExist: ensureCompressionVariantsExist})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion copy/progress_bars.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *copier) printCopyInfo(kind string, info types.BlobInfo) {
}
}

// mark100PercentComplete marks the progres bars as 100% complete;
// mark100PercentComplete marks the progress bars as 100% complete;
// it may do so by possibly advancing the current state if it is below the known total.
func (bar *progressBar) mark100PercentComplete() {
if bar.originalSize > 0 {
Expand Down
2 changes: 1 addition & 1 deletion copy/sign_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestCreateSignatures(t *testing.T) {
successfullySignedIdentity: "docker.io/library/busybox:latest",
},
{
name: "docker:// with overidden identity",
name: "docker:// with overridden identity",
dest: dockerDest,
identity: "myregistry.io/myrepo:mytag",
successfullySignedIdentity: "myregistry.io/myrepo:mytag",
Expand Down
Loading

0 comments on commit 6a17c28

Please sign in to comment.