Skip to content

Commit

Permalink
Merge pull request #3 from DataDog/abueno/merge-lz4
Browse files Browse the repository at this point in the history
lz4 streaming compression support
  • Loading branch information
zzzzssss authored Nov 15, 2019
2 parents d634a92 + 54af119 commit 2626d10
Show file tree
Hide file tree
Showing 9 changed files with 4,303 additions and 1 deletion.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ Forked from `github.com/cloudflare/golz4` but with significant differences:
* input/output arg order has been swapped to follow Go convention, ie `Compress(in, out)` -> `Compress(out, in)`
* lz4 131 used which fixes [several segfaults](https://github.com/cloudflare/golz4/pull/7)

Benchmark
```
BenchmarkCompress-8 5000000 234 ns/op 183.73 MB/s 0 B/op 0 allocs/op
BenchmarkCompressUncompress-8 20000000 62.4 ns/op 688.60 MB/s 0 B/op 0 allocs/op
BenchmarkStreamCompress-8 50000 32842 ns/op 2003.41 MB/s 278537 B/op 4 allocs/op
BenchmarkStreamUncompress-8 500000 2867 ns/op 22855.34 MB/s 52 B/op 2 allocs/op
```
166 changes: 166 additions & 0 deletions lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ package lz4
import "C"

import (
"encoding/binary"
"errors"
"fmt"
"io"
"unsafe"
)

const (
streamingBlockSize = 1024 * 64
boudedStreamingBlockSize = streamingBlockSize + streamingBlockSize/255 + 16
)

// p gets a char pointer to the first byte of a []byte slice
func p(in []byte) *C.char {
if len(in) == 0 {
Expand Down Expand Up @@ -52,3 +60,161 @@ func Compress(out, in []byte) (outSize int, err error) {
}
return
}

// Writer is an io.WriteCloser that lz4 compress its input.
type Writer struct {
compressionBuffer [2][streamingBlockSize]byte
lz4Stream *C.LZ4_stream_t
underlyingWriter io.Writer
inpBufIndex int
totalCompressedWritten int
}

// NewWriter creates a new Writer. Writes to
// the writer will be written in compressed form to w.
func NewWriter(w io.Writer) *Writer {
return &Writer{
lz4Stream: C.LZ4_createStream(),
underlyingWriter: w,
}
}

// Write writes a compressed form of src to the underlying io.Writer.
func (w *Writer) Write(src []byte) (int, error) {
if len(src) > streamingBlockSize+4 {
return 0, fmt.Errorf("block is too large: %d > %d", len(src), streamingBlockSize+4)
}

inpPtr := w.compressionBuffer[w.inpBufIndex]

var compressedBuf [boudedStreamingBlockSize]byte
copy(inpPtr[:], src)

written := int(C.LZ4_compress_fast_continue(
w.lz4Stream,
(*C.char)(unsafe.Pointer(&inpPtr[0])),
(*C.char)(unsafe.Pointer(&compressedBuf[0])),
C.int(len(src)),
C.int(len(compressedBuf)),
1))
if written <= 0 {
return 0, errors.New("error compressing")
}

// Write "header" to the buffer for decompression
var header [4]byte
binary.LittleEndian.PutUint32(header[:], uint32(written))
_, err := w.underlyingWriter.Write(header[:])
if err != nil {
return 0, err
}

// Write to underlying buffer
_, err = w.underlyingWriter.Write(compressedBuf[:written])
if err != nil {
return 0, err
}

w.inpBufIndex = (w.inpBufIndex + 1) % 2
w.totalCompressedWritten += written + 4
return len(src), nil
}

// Close releases all the resources occupied by Writer.
// w cannot be used after the release.
func (w *Writer) Close() error {
if w.lz4Stream != nil {
C.LZ4_freeStream(w.lz4Stream)
w.lz4Stream = nil
}
return nil
}

// reader is an io.ReadCloser that decompresses when read from.
type reader struct {
lz4Stream *C.LZ4_streamDecode_t
left unsafe.Pointer
right unsafe.Pointer
underlyingReader io.Reader
isLeft bool
}

// NewReader creates a new io.ReadCloser. Reads from the returned ReadCloser
// read and decompress data from r. It is the caller's responsibility to call
// Close on the ReadCloser when done. If this is not done, underlying objects
// in the lz4 library will not be freed.
func NewReader(r io.Reader) io.ReadCloser {
return &reader{
lz4Stream: C.LZ4_createStreamDecode(),
underlyingReader: r,
isLeft: true,
// double buffer needs to use C.malloc to make sure the same memory address
// allocate buffers in go memory will fail randomly since GC may move the memory
left: C.malloc(boudedStreamingBlockSize),
right: C.malloc(boudedStreamingBlockSize),
}
}

// Close releases all the resources occupied by r.
// r cannot be used after the release.
func (r *reader) Close() error {
if r.lz4Stream != nil {
C.LZ4_freeStreamDecode(r.lz4Stream)
r.lz4Stream = nil
}

C.free(r.left)
C.free(r.right)
return nil
}

// Read decompresses `compressionBuffer` into `dst`.
func (r *reader) Read(dst []byte) (int, error) {
blockSize, err := r.readSize(r.underlyingReader)
if err != nil {
return 0, err
}

// read blockSize from r.underlyingReader --> readBuffer
var uncompressedBuf [boudedStreamingBlockSize]byte
_, err = io.ReadFull(r.underlyingReader, uncompressedBuf[:blockSize])
if err != nil {
return 0, err
}

var ptr unsafe.Pointer
if r.isLeft {
ptr = r.left
r.isLeft = false
} else {
ptr = r.right
r.isLeft = true
}

written := int(C.LZ4_decompress_safe_continue(
r.lz4Stream,
(*C.char)(unsafe.Pointer(&uncompressedBuf[0])),
(*C.char)(ptr),
C.int(blockSize),
C.int(streamingBlockSize),
))

if written < 0 {
return written, errors.New("error decompressing")
}
// fmt.Println(hex.EncodeToString(ptr[:]))
mySlice := C.GoBytes(ptr, C.int(written))
copied := copy(dst[:written], mySlice)
return copied, nil
}

// read the 4-byte little endian size from the head of each stream compressed block
func (r *reader) readSize(rdr io.Reader) (int, error) {
var temp [4]byte
_, err := io.ReadFull(rdr, temp[:])
if err != nil {
return 0, err
}

return int(binary.LittleEndian.Uint32(temp[:])), nil
}
Loading

0 comments on commit 2626d10

Please sign in to comment.