Skip to content

Commit

Permalink
archive: use pigz if available
Browse files Browse the repository at this point in the history
use the pigz command line tool when available as it is much faster to
decompress a gzip stream.

On my machine I've seen a 50% pull time reduction when pulling some
big images.

Signed-off-by: Giuseppe Scrivano <[email protected]>
  • Loading branch information
giuseppe committed Jun 11, 2024
1 parent 17cd45f commit b3319f0
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ func DecompressStream(archive io.Reader) (_ io.ReadCloser, Err error) {
readBufWrapper := p.NewReadCloserWrapper(buf, buf)
return readBufWrapper, nil
case Gzip:
cleanup := func() {
p.Put(buf)
}
if rc, canUse := tryProcFilter([]string{"pigz", "-d"}, buf, cleanup); canUse {
return rc, nil
}
gzReader, err := gzip.NewReader(buf)
if err != nil {
return nil, err
Expand Down
55 changes: 55 additions & 0 deletions pkg/archive/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package archive

import (
"bytes"
"fmt"
"io"
"os/exec"
"strings"
"sync"
)

var filterPath sync.Map

func getFilterPath(name string) string {
path, ok := filterPath.Load(name)
if ok {
return path.(string)
}

path, err := exec.LookPath(name)
if err != nil {
path = ""
}

filterPath.Store(name, path)
return path.(string)
}

// tryProcFilter tries to run the command specified in args, passing input to its stdin and returning its stdout.
// cleanup() is a caller provided function that will be called when the command finishes running, regardless of
// whether it succeeds or fails.
// If the command is not found, it returns (nil, false) and the cleanup function is not called.
func tryProcFilter(args []string, input io.Reader, cleanup func()) (io.ReadCloser, bool) {
path := getFilterPath(args[0])
if path == "" {
return nil, false
}

var stderrBuf bytes.Buffer

r, w := io.Pipe()
cmd := exec.Command(path, args[1:]...)
cmd.Stdin = input
cmd.Stdout = w
cmd.Stderr = &stderrBuf
go func() {
err := cmd.Run()
if err != nil && stderrBuf.Len() > 0 {
err = fmt.Errorf("%s: %w", strings.TrimRight(stderrBuf.String(), "\n"), err)
}
w.CloseWithError(err) // CloseWithErr(nil) == Close()
cleanup()
}()
return r, true
}
58 changes: 58 additions & 0 deletions pkg/archive/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package archive

import (
"bufio"
"bytes"
"io"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTryProcFilter(t *testing.T) {
t.Run("Invalid filter path", func(t *testing.T) {
args := []string{"does-not-exist"}
input := bufio.NewReader(bytes.NewBufferString("foo"))
result, ok := tryProcFilter(args, input, func() {})
assert.Nil(t, result)
assert.False(t, ok)
})

t.Run("Valid filter path", func(t *testing.T) {
inputData := "input data"

args := []string{"cat", "-"}
input := bufio.NewReader(bytes.NewBufferString(inputData))

result, ok := tryProcFilter(args, input, func() {})
assert.NotNil(t, result)
assert.True(t, ok)

output, err := io.ReadAll(result)
require.NoError(t, err)
assert.Equal(t, inputData, string(output))
})

t.Run("Filter fails with error", func(t *testing.T) {
inputData := "input data"

var cleanedUp atomic.Bool

args := []string{"sh", "-c", "echo 'oh no' 1>&2; exit 21"}
input := bufio.NewReader(bytes.NewBufferString(inputData))

result, ok := tryProcFilter(args, input, func() { cleanedUp.Store(true) })
assert.NotNil(t, result)
assert.True(t, ok)

_, err := io.ReadAll(result)
require.Error(t, err)
assert.Contains(t, err.Error(), "oh no: exit status 21")
assert.Eventually(t, func() bool {
return cleanedUp.Load()
}, 5*time.Second, 10*time.Millisecond, "clean up function was not called")
})
}

0 comments on commit b3319f0

Please sign in to comment.