Skip to content

Commit

Permalink
Merge pull request #6 from DataDog/sy/file-chunk-test
Browse files Browse the repository at this point in the history
[lz4Compressiong] add tests
  • Loading branch information
zzzzssss authored Oct 30, 2019
2 parents 7d1e666 + 3be4906 commit db4177e
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 237 deletions.
223 changes: 74 additions & 149 deletions lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ const (
// that they were decompressed to. This limits us to using a decompression
// buffer at least this size, so we might as well actually use this as
// the block size.
streamingBlockSize = 1024 * 96
// lower than 63 does not work
streamingBlockSize = 1024 * 64

boudedStreamingBlockSize = streamingBlockSize + streamingBlockSize/255 + 16
)

var errShortRead = errors.New("short read")
Expand Down Expand Up @@ -98,72 +101,61 @@ func Compress(out, in []byte) (outSize int, err error) {

// Writer is an io.WriteCloser that lz4 compress its input.
type Writer struct {
lz4Stream *C.LZ4_stream_t
dstBuffer []byte
underlyingWriter io.Writer
count int
compressionBuffer [2][streamingBlockSize]byte
lz4Stream *C.LZ4_stream_t
underlyingWriter io.Writer
inpBufIndex int
totalCompressedWritten int
}

// NewWriter creates a new Writer. Writes to
// the writer will be written in compressed form to w.
func NewWriter(w io.Writer) *Writer {
return &Writer{
lz4Stream: C.LZ4_createStream(),
dstBuffer: make([]byte, CompressBoundInt(streamingBlockSize)),
underlyingWriter: w,
}

}

// Write writes a compressed form of src to the underlying io.Writer.
func (w *Writer) Write(src []byte) (int, error) {
w.count += 1
if len(src) == 0 {
return 0, nil
if len(src) > streamingBlockSize+4 {
return 0, fmt.Errorf("block is too large: %d > %d", len(src), streamingBlockSize+4)
}

var (
totalCompressedLen, cum int
)
inpPtr := w.compressionBuffer[w.inpBufIndex]

var compressedBuf [boudedStreamingBlockSize]byte
copy(inpPtr[:], src)

written := int(C.LZ4_compress_fast_continue(
w.lz4Stream,
(*C.char)(unsafe.Pointer(&inpPtr[0])),
(*C.char)(unsafe.Pointer(&compressedBuf[0])),
C.int(len(src)),
C.int(len(compressedBuf)),
1))
if written <= 0 {
return 0, errors.New("error compressing")
}

b := batch(len(src), streamingBlockSize)
lenBuf := make([]byte, 4)

for b.Next() {

inputBuf := src[b.Start:b.End]
inputBytes := len(inputBuf)
cum += inputBytes

compressedLen := C.LZ4_compress_fast_continue(
w.lz4Stream,
(*C.char)(unsafe.Pointer(&inputBuf[0])),
(*C.char)(unsafe.Pointer(&w.dstBuffer[0])),
C.int(inputBytes),
C.int(len(w.dstBuffer)),
1)

if compressedLen <= 0 {
break
}

// Write "header" to the buffer for decompression
binary.LittleEndian.PutUint32(lenBuf, uint32(compressedLen))
_, err := w.underlyingWriter.Write(lenBuf)
if err != nil {
return 0, err
}

// Write to underlying buffer
_, err = w.underlyingWriter.Write(w.dstBuffer[:compressedLen])
if err != nil {
return 0, err
}

totalCompressedLen += int(compressedLen)
//Write "header" to the buffer for decompression
var header [4]byte
binary.LittleEndian.PutUint32(header[:], uint32(written))
_, err := w.underlyingWriter.Write(header[:])
if err != nil {
return 0, err
}

// Write to underlying buffer
_, err = w.underlyingWriter.Write(compressedBuf[:written])
if err != nil {
return 0, err
}

return totalCompressedLen, nil
w.inpBufIndex = (w.inpBufIndex + 1) % 2
w.totalCompressedWritten += written + 4
return len(src), nil
}

// Close releases all the resources occupied by Writer.
Expand All @@ -179,29 +171,19 @@ func (w *Writer) Close() error {
// reader is an io.ReadCloser that decompresses when read from.
type reader struct {
lz4Stream *C.LZ4_streamDecode_t
readBuffer []byte
sizeBuf []byte
decompressedBuffer [2][]byte
decompOffset int
decompSize int
decBufIndex int
decompressedBuffer [2][boudedStreamingBlockSize]byte
underlyingReader io.Reader
decBufIndex int
}

// NewReader creates a new io.ReadCloser. Reads from the returned ReadCloser
// read and decompress data from r. It is the caller's responsibility to call
// Close on the ReadCloser when done. If this is not done, underlying objects
// in the lz4 library will not be freed.
func NewReader(r io.Reader) io.ReadCloser {
var decompressedBuffer2D [2][]byte
decompressedBuffer2D[0] = make([]byte, streamingBlockSize)
decompressedBuffer2D[1] = make([]byte, streamingBlockSize)
return &reader{
lz4Stream: C.LZ4_createStreamDecode(),
readBuffer: make([]byte, CompressBoundInt(streamingBlockSize)),
sizeBuf: make([]byte, 4),
decompressedBuffer: decompressedBuffer2D,
underlyingReader: r,
lz4Stream: C.LZ4_createStreamDecode(),
underlyingReader: r,
}
}

Expand All @@ -217,102 +199,45 @@ func (r *reader) Close() error {

// Read decompresses `compressionBuffer` into `dst`.
func (r *reader) Read(dst []byte) (int, error) {
if len(dst) == 0 || dst == nil {
return 0, nil
blockSize, err := r.readSize(r.underlyingReader)
if err != nil {
return 0, err
}
writeOffset := 0

// XXXX: we don't need to call LZ4_setStreamDecode, when the previous data is still available in memory
// C.LZ4_setStreamDecode(r.lz4Stream, nil, 0)
// we have leftover decompressed data from previous call
if r.decompOffset > 0 {
copied := copy(dst[writeOffset:], r.decompressedBuffer[r.decBufIndex][r.decompOffset:r.decompSize])
// justWritten := r.decompressedBuffer[r.decBufIndex][r.decompOffset:r.decompSize]
// fmt.Println("wrote leftover", len(justWritten), copied)
// if bytes.Contains(justWritten, []byte("see me heave")) {
// fmt.Println("leftover: a very palpable hit")
// }
if len(dst) == copied {
r.decompOffset += copied
if r.decompOffset == len(r.decompressedBuffer[r.decBufIndex][0:r.decompSize]) {
r.decompOffset = 0
r.decBufIndex = (r.decBufIndex + 1) % 2
}
return len(dst), nil
}
r.decompOffset = 0
r.decBufIndex = (r.decBufIndex + 1) % 2
writeOffset += copied

// read blockSize from r.underlyingReader --> readBuffer
var uncompressedBuf [boudedStreamingBlockSize]byte
_, err = io.ReadFull(r.underlyingReader, uncompressedBuf[:blockSize])
if err != nil {
return 0, err
}

for {
// Populate src
blockSize, err := r.readSize(r.underlyingReader)
if err != nil {
if err == io.EOF {
// fmt.Println("here's our EOF")
}
return writeOffset, err
}

// if the blockSize is bigger than our configured one, then something
// is wrong with the file or it was compressed with a different mechanism
if blockSize > len(r.readBuffer) {
return writeOffset, fmt.Errorf("invalid block size (10/21/2019): %d", blockSize)
}

readBuffer := r.readBuffer[:blockSize]
// read blockSize from r.underlyingReader --> readBuffer
_, err = io.ReadFull(r.underlyingReader, readBuffer)
if err != nil {
return 0, err
}

written := int(C.LZ4_decompress_safe_continue(
r.lz4Stream,
(*C.char)(unsafe.Pointer(&readBuffer[0])),
(*C.char)(unsafe.Pointer(&r.decompressedBuffer[r.decBufIndex][0])),
C.int(len(readBuffer)),
C.int(len(r.decompressedBuffer[r.decBufIndex])),
))

if written <= 0 {
break
}

r.decompSize = written
copied := copy(dst[writeOffset:], r.decompressedBuffer[r.decBufIndex][:written])
// fmt.Println("wrote after read", written, copied)
// justWritten := r.decompressedBuffer[r.decBufIndex][:written]
// if bytes.Contains(justWritten, []byte("see me heave")) {
// fmt.Println("read: a avery palable hit")
// }

switch {
// have some leftover data from the decompressedBuffer
case copied+r.decompOffset < len(r.decompressedBuffer[r.decBufIndex][:written]):
r.decompOffset += copied
return len(dst), nil
// have copied all from the decompressedBuffer
case copied+r.decompOffset == len(r.decompressedBuffer[r.decBufIndex][:written]):
r.decompOffset = 0
r.decBufIndex = (r.decBufIndex + 1) % 2
}
writeOffset += copied
if writeOffset == len(dst) {
return writeOffset, nil
}
ptr := r.decompressedBuffer[r.decBufIndex]

written := int(C.LZ4_decompress_safe_continue(
r.lz4Stream,
(*C.char)(unsafe.Pointer(&uncompressedBuf[0])),
(*C.char)(unsafe.Pointer(&ptr[0])),
C.int(blockSize),
C.int(streamingBlockSize)),
)
if written <= 0 {
return written, errors.New("error decompressing")
}
return writeOffset, nil
copied := copy(dst[:written], ptr[:written])

r.decBufIndex = (r.decBufIndex + 1) % 2
return copied, nil
}

// read the 4-byte little endian size from the head of each stream compressed block
func (r *reader) readSize(rdr io.Reader) (int, error) {
_, err := rdr.Read(r.sizeBuf)
var temp [4]byte
read, err := rdr.Read(temp[:])
if err != nil {
return 0, err
}

return int(binary.LittleEndian.Uint32(r.sizeBuf)), nil
if read != 4 {
panic("didn't read 4 bytes")
}
return int(binary.LittleEndian.Uint32(temp[:])), nil
}
Loading

0 comments on commit db4177e

Please sign in to comment.