Skip to content

Commit

Permalink
Merge pull request #1964 from giuseppe/use-decompressor-filters-if-av…
Browse files Browse the repository at this point in the history
…ailable

archive: use pigz|zstd if available
  • Loading branch information
openshift-merge-bot[bot] authored Jun 11, 2024
2 parents e87d2b4 + ae8836f commit 9fc521d
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 2 deletions.
30 changes: 28 additions & 2 deletions pkg/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,17 @@ func DetectCompression(source []byte) Compression {
}

// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
func DecompressStream(archive io.Reader) (_ io.ReadCloser, Err error) {
p := pools.BufioReader32KPool
buf := p.Get(archive)
bs, err := buf.Peek(10)

defer func() {
if Err != nil {
p.Put(buf)
}
}()

if err != nil && err != io.EOF {
// Note: we'll ignore any io.EOF error because there are some odd
// cases where the layer.tar file will be empty (zero bytes) and
Expand All @@ -189,6 +196,12 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
readBufWrapper := p.NewReadCloserWrapper(buf, buf)
return readBufWrapper, nil
case Gzip:
cleanup := func() {
p.Put(buf)
}
if rc, canUse := tryProcFilter([]string{"pigz", "-d"}, buf, cleanup); canUse {
return rc, nil
}
gzReader, err := gzip.NewReader(buf)
if err != nil {
return nil, err
Expand All @@ -207,16 +220,29 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
readBufWrapper := p.NewReadCloserWrapper(buf, xzReader)
return readBufWrapper, nil
case Zstd:
cleanup := func() {
p.Put(buf)
}
if rc, canUse := tryProcFilter([]string{"zstd", "-d"}, buf, cleanup); canUse {
return rc, nil
}
return zstdReader(buf)
default:
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
}
}

// CompressStream compresses the dest with specified compression algorithm.
func CompressStream(dest io.Writer, compression Compression) (io.WriteCloser, error) {
func CompressStream(dest io.Writer, compression Compression) (_ io.WriteCloser, Err error) {
p := pools.BufioWriter32KPool
buf := p.Get(dest)

defer func() {
if Err != nil {
p.Put(buf)
}
}()

switch compression {
case Uncompressed:
writeBufWrapper := p.NewWriteCloserWrapper(buf, buf)
Expand Down
55 changes: 55 additions & 0 deletions pkg/archive/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package archive

import (
"bytes"
"fmt"
"io"
"os/exec"
"strings"
"sync"
)

var filterPath sync.Map

func getFilterPath(name string) string {
path, ok := filterPath.Load(name)
if ok {
return path.(string)
}

path, err := exec.LookPath(name)
if err != nil {
path = ""
}

filterPath.Store(name, path)
return path.(string)
}

// tryProcFilter tries to run the command specified in args, passing input to its stdin and returning its stdout.
// cleanup() is a caller provided function that will be called when the command finishes running, regardless of
// whether it succeeds or fails.
// If the command is not found, it returns (nil, false) and the cleanup function is not called.
func tryProcFilter(args []string, input io.Reader, cleanup func()) (io.ReadCloser, bool) {
path := getFilterPath(args[0])
if path == "" {
return nil, false
}

var stderrBuf bytes.Buffer

r, w := io.Pipe()
cmd := exec.Command(path, args[1:]...)
cmd.Stdin = input
cmd.Stdout = w
cmd.Stderr = &stderrBuf
go func() {
err := cmd.Run()
if err != nil && stderrBuf.Len() > 0 {
err = fmt.Errorf("%s: %w", strings.TrimRight(stderrBuf.String(), "\n"), err)
}
w.CloseWithError(err) // CloseWithErr(nil) == Close()
cleanup()
}()
return r, true
}
58 changes: 58 additions & 0 deletions pkg/archive/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package archive

import (
"bufio"
"bytes"
"io"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTryProcFilter(t *testing.T) {
t.Run("Invalid filter path", func(t *testing.T) {
args := []string{"does-not-exist"}
input := bufio.NewReader(bytes.NewBufferString("foo"))
result, ok := tryProcFilter(args, input, func() {})
assert.Nil(t, result)
assert.False(t, ok)
})

t.Run("Valid filter path", func(t *testing.T) {
inputData := "input data"

args := []string{"cat", "-"}
input := bufio.NewReader(bytes.NewBufferString(inputData))

result, ok := tryProcFilter(args, input, func() {})
assert.NotNil(t, result)
assert.True(t, ok)

output, err := io.ReadAll(result)
require.NoError(t, err)
assert.Equal(t, inputData, string(output))
})

t.Run("Filter fails with error", func(t *testing.T) {
inputData := "input data"

var cleanedUp atomic.Bool

args := []string{"sh", "-c", "echo 'oh no' 1>&2; exit 21"}
input := bufio.NewReader(bytes.NewBufferString(inputData))

result, ok := tryProcFilter(args, input, func() { cleanedUp.Store(true) })
assert.NotNil(t, result)
assert.True(t, ok)

_, err := io.ReadAll(result)
require.Error(t, err)
assert.Contains(t, err.Error(), "oh no: exit status 21")
assert.Eventually(t, func() bool {
return cleanedUp.Load()
}, 5*time.Second, 10*time.Millisecond, "clean up function was not called")
})
}

0 comments on commit 9fc521d

Please sign in to comment.