Skip to content

Commit

Permalink
Merge pull request #9 from DataDog/fix-reader-out-of-bounds
Browse files Browse the repository at this point in the history
Fix reader out of bounds
  • Loading branch information
j-vizcaino authored Jan 21, 2020
2 parents 4bea358 + e6fec73 commit bdbee0b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 14 deletions.
61 changes: 50 additions & 11 deletions lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

const (
streamingBlockSize = 1024 * 64
boudedStreamingBlockSize = streamingBlockSize + streamingBlockSize/255 + 16
streamingBlockSize = 1024 * 64
boundedStreamingBlockSize = streamingBlockSize + streamingBlockSize/255 + 16
)

// p gets a char pointer to the first byte of a []byte slice
Expand All @@ -31,6 +31,13 @@ func clen(s []byte) C.int {
return C.int(len(s))
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

// Uncompress with a known output size. len(out) should be equal to
// the length of the uncompressed out.
func Uncompress(out, in []byte) (outSize int, err error) {
Expand Down Expand Up @@ -87,7 +94,7 @@ func (w *Writer) Write(src []byte) (int, error) {

inpPtr := w.compressionBuffer[w.inpBufIndex]

var compressedBuf [boudedStreamingBlockSize]byte
var compressedBuf [boundedStreamingBlockSize]byte
copy(inpPtr[:], src)

written := int(C.LZ4_compress_fast_continue(
Expand Down Expand Up @@ -133,6 +140,7 @@ func (w *Writer) Close() error {
// reader is an io.ReadCloser that decompresses when read from.
type reader struct {
lz4Stream *C.LZ4_streamDecode_t
pending []byte
left unsafe.Pointer
right unsafe.Pointer
underlyingReader io.Reader
Expand All @@ -148,10 +156,16 @@ func NewReader(r io.Reader) io.ReadCloser {
lz4Stream: C.LZ4_createStreamDecode(),
underlyingReader: r,
isLeft: true,
// As per lz4 docs:
//
// *_continue() :
// These decoding functions allow decompression of multiple blocks in "streaming" mode.
// Previously decoded blocks must still be available at the memory position where they were decoded.
//
// double buffer needs to use C.malloc to make sure the same memory address
// allocate buffers in go memory will fail randomly since GC may move the memory
left: C.malloc(boudedStreamingBlockSize),
right: C.malloc(boudedStreamingBlockSize),
left: C.malloc(boundedStreamingBlockSize),
right: C.malloc(boundedStreamingBlockSize),
}
}

Expand All @@ -170,13 +184,18 @@ func (r *reader) Close() error {

// Read decompresses `compressionBuffer` into `dst`.
func (r *reader) Read(dst []byte) (int, error) {
// Write data read from a previous call
if r.pending != nil {
return r.readFromPending(dst)
}

blockSize, err := r.readSize(r.underlyingReader)
if err != nil {
return 0, err
}

// read blockSize from r.underlyingReader --> readBuffer
var uncompressedBuf [boudedStreamingBlockSize]byte
var uncompressedBuf [boundedStreamingBlockSize]byte
_, err = io.ReadFull(r.underlyingReader, uncompressedBuf[:blockSize])
if err != nil {
return 0, err
Expand All @@ -191,20 +210,28 @@ func (r *reader) Read(dst []byte) (int, error) {
r.isLeft = true
}

written := int(C.LZ4_decompress_safe_continue(
decompressed := int(C.LZ4_decompress_safe_continue(
r.lz4Stream,
(*C.char)(unsafe.Pointer(&uncompressedBuf[0])),
(*C.char)(ptr),
C.int(blockSize),
C.int(streamingBlockSize),
))

if written < 0 {
return written, errors.New("error decompressing")
if decompressed < 0 {
return decompressed, errors.New("error decompressing")
}
// fmt.Println(hex.EncodeToString(ptr[:]))
mySlice := C.GoBytes(ptr, C.int(written))
copied := copy(dst[:written], mySlice)
mySlice := C.GoBytes(ptr, C.int(decompressed))
copySize := min(decompressed, len(dst))

copied := copy(dst, mySlice[:copySize])

if decompressed > len(dst) {
// Save data for future reads
r.pending = mySlice[copied:]
}

return copied, nil
}

Expand All @@ -218,3 +245,15 @@ func (r *reader) readSize(rdr io.Reader) (int, error) {

return int(binary.LittleEndian.Uint32(temp[:])), nil
}

func (r *reader) readFromPending(dst []byte) (int, error) {
copySize := min(len(dst), len(r.pending))
copied := copy(dst, r.pending[:copySize])

if copied == len(r.pending) {
r.pending = nil
} else {
r.pending = r.pending[copied:]
}
return copied, nil
}
11 changes: 8 additions & 3 deletions lz4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package lz4
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"
Expand Down Expand Up @@ -210,19 +211,23 @@ func TestFuzz(t *testing.T) {
}

func TestSimpleCompressDecompress(t *testing.T) {
data := []byte("this\nis\njust\na\ntestttttttttt.")
data := bytes.NewBuffer(nil)
// NOTE: make the buffer bigger than 65k to cover all use cases
for i := 0; i < 2000; i++ {
data.WriteString(fmt.Sprintf("%04d-abcdefghijklmnopqrstuvwxyz ", i))
}
w := bytes.NewBuffer(nil)
wc := NewWriter(w)
defer wc.Close()
_, err := wc.Write(data)
_, err := wc.Write(data.Bytes())

// Decompress
bufOut := bytes.NewBuffer(nil)
r := NewReader(w)
_, err = io.Copy(bufOut, r)
failOnError(t, "Failed writing to file", err)

if bufOut.String() != string(data) {
if bufOut.String() != data.String() {
t.Fatalf("Decompressed output != input: %q != %q", bufOut.String(), data)
}
}
Expand Down

0 comments on commit bdbee0b

Please sign in to comment.