Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archive: use pigz|zstd if available #1964

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions pkg/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,17 @@ func DetectCompression(source []byte) Compression {
}

// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
func DecompressStream(archive io.Reader) (_ io.ReadCloser, Err error) {
p := pools.BufioReader32KPool
buf := p.Get(archive)
bs, err := buf.Peek(10)

defer func() {
if Err != nil {
p.Put(buf)
}
}()

if err != nil && err != io.EOF {
// Note: we'll ignore any io.EOF error because there are some odd
// cases where the layer.tar file will be empty (zero bytes) and
Expand All @@ -189,6 +196,12 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
readBufWrapper := p.NewReadCloserWrapper(buf, buf)
return readBufWrapper, nil
case Gzip:
cleanup := func() {
p.Put(buf)
}
if rc, canUse := tryProcFilter([]string{"pigz", "-d"}, buf, cleanup); canUse {
return rc, nil
}
gzReader, err := gzip.NewReader(buf)
if err != nil {
return nil, err
Expand All @@ -207,16 +220,29 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
readBufWrapper := p.NewReadCloserWrapper(buf, xzReader)
return readBufWrapper, nil
case Zstd:
cleanup := func() {
p.Put(buf)
}
if rc, canUse := tryProcFilter([]string{"zstd", "-d"}, buf, cleanup); canUse {
return rc, nil
}
return zstdReader(buf)
default:
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
}
}

// CompressStream compresses the dest with specified compression algorithm.
func CompressStream(dest io.Writer, compression Compression) (io.WriteCloser, error) {
func CompressStream(dest io.Writer, compression Compression) (_ io.WriteCloser, Err error) {
p := pools.BufioWriter32KPool
buf := p.Get(dest)

defer func() {
if Err != nil {
p.Put(buf)
}
}()

switch compression {
case Uncompressed:
writeBufWrapper := p.NewWriteCloserWrapper(buf, buf)
Expand Down
55 changes: 55 additions & 0 deletions pkg/archive/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package archive

import (
"bytes"
"fmt"
"io"
"os/exec"
"strings"
"sync"
)

var filterPath sync.Map

func getFilterPath(name string) string {
path, ok := filterPath.Load(name)
if ok {
return path.(string)
}

path, err := exec.LookPath(name)
if err != nil {
path = ""
}

filterPath.Store(name, path)
return path.(string)
}

// tryProcFilter tries to run the command specified in args, passing input to its stdin and returning its stdout.
// cleanup() is a caller provided function that will be called when the command finishes running, regardless of
// whether it succeeds or fails.
// If the command is not found, it returns (nil, false) and the cleanup function is not called.
func tryProcFilter(args []string, input io.Reader, cleanup func()) (io.ReadCloser, bool) {
mtrmac marked this conversation as resolved.
Show resolved Hide resolved
path := getFilterPath(args[0])
if path == "" {
return nil, false
}

var stderrBuf bytes.Buffer

r, w := io.Pipe()
cmd := exec.Command(path, args[1:]...)
cmd.Stdin = input
cmd.Stdout = w
cmd.Stderr = &stderrBuf
go func() {
err := cmd.Run()
if err != nil && stderrBuf.Len() > 0 {
err = fmt.Errorf("%s: %w", strings.TrimRight(stderrBuf.String(), "\n"), err)
}
w.CloseWithError(err) // CloseWithErr(nil) == Close()
cleanup()
}()
return r, true
}
58 changes: 58 additions & 0 deletions pkg/archive/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package archive

import (
"bufio"
"bytes"
"io"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTryProcFilter(t *testing.T) {
t.Run("Invalid filter path", func(t *testing.T) {
args := []string{"does-not-exist"}
input := bufio.NewReader(bytes.NewBufferString("foo"))
result, ok := tryProcFilter(args, input, func() {})
assert.Nil(t, result)
assert.False(t, ok)
})

t.Run("Valid filter path", func(t *testing.T) {
inputData := "input data"

args := []string{"cat", "-"}
input := bufio.NewReader(bytes.NewBufferString(inputData))

result, ok := tryProcFilter(args, input, func() {})
assert.NotNil(t, result)
assert.True(t, ok)

output, err := io.ReadAll(result)
require.NoError(t, err)
assert.Equal(t, inputData, string(output))
})

t.Run("Filter fails with error", func(t *testing.T) {
inputData := "input data"

var cleanedUp atomic.Bool

args := []string{"sh", "-c", "echo 'oh no' 1>&2; exit 21"}
input := bufio.NewReader(bytes.NewBufferString(inputData))

result, ok := tryProcFilter(args, input, func() { cleanedUp.Store(true) })
assert.NotNil(t, result)
assert.True(t, ok)

_, err := io.ReadAll(result)
require.Error(t, err)
assert.Contains(t, err.Error(), "oh no: exit status 21")
assert.Eventually(t, func() bool {
return cleanedUp.Load()
}, 5*time.Second, 10*time.Millisecond, "clean up function was not called")
})
}