Skip to content

Commit

Permalink
compression: check level errors ahead of time
Browse files Browse the repository at this point in the history
We now no longer obey gzip compression level 0, which is "no
compression" but just adds compression headers. Instead, we conert that
to gzip.DefaultCompresion.

We now check level errors once before the sync.Pool's, rather than
inside the sync.Pool function.

Also makes the compress benchmark better.
  • Loading branch information
twmb committed Mar 1, 2022
1 parent d11066f commit b7a6f5a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 42 deletions.
67 changes: 31 additions & 36 deletions pkg/kgo/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ import (
"github.com/pierrec/lz4/v4"
)

// NOTE: level configuration was removed at some point due to it likely being
// more configuration than necessary; we may add level options as new functions
// down the line. The code below supports levels; zstd levels will need wiring
// in and levels will need validating.

// sliceWriter a reusable slice as an io.Writer
type sliceWriter struct{ inner []byte }

Expand Down Expand Up @@ -109,47 +104,47 @@ out:
case 0:
break out
case 1:
level := codec.level
if _, err := gzip.NewWriterLevel(nil, int(level)); err != nil {
level = gzip.DefaultCompression
level := gzip.DefaultCompression
if codec.level != 0 {
if _, err := gzip.NewWriterLevel(nil, int(codec.level)); err != nil {
level = int(codec.level)
}
}
c.gzPool = sync.Pool{New: func() interface{} { c, _ := gzip.NewWriterLevel(nil, int(level)); return c }}
c.gzPool = sync.Pool{New: func() interface{} { c, _ := gzip.NewWriterLevel(nil, level); return c }}
case 3:
level := codec.level
if level < 0 {
level = 0
level = 0 // 0 == lz4.Fast
}
c.lz4Pool = sync.Pool{
New: func() interface{} {
fn := func() interface{} { return lz4.NewWriter(new(bytes.Buffer)) }
w := lz4.NewWriter(new(bytes.Buffer))
if err := w.Apply(lz4.CompressionLevelOption(lz4.CompressionLevel(level))); err == nil {
fn = func() interface{} {
w := lz4.NewWriter(new(bytes.Buffer))
if err := w.Apply(lz4.CompressionLevelOption(lz4.CompressionLevel(level))); err != nil {
w.Close()
w = lz4.NewWriter(nil)
}
w.Apply(lz4.CompressionLevelOption(lz4.CompressionLevel(level)))
return w
},
}
}
w.Close()
c.lz4Pool = sync.Pool{New: fn}
case 4:
level := zstd.EncoderLevel(codec.level)
c.zstdPool = sync.Pool{
New: func() interface{} {
zstdEnc, err := zstd.NewWriter(nil,
zstd.WithEncoderLevel(level),
zstd.WithWindowSize(64<<10),
zstd.WithEncoderConcurrency(1),
zstd.WithZeroFrames(true),
)
if err != nil {
zstdEnc, _ = zstd.NewWriter(nil,
zstd.WithEncoderConcurrency(1))
}
r := &zstdEncoder{zstdEnc}
runtime.SetFinalizer(r, func(r *zstdEncoder) {
r.inner.Close()
})
return r
},
opts := []zstd.EOption{
zstd.WithWindowSize(64 << 10),
zstd.WithEncoderConcurrency(1),
zstd.WithZeroFrames(true),
}
fn := func() interface{} {
zstdEnc, _ := zstd.NewWriter(nil, opts...)
r := &zstdEncoder{zstdEnc}
runtime.SetFinalizer(r, func(r *zstdEncoder) { r.inner.Close() })
return r
}
zstdEnc, err := zstd.NewWriter(nil, append(opts, zstd.WithEncoderLevel(zstd.EncoderLevel(codec.level)))...)
if err == nil {
zstdEnc.Close()
opts = append(opts, zstd.WithEncoderLevel(zstd.EncoderLevel(codec.level)))
}
c.zstdPool = sync.Pool{New: fn}
}
}

Expand Down
20 changes: 14 additions & 6 deletions pkg/kgo/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kgo
import (
"bytes"
"encoding/base64"
"fmt"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -95,12 +96,19 @@ func TestCompressDecompress(t *testing.T) {
}

func BenchmarkCompress(b *testing.B) {
c, _ := newCompressor(CompressionCodec{codec: 2}) // snappy
in := []byte("foo")
for i := 0; i < b.N; i++ {
w := sliceWriters.Get().(*sliceWriter)
c.compress(w, in, 0)
sliceWriters.Put(w)
in := bytes.Repeat([]byte("abcdefghijklmno pqrs tuvwxy z"), 100)
for _, codec := range []int8{1, 2, 3, 4} {
c, _ := newCompressor(CompressionCodec{codec: codec}) // snappy
b.Run(fmt.Sprint(codec), func(b *testing.B) {
var afterSize int
for i := 0; i < b.N; i++ {
w := sliceWriters.Get().(*sliceWriter)
after, _ := c.compress(w, in, 99)
afterSize = len(after)
sliceWriters.Put(w)
}
b.Logf("%d => %d", len(in), afterSize)
})
}
}

Expand Down

0 comments on commit b7a6f5a

Please sign in to comment.