Skip to content

Commit

Permalink
Merge pull request #481 from mtrmac/docker-archive-auto-compression
Browse files Browse the repository at this point in the history
Docker archive auto compression
  • Loading branch information
mtrmac authored Jul 17, 2018
2 parents 66857a7 + 6353fa5 commit c6e0eee
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 66 deletions.
5 changes: 4 additions & 1 deletion copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc)
if err != nil {
return "", err
}
defer s.Close()
stream = s
}

Expand Down Expand Up @@ -673,10 +674,12 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
inputInfo.Size = -1
} else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Decompress && isCompressed {
logrus.Debugf("Blob will be decompressed")
destStream, err = decompressor(destStream)
s, err := decompressor(destStream)
if err != nil {
return types.BlobInfo{}, err
}
defer s.Close()
destStream = s
inputInfo.Digest = ""
inputInfo.Size = -1
} else {
Expand Down
102 changes: 71 additions & 31 deletions docker/tarfile/src.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tarfile
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"
Expand Down Expand Up @@ -43,28 +42,32 @@ type layerInfo struct {
// the source of an image.
// To do for both the NewSourceFromFile and NewSourceFromStream functions

// NewSourceFromFile returns a tarfile.Source for the specified path
// NewSourceFromFile supports both conpressed and uncompressed input
// NewSourceFromFile returns a tarfile.Source for the specified path.
func NewSourceFromFile(path string) (*Source, error) {
file, err := os.Open(path)
if err != nil {
return nil, errors.Wrapf(err, "error opening file %q", path)
}
defer file.Close()

reader, err := gzip.NewReader(file)
// If the file is already not compressed we can just return the file itself
// as a source. Otherwise we pass the stream to NewSourceFromStream.
stream, isCompressed, err := compression.AutoDecompress(file)
if err != nil {
return nil, errors.Wrapf(err, "Error detecting compression for file %q", path)
}
defer stream.Close()
if !isCompressed {
return &Source{
tarPath: path,
}, nil
}
defer reader.Close()

return NewSourceFromStream(reader)
return NewSourceFromStream(stream)
}

// NewSourceFromStream returns a tarfile.Source for the specified inputStream, which must be uncompressed.
// The caller can close the inputStream immediately after NewSourceFromFile returns.
// NewSourceFromStream returns a tarfile.Source for the specified inputStream,
// which can be either compressed or uncompressed. The caller can close the
// inputStream immediately after NewSourceFromFile returns.
func NewSourceFromStream(inputStream io.Reader) (*Source, error) {
// FIXME: use SystemContext here.
// Save inputStream to a temporary file
Expand All @@ -81,8 +84,20 @@ func NewSourceFromStream(inputStream io.Reader) (*Source, error) {
}
}()

// TODO: This can take quite some time, and should ideally be cancellable using a context.Context.
if _, err := io.Copy(tarCopyFile, inputStream); err != nil {
// In order to be compatible with docker-load, we need to support
// auto-decompression (it's also a nice quality-of-life thing to avoid
// giving users really confusing "invalid tar header" errors).
uncompressedStream, _, err := compression.AutoDecompress(inputStream)
if err != nil {
return nil, errors.Wrap(err, "Error auto-decompressing input")
}
defer uncompressedStream.Close()

// Copy the plain archive to the temporary file.
//
// TODO: This can take quite some time, and should ideally be cancellable
// using a context.Context.
if _, err := io.Copy(tarCopyFile, uncompressedStream); err != nil {
return nil, errors.Wrapf(err, "error copying contents to temporary file %q", tarCopyFile.Name())
}
succeeded = true
Expand Down Expand Up @@ -292,7 +307,25 @@ func (s *Source) prepareLayerData(tarManifest *ManifestItem, parsedConfig *manif
return nil, err
}
if li, ok := unknownLayerSizes[h.Name]; ok {
li.size = h.Size
// Since GetBlob will decompress layers that are compressed we need
// to do the decompression here as well, otherwise we will
// incorrectly report the size. Pretty critical, since tools like
// umoci always compress layer blobs. Obviously we only bother with
// the slower method of checking if it's compressed.
uncompressedStream, isCompressed, err := compression.AutoDecompress(t)
if err != nil {
return nil, errors.Wrapf(err, "Error auto-decompressing %s to determine its size", h.Name)
}
defer uncompressedStream.Close()

uncompressedSize := h.Size
if isCompressed {
uncompressedSize, err = io.Copy(ioutil.Discard, uncompressedStream)
if err != nil {
return nil, errors.Wrapf(err, "Error reading %s to find its size", h.Name)
}
}
li.size = uncompressedSize
delete(unknownLayerSizes, h.Name)
}
}
Expand Down Expand Up @@ -346,16 +379,22 @@ func (s *Source) GetManifest(ctx context.Context, instanceDigest *digest.Digest)
return s.generatedManifest, manifest.DockerV2Schema2MediaType, nil
}

type readCloseWrapper struct {
// uncompressedReadCloser is an io.ReadCloser that closes both the uncompressed stream and the underlying input.
type uncompressedReadCloser struct {
io.Reader
closeFunc func() error
underlyingCloser func() error
uncompressedCloser func() error
}

func (r readCloseWrapper) Close() error {
if r.closeFunc != nil {
return r.closeFunc()
func (r uncompressedReadCloser) Close() error {
var res error
if err := r.uncompressedCloser(); err != nil {
res = err
}
return nil
if err := r.underlyingCloser(); err != nil && res == nil {
res = err
}
return res
}

// GetBlob returns a stream for the specified blob, and the blob’s size (or -1 if unknown).
Expand All @@ -369,10 +408,16 @@ func (s *Source) GetBlob(ctx context.Context, info types.BlobInfo) (io.ReadClose
}

if li, ok := s.knownLayers[info.Digest]; ok { // diffID is a digest of the uncompressed tarball,
stream, err := s.openTarComponent(li.path)
underlyingStream, err := s.openTarComponent(li.path)
if err != nil {
return nil, 0, err
}
closeUnderlyingStream := true
defer func() {
if closeUnderlyingStream {
underlyingStream.Close()
}
}()

// In order to handle the fact that digests != diffIDs (and thus that a
// caller which is trying to verify the blob will run into problems),
Expand All @@ -386,22 +431,17 @@ func (s *Source) GetBlob(ctx context.Context, info types.BlobInfo) (io.ReadClose
// be verifing a "digest" which is not the actual layer's digest (but
// is instead the DiffID).

decompressFunc, reader, err := compression.DetectCompression(stream)
uncompressedStream, _, err := compression.AutoDecompress(underlyingStream)
if err != nil {
return nil, 0, errors.Wrapf(err, "Detecting compression in blob %s", info.Digest)
}

if decompressFunc != nil {
reader, err = decompressFunc(reader)
if err != nil {
return nil, 0, errors.Wrapf(err, "Decompressing blob %s stream", info.Digest)
}
return nil, 0, errors.Wrapf(err, "Error auto-decompressing blob %s", info.Digest)
}

newStream := readCloseWrapper{
Reader: reader,
closeFunc: stream.Close,
newStream := uncompressedReadCloser{
Reader: uncompressedStream,
underlyingCloser: underlyingStream.Close,
uncompressedCloser: uncompressedStream.Close,
}
closeUnderlyingStream = false

return newStream, li.size, nil
}
Expand Down
41 changes: 34 additions & 7 deletions pkg/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,34 @@ import (
"compress/bzip2"
"compress/gzip"
"io"
"io/ioutil"

"github.com/pkg/errors"

"github.com/sirupsen/logrus"
"github.com/ulikunitz/xz"
)

// DecompressorFunc returns the decompressed stream, given a compressed stream.
type DecompressorFunc func(io.Reader) (io.Reader, error)
// The caller must call Close() on the decompressed stream (even if the compressed input stream does not need closing!).
type DecompressorFunc func(io.Reader) (io.ReadCloser, error)

// GzipDecompressor is a DecompressorFunc for the gzip compression algorithm.
func GzipDecompressor(r io.Reader) (io.Reader, error) {
func GzipDecompressor(r io.Reader) (io.ReadCloser, error) {
return gzip.NewReader(r)
}

// Bzip2Decompressor is a DecompressorFunc for the bzip2 compression algorithm.
func Bzip2Decompressor(r io.Reader) (io.Reader, error) {
return bzip2.NewReader(r), nil
func Bzip2Decompressor(r io.Reader) (io.ReadCloser, error) {
return ioutil.NopCloser(bzip2.NewReader(r)), nil
}

// XzDecompressor is a DecompressorFunc for the xz compression algorithm.
func XzDecompressor(r io.Reader) (io.Reader, error) {
return nil, errors.New("Decompressing xz streams is not supported")
func XzDecompressor(r io.Reader) (io.ReadCloser, error) {
r, err := xz.NewReader(r)
if err != nil {
return nil, err
}
return ioutil.NopCloser(r), nil
}

// compressionAlgos is an internal implementation detail of DetectCompression
Expand Down Expand Up @@ -65,3 +71,24 @@ func DetectCompression(input io.Reader) (DecompressorFunc, io.Reader, error) {

return decompressor, io.MultiReader(bytes.NewReader(buffer[:n]), input), nil
}

// AutoDecompress takes a stream and returns an uncompressed version of the
// same stream.
// The caller must call Close() on the returned stream (even if the input does not need,
// or does not even support, closing!).
func AutoDecompress(stream io.Reader) (io.ReadCloser, bool, error) {
decompressor, stream, err := DetectCompression(stream)
if err != nil {
return nil, false, errors.Wrapf(err, "Error detecting compression")
}
var res io.ReadCloser
if decompressor != nil {
res, err = decompressor(stream)
if err != nil {
return nil, false, errors.Wrapf(err, "Error initializing decompression")
}
} else {
res = ioutil.NopCloser(stream)
}
return res, decompressor != nil, nil
}
95 changes: 68 additions & 27 deletions pkg/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,52 @@ import (
)

func TestDetectCompression(t *testing.T) {
cases := []struct {
filename string
unimplemented bool
}{
{"fixtures/Hello.uncompressed", false},
{"fixtures/Hello.gz", false},
{"fixtures/Hello.bz2", false},
{"fixtures/Hello.xz", true},
cases := []string{
"fixtures/Hello.uncompressed",
"fixtures/Hello.gz",
"fixtures/Hello.bz2",
"fixtures/Hello.xz",
}

// The original stream is preserved.
for _, c := range cases {
originalContents, err := ioutil.ReadFile(c.filename)
require.NoError(t, err, c.filename)
originalContents, err := ioutil.ReadFile(c)
require.NoError(t, err, c)

stream, err := os.Open(c.filename)
require.NoError(t, err, c.filename)
stream, err := os.Open(c)
require.NoError(t, err, c)
defer stream.Close()

_, updatedStream, err := DetectCompression(stream)
require.NoError(t, err, c.filename)
require.NoError(t, err, c)

updatedContents, err := ioutil.ReadAll(updatedStream)
require.NoError(t, err, c.filename)
assert.Equal(t, originalContents, updatedContents, c.filename)
require.NoError(t, err, c)
assert.Equal(t, originalContents, updatedContents, c)
}

// The correct decompressor is chosen, and the result is as expected.
for _, c := range cases {
stream, err := os.Open(c.filename)
require.NoError(t, err, c.filename)
stream, err := os.Open(c)
require.NoError(t, err, c)
defer stream.Close()

decompressor, updatedStream, err := DetectCompression(stream)
require.NoError(t, err, c.filename)
require.NoError(t, err, c)

var uncompressedStream io.Reader
switch {
case decompressor == nil:
if decompressor == nil {
uncompressedStream = updatedStream
case c.unimplemented:
_, err := decompressor(updatedStream)
assert.Error(t, err)
continue
default:
} else {
s, err := decompressor(updatedStream)
require.NoError(t, err)
defer s.Close()
uncompressedStream = s
}

uncompressedContents, err := ioutil.ReadAll(uncompressedStream)
require.NoError(t, err, c.filename)
assert.Equal(t, []byte("Hello"), uncompressedContents, c.filename)
require.NoError(t, err, c)
assert.Equal(t, []byte("Hello"), uncompressedContents, c)
}

// Empty input is handled reasonably.
Expand All @@ -84,3 +77,51 @@ func TestDetectCompression(t *testing.T) {
_, _, err = DetectCompression(reader)
assert.Error(t, err)
}

func TestAutoDecompress(t *testing.T) {
cases := []struct {
filename string
isCompressed bool
}{
{"fixtures/Hello.uncompressed", false},
{"fixtures/Hello.gz", true},
{"fixtures/Hello.bz2", true},
{"fixtures/Hello.xz", true},
}

// The correct decompressor is chosen, and the result is as expected.
for _, c := range cases {
stream, err := os.Open(c.filename)
require.NoError(t, err, c.filename)
defer stream.Close()

uncompressedStream, isCompressed, err := AutoDecompress(stream)
require.NoError(t, err, c.filename)
defer uncompressedStream.Close()

assert.Equal(t, c.isCompressed, isCompressed)

uncompressedContents, err := ioutil.ReadAll(uncompressedStream)
require.NoError(t, err, c.filename)
assert.Equal(t, []byte("Hello"), uncompressedContents, c.filename)
}

// Empty input is handled reasonably.
uncompressedStream, isCompressed, err := AutoDecompress(bytes.NewReader([]byte{}))
require.NoError(t, err)
assert.False(t, isCompressed)
uncompressedContents, err := ioutil.ReadAll(uncompressedStream)
require.NoError(t, err)
assert.Equal(t, []byte{}, uncompressedContents)

// Error initializing a decompressor (for a detected format)
uncompressedStream, isCompressed, err = AutoDecompress(bytes.NewReader([]byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}))
assert.Error(t, err)

// Error reading input
reader, writer := io.Pipe()
defer reader.Close()
writer.CloseWithError(errors.New("Expected error reading input in AutoDecompress"))
_, _, err = AutoDecompress(reader)
assert.Error(t, err)
}
Loading

0 comments on commit c6e0eee

Please sign in to comment.