Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s2: Add concurrent stream decompression #602

Merged
merged 5 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions s2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This is important, so you don't have to worry about spending CPU cycles on alrea
* Adjustable compression (3 levels)
* Concurrent stream compression
* Faster decompression, even for Snappy compatible content
* Concurrent Snappy/S2 stream decompression
* Ability to quickly skip forward in compressed stream
* Random seeking with indexes
* Compatible with reading Snappy compressed content
Expand Down Expand Up @@ -415,6 +416,25 @@ Without assembly decompression is also very fast; single goroutine decompression

Even though S2 typically compresses better than Snappy, decompression speed is always better.

### Concurrent Stream Decompression

For full stream decompression S2 offers a [DecodeConcurrent](https://pkg.go.dev/github.com/klauspost/compress/s2#Reader.DecodeConcurrent)
that will decode a full stream using multiple goroutines.

Example scaling, AMD Ryzen 3950X, 16 cores, decompression using `s2d -bench=3 <input>`, best of 3:

| Input | `-cpu=1` | `-cpu=2` | `-cpu=4` | `-cpu=8` | `-cpu=16` |
|-------------------------------------------|------------|------------|------------|------------|-------------|
| enwik10.snappy | 1098.6MB/s | 1819.8MB/s | 3625.6MB/s | 6910.6MB/s | 10818.2MB/s |
| enwik10.s2 | 1303.5MB/s | 2606.1MB/s | 4847.9MB/s | 8878.4MB/s | 9592.1MB/s |
| sofia-air-quality-dataset.tar.snappy | 1302.0MB/s | 2165.0MB/s | 4244.5MB/s | 8241.0MB/s | 12920.5MB/s |
| sofia-air-quality-dataset.tar.s2 | 1399.2MB/s | 2463.2MB/s | 5196.5MB/s | 9639.8MB/s | 11439.5MB/s |
| sofia-air-quality-dataset.tar.s2 (no asm) | 837.5MB/s | 1652.6MB/s | 3183.6MB/s | 5945.0MB/s | 9620.7MB/s |

Scaling can be expected to be pretty linear until memory bandwidth is saturated.

For now the DecodeConcurrent can only be used for full streams without seeking or combining with regular reads.

## Block compression


Expand Down
16 changes: 14 additions & 2 deletions s2/cmd/s2d/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io/ioutil"
"net/http"
"os"
"runtime"
"runtime/debug"
"strconv"
"strings"
Expand All @@ -34,6 +35,7 @@ var (
help = flag.Bool("help", false, "Display help")
out = flag.String("o", "", "Write output to another file. Single input file only")
block = flag.Bool("block", false, "Decompress as a single block. Will load content into memory.")
cpu = flag.Int("cpu", runtime.NumCPU(), "Decompress streams using this amount of threads")

version = "(dev)"
date = "(unknown)"
Expand Down Expand Up @@ -160,7 +162,11 @@ Options:`)
output = int64(len(dec))
} else {
r.Reset(bytes.NewBuffer(b))
output, err = io.Copy(ioutil.Discard, r)
if *cpu > 1 {
output, err = r.DecodeConcurrent(ioutil.Discard, *cpu)
} else {
output, err = io.Copy(ioutil.Discard, r)
}
exitErr(err)
}
if !*quiet {
Expand Down Expand Up @@ -286,7 +292,13 @@ Options:`)
}
decoded = r
}
output, err := io.Copy(out, decoded)
var err error
var output int64
if dec, ok := decoded.(*s2.Reader); ok && tailBytes == 0 && offset == 0 {
output, err = dec.DecodeConcurrent(out, *cpu)
} else {
output, err = io.Copy(out, decoded)
}
exitErr(err)
if !*quiet {
elapsed := time.Since(start)
Expand Down
262 changes: 259 additions & 3 deletions s2/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"fmt"
"io"
"io/ioutil"
"runtime"
"sync"
)

var (
Expand Down Expand Up @@ -196,13 +198,13 @@ type Reader struct {
// ensureBufferSize will ensure that the buffer can take at least n bytes.
// If false is returned the buffer exceeds maximum allowed size.
func (r *Reader) ensureBufferSize(n int) bool {
if len(r.buf) >= n {
return true
}
if n > r.maxBufSize {
r.err = ErrCorrupt
return false
}
if cap(r.buf) >= n {
return true
}
// Realloc buffer.
r.buf = make([]byte, n)
return true
Expand All @@ -220,6 +222,7 @@ func (r *Reader) Reset(reader io.Reader) {
r.err = nil
r.i = 0
r.j = 0
r.blockStart = 0
r.readHeader = r.ignoreStreamID
}

Expand Down Expand Up @@ -435,6 +438,259 @@ func (r *Reader) Read(p []byte) (int, error) {
}
}

// DecodeConcurrent will decode the full stream to w.
// This function should not be combined with reading, seeking or other operations.
// Up to 'concurrent' goroutines will be used.
// If <= 0, runtime.NumCPU will be used.
// On success the number of bytes decompressed nil and is returned.
// This is mainly intended for bigger streams.
func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
if r.i > 0 || r.j > 0 || r.blockStart > 0 {
return 0, errors.New("DecodeConcurrent called after ")
}
if concurrent <= 0 {
concurrent = runtime.NumCPU()
}

// Write to output
var errMu sync.Mutex
var aErr error
setErr := func(e error) (ok bool) {
errMu.Lock()
defer errMu.Unlock()
if e == nil {
return aErr == nil
}
if aErr == nil {
aErr = e
}
return false
}
hasErr := func() (ok bool) {
errMu.Lock()
v := aErr != nil
errMu.Unlock()
return v
}

var aWritten int64
toRead := make(chan []byte, concurrent)
writtenBlocks := make(chan []byte, concurrent)
queue := make(chan chan []byte, concurrent)
reUse := make(chan chan []byte, concurrent)
for i := 0; i < concurrent; i++ {
toRead <- make([]byte, 0, r.maxBufSize)
writtenBlocks <- make([]byte, 0, r.maxBufSize)
reUse <- make(chan []byte, 1)
}
// Writer
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for toWrite := range queue {
entry := <-toWrite
reUse <- toWrite
if hasErr() {
writtenBlocks <- entry
continue
}
n, err := w.Write(entry)
want := len(entry)
writtenBlocks <- entry
if err != nil {
setErr(err)
continue
}
if n != want {
setErr(io.ErrShortWrite)
continue
}
aWritten += int64(n)
}
}()

// Reader
defer func() {
close(queue)
if r.err != nil {
err = r.err
setErr(r.err)
}
wg.Wait()
if err == nil {
err = aErr
}
written = aWritten
}()

for !hasErr() {
if !r.readFull(r.buf[:4], true) {
if r.err == io.EOF {
r.err = nil
}
return 0, r.err
}
chunkType := r.buf[0]
if !r.readHeader {
if chunkType != chunkTypeStreamIdentifier {
r.err = ErrCorrupt
return 0, r.err
}
r.readHeader = true
}
chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16

// The chunk types are specified at
// https://github.com/google/snappy/blob/master/framing_format.txt
switch chunkType {
case chunkTypeCompressedData:
r.blockStart += int64(r.j)
// Section 4.2. Compressed data (chunk type 0x00).
if chunkLen < checksumSize {
r.err = ErrCorrupt
return 0, r.err
}
if chunkLen > r.maxBufSize {
r.err = ErrCorrupt
return 0, r.err
}
orgBuf := <-toRead
buf := orgBuf[:chunkLen]

if !r.readFull(buf, false) {
return 0, r.err
}

checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
buf = buf[checksumSize:]

n, err := DecodedLen(buf)
if err != nil {
r.err = err
return 0, r.err
}
if r.snappyFrame && n > maxSnappyBlockSize {
r.err = ErrCorrupt
return 0, r.err
}

if n > r.maxBlock {
r.err = ErrCorrupt
return 0, r.err
}
wg.Add(1)

decoded := <-writtenBlocks
entry := <-reUse
queue <- entry
go func() {
defer wg.Done()
decoded = decoded[:n]
_, err := Decode(decoded, buf)
toRead <- orgBuf
if err != nil {
writtenBlocks <- decoded
setErr(err)
return
}
if crc(decoded) != checksum {
writtenBlocks <- decoded
setErr(ErrCRC)
return
}
entry <- decoded
}()
continue

case chunkTypeUncompressedData:

// Section 4.3. Uncompressed data (chunk type 0x01).
if chunkLen < checksumSize {
r.err = ErrCorrupt
return 0, r.err
}
if chunkLen > r.maxBufSize {
r.err = ErrCorrupt
return 0, r.err
}
// Grab write buffer
orgBuf := <-writtenBlocks
buf := orgBuf[:checksumSize]
if !r.readFull(buf, false) {
return 0, r.err
}
checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
// Read content.
n := chunkLen - checksumSize

if r.snappyFrame && n > maxSnappyBlockSize {
r.err = ErrCorrupt
return 0, r.err
}
if n > r.maxBlock {
r.err = ErrCorrupt
return 0, r.err
}
// Read uncompressed
buf = orgBuf[:n]
if !r.readFull(buf, false) {
return 0, r.err
}

if crc(buf) != checksum {
r.err = ErrCRC
return 0, r.err
}
entry := <-reUse
queue <- entry
entry <- buf
continue

case chunkTypeStreamIdentifier:
// Section 4.1. Stream identifier (chunk type 0xff).
if chunkLen != len(magicBody) {
r.err = ErrCorrupt
return 0, r.err
}
if !r.readFull(r.buf[:len(magicBody)], false) {
return 0, r.err
}
if string(r.buf[:len(magicBody)]) != magicBody {
if string(r.buf[:len(magicBody)]) != magicBodySnappy {
r.err = ErrCorrupt
return 0, r.err
} else {
r.snappyFrame = true
}
} else {
r.snappyFrame = false
}
continue
}

if chunkType <= 0x7f {
// Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
// fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
r.err = ErrUnsupported
return 0, r.err
}
// Section 4.4 Padding (chunk type 0xfe).
// Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
if chunkLen > maxChunkSize {
// fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
r.err = ErrUnsupported
return 0, r.err
}

// fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
if !r.skippable(r.buf, chunkLen, false, chunkType) {
return 0, r.err
}
}
return 0, r.err
}

// Skip will skip n bytes forward in the decompressed output.
// For larger skips this consumes less CPU and is faster than reading output and discarding it.
// CRC is not checked on skipped blocks.
Expand Down
Loading