Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parquet predicate filtering with column projection #15113

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
be089f3
fix stats filter conversion dtypes and names
karthikeyann Feb 21, 2024
f458410
filter columns limitation fixed.
karthikeyann Mar 1, 2024
b01b2d8
address review comments, added docstring
karthikeyann Mar 1, 2024
b348db4
Merge branch 'branch-24.04' into fix-pq_filter_col_projection
karthikeyann Mar 1, 2024
4a07e3d
add docstring for filter
karthikeyann Mar 1, 2024
6ee2bcf
Merge branch 'branch-24.04' into fix-pq_filter_col_projection
karthikeyann Mar 6, 2024
acb0723
update docs with example
karthikeyann Mar 6, 2024
bff38f5
Merge branch 'fix-pq_filter_col_projection' of github.com:karthikeyan…
karthikeyann Mar 6, 2024
d643ce1
Merge branch 'branch-24.04' into fix-pq_filter_col_projection
karthikeyann Mar 6, 2024
e79552c
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
karthikeyann Apr 9, 2024
e40cffc
address review comments, include cleanup, reorg code
karthikeyann Apr 24, 2024
926a75a
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
karthikeyann Apr 24, 2024
a220d7d
fix col index ref on projection
karthikeyann May 10, 2024
c0e734c
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
karthikeyann May 10, 2024
96ea0e8
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
vuule May 14, 2024
47c5413
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
mhaseeb123 May 15, 2024
9e4008e
remove caching output dtypes
karthikeyann May 16, 2024
cc3bd26
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
karthikeyann May 16, 2024
f64294e
wMerge branch 'fix-pq_filter_col_projection' of github.com:karthikeya…
karthikeyann May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,31 @@ class parquet_reader_options {
/**
* @brief Sets AST based filter for predicate pushdown.
*
* The filter can utilize cudf::ast::column_name_reference to reference a column by its name,
* even if it's not necessarily present in the requested projected columns.
* To refer to output column indices, you can use cudf::ast::column_reference.
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
*
* For a parquet with columns ["A", "B", "C", ... "X", "Y", "Z"],
* Example 1: with/without column projection
* @code
* use_columns({"A", "X", "Z"})
* .filter(operation(ast_operator::LESS, column_name_reference{"C"}, literal{100}));
* @endcode
* Column "C" need not be present in output table.
* Example 2: without column projection
* @code
* filter(operation(ast_operator::LESS, column_reference{1}, literal{100}));
* @endcode
* Here, `1` will refer to column "B" because output will contain all columns in
* order ["A", ..., "Z"].
* Example 3: with column projection
* @code
* use_columns({"A", "Z", "X"})
* .filter(operation(ast_operator::LESS, column_reference{1}, literal{100}));
* @endcode
* Here, `1` will refer to column "Z" because output will contain 3 columns in
* order ["A", "Z", "X"].
*
wence- marked this conversation as resolved.
Show resolved Hide resolved
* @param filter AST expression to use as filter
*/
void set_filter(ast::expression const& filter) { _filter = filter; }
Expand Down Expand Up @@ -309,9 +334,7 @@ class parquet_reader_options_builder {
}

/**
* @brief Sets vector of individual row groups to read.
*
* @param filter Vector of row groups to read
* @copydoc parquet_reader_options::set_filter
* @return this for chaining
*/
parquet_reader_options_builder& filter(ast::expression const& filter)
Expand Down
175 changes: 156 additions & 19 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/resource_ref.hpp>

#include <thrust/iterator/counting_iterator.h>

#include <algorithm>
#include <list>
#include <numeric>
#include <optional>
#include <unordered_set>

namespace cudf::io::parquet::detail {

Expand Down Expand Up @@ -127,7 +129,7 @@ struct stats_caster {
// Creates device columns from column statistics (min, max)
template <typename T>
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> operator()(
size_t col_idx,
int schema_idx,
cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
Expand Down Expand Up @@ -206,22 +208,31 @@ struct stats_caster {
}; // local struct host_column
host_column min(total_row_groups);
host_column max(total_row_groups);

size_type stats_idx = 0;
for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) {
for (auto const rg_idx : row_group_indices[src_idx]) {
auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx];
auto const& colchunk = row_group.columns[col_idx];
// To support deprecated min, max fields.
auto const& min_value = colchunk.meta_data.statistics.min_value.has_value()
? colchunk.meta_data.statistics.min_value
: colchunk.meta_data.statistics.min;
auto const& max_value = colchunk.meta_data.statistics.max_value.has_value()
? colchunk.meta_data.statistics.max_value
: colchunk.meta_data.statistics.max;
// translate binary data to Type then to <T>
min.set_index(stats_idx, min_value, colchunk.meta_data.type);
max.set_index(stats_idx, max_value, colchunk.meta_data.type);
auto col = std::find_if(
row_group.columns.begin(),
row_group.columns.end(),
[schema_idx](ColumnChunk const& col) { return col.schema_idx == schema_idx; });
if (col != std::end(row_group.columns)) {
auto const& colchunk = *col;
// To support deprecated min, max fields.
auto const& min_value = colchunk.meta_data.statistics.min_value.has_value()
? colchunk.meta_data.statistics.min_value
: colchunk.meta_data.statistics.min;
auto const& max_value = colchunk.meta_data.statistics.max_value.has_value()
? colchunk.meta_data.statistics.max_value
: colchunk.meta_data.statistics.max;
// translate binary data to Type then to <T>
min.set_index(stats_idx, min_value, colchunk.meta_data.type);
max.set_index(stats_idx, max_value, colchunk.meta_data.type);
} else {
// Marking it null, if column present in row group
min.set_index(stats_idx, thrust::nullopt, {});
max.set_index(stats_idx, thrust::nullopt, {});
Comment on lines +233 to +234
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (non-blocking): Possibly worthwhile migrating set_index to std::optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min_value and min from Statistics struct uses thrust::optional, which is passed here.
https://github.com/rapidsai/cudf/blob/064dd7b02166cc67e882b708d66621bc3fafd70b/cpp/src/io/parquet/parquet.hpp uses thrust::optional everywhere (except at 2 places). Not sure why.
@vuule

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the thrift data structures use thrust::optional over std::optional because some are used on device. I assume these will migrate to cuda::std::optional eventually. #15091 (comment)

}
stats_idx++;
}
};
Expand Down Expand Up @@ -375,9 +386,42 @@ class stats_expression_converter : public ast::detail::expression_transformer {
};
} // namespace

void aggregate_reader_metadata::cache_output_dtypes(host_span<int const> output_schemas,
bool strings_to_categorical,
type_id timestamp_type_id)
{
// store output column types as vector
if (!_output_types.empty()) return;
std::function<cudf::data_type(int)> get_dtype = [strings_to_categorical,
timestamp_type_id,
&get_dtype,
this](int schema_idx) -> cudf::data_type {
// returns type of columns by using schema_idx.
auto const& schema_elem = get_schema(schema_idx);
if (schema_elem.is_stub()) {
CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub");
return get_dtype(schema_elem.children_idx[0]);
}

auto const one_level_list = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx));
// if we're at the root, this is a new output column
auto const col_type = one_level_list
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = to_data_type(col_type, schema_elem);
// path_is_valid is skipped for nested columns here.
return dtype;
};

for (auto const& schema_idx : output_schemas) {
if (schema_idx < 0) { continue; }
_output_types.push_back(get_dtype(schema_idx));
}
}

std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::filter_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const
{
Expand Down Expand Up @@ -411,8 +455,9 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
// For each column, it contains #sources * #column_chunks_per_src rows.
std::vector<std::unique_ptr<column>> columns;
stats_caster stats_col{total_row_groups, per_file_metadata, input_row_group_indices};
for (size_t col_idx = 0; col_idx < output_dtypes.size(); col_idx++) {
auto const& dtype = output_dtypes[col_idx];
for (size_t col_idx = 0; col_idx < _output_types.size(); col_idx++) {
auto const schema_idx = output_column_schemas[col_idx];
auto const& dtype = _output_types[col_idx];
// Only comparable types except fixed point are supported.
if (cudf::is_compound(dtype) && dtype.id() != cudf::type_id::STRING) {
// placeholder only for unsupported types.
Expand All @@ -423,14 +468,14 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
continue;
}
auto [min_col, max_col] =
cudf::type_dispatcher<dispatch_storage_type>(dtype, stats_col, col_idx, dtype, stream, mr);
cudf::type_dispatcher<dispatch_storage_type>(dtype, stats_col, schema_idx, dtype, stream, mr);
columns.push_back(std::move(min_col));
columns.push_back(std::move(max_col));
}
auto stats_table = cudf::table(std::move(columns));

// Converts AST to StatsAST with reference to min, max columns in above `stats_table`.
stats_expression_converter stats_expr{filter, static_cast<size_type>(output_dtypes.size())};
stats_expression_converter stats_expr{filter.get(), static_cast<size_type>(_output_types.size())};
auto stats_ast = stats_expr.get_stats_expr();
auto predicate_col = cudf::detail::compute_column(stats_table, stats_ast.get(), stream, mr);
auto predicate = predicate_col->view();
Expand Down Expand Up @@ -475,6 +520,20 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
}

// convert column named expression to column index reference expression
named_to_reference_converter::named_to_reference_converter(
std::optional<std::reference_wrapper<ast::expression const>> expr, table_metadata const& metadata)
{
if (!expr.has_value()) return;
// create map for column name.
std::transform(metadata.schema_info.cbegin(),
metadata.schema_info.cend(),
thrust::counting_iterator<size_t>(0),
std::inserter(column_name_to_index, column_name_to_index.end()),
[](auto const& sch, auto index) { return std::make_pair(sch.name, index); });

expr.value().get().accept(*this);
}

std::reference_wrapper<ast::expression const> named_to_reference_converter::visit(
ast::literal const& expr)
{
Expand Down Expand Up @@ -530,4 +589,82 @@ named_to_reference_converter::visit_operands(
return transformed_operands;
}

/**
* @brief Converts named columns to index reference columns
*
*/
class names_from_expression : public ast::detail::expression_transformer {
public:
names_from_expression(std::optional<std::reference_wrapper<ast::expression const>> expr,
std::vector<std::string> const& skip_names)
: _skip_names(skip_names.cbegin(), skip_names.cend())
{
if (!expr.has_value()) return;
expr.value().get().accept(*this);
}

/**
* @copydoc ast::detail::expression_transformer::visit(ast::literal const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::literal const& expr) override
{
return expr;
}
/**
* @copydoc ast::detail::expression_transformer::visit(ast::column_reference const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::column_reference const& expr) override
{
return expr;
}
/**
* @copydoc ast::detail::expression_transformer::visit(ast::column_name_reference const& )
*/
std::reference_wrapper<ast::expression const> visit(
ast::column_name_reference const& expr) override
{
// collect column names
auto col_name = expr.get_column_name();
if (_skip_names.count(col_name) == 0) { _column_names.insert(col_name); }
return expr;
}
/**
* @copydoc ast::detail::expression_transformer::visit(ast::operation const& )
*/
std::reference_wrapper<ast::expression const> visit(ast::operation const& expr) override
{
visit_operands(expr.get_operands());
return expr;
}

/**
* @brief Returns the column names in AST.
*
* @return AST operation expression
*/
[[nodiscard]] std::vector<std::string> to_vector() &&
{
return {std::make_move_iterator(_column_names.begin()),
std::make_move_iterator(_column_names.end())};
}

private:
void visit_operands(std::vector<std::reference_wrapper<ast::expression const>> operands)
{
for (auto const& operand : operands) {
operand.get().accept(*this);
}
}

std::unordered_set<std::string> _column_names;
std::unordered_set<std::string> _skip_names;
};

[[nodiscard]] std::vector<std::string> get_column_names_in_expression(
std::optional<std::reference_wrapper<ast::expression const>> expr,
std::vector<std::string> const& skip_names)
{
return names_from_expression(expr, skip_names).to_vector();
}

} // namespace cudf::io::parquet::detail
24 changes: 22 additions & 2 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#include <rmm/resource_ref.hpp>

#include <thrust/iterator/counting_iterator.h>

#include <bitset>
#include <numeric>

Expand Down Expand Up @@ -436,13 +438,26 @@ reader::impl::impl(std::size_t chunk_read_limit,
// Binary columns can be read as binary or strings
_reader_column_schema = options.get_column_schema();

// Select only columns required by the options
// Select only columns required by the options and filter
std::optional<std::vector<std::string>> filter_columns_names;
if (options.get_filter().has_value() and options.get_columns().has_value()) {
// list, struct, dictionary are not supported by AST filter yet.
// extract columns not present in get_columns() & keep count to remove at end.
filter_columns_names =
get_column_names_in_expression(options.get_filter(), *(options.get_columns()));
_num_filter_only_columns = filter_columns_names->size();
}
std::tie(_input_columns, _output_buffers, _output_column_schemas) =
_metadata->select_columns(options.get_columns(),
filter_columns_names,
options.is_enabled_use_pandas_metadata(),
_strings_to_categorical,
_timestamp_type.id());

// Find the dtypes of output columns (save it in _metadata).
_metadata->cache_output_dtypes(
_output_column_schemas, _strings_to_categorical, _timestamp_type.id());

// Save the states of the output buffers for reuse in `chunk_read()`.
for (auto const& buff : _output_buffers) {
_output_buffers_template.emplace_back(cudf::io::detail::inline_column_buffer::empty_like(buff));
Expand Down Expand Up @@ -572,7 +587,12 @@ table_with_metadata reader::impl::finalize_output(
*read_table, filter.value().get(), _stream, rmm::mr::get_current_device_resource());
CUDF_EXPECTS(predicate->view().type().id() == type_id::BOOL8,
"Predicate filter should return a boolean");
auto output_table = cudf::detail::apply_boolean_mask(*read_table, *predicate, _stream, _mr);
// Exclude columns present in filter only in output
auto counting_it = thrust::make_counting_iterator<std::size_t>(0);
auto const output_count = read_table->num_columns() - _num_filter_only_columns;
auto only_output = read_table->select(counting_it, counting_it + output_count);
auto output_table = cudf::detail::apply_boolean_mask(only_output, *predicate, _stream, _mr);
if (_num_filter_only_columns > 0) { out_metadata.schema_info.resize(output_count); }
return {std::move(output_table), std::move(out_metadata)};
}
return {std::make_unique<table>(std::move(out_columns)), std::move(out_metadata)};
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ class reader::impl {
// _output_buffers associated metadata
std::unique_ptr<table_metadata> _output_metadata;

// number of extra filter columns
std::size_t _num_filter_only_columns{0};

bool _strings_to_categorical = false;

// are there usable page indexes available
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "compact_protocol_reader.hpp"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/utilities/config_utils.hpp"
#include "io/utilities/time_utils.cuh"
Expand Down
Loading
Loading