Skip to content

Commit

Permalink
Fix parquet predicate filtering with column projection (#15113)
Browse files Browse the repository at this point in the history
Fixes #15051

The predicate filtering in parquet did not work while column projection is used. This PR fixes that limitation.

With this PR change, the user will be able to use both column name reference and column index reference in the filter.
- column name reference: the filters may specify any columns by name even if they are not present in column projection.
- column reference (index): The indices used should be the indices of output columns in the requested order.

This is achieved by extracting column names from filter and add to output buffers, after predicate filtering is done, these filter-only columns are removed and only requested columns are returned.
The change includes reading only output columns' statistics data instead of all root columns.

Summary of changes:
- `get_column_names_in_expression` extracts column names in filter.
- The extra columns in filter are added to output buffers during reader initialization
  - `cpp/src/io/parquet/reader_impl_helpers.cpp`, `cpp/src/io/parquet/reader_impl.cpp`
- instead of extracting statistics data of all root columns, it extracts for only output columns (including columns in filter)
  - `cpp/src/io/parquet/predicate_pushdown.cpp`
  - To do this, output column schemas and its dtypes should be cached.
  - statistics data extraction code is updated to check for `schema_idx` in row group metadata.
  - No need to convert filter again for all root columns, reuse the passed output columns reference filter. 
  - Rest of the code is same.
- After the output filter predicate is calculated, these filter-only columns are removed
- moved `named_to_reference_converter` constructor to cpp, and remove used constructor.
- small include<> cleanup

Authors:
  - Karthikeyan (https://github.com/karthikeyann)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

URL: #15113
  • Loading branch information
karthikeyann authored May 16, 2024
1 parent c7fe7fe commit 47ed345
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 65 deletions.
29 changes: 26 additions & 3 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,31 @@ class parquet_reader_options {
/**
* @brief Sets AST based filter for predicate pushdown.
*
* The filter can utilize cudf::ast::column_name_reference to reference a column by its name,
* even if it's not necessarily present in the requested projected columns.
* To refer to output column indices, you can use cudf::ast::column_reference.
*
* For a parquet with columns ["A", "B", "C", ... "X", "Y", "Z"],
* Example 1: with/without column projection
* @code
* use_columns({"A", "X", "Z"})
* .filter(operation(ast_operator::LESS, column_name_reference{"C"}, literal{100}));
* @endcode
* Column "C" need not be present in output table.
* Example 2: without column projection
* @code
* filter(operation(ast_operator::LESS, column_reference{1}, literal{100}));
* @endcode
* Here, `1` will refer to column "B" because output will contain all columns in
* order ["A", ..., "Z"].
* Example 3: with column projection
* @code
* use_columns({"A", "Z", "X"})
* .filter(operation(ast_operator::LESS, column_reference{1}, literal{100}));
* @endcode
* Here, `1` will refer to column "Z" because output will contain 3 columns in
* order ["A", "Z", "X"].
*
* @param filter AST expression to use as filter
*/
void set_filter(ast::expression const& filter) { _filter = filter; }
Expand Down Expand Up @@ -309,9 +334,7 @@ class parquet_reader_options_builder {
}

/**
* @brief Sets vector of individual row groups to read.
*
* @param filter Vector of row groups to read
* @copydoc parquet_reader_options::set_filter
* @return this for chaining
*/
parquet_reader_options_builder& filter(ast::expression const& filter)
Expand Down
139 changes: 122 additions & 17 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/resource_ref.hpp>

#include <thrust/iterator/counting_iterator.h>

#include <algorithm>
#include <list>
#include <numeric>
#include <optional>
#include <unordered_set>

namespace cudf::io::parquet::detail {

Expand Down Expand Up @@ -127,7 +129,7 @@ struct stats_caster {
// Creates device columns from column statistics (min, max)
template <typename T>
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> operator()(
size_t col_idx,
int schema_idx,
cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
Expand Down Expand Up @@ -206,22 +208,31 @@ struct stats_caster {
}; // local struct host_column
host_column min(total_row_groups);
host_column max(total_row_groups);

size_type stats_idx = 0;
for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) {
for (auto const rg_idx : row_group_indices[src_idx]) {
auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx];
auto const& colchunk = row_group.columns[col_idx];
// To support deprecated min, max fields.
auto const& min_value = colchunk.meta_data.statistics.min_value.has_value()
? colchunk.meta_data.statistics.min_value
: colchunk.meta_data.statistics.min;
auto const& max_value = colchunk.meta_data.statistics.max_value.has_value()
? colchunk.meta_data.statistics.max_value
: colchunk.meta_data.statistics.max;
// translate binary data to Type then to <T>
min.set_index(stats_idx, min_value, colchunk.meta_data.type);
max.set_index(stats_idx, max_value, colchunk.meta_data.type);
auto col = std::find_if(
row_group.columns.begin(),
row_group.columns.end(),
[schema_idx](ColumnChunk const& col) { return col.schema_idx == schema_idx; });
if (col != std::end(row_group.columns)) {
auto const& colchunk = *col;
// To support deprecated min, max fields.
auto const& min_value = colchunk.meta_data.statistics.min_value.has_value()
? colchunk.meta_data.statistics.min_value
: colchunk.meta_data.statistics.min;
auto const& max_value = colchunk.meta_data.statistics.max_value.has_value()
? colchunk.meta_data.statistics.max_value
: colchunk.meta_data.statistics.max;
// translate binary data to Type then to <T>
min.set_index(stats_idx, min_value, colchunk.meta_data.type);
max.set_index(stats_idx, max_value, colchunk.meta_data.type);
} else {
// Marking it null, if column present in row group
min.set_index(stats_idx, thrust::nullopt, {});
max.set_index(stats_idx, thrust::nullopt, {});
}
stats_idx++;
}
};
Expand Down Expand Up @@ -378,6 +389,7 @@ class stats_expression_converter : public ast::detail::expression_transformer {
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::filter_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const
{
Expand Down Expand Up @@ -412,7 +424,8 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
std::vector<std::unique_ptr<column>> columns;
stats_caster stats_col{total_row_groups, per_file_metadata, input_row_group_indices};
for (size_t col_idx = 0; col_idx < output_dtypes.size(); col_idx++) {
auto const& dtype = output_dtypes[col_idx];
auto const schema_idx = output_column_schemas[col_idx];
auto const& dtype = output_dtypes[col_idx];
// Only comparable types except fixed point are supported.
if (cudf::is_compound(dtype) && dtype.id() != cudf::type_id::STRING) {
// placeholder only for unsupported types.
Expand All @@ -423,14 +436,14 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
continue;
}
auto [min_col, max_col] =
cudf::type_dispatcher<dispatch_storage_type>(dtype, stats_col, col_idx, dtype, stream, mr);
cudf::type_dispatcher<dispatch_storage_type>(dtype, stats_col, schema_idx, dtype, stream, mr);
columns.push_back(std::move(min_col));
columns.push_back(std::move(max_col));
}
auto stats_table = cudf::table(std::move(columns));

// Converts AST to StatsAST with reference to min, max columns in above `stats_table`.
stats_expression_converter stats_expr{filter, static_cast<size_type>(output_dtypes.size())};
stats_expression_converter stats_expr{filter.get(), static_cast<size_type>(output_dtypes.size())};
auto stats_ast = stats_expr.get_stats_expr();
auto predicate_col = cudf::detail::compute_column(stats_table, stats_ast.get(), stream, mr);
auto predicate = predicate_col->view();
Expand Down Expand Up @@ -475,6 +488,20 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
}

// convert column named expression to column index reference expression
named_to_reference_converter::named_to_reference_converter(
std::optional<std::reference_wrapper<ast::expression const>> expr, table_metadata const& metadata)
{
if (!expr.has_value()) return;
// create map for column name.
std::transform(metadata.schema_info.cbegin(),
metadata.schema_info.cend(),
thrust::counting_iterator<size_t>(0),
std::inserter(column_name_to_index, column_name_to_index.end()),
[](auto const& sch, auto index) { return std::make_pair(sch.name, index); });

expr.value().get().accept(*this);
}

std::reference_wrapper<ast::expression const> named_to_reference_converter::visit(
ast::literal const& expr)
{
Expand Down Expand Up @@ -530,4 +557,82 @@ named_to_reference_converter::visit_operands(
return transformed_operands;
}

/**
* @brief Converts named columns to index reference columns
*
*/
class names_from_expression : public ast::detail::expression_transformer {
public:
names_from_expression(std::optional<std::reference_wrapper<ast::expression const>> expr,
std::vector<std::string> const& skip_names)
: _skip_names(skip_names.cbegin(), skip_names.cend())
{
if (!expr.has_value()) return;
expr.value().get().accept(*this);
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::literal const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::literal const& expr) override
{
return expr;
}
/**
* @copydoc ast::detail::expression_transformer::visit(ast::column_reference const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::column_reference const& expr) override
{
return expr;
}
/**
* @copydoc ast::detail::expression_transformer::visit(ast::column_name_reference const& )
*/
std::reference_wrapper<ast::expression const> visit(
ast::column_name_reference const& expr) override
{
// collect column names
auto col_name = expr.get_column_name();
if (_skip_names.count(col_name) == 0) { _column_names.insert(col_name); }
return expr;
}
/**
* @copydoc ast::detail::expression_transformer::visit(ast::operation const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::operation const& expr) override
{
visit_operands(expr.get_operands());
return expr;
}

/**
* @brief Returns the column names in AST.
*
* @return AST operation expression
*/
[[nodiscard]] std::vector<std::string> to_vector() &&
{
return {std::make_move_iterator(_column_names.begin()),
std::make_move_iterator(_column_names.end())};
}

private:
void visit_operands(std::vector<std::reference_wrapper<ast::expression const>> operands)
{
for (auto const& operand : operands) {
operand.get().accept(*this);
}
}

std::unordered_set<std::string> _column_names;
std::unordered_set<std::string> _skip_names;
};

[[nodiscard]] std::vector<std::string> get_column_names_in_expression(
std::optional<std::reference_wrapper<ast::expression const>> expr,
std::vector<std::string> const& skip_names)
{
return names_from_expression(expr, skip_names).to_vector();
}

} // namespace cudf::io::parquet::detail
20 changes: 18 additions & 2 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#include <rmm/resource_ref.hpp>

#include <thrust/iterator/counting_iterator.h>

#include <bitset>
#include <numeric>

Expand Down Expand Up @@ -436,9 +438,18 @@ reader::impl::impl(std::size_t chunk_read_limit,
// Binary columns can be read as binary or strings
_reader_column_schema = options.get_column_schema();

// Select only columns required by the options
// Select only columns required by the options and filter
std::optional<std::vector<std::string>> filter_columns_names;
if (options.get_filter().has_value() and options.get_columns().has_value()) {
// list, struct, dictionary are not supported by AST filter yet.
// extract columns not present in get_columns() & keep count to remove at end.
filter_columns_names =
get_column_names_in_expression(options.get_filter(), *(options.get_columns()));
_num_filter_only_columns = filter_columns_names->size();
}
std::tie(_input_columns, _output_buffers, _output_column_schemas) =
_metadata->select_columns(options.get_columns(),
filter_columns_names,
options.is_enabled_use_pandas_metadata(),
_strings_to_categorical,
_timestamp_type.id());
Expand Down Expand Up @@ -572,7 +583,12 @@ table_with_metadata reader::impl::finalize_output(
*read_table, filter.value().get(), _stream, rmm::mr::get_current_device_resource());
CUDF_EXPECTS(predicate->view().type().id() == type_id::BOOL8,
"Predicate filter should return a boolean");
auto output_table = cudf::detail::apply_boolean_mask(*read_table, *predicate, _stream, _mr);
// Exclude columns present in filter only in output
auto counting_it = thrust::make_counting_iterator<std::size_t>(0);
auto const output_count = read_table->num_columns() - _num_filter_only_columns;
auto only_output = read_table->select(counting_it, counting_it + output_count);
auto output_table = cudf::detail::apply_boolean_mask(only_output, *predicate, _stream, _mr);
if (_num_filter_only_columns > 0) { out_metadata.schema_info.resize(output_count); }
return {std::move(output_table), std::move(out_metadata)};
}
return {std::make_unique<table>(std::move(out_columns)), std::move(out_metadata)};
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ class reader::impl {
// _output_buffers associated metadata
std::unique_ptr<table_metadata> _output_metadata;

// number of extra filter columns
std::size_t _num_filter_only_columns{0};

bool _strings_to_categorical = false;

// are there usable page indexes available
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "compact_protocol_reader.hpp"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/utilities/config_utils.hpp"
#include "io/utilities/time_utils.cuh"
Expand Down
37 changes: 24 additions & 13 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "reader_impl_helpers.hpp"

#include "compact_protocol_reader.hpp"
#include "io/parquet/parquet.hpp"
#include "io/utilities/base64_utilities.hpp"
#include "io/utilities/row_selection.hpp"
Expand All @@ -25,6 +26,7 @@
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/zip_iterator.h>

#include <functional>
#include <numeric>
#include <regex>

Expand Down Expand Up @@ -954,13 +956,15 @@ aggregate_reader_metadata::select_row_groups(
int64_t skip_rows_opt,
std::optional<size_type> const& num_rows_opt,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const
{
std::optional<std::vector<std::vector<size_type>>> filtered_row_group_indices;
// if filter is not empty, then gather row groups to read after predicate pushdown
if (filter.has_value()) {
filtered_row_group_indices =
filter_row_groups(row_group_indices, output_dtypes, filter.value(), stream);
filtered_row_group_indices = filter_row_groups(
row_group_indices, output_dtypes, output_column_schemas, filter.value(), stream);
if (filtered_row_group_indices.has_value()) {
row_group_indices =
host_span<std::vector<size_type> const>(filtered_row_group_indices.value());
Expand Down Expand Up @@ -1017,10 +1021,12 @@ aggregate_reader_metadata::select_row_groups(
std::tuple<std::vector<input_column_info>,
std::vector<cudf::io::detail::inline_column_buffer>,
std::vector<size_type>>
aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>> const& use_names,
bool include_index,
bool strings_to_categorical,
type_id timestamp_type_id) const
aggregate_reader_metadata::select_columns(
std::optional<std::vector<std::string>> const& use_names,
std::optional<std::vector<std::string>> const& filter_columns_names,
bool include_index,
bool strings_to_categorical,
type_id timestamp_type_id) const
{
auto find_schema_child = [&](SchemaElement const& schema_elem, std::string const& name) {
auto const& col_schema_idx =
Expand Down Expand Up @@ -1184,13 +1190,18 @@ aggregate_reader_metadata::select_columns(std::optional<std::vector<std::string>

// Find which of the selected paths are valid and get their schema index
std::vector<path_info> valid_selected_paths;
for (auto const& selected_path : *use_names) {
auto found_path =
std::find_if(all_paths.begin(), all_paths.end(), [&](path_info& valid_path) {
return valid_path.full_path == selected_path;
});
if (found_path != all_paths.end()) {
valid_selected_paths.push_back({selected_path, found_path->schema_idx});
// vector reference pushback (*use_names). If filter names passed.
std::vector<std::reference_wrapper<std::vector<std::string> const>> column_names{
*use_names, *filter_columns_names};
for (auto const& used_column_names : column_names) {
for (auto const& selected_path : used_column_names.get()) {
auto found_path =
std::find_if(all_paths.begin(), all_paths.end(), [&](path_info& valid_path) {
return valid_path.full_path == selected_path;
});
if (found_path != all_paths.end()) {
valid_selected_paths.push_back({selected_path, found_path->schema_idx});
}
}
}

Expand Down
Loading

0 comments on commit 47ed345

Please sign in to comment.