Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s2: Add full Snappy output support #396

Merged
merged 4 commits into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 78 additions & 7 deletions s2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,30 @@ S2 is aimed for high throughput, which is why it features concurrent compression
Decoding is compatible with Snappy compressed content, but content compressed with S2 cannot be decompressed by Snappy.
This means that S2 can seamlessly replace Snappy without converting compressed content.

S2 can produce Snappy compatible output, faster and better than Snappy.
If you want full benefit of the changes you should use s2 without Snappy compatibility.

S2 is designed to have high throughput on content that cannot be compressed.
This is important, so you don't have to worry about spending CPU cycles on already compressed data.

## Benefits over Snappy

* Better compression
* Adjustable compression (3 levels)
* Concurrent stream compression
* Faster decompression
* Faster decompression, even for Snappy compatible content
* Ability to quickly skip forward in compressed stream
* Compatible with reading Snappy compressed content
* Offers alternative, more efficient, but slightly slower compression mode.
* Smaller block size overhead on incompressible blocks.
* Smaller block size overhead on incompressible blocks
* Block concatenation
* Automatic stream size padding.
* Snappy compatible block compression.
* Uncompressed stream mode
* Automatic stream size padding
* Snappy compatible block compression

## Drawbacks over Snappy

* Not optimized for 32 bit systems.
* Uses slightly more memory (4MB per core) due to larger blocks and concurrency (configurable).
* Streams use slightly more memory due to larger blocks and concurrency (configurable).

# Usage

Expand Down Expand Up @@ -150,7 +154,7 @@ To build binaries to the current folder use:
Usage: s2c [options] file1 file2
Compresses all files supplied as input separately.
Output files are written as 'filename.ext.s2'.
Output files are written as 'filename.ext.s2' or 'filename.ext.snappy'.
By default output files will be overwritten.
Use - as the only file name to read from stdin and write to stdout.
Expand All @@ -172,6 +176,8 @@ Options:
Compress faster, but with a minor compression loss
-help
Display help
-o string
Write output to another file. Single input file only
-pad string
Pad size to a multiple of this value, Examples: 500, 64K, 256K, 1M, 4M, etc (default "1")
-q Don't write any output to terminal, except errors
Expand All @@ -181,6 +187,8 @@ Options:
Do not overwrite output files
-slower
Compress more, but a lot slower
-snappy
Generate Snappy compatible output stream
-verify
Verify written files
Expand All @@ -207,6 +215,8 @@ Options:
-c Write all output to stdout. Multiple input files will be concatenated
-help
Display help
-o string
Write output to another file. Single input file only
-q Don't write any output to terminal, except errors
-rm
Delete source file(s) after successful decompression
Expand Down Expand Up @@ -595,6 +605,67 @@ Best... 10737418240 -> 4244773384 [39.53%]; 42.96s, 238.4MB/s

Decompression speed should be around the same as using the 'better' compression mode.

# Snappy Compatibility

S2 now offers full compatibility with Snappy.

This means that the efficient encoders of S2 can be used to generate fully Snappy compatible output.

## Blocks

Snappy compatible blocks can be generated with the S2 encoder.
Compression and speed is typically a bit better `MaxEncodedLen` is also smaller for smaller memory usage. Replace

| Snappy | S2 replacement |
|----------------------------|-------------------------|
| snappy.Encode(...) | s2.EncodeSnappy(...)` |
| snappy.MaxEncodedLen(...) | s2.MaxEncodedLen(...) |

`s2.EncodeSnappy` can be replaced with `s2.EncodeSnappyBetter` or `s2.EncodeSnappyBest` to get more efficiently compressed snappy compatible output.

`s2ConcatBlocks` is compatible with snappy blocks.

Comparison of [`webdevdata.org-2015-01-07-subset`](https://files.klauspost.com/compress/webdevdata.org-2015-01-07-4GB-subset.7z),
53927 files, total input size: 4,014,735,833 bytes. amd64, single goroutine used:

| Encoder | Size | MB/s |
|-----------------------|------------|--------|
| snappy.Encode | 1128706759 | 725.59 |
| s2.EncodeSnappy | 1093823291 | 899.16 |
| s2.EncodeSnappyBetter | 1001158548 | 578.49 |
| s2.EncodeSnappyBest | 944507998 | 66.00 |

## Streams

For streams, replace `enc = snappy.NewWriter(w)` with `enc = s2.NewWriter(w, s2.WriterSnappyCompat())`.
All other options are available, but note that block size limit is different for snappy.

Comparison of different streams, AMD Ryzen 3950x, 16 cores. Size and throughput:

| File | snappy.NewWriter | S2 Snappy | S2 Snappy, Better | S2 Snappy, Best |
|-----------------------------|--------------------------|---------------------------|--------------------------|-------------------------|
| nyc-taxi-data-10M.csv | 1316042016 - 517.54MB/s | 1307003093 - 8406.29MB/s | 1174534014 - 4984.35MB/s | 1115904679 - 177.81MB/s |
| enwik10 | 5088294643 - 433.45MB/s | 5175840939 - 8454.52MB/s | 4560784526 - 4403.10MB/s | 4340299103 - 159.71MB/s |
| 10gb.tar | 6056946612 - 703.25MB/s | 6208571995 - 9035.75MB/s | 5741646126 - 2402.08MB/s | 5548973895 - 171.17MB/s |
| github-june-2days-2019.json | 1525176492 - 908.11MB/s | 1476519054 - 12625.93MB/s | 1400547532 - 6163.61MB/s | 1321887137 - 200.71MB/s |
| consensus.db.10gb | 5412897703 - 1054.38MB/s | 5354073487 - 12634.82MB/s | 5335069899 - 2472.23MB/s | 5201000954 - 166.32MB/s |

# Decompression

All decompression functions map directly to equivalent s2 functions.

| Snappy | S2 replacement |
|------------------------|--------------------|
| snappy.Decode(...) | s2.Decode(...) |
| snappy.DecodedLen(...) | s2.DecodedLen(...) |
| snappy.NewReader(...) | s2.NewReader(...) |

Features like [quick forward skipping without decompression](https://pkg.go.dev/github.com/klauspost/compress/s2#Reader.Skip)
are also available for Snappy streams.

If you know you are only decompressing snappy streams, setting [`ReaderMaxBlockSize(64<<10)`](https://pkg.go.dev/github.com/klauspost/compress/s2#ReaderMaxBlockSize)
on your Reader will reduce memory consumption.

# Concatenating blocks and streams.

Concatenating streams will concatenate the output of both without recompressing them.
Expand Down
15 changes: 12 additions & 3 deletions s2/_generate/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,17 @@ func main() {
o.snappy = true
o.outputMargin = 9
o.genEncodeBlockAsm("encodeSnappyBlockAsm", 14, 6, 6, limit14B)
o.genEncodeBlockAsm("encodeSnappyBlockAsm64K", 14, 6, 6, 64<<10-1)
o.genEncodeBlockAsm("encodeSnappyBlockAsm12B", 12, 5, 5, limit12B)
o.genEncodeBlockAsm("encodeSnappyBlockAsm10B", 10, 5, 4, limit10B)
o.genEncodeBlockAsm("encodeSnappyBlockAsm8B", 8, 4, 4, limit8B)

o.genEncodeBetterBlockAsm("encodeSnappyBetterBlockAsm", 16, 7, 7, limit14B)
o.genEncodeBetterBlockAsm("encodeSnappyBetterBlockAsm64K", 16, 7, 7, 64<<10-1)
o.genEncodeBetterBlockAsm("encodeSnappyBetterBlockAsm12B", 14, 6, 6, limit12B)
o.genEncodeBetterBlockAsm("encodeSnappyBetterBlockAsm10B", 12, 5, 6, limit10B)
o.genEncodeBetterBlockAsm("encodeSnappyBetterBlockAsm8B", 10, 4, 6, limit8B)

o.snappy = false
o.outputMargin = 0
o.maxLen = math.MaxUint32
Expand Down Expand Up @@ -1215,8 +1222,10 @@ func (o options) genEncodeBetterBlockAsm(name string, lTableBits, skipLog, lHash
MOVL(s, offset32)
SUBL(candidate, offset32)
Comment("Check if repeat")
CMPL(repeatL, offset32)
JEQ(LabelRef("match_is_repeat_" + name))
if !o.snappy {
CMPL(repeatL, offset32)
JEQ(LabelRef("match_is_repeat_" + name))
}

// NOT REPEAT
{
Expand Down Expand Up @@ -1246,7 +1255,7 @@ func (o options) genEncodeBetterBlockAsm(name string, lTableBits, skipLog, lHash
// Jumps at end
}
// REPEAT
{
if !o.snappy {
Label("match_is_repeat_" + name)
// Emit....
o.emitLiteralsDstP(nextEmitL, base, src, dst, "match_emit_repeat_"+name)
Expand Down
39 changes: 34 additions & 5 deletions s2/cmd/s2c/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
var (
faster = flag.Bool("faster", false, "Compress faster, but with a minor compression loss")
slower = flag.Bool("slower", false, "Compress more, but a lot slower")
snappy = flag.Bool("snappy", false, "Generate Snappy compatible output stream")
cpu = flag.Int("cpu", runtime.GOMAXPROCS(0), "Compress using this amount of threads")
blockSize = flag.String("blocksize", "4M", "Max block size. Examples: 64K, 256K, 1M, 4M. Must be power of two and <= 4MB")
safe = flag.Bool("safe", false, "Do not overwrite output files")
padding = flag.String("pad", "1", "Pad size to a multiple of this value, Examples: 500, 64K, 256K, 1M, 4M, etc")
stdout = flag.Bool("c", false, "Write all output to stdout. Multiple input files will be concatenated")
out = flag.String("o", "", "Write output to another file. Single input file only")
remove = flag.Bool("rm", false, "Delete source file(s) after successful compression")
quiet = flag.Bool("q", false, "Don't write any output to terminal, except errors")
bench = flag.Int("bench", 0, "Run benchmark n times. No output will be written")
Expand Down Expand Up @@ -63,11 +65,11 @@ func main() {
if len(args) == 0 || *help || (*slower && *faster) {
_, _ = fmt.Fprintf(os.Stderr, "s2 compress v%v, built at %v.\n\n", version, date)
_, _ = fmt.Fprintf(os.Stderr, "Copyright (c) 2011 The Snappy-Go Authors. All rights reserved.\n"+
"Copyright (c) 2019 Klaus Post. All rights reserved.\n\n")
"Copyright (c) 2019+ Klaus Post. All rights reserved.\n\n")
_, _ = fmt.Fprintln(os.Stderr, `Usage: s2c [options] file1 file2
Compresses all files supplied as input separately.
Output files are written as 'filename.ext.s2'.
Output files are written as 'filename.ext.s2' or 'filename.ext.snappy'.
By default output files will be overwritten.
Use - as the only file name to read from stdin and write to stdout.
Expand All @@ -88,14 +90,32 @@ Options:`)
if *slower {
opts = append(opts, s2.WriterBestCompression())
}
if *snappy {
opts = append(opts, s2.WriterSnappyCompat())
}
wr := s2.NewWriter(nil, opts...)

// No args, use stdin/stdout
if len(args) == 1 && args[0] == "-" {
// Catch interrupt, so we don't exit at once.
// os.Stdin will return EOF, so we should be able to get everything.
signal.Notify(make(chan os.Signal, 1), os.Interrupt)
wr.Reset(os.Stdout)
if len(*out) == 0 {
wr.Reset(os.Stdout)
} else {
if *safe {
_, err := os.Stat(*out)
if !os.IsNotExist(err) {
exitErr(errors.New("destination file exists"))
}
}
dstFile, err := os.OpenFile(*out, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm)
exitErr(err)
defer dstFile.Close()
bw := bufio.NewWriterSize(dstFile, int(sz)*2)
defer bw.Flush()
wr.Reset(bw)
}
_, err = wr.ReadFrom(os.Stdin)
printErr(err)
printErr(wr.Close())
Expand Down Expand Up @@ -216,11 +236,20 @@ Options:`)
}
os.Exit(0)
}

ext := ".s2"
if *snappy {
ext = ".snappy"
}
if *out != "" && len(files) > 1 {
exitErr(errors.New("-out parameter can only be used with one input"))
}
for _, filename := range files {
func() {
var closeOnce sync.Once
dstFilename := cleanFileName(fmt.Sprintf("%s%s", filename, ".s2"))
dstFilename := cleanFileName(fmt.Sprintf("%s%s", filename, ext))
if *out != "" {
dstFilename = *out
}
if !*quiet {
fmt.Print("Compressing ", filename, " -> ", dstFilename)
}
Expand Down
36 changes: 30 additions & 6 deletions s2/cmd/s2d/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
quiet = flag.Bool("q", false, "Don't write any output to terminal, except errors")
bench = flag.Int("bench", 0, "Run benchmark n times. No output will be written")
help = flag.Bool("help", false, "Display help")
out = flag.String("o", "", "Write output to another file. Single input file only")

version = "(dev)"
date = "(unknown)"
Expand All @@ -42,7 +43,7 @@ func main() {
if len(args) == 0 || *help {
_, _ = fmt.Fprintf(os.Stderr, "s2 decompress v%v, built at %v.\n\n", version, date)
_, _ = fmt.Fprintf(os.Stderr, "Copyright (c) 2011 The Snappy-Go Authors. All rights reserved.\n"+
"Copyright (c) 2019 Klaus Post. All rights reserved.\n\n")
"Copyright (c) 2019+ Klaus Post. All rights reserved.\n\n")
_, _ = fmt.Fprintln(os.Stderr, `Usage: s2d [options] file1 file2
Decompresses all files supplied as input. Input files must end with '.s2' or '.snappy'.
Expand All @@ -61,13 +62,30 @@ Options:`)
}
if len(args) == 1 && args[0] == "-" {
r.Reset(os.Stdin)
if !*verify {
_, err := io.Copy(os.Stdout, r)
exitErr(err)
} else {
if *verify {
_, err := io.Copy(ioutil.Discard, r)
exitErr(err)
return
}
if *out == "" {
_, err := io.Copy(os.Stdout, r)
exitErr(err)
return
}
dstFilename := *out
if *safe {
_, err := os.Stat(dstFilename)
if !os.IsNotExist(err) {
exitErr(errors.New("destination files exists"))
}
}
dstFile, err := os.OpenFile(dstFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm)
exitErr(err)
defer dstFile.Close()
bw := bufio.NewWriterSize(dstFile, 4<<20)
defer bw.Flush()
_, err = io.Copy(bw, r)
exitErr(err)
return
}
var files []string
Expand Down Expand Up @@ -134,9 +152,15 @@ Options:`)
os.Exit(0)
}

if *out != "" && len(files) > 1 {
exitErr(errors.New("-out parameter can only be used with one input"))
}

for _, filename := range files {
dstFilename := cleanFileName(filename)
switch {
case *out != "":
dstFilename = *out
case strings.HasSuffix(filename, ".s2"):
dstFilename = strings.TrimSuffix(dstFilename, ".s2")
case strings.HasSuffix(filename, ".snappy"):
Expand Down Expand Up @@ -176,7 +200,7 @@ Options:`)
case *stdout:
out = os.Stdout
default:
dstFile, err := os.OpenFile(dstFilename, os.O_CREATE|os.O_WRONLY, mode)
dstFile, err := os.OpenFile(dstFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, mode)
exitErr(err)
defer dstFile.Close()
bw := bufio.NewWriterSize(dstFile, 4<<20)
Expand Down
Loading