From 3be4906b1714becdfadc2eb77b901059786e6b78 Mon Sep 17 00:00:00 2001 From: Hippolyte Barraud Date: Wed, 30 Oct 2019 19:30:12 -0400 Subject: [PATCH] fix to lz4 writer/read, can now be used with std goland io facilities --- lz4.go | 222 +++++++++++++++++----------------------------------- lz4_test.go | 159 ++++++++++--------------------------- 2 files changed, 115 insertions(+), 266 deletions(-) diff --git a/lz4.go b/lz4.go index ba1f90d4b..b4bd67d0c 100644 --- a/lz4.go +++ b/lz4.go @@ -39,7 +39,9 @@ const ( // buffer at least this size, so we might as well actually use this as // the block size. // lower than 63 does not work - streamingBlockSize = 1024 * 96 + streamingBlockSize = 1024 * 64 + + boudedStreamingBlockSize = streamingBlockSize + streamingBlockSize/255 + 16 ) var errShortRead = errors.New("short read") @@ -99,10 +101,11 @@ func Compress(out, in []byte) (outSize int, err error) { // Writer is an io.WriteCloser that lz4 compress its input. type Writer struct { - lz4Stream *C.LZ4_stream_t - dstBuffer []byte - underlyingWriter io.Writer - count int + compressionBuffer [2][streamingBlockSize]byte + lz4Stream *C.LZ4_stream_t + underlyingWriter io.Writer + inpBufIndex int + totalCompressedWritten int } // NewWriter creates a new Writer. Writes to @@ -110,61 +113,49 @@ type Writer struct { func NewWriter(w io.Writer) *Writer { return &Writer{ lz4Stream: C.LZ4_createStream(), - dstBuffer: make([]byte, CompressBoundInt(streamingBlockSize)), underlyingWriter: w, } - } // Write writes a compressed form of src to the underlying io.Writer. func (w *Writer) Write(src []byte) (int, error) { - w.count += 1 - if len(src) == 0 { - return 0, nil + if len(src) > streamingBlockSize+4 { + return 0, fmt.Errorf("block is too large: %d > %d", len(src), streamingBlockSize+4) } - var ( - totalCompressedLen, cum int - ) + inpPtr := w.compressionBuffer[w.inpBufIndex] + + var compressedBuf [boudedStreamingBlockSize]byte + copy(inpPtr[:], src) + + written := int(C.LZ4_compress_fast_continue( + w.lz4Stream, + (*C.char)(unsafe.Pointer(&inpPtr[0])), + (*C.char)(unsafe.Pointer(&compressedBuf[0])), + C.int(len(src)), + C.int(len(compressedBuf)), + 1)) + if written <= 0 { + return 0, errors.New("error compressing") + } - b := batch(len(src), streamingBlockSize) - lenBuf := make([]byte, 4) - - for b.Next() { - - inputBuf := src[b.Start:b.End] - inputBytes := len(inputBuf) - cum += inputBytes - - compressedLen := C.LZ4_compress_fast_continue( - w.lz4Stream, - (*C.char)(unsafe.Pointer(&inputBuf[0])), - (*C.char)(unsafe.Pointer(&w.dstBuffer[0])), - C.int(inputBytes), - C.int(len(w.dstBuffer)), - 1) - - if compressedLen <= 0 { - break - } - - // Write "header" to the buffer for decompression - binary.LittleEndian.PutUint32(lenBuf, uint32(compressedLen)) - _, err := w.underlyingWriter.Write(lenBuf) - if err != nil { - return 0, err - } - - // Write to underlying buffer - _, err = w.underlyingWriter.Write(w.dstBuffer[:compressedLen]) - if err != nil { - return 0, err - } - - totalCompressedLen += int(compressedLen) + //Write "header" to the buffer for decompression + var header [4]byte + binary.LittleEndian.PutUint32(header[:], uint32(written)) + _, err := w.underlyingWriter.Write(header[:]) + if err != nil { + return 0, err + } + + // Write to underlying buffer + _, err = w.underlyingWriter.Write(compressedBuf[:written]) + if err != nil { + return 0, err } - return totalCompressedLen, nil + w.inpBufIndex = (w.inpBufIndex + 1) % 2 + w.totalCompressedWritten += written + 4 + return len(src), nil } // Close releases all the resources occupied by Writer. @@ -180,13 +171,9 @@ func (w *Writer) Close() error { // reader is an io.ReadCloser that decompresses when read from. type reader struct { lz4Stream *C.LZ4_streamDecode_t - readBuffer []byte - sizeBuf []byte - decompressedBuffer [2][]byte - decompOffset int - decompSize int - decBufIndex int + decompressedBuffer [2][boudedStreamingBlockSize]byte underlyingReader io.Reader + decBufIndex int } // NewReader creates a new io.ReadCloser. Reads from the returned ReadCloser @@ -194,15 +181,9 @@ type reader struct { // Close on the ReadCloser when done. If this is not done, underlying objects // in the lz4 library will not be freed. func NewReader(r io.Reader) io.ReadCloser { - var decompressedBuffer2D [2][]byte - decompressedBuffer2D[0] = make([]byte, streamingBlockSize) - decompressedBuffer2D[1] = make([]byte, streamingBlockSize) return &reader{ - lz4Stream: C.LZ4_createStreamDecode(), - readBuffer: make([]byte, CompressBoundInt(streamingBlockSize)), - sizeBuf: make([]byte, 4), - decompressedBuffer: decompressedBuffer2D, - underlyingReader: r, + lz4Stream: C.LZ4_createStreamDecode(), + underlyingReader: r, } } @@ -218,102 +199,45 @@ func (r *reader) Close() error { // Read decompresses `compressionBuffer` into `dst`. func (r *reader) Read(dst []byte) (int, error) { - if len(dst) == 0 || dst == nil { - return 0, nil + blockSize, err := r.readSize(r.underlyingReader) + if err != nil { + return 0, err } - writeOffset := 0 - - // XXXX: we don't need to call LZ4_setStreamDecode, when the previous data is still available in memory - // C.LZ4_setStreamDecode(r.lz4Stream, nil, 0) - // we have leftover decompressed data from previous call - if r.decompOffset > 0 { - copied := copy(dst[writeOffset:], r.decompressedBuffer[r.decBufIndex][r.decompOffset:r.decompSize]) - // justWritten := r.decompressedBuffer[r.decBufIndex][r.decompOffset:r.decompSize] - // fmt.Println("wrote leftover", len(justWritten), copied) - // if bytes.Contains(justWritten, []byte("see me heave")) { - // fmt.Println("leftover: a very palpable hit") - // } - if len(dst) == copied { - r.decompOffset += copied - if r.decompOffset == len(r.decompressedBuffer[r.decBufIndex][0:r.decompSize]) { - r.decompOffset = 0 - r.decBufIndex = (r.decBufIndex + 1) % 2 - } - return len(dst), nil - } - r.decompOffset = 0 - r.decBufIndex = (r.decBufIndex + 1) % 2 - writeOffset += copied + + // read blockSize from r.underlyingReader --> readBuffer + var uncompressedBuf [boudedStreamingBlockSize]byte + _, err = io.ReadFull(r.underlyingReader, uncompressedBuf[:blockSize]) + if err != nil { + return 0, err } - for { - // Populate src - blockSize, err := r.readSize(r.underlyingReader) - if err != nil { - if err == io.EOF { - // fmt.Println("here's our EOF") - } - return writeOffset, err - } - - // if the blockSize is bigger than our configured one, then something - // is wrong with the file or it was compressed with a different mechanism - if blockSize > len(r.readBuffer) { - return writeOffset, fmt.Errorf("invalid block size (10/21/2019): %d", blockSize) - } - - readBuffer := r.readBuffer[:blockSize] - // read blockSize from r.underlyingReader --> readBuffer - _, err = io.ReadFull(r.underlyingReader, readBuffer) - if err != nil { - return 0, err - } - - written := int(C.LZ4_decompress_safe_continue( - r.lz4Stream, - (*C.char)(unsafe.Pointer(&readBuffer[0])), - (*C.char)(unsafe.Pointer(&r.decompressedBuffer[r.decBufIndex][0])), - C.int(len(readBuffer)), - C.int(len(r.decompressedBuffer[r.decBufIndex])), - )) - - if written <= 0 { - break - } - - r.decompSize = written - copied := copy(dst[writeOffset:], r.decompressedBuffer[r.decBufIndex][:written]) - // fmt.Println("wrote after read", written, copied) - // justWritten := r.decompressedBuffer[r.decBufIndex][:written] - // if bytes.Contains(justWritten, []byte("see me heave")) { - // fmt.Println("read: a avery palable hit") - // } - - switch { - // have some leftover data from the decompressedBuffer - case copied+r.decompOffset < len(r.decompressedBuffer[r.decBufIndex][:written]): - r.decompOffset += copied - return len(dst), nil - // have copied all from the decompressedBuffer - case copied+r.decompOffset == len(r.decompressedBuffer[r.decBufIndex][:written]): - r.decompOffset = 0 - r.decBufIndex = (r.decBufIndex + 1) % 2 - } - writeOffset += copied - if writeOffset == len(dst) { - return writeOffset, nil - } + ptr := r.decompressedBuffer[r.decBufIndex] + written := int(C.LZ4_decompress_safe_continue( + r.lz4Stream, + (*C.char)(unsafe.Pointer(&uncompressedBuf[0])), + (*C.char)(unsafe.Pointer(&ptr[0])), + C.int(blockSize), + C.int(streamingBlockSize)), + ) + if written <= 0 { + return written, errors.New("error decompressing") } - return writeOffset, nil + copied := copy(dst[:written], ptr[:written]) + + r.decBufIndex = (r.decBufIndex + 1) % 2 + return copied, nil } // read the 4-byte little endian size from the head of each stream compressed block func (r *reader) readSize(rdr io.Reader) (int, error) { - _, err := rdr.Read(r.sizeBuf) + var temp [4]byte + read, err := rdr.Read(temp[:]) if err != nil { return 0, err } - - return int(binary.LittleEndian.Uint32(r.sizeBuf)), nil + if read != 4 { + panic("didn't read 4 bytes") + } + return int(binary.LittleEndian.Uint32(temp[:])), nil } diff --git a/lz4_test.go b/lz4_test.go index 6564a1d42..7eb40b79f 100644 --- a/lz4_test.go +++ b/lz4_test.go @@ -7,9 +7,9 @@ package lz4 import ( "bytes" - "fmt" "io" "io/ioutil" + "math/rand" "os" "runtime/debug" "testing" @@ -244,56 +244,18 @@ func testIOCopy(t *testing.T, src io.Reader, filename string) { writer := NewWriter(file) - // try using io.Copy - // copied, err := io.Copy(writer, src) - // fmt.Println("// copied:", copied) - // failOnError(t, "Failed copied", err) - ///////// - - // try read file by chunks - BufferSize := 1024 * 96 - buffer := make([]byte, BufferSize) - nCum := 0 - fileCum := 0 - n := 0 - var err2 error - for { - bytesread, err := src.Read(buffer) - if bytesread != BufferSize { - fmt.Println("let's look at this:", bytesread) - } - fileCum += bytesread - n, err2 = writer.Write(buffer[:bytesread]) - nCum += n - failOnError(t, "Failed writing to compress object", err2) - - if err != nil { - if err != io.EOF { - fmt.Println(err) - } - fmt.Println("///EOF: ", bytesread) - - break - } - - } + _, err = io.Copy(writer, src) + failOnError(t, "Failed witting to file", err) failOnError(t, "Failed to close compress object", writer.Close()) stat, err := os.Stat(fname) filenameSize, err := os.Stat(filename) failOnError(t, "Cannot open file", err) - fmt.Println("fileCum", fileCum, "nCum: ", nCum) + //fmt.Println("fileCum", fileCum, "nCum: ", nCum) // t.Logf("Compressed %v -> %v bytes", len(src), stat.Size()) t.Logf("Compressed %v -> %v bytes", filenameSize.Size(), stat.Size()) - // check the compressed file is the same with the one uploaded to S3 - - // if !checkfilecontentIsSame(t, fname, filename+".lz4") { - // t.Fatalf("compressed file and the S3 one is not the same: %s != %s", fname, filename+".lz4") - - // } - file.Close() // read from the file @@ -394,49 +356,6 @@ func TestContinueCompress(t *testing.T) { } -func TestStreamSimpleCompressionDecompression(t *testing.T) { - inputs, _ := ioutil.ReadFile("shakespeare.txt") - var bigInput []byte - for i := 0; i < 20; i++ { - bigInput = append(bigInput, inputs...) - } - testCompressionDecompression(t, bigInput) -} - -func testCompressionDecompression(t *testing.T, payload []byte) { - var w bytes.Buffer - writer := NewWriter(&w) - n, err := writer.Write(payload) - failOnError(t, "Failed writing to compress object", err) - failOnError(t, "Failed to close compress object", writer.Close()) - out := w.Bytes() - t.Logf("Compressed %v -> %v bytes", len(payload), len(out)) - failOnError(t, "Failed compressing", err) - - // Decompress - r := NewReader(&w) - dst := make([]byte, len(payload)) - n, err = r.Read(dst) - if err != nil { - t.Fatal("error reading", err) - } - - dst = dst[:n] - if string(payload) != string(dst) { // Only print if we can print - if len(payload) < 100 && len(dst) < 100 { - t.Fatalf("Cannot compress and decompress: %s != %s", payload, dst) - } else { - t.Fatalf("Cannot compress and decompress (lengths: %v bytes & %v bytes)", len(payload), len(dst)) - } - } - // Check EOF - n, err = r.Read(dst) - if err != io.EOF && len(dst) > 0 { // If we want 0 bytes, that should work - t.Fatalf("Error should have been EOF, was %s instead: (%v bytes read: %s)", err, n, dst[:n]) - } - failOnError(t, "Failed to close decompress object", r.Close()) -} - func TestStreamingFuzz(t *testing.T) { f := func(input []byte) bool { var w bytes.Buffer @@ -491,68 +410,74 @@ func BenchmarkCompress(b *testing.B) { } func BenchmarkStreamCompress(b *testing.B) { + var buffer bytes.Buffer + localBuffer := make([]byte, streamingBlockSize) + rand.Read(localBuffer[:]) + b.ReportAllocs() - var intermediate bytes.Buffer - w := NewWriter(&intermediate) - defer w.Close() - b.SetBytes(int64(len(plaintext0))) b.ResetTimer() + for i := 0; i < b.N; i++ { - _, err := w.Write(plaintext0) + w := NewWriter(&buffer) + _, err := w.Write(localBuffer) + if err != nil { b.Fatalf("Failed writing to compress object: %s", err) } + b.SetBytes(int64(w.totalCompressedWritten)) + // Prevent from unbound buffer growth. - intermediate.Reset() + buffer.Reset() + w.Close() } - } -func BenchmarkCompressUncompress(b *testing.B) { +func BenchmarkStreamUncompress(b *testing.B) { b.ReportAllocs() - compressed := make([]byte, CompressBound(plaintext0)) - n, err := Compress(compressed, plaintext0) + + var buffer bytes.Buffer + localBuffer := make([]byte, streamingBlockSize) + rand.Read(localBuffer[:]) + + w := NewWriter(&buffer) + + _, err := w.Write(localBuffer) if err != nil { - b.Errorf("Compress error: %v", err) + b.Fatalf("Failed writing to compress object: %s", err) } - compressed = compressed[:n] + w.Close() - dst := make([]byte, len(plaintext0)) - b.SetBytes(int64(len(plaintext0))) + b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := Uncompress(dst, compressed) - if err != nil { - b.Errorf("Uncompress error: %v", err) + r := NewReader(&buffer) + for { + read, err := r.Read(localBuffer) + if err == io.EOF { + break + } + if err != io.EOF && err != nil { + b.Fatalf("Failed to decompress: %s", err) + } + b.SetBytes(int64(read)) } } } -func BenchmarkStreamUncompress(b *testing.B) { +func BenchmarkCompressUncompress(b *testing.B) { b.ReportAllocs() - var err error - var n int - compressed := make([]byte, CompressBound(plaintext0)) - n, err = Compress(compressed, plaintext0) + n, err := Compress(compressed, plaintext0) if err != nil { b.Errorf("Compress error: %v", err) } compressed = compressed[:n] dst := make([]byte, len(plaintext0)) - b.SetBytes(int64(len(plaintext0))) - - b.ResetTimer() - b.SetBytes(int64(len(plaintext0))) for i := 0; i < b.N; i++ { - rr := bytes.NewReader(compressed) - r := NewReader(rr) - r.Read(dst) + _, err := Uncompress(dst, compressed) if err != nil { - b.Fatalf("Failed to decompress: %s", err) + b.Errorf("Uncompress error: %v", err) } - } - }