diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp index 2dcd01737ed3..c3ced4a47b0f 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp @@ -370,6 +370,7 @@ template void ParquetLeafColReader::readPage() { // refer to: ColumnReaderImplBase::ReadNewPage in column_reader.cc + // this is where decompression happens auto cur_page = parquet_page_reader->NextPage(); switch (cur_page->type()) { @@ -409,46 +410,13 @@ void ParquetLeafColReader::readPage() } template -void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) +void ParquetLeafColReader::initDataReader( + parquet::Encoding::type enconding_type, + const uint8_t * buffer, + std::size_t max_size, + std::unique_ptr && def_level_reader) { - static parquet::LevelDecoder repetition_level_decoder; - - cur_page_values = page.num_values(); - - // refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc - if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0) - { - throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding()); - } - const auto * buffer = page.data(); - auto max_size = page.size(); - - if (col_descriptor.max_repetition_level() > 0) - { - auto rep_levels_bytes = repetition_level_decoder.SetData( - page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, max_size); - buffer += rep_levels_bytes; - max_size -= rep_levels_bytes; - } - - assert(col_descriptor.max_definition_level() >= 0); - std::unique_ptr def_level_reader; - if (col_descriptor.max_definition_level() > 0) - { - auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1); - auto num_bytes = ::arrow::util::SafeLoadAs(buffer); - auto bit_reader = std::make_unique(buffer + 4, num_bytes); - num_bytes += 4; - buffer += num_bytes; - max_size -= num_bytes; - def_level_reader = std::make_unique(std::move(bit_reader), bit_width); - } - else - { - def_level_reader = std::make_unique(page.num_values()); - } - - switch (page.encoding()) + switch (enconding_type) { case parquet::Encoding::PLAIN: { @@ -502,17 +470,144 @@ void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) case parquet::Encoding::DELTA_BINARY_PACKED: case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: case parquet::Encoding::DELTA_BYTE_ARRAY: - throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.encoding()); + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", enconding_type); default: - throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", page.encoding()); + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", enconding_type); + } +} + +template +void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) +{ + cur_page_values = page.num_values(); + + // refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc + if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding()); + } + + const auto * buffer = page.data(); + auto max_size = static_cast(page.size()); + + if (col_descriptor.max_repetition_level() > 0) + { + if (max_size < sizeof(int32_t)) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?"); + } + + auto num_bytes = ::arrow::util::SafeLoadAs(buffer); + + if (num_bytes < 0) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Number of bytes for dl is negative, corrupt?"); + } + + if (num_bytes + 4u > max_size) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?"); + } + + // not constructing level reader because we are not using it atm + num_bytes += 4; + buffer += num_bytes; + max_size -= num_bytes; + } + + assert(col_descriptor.max_definition_level() >= 0); + std::unique_ptr def_level_reader; + if (col_descriptor.max_definition_level() > 0) + { + auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1); + + if (max_size < sizeof(int32_t)) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?"); + } + + auto num_bytes = ::arrow::util::SafeLoadAs(buffer); + + if (num_bytes < 0) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Number of bytes for dl is negative, corrupt?"); + } + + if (num_bytes + 4u > max_size) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?"); + } + + auto bit_reader = std::make_unique(buffer + 4, num_bytes); + num_bytes += 4; + buffer += num_bytes; + max_size -= num_bytes; + def_level_reader = std::make_unique(std::move(bit_reader), bit_width); } + else + { + def_level_reader = std::make_unique(page.num_values()); + } + + initDataReader(page.encoding(), buffer, max_size, std::move(def_level_reader)); } +/* + * As far as I understand, the difference between page v1 and page v2 lies primarily on the below: + * 1. repetition and definition levels are not compressed; + * 2. size of repetition and definition levels is present in the header; + * 3. the encoding is always RLE + * + * Therefore, this method leverages the existing `parquet::LevelDecoder::SetDataV2` method to build the repetition level decoder. + * The data buffer is "offset-ed" by rl bytes length and then dl decoder is built using RLE decoder. Since dl bytes length was present in the header, + * there is no need to read it and apply an offset like in page v1. + * */ template -void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & /*page*/) +void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & page) { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "read page V2 is not implemented yet"); + cur_page_values = page.num_values(); + + const auto * buffer = page.data(); + + if (page.repetition_levels_byte_length() < 0 || page.definition_levels_byte_length() < 0) + { + throw Exception( + ErrorCodes::PARQUET_EXCEPTION, "Either RL or DL is negative, this should not happen. Most likely corrupt file or parsing issue"); + } + + const int64_t total_levels_length = + static_cast(page.repetition_levels_byte_length()) + + page.definition_levels_byte_length(); + + if (total_levels_length > page.size()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Data page too small for levels (corrupt header?)"); + } + + // ARROW-17453: Even if max_rep_level_ is 0, there may still be + // repetition level bytes written and/or reported in the header by + // some writers (e.g. Athena) + buffer += page.repetition_levels_byte_length(); + + assert(col_descriptor.max_definition_level() >= 0); + std::unique_ptr def_level_reader; + if (col_descriptor.max_definition_level() > 0) + { + auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1); + auto num_bytes = page.definition_levels_byte_length(); + auto bit_reader = std::make_unique(buffer, num_bytes); + def_level_reader = std::make_unique(std::move(bit_reader), bit_width); + } + else + { + def_level_reader = std::make_unique(page.num_values()); + } + + buffer += page.definition_levels_byte_length(); + + initDataReader(page.encoding(), buffer, page.size() - total_levels_length, std::move(def_level_reader)); } template diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h index c5b14132f176..e1eb7702def1 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h @@ -54,6 +54,10 @@ class ParquetLeafColReader : public ParquetColumnReader void readPage(); void readPageV1(const parquet::DataPageV1 & page); void readPageV2(const parquet::DataPageV2 & page); + void initDataReader(parquet::Encoding::type enconding_type, + const uint8_t * buffer, + std::size_t max_size, + std::unique_ptr && def_level_reader); std::unique_ptr createDictReader( std::unique_ptr def_level_reader, std::unique_ptr rle_data_reader); diff --git a/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference new file mode 100644 index 000000000000..3ae545099d6a --- /dev/null +++ b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.reference @@ -0,0 +1,10 @@ +abc 2 +abc 2 +abc 3 +abc 4 +\N 5 +abc 2 +abc 2 +abc 3 +abc 4 +\N 5 diff --git a/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh new file mode 100755 index 000000000000..217491534e99 --- /dev/null +++ b/tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}') + +WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}" + +mkdir -p "${WORKING_DIR}" + +DATA_FILE="${CUR_DIR}/data_parquet/datapage_v2.snappy.parquet" + +DATA_FILE_USER_PATH="${WORKING_DIR}/datapage_v2.snappy.parquet" + +cp ${DATA_FILE} ${DATA_FILE_USER_PATH} + +# Not reading all columns because some data types and encodings are not supported by native reader yet +# TODO read all columns once implemented +${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) order by c SETTINGS input_format_parquet_use_native_reader=false;" +${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) order by c SETTINGS input_format_parquet_use_native_reader=true;"