Skip to content

Commit

Permalink
Add one-level list encoding support in parquet reader (#9848)
Browse files Browse the repository at this point in the history
Closes #9240

This PR added the [one-level list encoding](https://github.com/apache/parquet-cpp/blob/master/src/parquet/schema.h#L43-L77) support in parquet reader. It also involved cleanups like removing the unused stream argument and fixing typos in docs/comments.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Devavret Makkar (https://github.com/devavret)
  - Vukasin Milovanovic (https://github.com/vuule)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #9848
  • Loading branch information
PointKernel authored Dec 10, 2021
1 parent d7ce106 commit 9435945
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 15 deletions.
2 changes: 0 additions & 2 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ class reader {
*
* @param sources Input `datasource` objects to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ constexpr size_type default_row_group_size_rows = 1000000;
class parquet_reader_options_builder;

/**
* @brief Settings or `read_parquet()`.
* @brief Settings for `read_parquet()`.
*/
class parquet_reader_options {
source_info _source;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,7 @@ table_with_metadata read_parquet(parquet_reader_options const& options,
CUDF_FUNC_RANGE();

auto datasources = make_datasources(options.get_source());
auto reader = std::make_unique<detail_parquet::reader>(
std::move(datasources), options, rmm::cuda_stream_default, mr);
auto reader = std::make_unique<detail_parquet::reader>(std::move(datasources), options, mr);

return reader->read(options);
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1557,10 +1557,10 @@ extern "C" __global__ void __launch_bounds__(block_size)

bool has_repetition = s->col.max_level[level_type::REPETITION] > 0;

// optimization : it might be useful to have a version of gpuDecodeStream that could go
// wider than 1 warp. Currently it only only uses 1 warp so that it can overlap work
// with the value decoding step when in the actual value decoding kernel. however during
// this preprocess step we have no such limits - we could go as wide as block_size
// optimization : it might be useful to have a version of gpuDecodeStream that could go wider than
// 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step
// when in the actual value decoding kernel. However, during this preprocess step we have no such
// limits - we could go as wide as block_size
if (t < 32) {
constexpr int batch_size = 32;
int target_input_count = batch_size;
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ struct SchemaElement {
// };
// }
bool is_stub() const { return repetition_type == REPEATED && num_children == 1; }

// https://github.com/apache/parquet-cpp/blob/642da05/src/parquet/schema.h#L49-L50
// One-level LIST encoding: Only allows required lists with required cells:
// repeated value_type name
bool is_one_level_list() const { return repetition_type == REPEATED and num_children == 0; }

// in parquet terms, a group is a level of nesting in the schema. a group
// can be a struct or a list
bool is_struct() const
Expand Down
45 changes: 39 additions & 6 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ type_id to_type_id(SchemaElement const& schema,
return type_id::EMPTY;
}

/**
* @brief Converts cuDF type enum to column logical type
*/
data_type to_data_type(type_id t_id, SchemaElement const& schema)
{
return t_id == type_id::DECIMAL32 || t_id == type_id::DECIMAL64 || t_id == type_id::DECIMAL128
? data_type{t_id, numeric::scale_type{-schema.decimal_scale}}
: data_type{t_id};
}

/**
* @brief Function that returns the required the number of bits to store a value
*/
Expand Down Expand Up @@ -414,6 +424,9 @@ class aggregate_metadata {
// walk upwards, skipping repeated fields
while (schema_index > 0) {
if (!pfm.schema[schema_index].is_stub()) { depth++; }
// schema of one-level encoding list doesn't contain nesting information, so we need to
// manually add an extra nesting level
if (pfm.schema[schema_index].is_one_level_list()) { depth++; }
schema_index = pfm.schema[schema_index].parent_idx;
}
return depth;
Expand Down Expand Up @@ -596,11 +609,11 @@ class aggregate_metadata {
}

// if we're at the root, this is a new output column
auto const col_type = to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 ||
col_type == type_id::DECIMAL128
? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}}
: data_type{col_type};
auto const col_type =
schema_elem.is_one_level_list()
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = to_data_type(col_type, schema_elem);

column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL);
// store the index of this element if inserted in out_col_array
Expand Down Expand Up @@ -630,6 +643,23 @@ class aggregate_metadata {
if (schema_elem.num_children == 0) {
input_column_info& input_col =
input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name});

// set up child output column for one-level encoding list
if (schema_elem.is_one_level_list()) {
// determine the element data type
auto const element_type =
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const element_dtype = to_data_type(element_type, schema_elem);

column_buffer element_col(element_dtype, schema_elem.repetition_type == OPTIONAL);
// store the index of this element
nesting.push_back(static_cast<int>(output_col.children.size()));
// TODO: not sure if we should assign a name or leave it blank
element_col.name = "element";

output_col.children.push_back(std::move(element_col));
}

std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting));
path_is_valid = true; // If we're able to reach leaf then path is valid
}
Expand Down Expand Up @@ -850,6 +880,10 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
// if this is a repeated field, map it one level deeper
shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth;
}
// if it's one-level encoding list
else if (cur_schema.is_one_level_list()) {
shallowest = cur_depth - 1;
}
if (!cur_schema.is_stub()) { cur_depth--; }
schema_idx = cur_schema.parent_idx;
}
Expand Down Expand Up @@ -1770,7 +1804,6 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Forward to implementation
reader::reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: _impl(std::make_unique<impl>(std::move(sources), options, mr))
{
Expand Down
Binary file not shown.
9 changes: 9 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2269,6 +2269,15 @@ def test_parquet_reader_brotli(datadir):
assert_eq(expect, got)


def test_parquet_reader_one_level_list(datadir):
fname = datadir / "one_level_list.parquet"

expect = pd.read_parquet(fname)
got = cudf.read_parquet(fname).to_pandas(nullable=True)

assert_eq(expect, got)


@pytest.mark.parametrize("size_bytes", [4_000_000, 1_000_000, 600_000])
@pytest.mark.parametrize("size_rows", [1_000_000, 100_000, 10_000])
def test_parquet_writer_row_group_size(
Expand Down

0 comments on commit 9435945

Please sign in to comment.