From d2b49bcca2c46cad88b129ce545bd1fbf8f90f9a Mon Sep 17 00:00:00 2001 From: Achille Date: Wed, 14 Sep 2022 15:20:27 -0700 Subject: [PATCH] fix issue 340 (#341) --- column.go | 45 ++++++++++++++++++-------- file.go | 1 + go.mod | 2 +- go.sum | 4 +-- internal/debug/debug.go | 5 +-- testdata/rle_boolean_encoding.parquet | Bin 0 -> 168 bytes 6 files changed, 39 insertions(+), 18 deletions(-) create mode 100644 testdata/rle_boolean_encoding.parquet diff --git a/column.go b/column.go index 8bb840a..08ccfba 100644 --- a/column.go +++ b/column.go @@ -561,24 +561,39 @@ func (c *Column) decodeDataPageV2(header DataPageHeaderV2, page *buffer, dict Di var repetitionLevels *buffer var definitionLevels *buffer - if c.maxRepetitionLevel > 0 { - encoding := lookupLevelEncoding(header.RepetitionLevelEncoding(), c.maxRepetitionLevel) - length := header.RepetitionLevelsByteLength() - repetitionLevels, pageData, err = decodeLevelsV2(encoding, numValues, pageData, length) + if length := header.RepetitionLevelsByteLength(); length > 0 { + if c.maxRepetitionLevel == 0 { + // In some cases we've observed files which have a non-zero + // repetition level despite the column not being repeated + // (nor nested within a repeated column). + // + // See https://github.com/apache/parquet-testing/pull/24 + pageData, err = skipLevelsV2(pageData, length) + } else { + encoding := lookupLevelEncoding(header.RepetitionLevelEncoding(), c.maxRepetitionLevel) + repetitionLevels, pageData, err = decodeLevelsV2(encoding, numValues, pageData, length) + } if err != nil { return nil, fmt.Errorf("decoding repetition levels of data page v2: %w", io.ErrUnexpectedEOF) } - defer repetitionLevels.unref() + if repetitionLevels != nil { + defer repetitionLevels.unref() + } } - if c.maxDefinitionLevel > 0 { - encoding := lookupLevelEncoding(header.DefinitionLevelEncoding(), c.maxDefinitionLevel) - length := header.DefinitionLevelsByteLength() - definitionLevels, pageData, err = decodeLevelsV2(encoding, numValues, pageData, length) + if length := header.DefinitionLevelsByteLength(); length > 0 { + if c.maxDefinitionLevel == 0 { + pageData, err = skipLevelsV2(pageData, length) + } else { + encoding := lookupLevelEncoding(header.DefinitionLevelEncoding(), c.maxDefinitionLevel) + definitionLevels, pageData, err = decodeLevelsV2(encoding, numValues, pageData, length) + } if err != nil { return nil, fmt.Errorf("decoding definition levels of data page v2: %w", io.ErrUnexpectedEOF) } - defer definitionLevels.unref() + if definitionLevels != nil { + defer definitionLevels.unref() + } } if isCompressed(c.compression) && header.IsCompressed() { @@ -687,9 +702,6 @@ func decodeLevelsV1(enc encoding.Encoding, numValues int, data []byte) (*buffer, } func decodeLevelsV2(enc encoding.Encoding, numValues int, data []byte, length int64) (*buffer, []byte, error) { - if length > int64(len(data)) { - return nil, data, io.ErrUnexpectedEOF - } levels, err := decodeLevels(enc, numValues, data[:length]) return levels, data[length:], err } @@ -711,6 +723,13 @@ func decodeLevels(enc encoding.Encoding, numValues int, data []byte) (levels *bu return levels, err } +func skipLevelsV2(data []byte, length int64) ([]byte, error) { + if length >= int64(len(data)) { + return data, io.ErrUnexpectedEOF + } + return data[length:], nil +} + // DecodeDictionary decodes a data page from the header and compressed data // passed as arguments. func (c *Column) DecodeDictionary(header DictionaryPageHeader, page []byte) (Dictionary, error) { diff --git a/file.go b/file.go index 117a26f..846b912 100644 --- a/file.go +++ b/file.go @@ -696,6 +696,7 @@ func (f *filePages) Close() error { f.chunk = nil f.section = io.SectionReader{} f.rbuf = nil + f.rbufpool = nil f.baseOffset = 0 f.dataOffset = 0 f.dictOffset = 0 diff --git a/go.mod b/go.mod index 54551e2..9804d55 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/andybalholm/brotli v1.0.3 github.com/google/uuid v1.3.0 github.com/hexops/gotextdiff v1.0.3 - github.com/klauspost/compress v1.15.5 + github.com/klauspost/compress v1.15.9 github.com/olekukonko/tablewriter v0.0.5 github.com/pierrec/lz4/v4 v4.1.9 github.com/segmentio/encoding v0.3.5 diff --git a/go.sum b/go.sum index dd857c4..5109074 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= -github.com/klauspost/compress v1.15.5 h1:qyCLMz2JCrKADihKOh9FxnW3houKeNsp2h5OEz0QSEA= -github.com/klauspost/compress v1.15.5/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= diff --git a/internal/debug/debug.go b/internal/debug/debug.go index a527ae6..bc6419c 100644 --- a/internal/debug/debug.go +++ b/internal/debug/debug.go @@ -1,6 +1,7 @@ package debug import ( + "encoding/hex" "fmt" "io" ) @@ -19,7 +20,7 @@ type ioReaderAt struct { func (d *ioReaderAt) ReadAt(b []byte, off int64) (int, error) { n, err := d.reader.ReadAt(b, off) - fmt.Printf("%s: Read(%d) @%d => %d %v \n %q\n", d.prefix, len(b), off, n, err, b[:n]) + fmt.Printf("%s: Read(%d) @%d => %d %v \n%s\n", d.prefix, len(b), off, n, err, hex.Dump(b[:n])) return n, err } @@ -38,7 +39,7 @@ type ioReader struct { func (d *ioReader) Read(b []byte) (int, error) { n, err := d.reader.Read(b) - fmt.Printf("%s: Read(%d) @%d => %d %v \n %q\n", d.prefix, len(b), d.offset, n, err, b[:n]) + fmt.Printf("%s: Read(%d) @%d => %d %v \n%s\n", d.prefix, len(b), d.offset, n, err, hex.Dump(b[:n])) d.offset += int64(n) return n, err } diff --git a/testdata/rle_boolean_encoding.parquet b/testdata/rle_boolean_encoding.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9b90e01e8bb0a728ac932376a2dc24b6aa89c791 GIT binary patch literal 168 zcmWG=3^EjD6BQD*iV+nMWddS0Q5I1q1_lA<-}2oY3=oi