Skip to content

Commit

Permalink
Merge pull request #5 from DataDog/add-lz4frame
Browse files Browse the repository at this point in the history
[lz4-2]Try new termination condition
  • Loading branch information
zzzzssss authored Oct 22, 2019
2 parents 4b5084d + 6269545 commit 7d1e666
Show file tree
Hide file tree
Showing 3 changed files with 124,481 additions and 12 deletions.
21 changes: 17 additions & 4 deletions lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,15 @@ func (r *reader) Read(dst []byte) (int, error) {
// C.LZ4_setStreamDecode(r.lz4Stream, nil, 0)
// we have leftover decompressed data from previous call
if r.decompOffset > 0 {
copied := copy(dst[writeOffset:], r.decompressedBuffer[r.decBufIndex][r.decompOffset:])
copied := copy(dst[writeOffset:], r.decompressedBuffer[r.decBufIndex][r.decompOffset:r.decompSize])
// justWritten := r.decompressedBuffer[r.decBufIndex][r.decompOffset:r.decompSize]
// fmt.Println("wrote leftover", len(justWritten), copied)
// if bytes.Contains(justWritten, []byte("see me heave")) {
// fmt.Println("leftover: a very palpable hit")
// }
if len(dst) == copied {
r.decompOffset += copied
if r.decompOffset == len(r.decompressedBuffer[r.decBufIndex]) {
if r.decompOffset == len(r.decompressedBuffer[r.decBufIndex][0:r.decompSize]) {
r.decompOffset = 0
r.decBufIndex = (r.decBufIndex + 1) % 2
}
Expand All @@ -244,13 +249,16 @@ func (r *reader) Read(dst []byte) (int, error) {
// Populate src
blockSize, err := r.readSize(r.underlyingReader)
if err != nil {
if err == io.EOF {
// fmt.Println("here's our EOF")
}
return writeOffset, err
}

// if the blockSize is bigger than our configured one, then something
// is wrong with the file or it was compressed with a different mechanism
if blockSize > len(r.readBuffer) {
return writeOffset, fmt.Errorf("invalid block size (Version3)%d", blockSize)
return writeOffset, fmt.Errorf("invalid block size (10/21/2019): %d", blockSize)
}

readBuffer := r.readBuffer[:blockSize]
Expand All @@ -272,7 +280,13 @@ func (r *reader) Read(dst []byte) (int, error) {
break
}

r.decompSize = written
copied := copy(dst[writeOffset:], r.decompressedBuffer[r.decBufIndex][:written])
// fmt.Println("wrote after read", written, copied)
// justWritten := r.decompressedBuffer[r.decBufIndex][:written]
// if bytes.Contains(justWritten, []byte("see me heave")) {
// fmt.Println("read: a avery palable hit")
// }

switch {
// have some leftover data from the decompressedBuffer
Expand All @@ -290,7 +304,6 @@ func (r *reader) Read(dst []byte) (int, error) {
}

}

return writeOffset, nil
}

Expand Down
16 changes: 8 additions & 8 deletions lz4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestSimpleCompressDecompress(t *testing.T) {
}

func TestIOCopyStreamSimpleCompressionDecompression(t *testing.T) {
filename := "1557419400MM.idb"
filename := "shakespeare.txt"
inputs, _ := ioutil.ReadFile(filename)

testIOCopy(t, inputs, filename)
Expand All @@ -252,10 +252,10 @@ func testIOCopy(t *testing.T, payload []byte, filename string) {

// check the compressed file is the same with the one uploaded to S3

if !checkfilecontentIsSame(t, fname, filename+".lz4") {
t.Fatalf("compressed file and the S3 one is not the same: %s != %s", fname, filename+".lz4")
// if !checkfilecontentIsSame(t, fname, filename+".lz4") {
// t.Fatalf("compressed file and the S3 one is not the same: %s != %s", fname, filename+".lz4")

}
// }

file.Close()

Expand All @@ -265,7 +265,7 @@ func testIOCopy(t *testing.T, payload []byte, filename string) {
defer fi.Close()

// decompress the file againg
fnameNew := "1557419400NEW.idb"
fnameNew := "shakespeare.txt.copy"

fileNew, err := os.Create(fnameNew)
failOnError(t, "Failed writing to file", err)
Expand Down Expand Up @@ -304,14 +304,14 @@ func checkfilecontentIsSame(t *testing.T, f1, f2 string) bool {
}

func TestIOCopyDecompression(t *testing.T) {
filename := "1557342000.idb.lz4"
filename := "shakespeare.txttestcom.lz4"
// read from the file
fi, err := os.Open(filename)
failOnError(t, "Failed open file", err)
defer fi.Close()

// decompress into this new file
fnameNew := "1557342000.idb"
fnameNew := "shakespeare.txt.copy"
fileNew, err := os.Create(fnameNew)
failOnError(t, "Failed writing to file", err)
defer fileNew.Close()
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestContinueCompress(t *testing.T) {
}

func TestStreamSimpleCompressionDecompression(t *testing.T) {
inputs, _ := ioutil.ReadFile("sample2.txt")
inputs, _ := ioutil.ReadFile("shakespeare.txt")
var bigInput []byte
for i := 0; i < 20; i++ {
bigInput = append(bigInput, inputs...)
Expand Down
Loading

0 comments on commit 7d1e666

Please sign in to comment.