diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 7f034668e43..b2f949cdcee 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -205,6 +205,31 @@ class parquet_reader_options { /** * @brief Sets AST based filter for predicate pushdown. * + * The filter can utilize cudf::ast::column_name_reference to reference a column by its name, + * even if it's not necessarily present in the requested projected columns. + * To refer to output column indices, you can use cudf::ast::column_reference. + * + * For a parquet with columns ["A", "B", "C", ... "X", "Y", "Z"], + * Example 1: with/without column projection + * @code + * use_columns({"A", "X", "Z"}) + * .filter(operation(ast_operator::LESS, column_name_reference{"C"}, literal{100})); + * @endcode + * Column "C" need not be present in output table. + * Example 2: without column projection + * @code + * filter(operation(ast_operator::LESS, column_reference{1}, literal{100})); + * @endcode + * Here, `1` will refer to column "B" because output will contain all columns in + * order ["A", ..., "Z"]. + * Example 3: with column projection + * @code + * use_columns({"A", "Z", "X"}) + * .filter(operation(ast_operator::LESS, column_reference{1}, literal{100})); + * @endcode + * Here, `1` will refer to column "Z" because output will contain 3 columns in + * order ["A", "Z", "X"]. + * * @param filter AST expression to use as filter */ void set_filter(ast::expression const& filter) { _filter = filter; } @@ -309,9 +334,7 @@ class parquet_reader_options_builder { } /** - * @brief Sets vector of individual row groups to read. - * - * @param filter Vector of row groups to read + * @copydoc parquet_reader_options::set_filter * @return this for chaining */ parquet_reader_options_builder& filter(ast::expression const& filter) diff --git a/cpp/src/io/parquet/predicate_pushdown.cpp b/cpp/src/io/parquet/predicate_pushdown.cpp index 9869dafadfb..0109be661a7 100644 --- a/cpp/src/io/parquet/predicate_pushdown.cpp +++ b/cpp/src/io/parquet/predicate_pushdown.cpp @@ -31,10 +31,12 @@ #include #include +#include + #include -#include #include #include +#include namespace cudf::io::parquet::detail { @@ -127,7 +129,7 @@ struct stats_caster { // Creates device columns from column statistics (min, max) template std::pair, std::unique_ptr> operator()( - size_t col_idx, + int schema_idx, cudf::data_type dtype, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const @@ -206,22 +208,31 @@ struct stats_caster { }; // local struct host_column host_column min(total_row_groups); host_column max(total_row_groups); - size_type stats_idx = 0; for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) { for (auto const rg_idx : row_group_indices[src_idx]) { auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx]; - auto const& colchunk = row_group.columns[col_idx]; - // To support deprecated min, max fields. - auto const& min_value = colchunk.meta_data.statistics.min_value.has_value() - ? colchunk.meta_data.statistics.min_value - : colchunk.meta_data.statistics.min; - auto const& max_value = colchunk.meta_data.statistics.max_value.has_value() - ? colchunk.meta_data.statistics.max_value - : colchunk.meta_data.statistics.max; - // translate binary data to Type then to - min.set_index(stats_idx, min_value, colchunk.meta_data.type); - max.set_index(stats_idx, max_value, colchunk.meta_data.type); + auto col = std::find_if( + row_group.columns.begin(), + row_group.columns.end(), + [schema_idx](ColumnChunk const& col) { return col.schema_idx == schema_idx; }); + if (col != std::end(row_group.columns)) { + auto const& colchunk = *col; + // To support deprecated min, max fields. + auto const& min_value = colchunk.meta_data.statistics.min_value.has_value() + ? colchunk.meta_data.statistics.min_value + : colchunk.meta_data.statistics.min; + auto const& max_value = colchunk.meta_data.statistics.max_value.has_value() + ? colchunk.meta_data.statistics.max_value + : colchunk.meta_data.statistics.max; + // translate binary data to Type then to + min.set_index(stats_idx, min_value, colchunk.meta_data.type); + max.set_index(stats_idx, max_value, colchunk.meta_data.type); + } else { + // Marking it null, if column present in row group + min.set_index(stats_idx, thrust::nullopt, {}); + max.set_index(stats_idx, thrust::nullopt, {}); + } stats_idx++; } }; @@ -378,6 +389,7 @@ class stats_expression_converter : public ast::detail::expression_transformer { std::optional>> aggregate_reader_metadata::filter_row_groups( host_span const> row_group_indices, host_span output_dtypes, + host_span output_column_schemas, std::reference_wrapper filter, rmm::cuda_stream_view stream) const { @@ -412,7 +424,8 @@ std::optional>> aggregate_reader_metadata::fi std::vector> columns; stats_caster stats_col{total_row_groups, per_file_metadata, input_row_group_indices}; for (size_t col_idx = 0; col_idx < output_dtypes.size(); col_idx++) { - auto const& dtype = output_dtypes[col_idx]; + auto const schema_idx = output_column_schemas[col_idx]; + auto const& dtype = output_dtypes[col_idx]; // Only comparable types except fixed point are supported. if (cudf::is_compound(dtype) && dtype.id() != cudf::type_id::STRING) { // placeholder only for unsupported types. @@ -423,14 +436,14 @@ std::optional>> aggregate_reader_metadata::fi continue; } auto [min_col, max_col] = - cudf::type_dispatcher(dtype, stats_col, col_idx, dtype, stream, mr); + cudf::type_dispatcher(dtype, stats_col, schema_idx, dtype, stream, mr); columns.push_back(std::move(min_col)); columns.push_back(std::move(max_col)); } auto stats_table = cudf::table(std::move(columns)); // Converts AST to StatsAST with reference to min, max columns in above `stats_table`. - stats_expression_converter stats_expr{filter, static_cast(output_dtypes.size())}; + stats_expression_converter stats_expr{filter.get(), static_cast(output_dtypes.size())}; auto stats_ast = stats_expr.get_stats_expr(); auto predicate_col = cudf::detail::compute_column(stats_table, stats_ast.get(), stream, mr); auto predicate = predicate_col->view(); @@ -475,6 +488,20 @@ std::optional>> aggregate_reader_metadata::fi } // convert column named expression to column index reference expression +named_to_reference_converter::named_to_reference_converter( + std::optional> expr, table_metadata const& metadata) +{ + if (!expr.has_value()) return; + // create map for column name. + std::transform(metadata.schema_info.cbegin(), + metadata.schema_info.cend(), + thrust::counting_iterator(0), + std::inserter(column_name_to_index, column_name_to_index.end()), + [](auto const& sch, auto index) { return std::make_pair(sch.name, index); }); + + expr.value().get().accept(*this); +} + std::reference_wrapper named_to_reference_converter::visit( ast::literal const& expr) { @@ -530,4 +557,82 @@ named_to_reference_converter::visit_operands( return transformed_operands; } +/** + * @brief Converts named columns to index reference columns + * + */ +class names_from_expression : public ast::detail::expression_transformer { + public: + names_from_expression(std::optional> expr, + std::vector const& skip_names) + : _skip_names(skip_names.cbegin(), skip_names.cend()) + { + if (!expr.has_value()) return; + expr.value().get().accept(*this); + } + + /** + * @copydoc ast::detail::expression_transformer::visit(ast::literal const& ) + */ + std::reference_wrapper visit(ast::literal const& expr) override + { + return expr; + } + /** + * @copydoc ast::detail::expression_transformer::visit(ast::column_reference const& ) + */ + std::reference_wrapper visit(ast::column_reference const& expr) override + { + return expr; + } + /** + * @copydoc ast::detail::expression_transformer::visit(ast::column_name_reference const& ) + */ + std::reference_wrapper visit( + ast::column_name_reference const& expr) override + { + // collect column names + auto col_name = expr.get_column_name(); + if (_skip_names.count(col_name) == 0) { _column_names.insert(col_name); } + return expr; + } + /** + * @copydoc ast::detail::expression_transformer::visit(ast::operation const& ) + */ + std::reference_wrapper visit(ast::operation const& expr) override + { + visit_operands(expr.get_operands()); + return expr; + } + + /** + * @brief Returns the column names in AST. + * + * @return AST operation expression + */ + [[nodiscard]] std::vector to_vector() && + { + return {std::make_move_iterator(_column_names.begin()), + std::make_move_iterator(_column_names.end())}; + } + + private: + void visit_operands(std::vector> operands) + { + for (auto const& operand : operands) { + operand.get().accept(*this); + } + } + + std::unordered_set _column_names; + std::unordered_set _skip_names; +}; + +[[nodiscard]] std::vector get_column_names_in_expression( + std::optional> expr, + std::vector const& skip_names) +{ + return names_from_expression(expr, skip_names).to_vector(); +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 5b7c180195b..b0d19ad00f3 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -26,6 +26,8 @@ #include +#include + #include #include @@ -436,9 +438,18 @@ reader::impl::impl(std::size_t chunk_read_limit, // Binary columns can be read as binary or strings _reader_column_schema = options.get_column_schema(); - // Select only columns required by the options + // Select only columns required by the options and filter + std::optional> filter_columns_names; + if (options.get_filter().has_value() and options.get_columns().has_value()) { + // list, struct, dictionary are not supported by AST filter yet. + // extract columns not present in get_columns() & keep count to remove at end. + filter_columns_names = + get_column_names_in_expression(options.get_filter(), *(options.get_columns())); + _num_filter_only_columns = filter_columns_names->size(); + } std::tie(_input_columns, _output_buffers, _output_column_schemas) = _metadata->select_columns(options.get_columns(), + filter_columns_names, options.is_enabled_use_pandas_metadata(), _strings_to_categorical, _timestamp_type.id()); @@ -572,7 +583,12 @@ table_with_metadata reader::impl::finalize_output( *read_table, filter.value().get(), _stream, rmm::mr::get_current_device_resource()); CUDF_EXPECTS(predicate->view().type().id() == type_id::BOOL8, "Predicate filter should return a boolean"); - auto output_table = cudf::detail::apply_boolean_mask(*read_table, *predicate, _stream, _mr); + // Exclude columns present in filter only in output + auto counting_it = thrust::make_counting_iterator(0); + auto const output_count = read_table->num_columns() - _num_filter_only_columns; + auto only_output = read_table->select(counting_it, counting_it + output_count); + auto output_table = cudf::detail::apply_boolean_mask(only_output, *predicate, _stream, _mr); + if (_num_filter_only_columns > 0) { out_metadata.schema_info.resize(output_count); } return {std::move(output_table), std::move(out_metadata)}; } return {std::make_unique(std::move(out_columns)), std::move(out_metadata)}; diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 6c6cedf4e76..b67d2e312d7 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -368,6 +368,9 @@ class reader::impl { // _output_buffers associated metadata std::unique_ptr _output_metadata; + // number of extra filter columns + std::size_t _num_filter_only_columns{0}; + bool _strings_to_categorical = false; // are there usable page indexes available diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index f4fb6bc57e6..6824d72cf04 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "compact_protocol_reader.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/config_utils.hpp" #include "io/utilities/time_utils.cuh" diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index dfbc8c565ad..eb653c6b9ac 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -16,6 +16,7 @@ #include "reader_impl_helpers.hpp" +#include "compact_protocol_reader.hpp" #include "io/parquet/parquet.hpp" #include "io/utilities/base64_utilities.hpp" #include "io/utilities/row_selection.hpp" @@ -25,6 +26,7 @@ #include #include +#include #include #include @@ -954,13 +956,15 @@ aggregate_reader_metadata::select_row_groups( int64_t skip_rows_opt, std::optional const& num_rows_opt, host_span output_dtypes, + host_span output_column_schemas, std::optional> filter, rmm::cuda_stream_view stream) const { std::optional>> filtered_row_group_indices; + // if filter is not empty, then gather row groups to read after predicate pushdown if (filter.has_value()) { - filtered_row_group_indices = - filter_row_groups(row_group_indices, output_dtypes, filter.value(), stream); + filtered_row_group_indices = filter_row_groups( + row_group_indices, output_dtypes, output_column_schemas, filter.value(), stream); if (filtered_row_group_indices.has_value()) { row_group_indices = host_span const>(filtered_row_group_indices.value()); @@ -1017,10 +1021,12 @@ aggregate_reader_metadata::select_row_groups( std::tuple, std::vector, std::vector> -aggregate_reader_metadata::select_columns(std::optional> const& use_names, - bool include_index, - bool strings_to_categorical, - type_id timestamp_type_id) const +aggregate_reader_metadata::select_columns( + std::optional> const& use_names, + std::optional> const& filter_columns_names, + bool include_index, + bool strings_to_categorical, + type_id timestamp_type_id) const { auto find_schema_child = [&](SchemaElement const& schema_elem, std::string const& name) { auto const& col_schema_idx = @@ -1184,13 +1190,18 @@ aggregate_reader_metadata::select_columns(std::optional // Find which of the selected paths are valid and get their schema index std::vector valid_selected_paths; - for (auto const& selected_path : *use_names) { - auto found_path = - std::find_if(all_paths.begin(), all_paths.end(), [&](path_info& valid_path) { - return valid_path.full_path == selected_path; - }); - if (found_path != all_paths.end()) { - valid_selected_paths.push_back({selected_path, found_path->schema_idx}); + // vector reference pushback (*use_names). If filter names passed. + std::vector const>> column_names{ + *use_names, *filter_columns_names}; + for (auto const& used_column_names : column_names) { + for (auto const& selected_path : used_column_names.get()) { + auto found_path = + std::find_if(all_paths.begin(), all_paths.end(), [&](path_info& valid_path) { + return valid_path.full_path == selected_path; + }); + if (found_path != all_paths.end()) { + valid_selected_paths.push_back({selected_path, found_path->schema_idx}); + } } } diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 398812945e2..9aeb19a7723 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -16,7 +16,6 @@ #pragma once -#include "compact_protocol_reader.hpp" #include "parquet_gpu.hpp" #include @@ -25,9 +24,6 @@ #include #include -#include -#include - #include #include #include @@ -257,7 +253,8 @@ class aggregate_reader_metadata { * @brief Filters the row groups based on predicate filter * * @param row_group_indices Lists of row groups to read, one per source - * @param output_dtypes List of output column datatypes + * @param output_dtypes Datatypes of of output columns + * @param output_column_schemas schema indices of output columns * @param filter AST expression to filter row groups based on Column chunk statistics * @param stream CUDA stream used for device memory operations and kernel launches * @return Filtered row group indices, if any is filtered. @@ -265,6 +262,7 @@ class aggregate_reader_metadata { [[nodiscard]] std::optional>> filter_row_groups( host_span const> row_group_indices, host_span output_dtypes, + host_span output_column_schemas, std::reference_wrapper filter, rmm::cuda_stream_view stream) const; @@ -277,7 +275,8 @@ class aggregate_reader_metadata { * @param row_group_indices Lists of row groups to read, one per source * @param row_start Starting row of the selection * @param row_count Total number of rows selected - * @param output_dtypes List of output column datatypes + * @param output_dtypes Datatypes of of output columns + * @param output_column_schemas schema indices of output columns * @param filter Optional AST expression to filter row groups based on Column chunk statistics * @param stream CUDA stream used for device memory operations and kernel launches * @return A tuple of corrected row_start, row_count and list of row group indexes and its @@ -288,6 +287,7 @@ class aggregate_reader_metadata { int64_t row_start, std::optional const& row_count, host_span output_dtypes, + host_span output_column_schemas, std::optional> filter, rmm::cuda_stream_view stream) const; @@ -296,6 +296,7 @@ class aggregate_reader_metadata { * * @param use_names List of paths of column names to select; `nullopt` if user did not select * columns to read + * @param filter_columns_names List of paths of column names that are present only in filter * @param include_index Whether to always include the PANDAS index column(s) * @param strings_to_categorical Type conversion parameter * @param timestamp_type_id Type conversion parameter @@ -307,6 +308,7 @@ class aggregate_reader_metadata { std::vector, std::vector> select_columns(std::optional> const& use_names, + std::optional> const& filter_columns_names, bool include_index, bool strings_to_categorical, type_id timestamp_type_id) const; @@ -319,23 +321,7 @@ class aggregate_reader_metadata { class named_to_reference_converter : public ast::detail::expression_transformer { public: named_to_reference_converter(std::optional> expr, - table_metadata const& metadata) - : metadata(metadata) - { - if (!expr.has_value()) return; - // create map for column name. - std::transform( - thrust::make_zip_iterator(metadata.schema_info.cbegin(), - thrust::counting_iterator(0)), - thrust::make_zip_iterator(metadata.schema_info.cend(), - thrust::counting_iterator(metadata.schema_info.size())), - std::inserter(column_name_to_index, column_name_to_index.end()), - [](auto const& name_index) { - return std::make_pair(thrust::get<0>(name_index).name, thrust::get<1>(name_index)); - }); - - expr.value().get().accept(*this); - } + table_metadata const& metadata); /** * @copydoc ast::detail::expression_transformer::visit(ast::literal const& ) @@ -370,7 +356,6 @@ class named_to_reference_converter : public ast::detail::expression_transformer std::vector> visit_operands( std::vector> operands); - table_metadata const& metadata; std::unordered_map column_name_to_index; std::optional> _stats_expr; // Using std::list or std::deque to avoid reference invalidation @@ -378,4 +363,15 @@ class named_to_reference_converter : public ast::detail::expression_transformer std::list _operators; }; +/** + * @brief Get the column names in expression object + * + * @param expr The optional expression object to get the column names from + * @param skip_names The names of column names to skip in returned column names + * @return The column names present in expression object except the skip_names + */ +[[nodiscard]] std::vector get_column_names_in_expression( + std::optional> expr, + std::vector const& skip_names); + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index a5cd7d06536..084f82a2ca0 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1230,17 +1230,23 @@ void reader::impl::preprocess_file( CUDF_EXPECTS(!_file_preprocessed, "Attempted to preprocess file more than once"); // if filter is not empty, then create output types as vector and pass for filtering. - std::vector output_types; + std::vector output_dtypes; if (filter.has_value()) { - std::transform(_output_buffers.cbegin(), - _output_buffers.cend(), - std::back_inserter(output_types), + std::transform(_output_buffers_template.cbegin(), + _output_buffers_template.cend(), + std::back_inserter(output_dtypes), [](auto const& col) { return col.type; }); } + std::tie( _file_itm_data.global_skip_rows, _file_itm_data.global_num_rows, _file_itm_data.row_groups) = - _metadata->select_row_groups( - row_group_indices, skip_rows, num_rows, output_types, filter, _stream); + _metadata->select_row_groups(row_group_indices, + skip_rows, + num_rows, + output_dtypes, + _output_column_schemas, + filter, + _stream); // check for page indexes _has_page_index = std::all_of(_file_itm_data.row_groups.begin(), diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 85ada9b38fc..aa9172b0608 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -1406,6 +1406,56 @@ TEST_F(ParquetReaderTest, FilterIdentity) CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *result2.tbl); } +TEST_F(ParquetReaderTest, FilterWithColumnProjection) +{ + // col_uint32, col_int64, col_double + auto [src, filepath] = create_parquet_with_stats("FilterWithColumnProjection.parquet"); + auto val = cudf::numeric_scalar{10}; + auto lit = cudf::ast::literal{val}; + auto col_ref = cudf::ast::column_name_reference{"col_uint32"}; + auto col_index = cudf::ast::column_reference{0}; + auto filter_expr = cudf::ast::operation(cudf::ast::ast_operator::LESS, col_index, lit); + + auto predicate = cudf::compute_column(src, filter_expr); + + { // column_name_reference in parquet filter (not present in column projection) + auto read_expr = cudf::ast::operation(cudf::ast::ast_operator::LESS, col_ref, lit); + auto projected_table = cudf::table_view{{src.get_column(2)}}; + auto expected = cudf::apply_boolean_mask(projected_table, *predicate); + + auto read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .columns({"col_double"}) + .filter(read_expr); + auto result = cudf::io::read_parquet(read_opts); + CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *expected); + } + + { // column_reference in parquet filter (indices as per order of column projection) + auto col_index2 = cudf::ast::column_reference{1}; + auto read_ref_expr = cudf::ast::operation(cudf::ast::ast_operator::LESS, col_index2, lit); + + auto projected_table = cudf::table_view{{src.get_column(2), src.get_column(0)}}; + auto expected = cudf::apply_boolean_mask(projected_table, *predicate); + auto read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .columns({"col_double", "col_uint32"}) + .filter(read_ref_expr); + auto result = cudf::io::read_parquet(read_opts); + CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *expected); + } + + // Error cases + { // column_reference is not same type as literal, column_reference index is out of bounds + for (auto const index : {0, 2}) { + auto col_index2 = cudf::ast::column_reference{index}; + auto read_ref_expr = cudf::ast::operation(cudf::ast::ast_operator::LESS, col_index2, lit); + auto read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .columns({"col_double", "col_uint32"}) + .filter(read_ref_expr); + EXPECT_THROW(cudf::io::read_parquet(read_opts), cudf::logic_error); + } + } +} + TEST_F(ParquetReaderTest, FilterReferenceExpression) { auto [src, filepath] = create_parquet_with_stats("FilterReferenceExpression.parquet");