Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compression: introduce compression.Type interface #3136

Merged
merged 3 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 4 additions & 65 deletions cache/blobs.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package cache

import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
"strconv"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/diff/walking"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
"github.com/klauspost/compress/zstd"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/flightcontrol"
Expand Down Expand Up @@ -57,8 +53,6 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo
return computeBlobChain(ctx, sr, createIfNeeded, comp, s, filter)
}

type compressor func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error)

func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, comp compression.Config, s session.Group, filter map[string]struct{}) error {
eg, ctx := errgroup.WithContext(ctx)
switch sr.kind() {
Expand Down Expand Up @@ -92,28 +86,8 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.WithStack(ErrNoBlobs)
}

var mediaType string
var compressorFunc compressor
var finalize func(context.Context, content.Store) (map[string]string, error)
switch comp.Type {
case compression.Uncompressed:
mediaType = ocispecs.MediaTypeImageLayer
case compression.Gzip:
compressorFunc = func(dest io.Writer, _ string) (io.WriteCloser, error) {
return gzipWriter(comp)(dest)
}
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.EStargz:
compressorFunc, finalize = compressEStargz(comp)
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.Zstd:
compressorFunc = func(dest io.Writer, _ string) (io.WriteCloser, error) {
return zstdWriter(comp)(dest)
}
mediaType = ocispecs.MediaTypeImageLayer + "+zstd"
default:
return nil, errors.Errorf("unknown layer compression type: %q", comp.Type)
}
compressorFunc, finalize := comp.Type.Compress(comp)
mediaType := comp.Type.MediaType()

var lowerRef *immutableRef
switch sr.kind() {
Expand Down Expand Up @@ -206,7 +180,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
}
}

if desc.Digest == "" && !isTypeWindows(sr) && (comp.Type == compression.Zstd || comp.Type == compression.EStargz) {
if desc.Digest == "" && !isTypeWindows(sr) && comp.Type.NeedsComputeDiffBySelf() {
// These compression types aren't supported by containerd differ. So try to compute diff on buildkit side.
// This case can be happen on containerd worker + non-overlayfs snapshotter (e.g. native).
// See also: https://github.com/containerd/containerd/issues/4263
Expand Down Expand Up @@ -433,7 +407,7 @@ func isTypeWindows(sr *immutableRef) bool {

// ensureCompression ensures the specified ref has the blob of the specified compression Type.
func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error {
_, err := g.Do(ctx, fmt.Sprintf("%s-%d", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) {
_, err := g.Do(ctx, fmt.Sprintf("%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) {
desc, err := ref.ociDesc(ctx, ref.descHandlers, true)
if err != nil {
return nil, err
Expand Down Expand Up @@ -480,38 +454,3 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.
})
return err
}

func gzipWriter(comp compression.Config) func(io.Writer) (io.WriteCloser, error) {
return func(dest io.Writer) (io.WriteCloser, error) {
level := gzip.DefaultCompression
if comp.Level != nil {
level = *comp.Level
}
return gzip.NewWriterLevel(dest, level)
}
}

func zstdWriter(comp compression.Config) func(io.Writer) (io.WriteCloser, error) {
return func(dest io.Writer) (io.WriteCloser, error) {
level := zstd.SpeedDefault
if comp.Level != nil {
level = toZstdEncoderLevel(*comp.Level)
}
return zstd.NewWriter(dest, zstd.WithEncoderLevel(level))
}
}

func toZstdEncoderLevel(level int) zstd.EncoderLevel {
// map zstd compression levels to go-zstd levels
// once we also have c based implementation move this to helper pkg
if level < 0 {
return zstd.SpeedDefault
} else if level < 3 {
return zstd.SpeedFastest
} else if level < 7 {
return zstd.SpeedDefault
} else if level < 9 {
return zstd.SpeedBetterCompression
}
return zstd.SpeedBestCompression
}
3 changes: 2 additions & 1 deletion cache/blobs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/overlay"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -24,7 +25,7 @@ var emptyDesc = ocispecs.Descriptor{}
// diff between lower and upper snapshot. If the passed mounts cannot
// be computed (e.g. because the mounts aren't overlayfs), it returns
// an error.
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compressor) (_ ocispecs.Descriptor, ok bool, err error) {
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor) (_ ocispecs.Descriptor, ok bool, err error) {
// Get upperdir location if mounts are overlayfs that can be processed by this differ.
upperdir, err := overlay.GetUpperdir(lower, upper)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cache/blobs_nolinux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package cache
import (
"context"

"github.com/moby/buildkit/util/compression"
"github.com/containerd/containerd/mount"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compressor) (_ ocispecs.Descriptor, ok bool, err error) {
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor) (_ ocispecs.Descriptor, ok bool, err error) {
return ocispecs.Descriptor{}, true, errors.Errorf("overlayfs-based diff computing is unsupported")
}
148 changes: 19 additions & 129 deletions cache/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,120 +7,46 @@ import (
"io"
"sync"

cdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/labels"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/iohelper"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

// needsConversion indicates whether a conversion is needed for the specified descriptor to
// be the compressionType.
func needsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, compressionType compression.Type) (bool, error) {
mediaType := desc.MediaType
switch compressionType {
case compression.Uncompressed:
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Uncompressed {
return false, nil
}
case compression.Gzip:
esgz, err := isEStargz(ctx, cs, desc.Digest)
if err != nil {
return false, err
}
if (!images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Gzip) && !esgz {
return false, nil
}
case compression.Zstd:
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Zstd {
return false, nil
}
case compression.EStargz:
esgz, err := isEStargz(ctx, cs, desc.Digest)
if err != nil {
return false, err
}
if !images.IsLayerType(mediaType) || esgz {
return false, nil
}
default:
return false, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
}
return true, nil
}

// getConverter returns converter function according to the specified compression type.
// If no conversion is needed, this returns nil without error.
func getConverter(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, comp compression.Config) (converter.ConvertFunc, error) {
if needs, err := needsConversion(ctx, cs, desc, comp.Type); err != nil {
if needs, err := comp.Type.NeedsConversion(ctx, cs, desc); err != nil {
return nil, errors.Wrapf(err, "failed to determine conversion needs")
} else if !needs {
// No conversion. No need to return an error here.
return nil, nil
}

c := conversion{target: comp}

from := compression.FromMediaType(desc.MediaType)
switch from {
case compression.Uncompressed:
case compression.Gzip, compression.Zstd:
c.decompress = func(ctx context.Context, desc ocispecs.Descriptor) (r io.ReadCloser, err error) {
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
esgz, err := isEStargz(ctx, cs, desc.Digest)
if err != nil {
return nil, err
} else if esgz {
r, err = decompressEStargz(io.NewSectionReader(ra, 0, ra.Size()))
if err != nil {
return nil, err
}
} else {
r, err = cdcompression.DecompressStream(io.NewSectionReader(ra, 0, ra.Size()))
if err != nil {
return nil, err
}
}
return &readCloser{r, ra.Close}, nil
}
default:
return nil, errors.Errorf("unsupported source compression type %q from mediatype %q", from, desc.MediaType)
from, err := compression.FromMediaType(desc.MediaType)
if err != nil {
return nil, err
}

switch comp.Type {
case compression.Uncompressed:
case compression.Gzip:
c.compress = gzipWriter(comp)
case compression.Zstd:
c.compress = zstdWriter(comp)
case compression.EStargz:
compressorFunc, finalize := compressEStargz(comp)
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip)
}
c.finalize = finalize
default:
return nil, errors.Errorf("unknown target compression type during conversion: %q", comp.Type)
}
c := conversion{target: comp}
c.compress, c.finalize = comp.Type.Compress(comp)
c.decompress = from.Decompress

return (&c).convert, nil
}

type conversion struct {
target compression.Config
decompress func(context.Context, ocispecs.Descriptor) (io.ReadCloser, error)
compress func(w io.Writer) (io.WriteCloser, error)
finalize func(context.Context, content.Store) (map[string]string, error)
decompress compression.Decompressor
compress compression.Compressor
finalize compression.Finalizer
}

var bufioPool = sync.Pool{
Expand Down Expand Up @@ -151,34 +77,20 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec
bufW = bufio.NewWriterSize(w, 128*1024)
}
defer bufioPool.Put(bufW)
var zw io.WriteCloser = &nopWriteCloser{bufW}
if c.compress != nil {
zw, err = c.compress(zw)
if err != nil {
return nil, err
}
zw, err := c.compress(&iohelper.NopWriteCloser{Writer: bufW}, c.target.Type.MediaType())
if err != nil {
return nil, err
}
zw = &onceWriteCloser{WriteCloser: zw}
defer zw.Close()

// convert this layer
diffID := digest.Canonical.Digester()
var rdr io.Reader
if c.decompress == nil {
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
rdr = io.NewSectionReader(ra, 0, ra.Size())
} else {
rc, err := c.decompress(ctx, desc)
if err != nil {
return nil, err
}
defer rc.Close()
rdr = rc
rdr, err := c.decompress(ctx, cs, desc)
if err != nil {
return nil, err
}
defer rdr.Close()
if _, err := io.Copy(zw, io.TeeReader(rdr, diffID.Hash())); err != nil {
return nil, err
}
Expand All @@ -201,7 +113,7 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec
}

newDesc := desc
newDesc.MediaType = c.target.Type.DefaultMediaType()
newDesc.MediaType = c.target.Type.MediaType()
newDesc.Digest = info.Digest
newDesc.Size = info.Size
newDesc.Annotations = map[string]string{labels.LabelUncompressed: diffID.Digest().String()}
Expand All @@ -217,28 +129,6 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec
return &newDesc, nil
}

type readCloser struct {
io.ReadCloser
closeFunc func() error
}

func (rc *readCloser) Close() error {
err1 := rc.ReadCloser.Close()
err2 := rc.closeFunc()
if err1 != nil {
return errors.Wrapf(err1, "failed to close: %v", err2)
}
return err2
}

type nopWriteCloser struct {
io.Writer
}

func (w *nopWriteCloser) Close() error {
return nil
}

type onceWriteCloser struct {
io.WriteCloser
closeOnce sync.Once
Expand Down
Loading