From bcb1237a7fcf195effd13f10d2f50ed02ce87372 Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Wed, 26 May 2021 20:10:42 -0500 Subject: [PATCH] Handle nested column types properly for empty parquet files. (#8350) Fixes: https://github.com/rapidsai/cudf/issues/8323 Also fixes a recently introduced bug in the test column equality checker. The code was previously relying on accesses to device memory being transparently handled by `thrust::device_vector` Authors: - https://github.com/nvdbaranec Approvers: - Mike Wilson (https://github.com/hyperbolic2346) - Devavret Makkar (https://github.com/devavret) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/8350 --- cpp/src/io/parquet/reader_impl.cu | 3 +- cpp/src/io/utilities/column_buffer.cpp | 76 ++++++++++++++++++++++--- cpp/src/io/utilities/column_buffer.hpp | 31 ++++++++++ cpp/tests/io/parquet_test.cpp | 36 ++++++++++++ cpp/tests/utilities/column_utilities.cu | 9 ++- 5 files changed, 140 insertions(+), 15 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 94a5a5d603b..0863bca7b03 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1572,7 +1572,8 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Create empty columns as needed (this can happen if we've ended up with no actual data to read) for (size_t i = out_columns.size(); i < _output_columns.size(); ++i) { out_metadata.schema_info.push_back(column_name_info{""}); - out_columns.emplace_back(make_empty_column(_output_columns[i].type)); + out_columns.emplace_back(cudf::io::detail::empty_like( + _output_columns[i], &out_metadata.schema_info.back(), stream, _mr)); } // Return column names (must match order of returned columns) diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 9170a9016c4..d60c7e4fad4 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -21,6 +21,7 @@ #include "column_buffer.hpp" #include +#include namespace cudf { namespace io { @@ -54,15 +55,7 @@ void column_buffer::create(size_type _size, } /** - * @brief Creates a column from an existing set of device memory buffers. - * - * @throws std::bad_alloc if device memory allocation fails - * - * @param buffer Column buffer descriptors - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the returned column's device memory - * - * @return `std::unique_ptr` Column from the existing device data + * @copydoc cudf::io::detail::make_column */ std::unique_ptr make_column(column_buffer& buffer, column_name_info* schema_info, @@ -139,6 +132,71 @@ std::unique_ptr make_column(column_buffer& buffer, } } +/** + * @copydoc cudf::io::detail::empty_like + */ +std::unique_ptr empty_like(column_buffer& buffer, + column_name_info* schema_info, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + if (schema_info != nullptr) { schema_info->name = buffer.name; } + + switch (buffer.type.id()) { + case type_id::LIST: { + // make offsets column + auto offsets = cudf::make_empty_column(data_type{type_id::INT32}); + + column_name_info* child_info = nullptr; + if (schema_info != nullptr) { + schema_info->children.push_back(column_name_info{"offsets"}); + schema_info->children.push_back(column_name_info{""}); + child_info = &schema_info->children.back(); + } + + // make child column + CUDF_EXPECTS(buffer.children.size() > 0, "Encountered malformed column_buffer"); + auto child = empty_like(buffer.children[0], child_info, stream, mr); + + // make the final list column + return make_lists_column(0, + std::move(offsets), + std::move(child), + buffer._null_count, + std::move(buffer._null_mask), + stream, + mr); + } break; + + case type_id::STRUCT: { + std::vector> output_children; + output_children.reserve(buffer.children.size()); + std::transform(buffer.children.begin(), + buffer.children.end(), + std::back_inserter(output_children), + [&](column_buffer& col) { + column_name_info* child_info = nullptr; + if (schema_info != nullptr) { + schema_info->children.push_back(column_name_info{""}); + child_info = &schema_info->children.back(); + } + return empty_like(col, child_info, stream, mr); + }); + + return make_structs_column(0, + std::move(output_children), + buffer._null_count, + std::move(buffer._null_mask), + stream, + mr); + } break; + + case type_id::STRING: return cudf::strings::detail::make_empty_strings_column(stream, mr); + + default: return cudf::make_empty_column(buffer.type); + } +} + } // namespace detail } // namespace io } // namespace cudf diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index 16b3756e2ec..ab387616b24 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -123,12 +123,43 @@ struct column_buffer { std::string name; }; +/** + * @brief Creates a column from an existing set of device memory buffers. + * + * @throws std::bad_alloc if device memory allocation fails + * + * @param buffer Column buffer descriptors + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @return `std::unique_ptr` Column from the existing device data + */ std::unique_ptr make_column( column_buffer& buffer, column_name_info* schema_info = nullptr, rmm::cuda_stream_view stream = rmm::cuda_stream_default, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +/** + * @brief Creates an equivalent empty column from an existing set of device memory buffers. + * + * This function preserves nested column type information by producing complete/identical + * column hierarchies. + * + * @throws std::bad_alloc if device memory allocation fails + * + * @param buffer Column buffer descriptors + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @return `std::unique_ptr` Column from the existing device data + */ +std::unique_ptr empty_like( + column_buffer& buffer, + column_name_info* schema_info = nullptr, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + } // namespace detail } // namespace io } // namespace cudf diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 880f11aaeb2..e59a4accf66 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -2861,4 +2861,40 @@ TEST_F(ParquetReaderTest, DecimalRead) } } +TEST_F(ParquetReaderTest, EmptyOutput) +{ + cudf::test::fixed_width_column_wrapper c0; + cudf::test::strings_column_wrapper c1; + cudf::test::fixed_point_column_wrapper c2({}, numeric::scale_type{2}); + cudf::test::lists_column_wrapper _c3{{{1, 2}, {3, 4}}, {{5, 6}, {7, 8}}}; + auto c3 = cudf::empty_like(_c3); + + cudf::test::fixed_width_column_wrapper sc0; + cudf::test::strings_column_wrapper sc1; + cudf::test::lists_column_wrapper _sc2{{1, 2}}; + std::vector> struct_children; + struct_children.push_back(sc0.release()); + struct_children.push_back(sc1.release()); + struct_children.push_back(cudf::empty_like(_sc2)); + cudf::test::structs_column_wrapper c4(std::move(struct_children)); + + table_view expected({c0, c1, c2, *c3, c4}); + + // set precision on the decimal column + cudf_io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[2].set_decimal_precision(1); + + auto filepath = temp_env->get_temp_filepath("EmptyOutput.parquet"); + cudf_io::parquet_writer_options out_args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, expected); + out_args.set_metadata(&expected_metadata); + cudf_io::write_parquet(out_args); + + cudf_io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); + auto result = cudf_io::read_parquet(read_args); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/utilities/column_utilities.cu b/cpp/tests/utilities/column_utilities.cu index 8aac7370b13..eabb307f77b 100644 --- a/cpp/tests/utilities/column_utilities.cu +++ b/cpp/tests/utilities/column_utilities.cu @@ -180,14 +180,13 @@ std::string stringify_column_differences(cudf::device_span difference { CUDF_EXPECTS(not differences.empty(), "Shouldn't enter this function if `differences` is empty"); std::string const depth_str = depth > 0 ? "depth " + std::to_string(depth) + '\n' : ""; + // move the differences to the host. + auto h_differences = cudf::detail::make_host_vector_sync(differences); if (print_all_differences) { std::ostringstream buffer; buffer << depth_str << "differences:" << std::endl; - // thrust may crash if a device vector is passed to fixed_width_column_wrapper, - // thus we construct fixed_width_column_wrapper from a host_vector instead - auto h_differences = cudf::detail::make_host_vector_sync(differences); - auto source_table = cudf::table_view({lhs, rhs}); + auto source_table = cudf::table_view({lhs, rhs}); auto diff_column = fixed_width_column_wrapper(h_differences.begin(), h_differences.end()); auto diff_table = cudf::gather(source_table, diff_column); @@ -200,7 +199,7 @@ std::string stringify_column_differences(cudf::device_span difference << h_differences[i] << "] = " << h_right_strings[i] << std::endl; return buffer.str(); } else { - int index = differences[0]; // only stringify first difference + int index = h_differences[0]; // only stringify first difference auto diff_lhs = cudf::detail::slice(lhs, index, index + 1); auto diff_rhs = cudf::detail::slice(rhs, index, index + 1); return depth_str + "first difference: " + "lhs[" + std::to_string(index) +