Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support filtered I/O in chunked_parquet_reader and simplify the use of parquet_reader_options #15764

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,9 @@ class reader {
/**
* @brief Reads the dataset as per given options.
*
* @param options Settings for controlling reading behavior
*
* @return The set of columns along with table metadata
*/
table_with_metadata read(parquet_reader_options const& options);
table_with_metadata read();
};

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ table_with_metadata read_parquet(parquet_reader_options const& options,
auto reader =
std::make_unique<detail_parquet::reader>(std::move(datasources), options, stream, mr);

return reader->read(options);
return reader->read();
}

parquet_metadata read_parquet_metadata(source_info const& src_info)
Expand Down
12 changes: 1 addition & 11 deletions cpp/src/io/parquet/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,7 @@ reader::reader(std::vector<std::unique_ptr<datasource>>&& sources,

reader::~reader() = default;

table_with_metadata reader::read(parquet_reader_options const& options)
{
// if the user has specified custom row bounds
bool const uses_custom_row_bounds =
options.get_num_rows().has_value() || options.get_skip_rows() != 0;
return _impl->read(options.get_skip_rows(),
options.get_num_rows(),
uses_custom_row_bounds,
options.get_row_groups(),
options.get_filter());
}
table_with_metadata reader::read() { return _impl->read(); }

chunked_reader::chunked_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
Expand Down
91 changes: 41 additions & 50 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ inline bool is_treat_fixed_length_as_string(thrust::optional<LogicalType> const&

} // namespace

void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows)
void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_rows)
{
auto& pass = *_pass_itm_data;
auto& subpass = *pass.subpass;
Expand Down Expand Up @@ -88,7 +88,7 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
is_treat_fixed_length_as_string(chunk.logical_type);
});

if (!_has_page_index || uses_custom_row_bounds || has_flba) {
if (!_has_page_index || uses_custom_row_bounds(mode) || has_flba) {
ComputePageStringSizes(subpass.pages,
pass.chunks,
delta_temp_buf,
Expand Down Expand Up @@ -419,6 +419,11 @@ reader::impl::impl(std::size_t chunk_read_limit,
rmm::device_async_resource_ref mr)
: _stream{stream},
_mr{mr},
_options{options.get_timestamp_type(),
options.get_skip_rows(),
options.get_num_rows(),
options.get_row_groups(),
options.get_filter()},
_sources{std::move(sources)},
_output_chunk_read_limit{chunk_read_limit},
_input_pass_read_limit{pass_read_limit}
Expand All @@ -427,11 +432,6 @@ reader::impl::impl(std::size_t chunk_read_limit,
_metadata =
std::make_unique<aggregate_reader_metadata>(_sources, options.is_enabled_use_arrow_schema());

// Override output timestamp resolution if requested
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
if (options.get_timestamp_type().id() != type_id::EMPTY) {
_timestamp_type = options.get_timestamp_type();
}

// Strings may be returned as either string or categorical columns
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();

Expand All @@ -452,34 +452,28 @@ reader::impl::impl(std::size_t chunk_read_limit,
filter_columns_names,
options.is_enabled_use_pandas_metadata(),
_strings_to_categorical,
_timestamp_type.id());
_options.timestamp_type.id());

// Save the states of the output buffers for reuse in `chunk_read()`.
for (auto const& buff : _output_buffers) {
_output_buffers_template.emplace_back(cudf::io::detail::inline_column_buffer::empty_like(buff));
}
}

void reader::impl::prepare_data(int64_t skip_rows,
std::optional<size_type> const& num_rows,
bool uses_custom_row_bounds,
host_span<std::vector<size_type> const> row_group_indices,
std::optional<std::reference_wrapper<ast::expression const>> filter)
void reader::impl::prepare_data(read_mode mode)
{
// if we have not preprocessed at the whole-file level, do that now
if (!_file_preprocessed) {
// setup file level information
// - read row group information
// - setup information on (parquet) chunks
// - compute schedule of input passes
preprocess_file(skip_rows, num_rows, row_group_indices, filter);
preprocess_file(mode);
}

// handle any chunking work (ratcheting through the subpasses and chunks within
// our current pass) if in bounds
if (_file_itm_data._current_input_pass < _file_itm_data.num_passes()) {
handle_chunking(uses_custom_row_bounds);
}
if (_file_itm_data._current_input_pass < _file_itm_data.num_passes()) { handle_chunking(mode); }
}

void reader::impl::populate_metadata(table_metadata& out_metadata)
Expand All @@ -498,8 +492,7 @@ void reader::impl::populate_metadata(table_metadata& out_metadata)
out_metadata.per_file_user_data[0].end()};
}

table_with_metadata reader::impl::read_chunk_internal(
bool uses_custom_row_bounds, std::optional<std::reference_wrapper<ast::expression const>> filter)
table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
{
// If `_output_metadata` has been constructed, just copy it over.
auto out_metadata = _output_metadata ? table_metadata{*_output_metadata} : table_metadata{};
Expand All @@ -510,17 +503,17 @@ table_with_metadata reader::impl::read_chunk_internal(
out_columns.reserve(_output_buffers.size());

// no work to do (this can happen on the first pass if we have no rows to read)
if (!has_more_work()) { return finalize_output(out_metadata, out_columns, filter); }
if (!has_more_work()) { return finalize_output(out_metadata, out_columns); }

auto& pass = *_pass_itm_data;
auto& subpass = *pass.subpass;
auto const& read_info = subpass.output_chunk_read_info[subpass.current_output_chunk];

// Allocate memory buffers for the output columns.
allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds);
allocate_columns(mode, read_info.skip_rows, read_info.num_rows);

// Parse data into the output buffers.
decode_page_data(uses_custom_row_bounds, read_info.skip_rows, read_info.num_rows);
decode_page_data(mode, read_info.skip_rows, read_info.num_rows);

// Create the final output cudf columns.
for (size_t i = 0; i < _output_buffers.size(); ++i) {
Expand All @@ -547,13 +540,11 @@ table_with_metadata reader::impl::read_chunk_internal(
}

// Add empty columns if needed. Filter output columns based on filter.
return finalize_output(out_metadata, out_columns, filter);
return finalize_output(out_metadata, out_columns);
}

table_with_metadata reader::impl::finalize_output(
table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns,
std::optional<std::reference_wrapper<ast::expression const>> filter)
table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns)
{
// Create empty columns as needed (this can happen if we've ended up with no actual data to read)
for (size_t i = out_columns.size(); i < _output_buffers.size(); ++i) {
Expand Down Expand Up @@ -581,10 +572,10 @@ table_with_metadata reader::impl::finalize_output(
// increment the output chunk count
_file_itm_data._output_chunk_count++;

if (filter.has_value()) {
if (_output_filter.has_value()) {
auto read_table = std::make_unique<table>(std::move(out_columns));
auto predicate = cudf::detail::compute_column(
*read_table, filter.value().get(), _stream, rmm::mr::get_current_device_resource());
*read_table, _output_filter.value().get(), _stream, rmm::mr::get_current_device_resource());
CUDF_EXPECTS(predicate->view().type().id() == type_id::BOOL8,
"Predicate filter should return a boolean");
// Exclude columns present in filter only in output
Expand All @@ -598,22 +589,19 @@ table_with_metadata reader::impl::finalize_output(
return {std::make_unique<table>(std::move(out_columns)), std::move(out_metadata)};
}

table_with_metadata reader::impl::read(
int64_t skip_rows,
std::optional<size_type> const& num_rows,
bool uses_custom_row_bounds,
host_span<std::vector<size_type> const> row_group_indices,
std::optional<std::reference_wrapper<ast::expression const>> filter)
table_with_metadata reader::impl::read()
{
CUDF_EXPECTS(_output_chunk_read_limit == 0,
"Reading the whole file must not have non-zero byte_limit.");

// Save the output filter for use in `finalize_output()`
table_metadata metadata;
populate_metadata(metadata);
auto expr_conv = named_to_reference_converter(filter, metadata);
auto output_filter = expr_conv.get_converted_expr();
auto expr_conv = named_to_reference_converter(_options.filter, metadata);
_output_filter = expr_conv.get_converted_expr();

prepare_data(skip_rows, num_rows, uses_custom_row_bounds, row_group_indices, output_filter);
return read_chunk_internal(uses_custom_row_bounds, output_filter);
prepare_data(read_mode::READ_ALL);
return read_chunk_internal(read_mode::READ_ALL);
}

table_with_metadata reader::impl::read_chunk()
Expand All @@ -628,22 +616,25 @@ table_with_metadata reader::impl::read_chunk()
}
}

prepare_data(0 /*skip_rows*/,
std::nullopt /*num_rows, `nullopt` means unlimited*/,
true /*uses_custom_row_bounds*/,
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);
// Save the output filter for use in `finalize_output()`
table_metadata metadata;
populate_metadata(metadata);
auto expr_conv = named_to_reference_converter(_options.filter, metadata);
_output_filter = expr_conv.get_converted_expr();

return read_chunk_internal(true, std::nullopt);
prepare_data(read_mode::CHUNKED_READ);
return read_chunk_internal(read_mode::CHUNKED_READ);
}

bool reader::impl::has_next()
{
prepare_data(0 /*skip_rows*/,
std::nullopt /*num_rows, `nullopt` means unlimited*/,
true /*uses_custom_row_bounds*/,
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);
// Save the output filter for use in `finalize_output()`
table_metadata metadata;
populate_metadata(metadata);
auto expr_conv = named_to_reference_converter(_options.filter, metadata);
_output_filter = expr_conv.get_converted_expr();
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

prepare_data(read_mode::CHUNKED_READ);

// current_input_pass will only be incremented to be == num_passes after
// the last chunk in the last subpass in the last pass has been returned
Expand Down
Loading
Loading