Skip to content

Commit

Permalink
Merge pull request #558 from Altinity/backports/24.8.8/parquet-page-h…
Browse files Browse the repository at this point in the history
…eader-v2-native-reader

24.8.8 Backport of ClickHouse#70807 parquet page header v2 native reader
  • Loading branch information
Enmk authored Dec 19, 2024
2 parents c73183a + 01b508b commit 4bd65c0
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 43 deletions.
181 changes: 138 additions & 43 deletions src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPage()
{
// refer to: ColumnReaderImplBase::ReadNewPage in column_reader.cc
// this is where decompression happens
auto cur_page = parquet_page_reader->NextPage();
switch (cur_page->type())
{
Expand Down Expand Up @@ -409,46 +410,13 @@ void ParquetLeafColReader<TColumn>::readPage()
}

template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
void ParquetLeafColReader<TColumn>::initDataReader(
parquet::Encoding::type enconding_type,
const uint8_t * buffer,
std::size_t max_size,
std::unique_ptr<RleValuesReader> && def_level_reader)
{
static parquet::LevelDecoder repetition_level_decoder;

cur_page_values = page.num_values();

// refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc
if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding());
}
const auto * buffer = page.data();
auto max_size = page.size();

if (col_descriptor.max_repetition_level() > 0)
{
auto rep_levels_bytes = repetition_level_decoder.SetData(
page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, max_size);
buffer += rep_levels_bytes;
max_size -= rep_levels_bytes;
}

assert(col_descriptor.max_definition_level() >= 0);
std::unique_ptr<RleValuesReader> def_level_reader;
if (col_descriptor.max_definition_level() > 0)
{
auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1);
auto num_bytes = ::arrow::util::SafeLoadAs<int32_t>(buffer);
auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer + 4, num_bytes);
num_bytes += 4;
buffer += num_bytes;
max_size -= num_bytes;
def_level_reader = std::make_unique<RleValuesReader>(std::move(bit_reader), bit_width);
}
else
{
def_level_reader = std::make_unique<RleValuesReader>(page.num_values());
}

switch (page.encoding())
switch (enconding_type)
{
case parquet::Encoding::PLAIN:
{
Expand Down Expand Up @@ -502,17 +470,144 @@ void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
case parquet::Encoding::DELTA_BINARY_PACKED:
case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY:
case parquet::Encoding::DELTA_BYTE_ARRAY:
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.encoding());
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", enconding_type);

default:
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", page.encoding());
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", enconding_type);
}
}

template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
{
cur_page_values = page.num_values();

// refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc
if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding());
}

const auto * buffer = page.data();
auto max_size = static_cast<std::size_t>(page.size());

if (col_descriptor.max_repetition_level() > 0)
{
if (max_size < sizeof(int32_t))
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?");
}

auto num_bytes = ::arrow::util::SafeLoadAs<int32_t>(buffer);

if (num_bytes < 0)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Number of bytes for dl is negative, corrupt?");
}

if (num_bytes + 4u > max_size)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?");
}

// not constructing level reader because we are not using it atm
num_bytes += 4;
buffer += num_bytes;
max_size -= num_bytes;
}

assert(col_descriptor.max_definition_level() >= 0);
std::unique_ptr<RleValuesReader> def_level_reader;
if (col_descriptor.max_definition_level() > 0)
{
auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1);

if (max_size < sizeof(int32_t))
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?");
}

auto num_bytes = ::arrow::util::SafeLoadAs<int32_t>(buffer);

if (num_bytes < 0)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Number of bytes for dl is negative, corrupt?");
}

if (num_bytes + 4u > max_size)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?");
}

auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer + 4, num_bytes);
num_bytes += 4;
buffer += num_bytes;
max_size -= num_bytes;
def_level_reader = std::make_unique<RleValuesReader>(std::move(bit_reader), bit_width);
}
else
{
def_level_reader = std::make_unique<RleValuesReader>(page.num_values());
}

initDataReader(page.encoding(), buffer, max_size, std::move(def_level_reader));
}

/*
* As far as I understand, the difference between page v1 and page v2 lies primarily on the below:
* 1. repetition and definition levels are not compressed;
* 2. size of repetition and definition levels is present in the header;
* 3. the encoding is always RLE
*
* Therefore, this method leverages the existing `parquet::LevelDecoder::SetDataV2` method to build the repetition level decoder.
* The data buffer is "offset-ed" by rl bytes length and then dl decoder is built using RLE decoder. Since dl bytes length was present in the header,
* there is no need to read it and apply an offset like in page v1.
* */
template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV2(const parquet::DataPageV2 & /*page*/)
void ParquetLeafColReader<TColumn>::readPageV2(const parquet::DataPageV2 & page)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "read page V2 is not implemented yet");
cur_page_values = page.num_values();

const auto * buffer = page.data();

if (page.repetition_levels_byte_length() < 0 || page.definition_levels_byte_length() < 0)
{
throw Exception(
ErrorCodes::PARQUET_EXCEPTION, "Either RL or DL is negative, this should not happen. Most likely corrupt file or parsing issue");
}

const int64_t total_levels_length =
static_cast<int64_t>(page.repetition_levels_byte_length()) +
page.definition_levels_byte_length();

if (total_levels_length > page.size())
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Data page too small for levels (corrupt header?)");
}

// ARROW-17453: Even if max_rep_level_ is 0, there may still be
// repetition level bytes written and/or reported in the header by
// some writers (e.g. Athena)
buffer += page.repetition_levels_byte_length();

assert(col_descriptor.max_definition_level() >= 0);
std::unique_ptr<RleValuesReader> def_level_reader;
if (col_descriptor.max_definition_level() > 0)
{
auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1);
auto num_bytes = page.definition_levels_byte_length();
auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer, num_bytes);
def_level_reader = std::make_unique<RleValuesReader>(std::move(bit_reader), bit_width);
}
else
{
def_level_reader = std::make_unique<RleValuesReader>(page.num_values());
}

buffer += page.definition_levels_byte_length();

initDataReader(page.encoding(), buffer, page.size() - total_levels_length, std::move(def_level_reader));
}

template <typename TColumn>
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class ParquetLeafColReader : public ParquetColumnReader
void readPage();
void readPageV1(const parquet::DataPageV1 & page);
void readPageV2(const parquet::DataPageV2 & page);
void initDataReader(parquet::Encoding::type enconding_type,
const uint8_t * buffer,
std::size_t max_size,
std::unique_ptr<RleValuesReader> && def_level_reader);

std::unique_ptr<ParquetDataValuesReader> createDictReader(
std::unique_ptr<RleValuesReader> def_level_reader, std::unique_ptr<RleValuesReader> rle_data_reader);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
abc 2
abc 2
abc 3
abc 4
\N 5
abc 2
abc 2
abc 3
abc 4
\N 5
23 changes: 23 additions & 0 deletions tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')

WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}"

mkdir -p "${WORKING_DIR}"

DATA_FILE="${CUR_DIR}/data_parquet/datapage_v2.snappy.parquet"

DATA_FILE_USER_PATH="${WORKING_DIR}/datapage_v2.snappy.parquet"

cp ${DATA_FILE} ${DATA_FILE_USER_PATH}

# Not reading all columns because some data types and encodings are not supported by native reader yet
# TODO read all columns once implemented
${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) order by c SETTINGS input_format_parquet_use_native_reader=false;"
${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) order by c SETTINGS input_format_parquet_use_native_reader=true;"

0 comments on commit 4bd65c0

Please sign in to comment.