Skip to content

Commit

Permalink
fix issue 340 (#341)
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille authored Sep 14, 2022
1 parent f80ef85 commit d2b49bc
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 18 deletions.
45 changes: 32 additions & 13 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,24 +561,39 @@ func (c *Column) decodeDataPageV2(header DataPageHeaderV2, page *buffer, dict Di
var repetitionLevels *buffer
var definitionLevels *buffer

if c.maxRepetitionLevel > 0 {
encoding := lookupLevelEncoding(header.RepetitionLevelEncoding(), c.maxRepetitionLevel)
length := header.RepetitionLevelsByteLength()
repetitionLevels, pageData, err = decodeLevelsV2(encoding, numValues, pageData, length)
if length := header.RepetitionLevelsByteLength(); length > 0 {
if c.maxRepetitionLevel == 0 {
// In some cases we've observed files which have a non-zero
// repetition level despite the column not being repeated
// (nor nested within a repeated column).
//
// See https://github.com/apache/parquet-testing/pull/24
pageData, err = skipLevelsV2(pageData, length)
} else {
encoding := lookupLevelEncoding(header.RepetitionLevelEncoding(), c.maxRepetitionLevel)
repetitionLevels, pageData, err = decodeLevelsV2(encoding, numValues, pageData, length)
}
if err != nil {
return nil, fmt.Errorf("decoding repetition levels of data page v2: %w", io.ErrUnexpectedEOF)
}
defer repetitionLevels.unref()
if repetitionLevels != nil {
defer repetitionLevels.unref()
}
}

if c.maxDefinitionLevel > 0 {
encoding := lookupLevelEncoding(header.DefinitionLevelEncoding(), c.maxDefinitionLevel)
length := header.DefinitionLevelsByteLength()
definitionLevels, pageData, err = decodeLevelsV2(encoding, numValues, pageData, length)
if length := header.DefinitionLevelsByteLength(); length > 0 {
if c.maxDefinitionLevel == 0 {
pageData, err = skipLevelsV2(pageData, length)
} else {
encoding := lookupLevelEncoding(header.DefinitionLevelEncoding(), c.maxDefinitionLevel)
definitionLevels, pageData, err = decodeLevelsV2(encoding, numValues, pageData, length)
}
if err != nil {
return nil, fmt.Errorf("decoding definition levels of data page v2: %w", io.ErrUnexpectedEOF)
}
defer definitionLevels.unref()
if definitionLevels != nil {
defer definitionLevels.unref()
}
}

if isCompressed(c.compression) && header.IsCompressed() {
Expand Down Expand Up @@ -687,9 +702,6 @@ func decodeLevelsV1(enc encoding.Encoding, numValues int, data []byte) (*buffer,
}

func decodeLevelsV2(enc encoding.Encoding, numValues int, data []byte, length int64) (*buffer, []byte, error) {
if length > int64(len(data)) {
return nil, data, io.ErrUnexpectedEOF
}
levels, err := decodeLevels(enc, numValues, data[:length])
return levels, data[length:], err
}
Expand All @@ -711,6 +723,13 @@ func decodeLevels(enc encoding.Encoding, numValues int, data []byte) (levels *bu
return levels, err
}

func skipLevelsV2(data []byte, length int64) ([]byte, error) {
if length >= int64(len(data)) {
return data, io.ErrUnexpectedEOF
}
return data[length:], nil
}

// DecodeDictionary decodes a data page from the header and compressed data
// passed as arguments.
func (c *Column) DecodeDictionary(header DictionaryPageHeader, page []byte) (Dictionary, error) {
Expand Down
1 change: 1 addition & 0 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ func (f *filePages) Close() error {
f.chunk = nil
f.section = io.SectionReader{}
f.rbuf = nil
f.rbufpool = nil
f.baseOffset = 0
f.dataOffset = 0
f.dictOffset = 0
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/andybalholm/brotli v1.0.3
github.com/google/uuid v1.3.0
github.com/hexops/gotextdiff v1.0.3
github.com/klauspost/compress v1.15.5
github.com/klauspost/compress v1.15.9
github.com/olekukonko/tablewriter v0.0.5
github.com/pierrec/lz4/v4 v4.1.9
github.com/segmentio/encoding v0.3.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/klauspost/compress v1.15.5 h1:qyCLMz2JCrKADihKOh9FxnW3houKeNsp2h5OEz0QSEA=
github.com/klauspost/compress v1.15.5/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
Expand Down
5 changes: 3 additions & 2 deletions internal/debug/debug.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package debug

import (
"encoding/hex"
"fmt"
"io"
)
Expand All @@ -19,7 +20,7 @@ type ioReaderAt struct {

func (d *ioReaderAt) ReadAt(b []byte, off int64) (int, error) {
n, err := d.reader.ReadAt(b, off)
fmt.Printf("%s: Read(%d) @%d => %d %v \n %q\n", d.prefix, len(b), off, n, err, b[:n])
fmt.Printf("%s: Read(%d) @%d => %d %v \n%s\n", d.prefix, len(b), off, n, err, hex.Dump(b[:n]))
return n, err
}

Expand All @@ -38,7 +39,7 @@ type ioReader struct {

func (d *ioReader) Read(b []byte) (int, error) {
n, err := d.reader.Read(b)
fmt.Printf("%s: Read(%d) @%d => %d %v \n %q\n", d.prefix, len(b), d.offset, n, err, b[:n])
fmt.Printf("%s: Read(%d) @%d => %d %v \n%s\n", d.prefix, len(b), d.offset, n, err, hex.Dump(b[:n]))
d.offset += int64(n)
return n, err
}
Expand Down
Binary file added testdata/rle_boolean_encoding.parquet
Binary file not shown.

0 comments on commit d2b49bc

Please sign in to comment.