diff --git a/cpp/include/cudf/io/data_sink.hpp b/cpp/include/cudf/io/data_sink.hpp index 42421aed716..2c1966ee6ba 100644 --- a/cpp/include/cudf/io/data_sink.hpp +++ b/cpp/include/cudf/io/data_sink.hpp @@ -69,6 +69,22 @@ class data_sink { */ static std::unique_ptr create(cudf::io::data_sink* const user_sink); + /** + * @brief Creates a vector of data sinks, one per element in the input vector. + * + * @param[in] args vector of parameters + */ + template + static std::vector> create(std::vector const& args) + { + std::vector> sinks; + sinks.reserve(args.size()); + std::transform(args.cbegin(), args.cend(), std::back_inserter(sinks), [](auto const& arg) { + return data_sink::create(arg); + }); + return sinks; + } + /** * @brief Base class destructor */ diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index a18bd450640..9af2e3f278d 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -89,13 +89,13 @@ class writer { /** * @brief Constructor for output to a file. * - * @param sink The data sink to write the data to + * @param sinks The data sinks to write the data to * @param options Settings for controlling writing behavior * @param mode Option to write at once or in chunks * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ - explicit writer(std::unique_ptr sink, + explicit writer(std::vector> sinks, parquet_writer_options const& options, SingleWriteMode mode, rmm::cuda_stream_view stream, @@ -104,7 +104,7 @@ class writer { /** * @brief Constructor for writer to handle chunked parquet options. * - * @param sink The data sink to write the data to + * @param sinks The data sinks to write the data to * @param options Settings for controlling writing behavior for chunked writer * @param mode Option to write at once or in chunks * @param stream CUDA stream used for device memory operations and kernel launches @@ -112,7 +112,7 @@ class writer { * * @return A parquet-compatible blob that contains the data for all rowgroups in the list */ - explicit writer(std::unique_ptr sink, + explicit writer(std::vector> sinks, chunked_parquet_writer_options const& options, SingleWriteMode mode, rmm::cuda_stream_view stream, @@ -127,8 +127,10 @@ class writer { * @brief Writes a single subtable as part of a larger parquet file/table write. * * @param[in] table The table information to be written + * @param[in] partitions Optional partitions to divide the table into. If specified, must be same + * size as number of sinks. */ - void write(table_view const& table); + void write(table_view const& table, std::vector const& partitions = {}); /** * @brief Finishes the chunked/streamed write process. @@ -138,7 +140,8 @@ class writer { * @return A parquet-compatible blob that contains the data for all rowgroups in the list only if * `column_chunks_file_path` is provided, else null. */ - std::unique_ptr> close(std::string const& column_chunks_file_path = ""); + std::unique_ptr> close( + std::vector const& column_chunks_file_path = {}); /** * @brief Merges multiple metadata blobs returned by write_all into a single metadata blob diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 16588185f3d..b3a2f6bcbbb 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -454,6 +454,8 @@ class orc_writer_options { table_view _table; // Optional associated metadata const table_input_metadata* _metadata = nullptr; + // Optional footer key_value_metadata + std::map _user_data; friend orc_writer_options_builder; @@ -530,6 +532,11 @@ class orc_writer_options { */ table_input_metadata const* get_metadata() const { return _metadata; } + /** + * @brief Returns Key-Value footer metadata information. + */ + std::map const& get_key_value_metadata() const { return _user_data; } + // Setters /** @@ -591,6 +598,16 @@ class orc_writer_options { * @param meta Associated metadata. */ void set_metadata(table_input_metadata const* meta) { _metadata = meta; } + + /** + * @brief Sets metadata. + * + * @param metadata Key-Value footer metadata + */ + void set_key_value_metadata(std::map metadata) + { + _user_data = std::move(metadata); + } }; class orc_writer_options_builder { @@ -698,6 +715,18 @@ class orc_writer_options_builder { return *this; } + /** + * @brief Sets Key-Value footer metadata. + * + * @param metadata Key-Value footer metadata + * @return this for chaining. + */ + orc_writer_options_builder& key_value_metadata(std::map metadata) + { + options._user_data = std::move(metadata); + return *this; + } + /** * @brief move orc_writer_options member once it's built. */ @@ -753,6 +782,8 @@ class chunked_orc_writer_options { size_type _row_index_stride = default_row_index_stride; // Optional associated metadata const table_input_metadata* _metadata = nullptr; + // Optional footer key_value_metadata + std::map _user_data; friend chunked_orc_writer_options_builder; @@ -819,6 +850,11 @@ class chunked_orc_writer_options { */ table_input_metadata const* get_metadata() const { return _metadata; } + /** + * @brief Returns Key-Value footer metadata information. + */ + std::map const& get_key_value_metadata() const { return _user_data; } + // Setters /** @@ -873,6 +909,16 @@ class chunked_orc_writer_options { * @param meta Associated metadata. */ void metadata(table_input_metadata const* meta) { _metadata = meta; } + + /** + * @brief Sets Key-Value footer metadata. + * + * @param metadata Key-Value footer metadata + */ + void set_key_value_metadata(std::map metadata) + { + _user_data = std::move(metadata); + } }; class chunked_orc_writer_options_builder { @@ -965,6 +1011,19 @@ class chunked_orc_writer_options_builder { return *this; } + /** + * @brief Sets Key-Value footer metadata. + * + * @param metadata Key-Value footer metadata + * @return this for chaining. + */ + chunked_orc_writer_options_builder& key_value_metadata( + std::map metadata) + { + options._user_data = std::move(metadata); + return *this; + } + /** * @brief move chunked_orc_writer_options member once it's built. */ diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 2215f24b550..740f7a8b2db 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -364,13 +364,17 @@ class parquet_writer_options { statistics_freq _stats_level = statistics_freq::STATISTICS_ROWGROUP; // Sets of columns to output table_view _table; + // Partitions described as {start_row, num_rows} pairs + std::vector _partitions; // Optional associated metadata table_input_metadata const* _metadata = nullptr; + // Optional footer key_value_metadata + std::vector> _user_data; // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. // If true then overrides any per-column setting in _metadata. bool _write_timestamps_as_int96 = false; - // Column chunks file path to be set in the raw output metadata - std::string _column_chunks_file_path; + // Column chunks file paths to be set in the raw output metadata. One per output file + std::vector _column_chunks_file_paths; // Maximum size of each row group (unless smaller than a single page) size_t _row_group_size_bytes = default_row_group_size_bytes; // Maximum number of rows in row group (unless smaller than a single page) @@ -434,20 +438,36 @@ class parquet_writer_options { */ table_view get_table() const { return _table; } + /** + * @brief Returns partitions. + */ + std::vector const& get_partitions() const { return _partitions; } + /** * @brief Returns associated metadata. */ table_input_metadata const* get_metadata() const { return _metadata; } + /** + * @brief Returns Key-Value footer metadata information. + */ + std::vector> const& get_key_value_metadata() const + { + return _user_data; + } + /** * @brief Returns `true` if timestamps will be written as INT96 */ bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } /** - * @brief Returns Column chunks file path to be set in the raw output metadata. + * @brief Returns Column chunks file paths to be set in the raw output metadata. */ - std::string get_column_chunks_file_path() const { return _column_chunks_file_path; } + std::vector const& get_column_chunks_file_paths() const + { + return _column_chunks_file_paths; + } /** * @brief Returns maximum row group size, in bytes. @@ -459,6 +479,19 @@ class parquet_writer_options { */ auto get_row_group_size_rows() const { return _row_group_size_rows; } + /** + * @brief Sets partitions. + * + * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must + * be same size as number of sinks in sink_info + */ + void set_partitions(std::vector partitions) + { + CUDF_EXPECTS(partitions.size() == _sink.num_sinks(), + "Mismatch between number of sinks and number of partitions"); + _partitions = std::move(partitions); + } + /** * @brief Sets metadata. * @@ -466,6 +499,18 @@ class parquet_writer_options { */ void set_metadata(table_input_metadata const* metadata) { _metadata = metadata; } + /** + * @brief Sets metadata. + * + * @param metadata Key-Value footer metadata + */ + void set_key_value_metadata(std::vector> metadata) + { + CUDF_EXPECTS(metadata.size() == _sink.num_sinks(), + "Mismatch between number of sinks and number of metadata maps"); + _user_data = std::move(metadata); + } + /** * @brief Sets the level of statistics. * @@ -491,11 +536,14 @@ class parquet_writer_options { /** * @brief Sets column chunks file path to be set in the raw output metadata. * - * @param file_path String which indicates file path. + * @param file_paths Vector of Strings which indicates file path. Must be same size as number of + * data sinks in sink info */ - void set_column_chunks_file_path(std::string file_path) + void set_column_chunks_file_paths(std::vector file_paths) { - _column_chunks_file_path.assign(file_path); + CUDF_EXPECTS(file_paths.size() == _sink.num_sinks(), + "Mismatch between number of sinks and number of chunk paths to set"); + _column_chunks_file_paths = std::move(file_paths); } /** @@ -543,6 +591,21 @@ class parquet_writer_options_builder { { } + /** + * @brief Sets partitions in parquet_writer_options. + * + * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must + * be same size as number of sinks in sink_info + * @return this for chaining. + */ + parquet_writer_options_builder& partitions(std::vector partitions) + { + CUDF_EXPECTS(partitions.size() == options._sink.num_sinks(), + "Mismatch between number of sinks and number of partitions"); + options.set_partitions(std::move(partitions)); + return *this; + } + /** * @brief Sets metadata in parquet_writer_options. * @@ -555,6 +618,21 @@ class parquet_writer_options_builder { return *this; } + /** + * @brief Sets Key-Value footer metadata in parquet_writer_options. + * + * @param metadata Key-Value footer metadata + * @return this for chaining. + */ + parquet_writer_options_builder& key_value_metadata( + std::vector> metadata) + { + CUDF_EXPECTS(metadata.size() == options._sink.num_sinks(), + "Mismatch between number of sinks and number of metadata maps"); + options._user_data = std::move(metadata); + return *this; + } + /** * @brief Sets the level of statistics in parquet_writer_options. * @@ -582,12 +660,15 @@ class parquet_writer_options_builder { /** * @brief Sets column chunks file path to be set in the raw output metadata. * - * @param file_path String which indicates file path. + * @param file_paths Vector of Strings which indicates file path. Must be same size as number of + * data sinks * @return this for chaining. */ - parquet_writer_options_builder& column_chunks_file_path(std::string file_path) + parquet_writer_options_builder& column_chunks_file_paths(std::vector file_paths) { - options._column_chunks_file_path.assign(file_path); + CUDF_EXPECTS(file_paths.size() == options._sink.num_sinks(), + "Mismatch between number of sinks and number of chunk paths to set"); + options.set_column_chunks_file_paths(std::move(file_paths)); return *this; } @@ -690,6 +771,8 @@ class chunked_parquet_writer_options { statistics_freq _stats_level = statistics_freq::STATISTICS_ROWGROUP; // Optional associated metadata. table_input_metadata const* _metadata = nullptr; + // Optional footer key_value_metadata + std::vector> _user_data; // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. // If true then overrides any per-column setting in _metadata. bool _write_timestamps_as_int96 = false; @@ -735,6 +818,14 @@ class chunked_parquet_writer_options { */ table_input_metadata const* get_metadata() const { return _metadata; } + /** + * @brief Returns Key-Value footer metadata information. + */ + std::vector> const& get_key_value_metadata() const + { + return _user_data; + } + /** * @brief Returns `true` if timestamps will be written as INT96 */ @@ -757,6 +848,18 @@ class chunked_parquet_writer_options { */ void set_metadata(table_input_metadata const* metadata) { _metadata = metadata; } + /** + * @brief Sets Key-Value footer metadata. + * + * @param metadata Key-Value footer metadata + */ + void set_key_value_metadata(std::vector> metadata) + { + CUDF_EXPECTS(metadata.size() == _sink.num_sinks(), + "Mismatch between number of sinks and number of metadata maps"); + _user_data = std::move(metadata); + } + /** * @brief Sets the level of statistics in parquet_writer_options. * @@ -841,6 +944,21 @@ class chunked_parquet_writer_options_builder { return *this; } + /** + * @brief Sets Key-Value footer metadata in parquet_writer_options. + * + * @param metadata Key-Value footer metadata + * @return this for chaining. + */ + chunked_parquet_writer_options_builder& key_value_metadata( + std::vector> metadata) + { + CUDF_EXPECTS(metadata.size() == options._sink.num_sinks(), + "Mismatch between number of sinks and number of metadata maps"); + options.set_key_value_metadata(std::move(metadata)); + return *this; + } + /** * @brief Sets Sets the level of statistics in chunked_parquet_writer_options. * @@ -958,18 +1076,25 @@ class parquet_chunked_writer { * @brief Writes table to output. * * @param[in] table Table that needs to be written + * @param[in] partitions Optional partitions to divide the table into. If specified, must be same + * size as number of sinks. + * + * @throws cudf::logic_error If the number of partitions is not the smae as number of sinks * @return returns reference of the class object */ - parquet_chunked_writer& write(table_view const& table); + parquet_chunked_writer& write(table_view const& table, + std::vector const& partitions = {}); /** * @brief Finishes the chunked/streamed write process. * - * @param[in] column_chunks_file_path Column chunks file path to be set in the raw output metadata + * @param[in] column_chunks_file_paths Column chunks file path to be set in the raw output + * metadata * @return A parquet-compatible blob that contains the data for all rowgroups in the list only if - * `column_chunks_file_path` is provided, else null. + * `column_chunks_file_paths` is provided, else null. */ - std::unique_ptr> close(std::string const& column_chunks_file_path = ""); + std::unique_ptr> close( + std::vector const& column_chunks_file_paths = {}); // Unique pointer to impl writer class std::unique_ptr writer; diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index cf6be8a20af..512a90b3249 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -151,61 +151,93 @@ struct host_buffer { * @brief Source information for read interfaces */ struct source_info { - io_type type = io_type::FILEPATH; - std::vector filepaths; - std::vector buffers; - std::vector> files; - std::vector user_sources; + std::vector> _files; source_info() = default; explicit source_info(std::vector const& file_paths) - : type(io_type::FILEPATH), filepaths(file_paths) + : _type(io_type::FILEPATH), _filepaths(file_paths) { } explicit source_info(std::string const& file_path) - : type(io_type::FILEPATH), filepaths({file_path}) + : _type(io_type::FILEPATH), _filepaths({file_path}) { } explicit source_info(std::vector const& host_buffers) - : type(io_type::HOST_BUFFER), buffers(host_buffers) + : _type(io_type::HOST_BUFFER), _buffers(host_buffers) { } explicit source_info(const char* host_data, size_t size) - : type(io_type::HOST_BUFFER), buffers({{host_data, size}}) + : _type(io_type::HOST_BUFFER), _buffers({{host_data, size}}) { } explicit source_info(std::vector const& sources) - : type(io_type::USER_IMPLEMENTED), user_sources(sources) + : _type(io_type::USER_IMPLEMENTED), _user_sources(sources) { } explicit source_info(cudf::io::datasource* source) - : type(io_type::USER_IMPLEMENTED), user_sources({source}) + : _type(io_type::USER_IMPLEMENTED), _user_sources({source}) { } + + auto type() const { return _type; } + auto const& filepaths() const { return _filepaths; } + auto const& buffers() const { return _buffers; } + auto const& files() const { return _files; } + auto const& user_sources() const { return _user_sources; } + + private: + io_type _type = io_type::FILEPATH; + std::vector _filepaths; + std::vector _buffers; + std::vector _user_sources; }; /** * @brief Destination information for write interfaces */ struct sink_info { - io_type type = io_type::VOID; - std::string filepath; - std::vector* buffer = nullptr; - cudf::io::data_sink* user_sink = nullptr; - sink_info() = default; + sink_info(size_t num_sinks) : _type(io_type::VOID), _num_sinks(num_sinks) {} - explicit sink_info(const std::string& file_path) : type(io_type::FILEPATH), filepath(file_path) {} + explicit sink_info(std::vector const& file_paths) + : _type(io_type::FILEPATH), _num_sinks(file_paths.size()), _filepaths(file_paths) + { + } + explicit sink_info(std::string const& file_path) + : _type(io_type::FILEPATH), _filepaths({file_path}) + { + } - explicit sink_info(std::vector* buffer) : type(io_type::HOST_BUFFER), buffer(buffer) {} + explicit sink_info(std::vector*> const& buffers) + : _type(io_type::HOST_BUFFER), _num_sinks(buffers.size()), _buffers(buffers) + { + } + explicit sink_info(std::vector* buffer) : _type(io_type::HOST_BUFFER), _buffers({buffer}) {} - explicit sink_info(class cudf::io::data_sink* user_sink_) - : type(io_type::USER_IMPLEMENTED), user_sink(user_sink_) + explicit sink_info(std::vector const& user_sinks) + : _type(io_type::USER_IMPLEMENTED), _num_sinks(user_sinks.size()), _user_sinks(user_sinks) { } + explicit sink_info(class cudf::io::data_sink* user_sink) + : _type(io_type::USER_IMPLEMENTED), _user_sinks({user_sink}) + { + } + + auto type() const { return _type; } + auto num_sinks() const { return _num_sinks; } + auto const& filepaths() const { return _filepaths; } + auto const& buffers() const { return _buffers; } + auto const& user_sinks() const { return _user_sinks; } + + private: + io_type _type = io_type::VOID; + size_t _num_sinks = 1; + std::vector _filepaths; + std::vector*> _buffers; + std::vector _user_sinks; }; class table_input_metadata; @@ -369,12 +401,21 @@ class table_input_metadata { * The constructed table_input_metadata has the same structure as the passed table_view * * @param table The table_view to construct metadata for - * @param user_data Optional Additional metadata to encode, as key-value pairs */ - table_input_metadata(table_view const& table, std::map user_data = {}); + table_input_metadata(table_view const& table); std::vector column_metadata; - std::map user_data; //!< Format-dependent metadata as key-values pairs +}; + +/** + * @brief Information used while writing partitioned datasets + * + * This information defines the slice of an input table to write to file. In partitioned dataset + * writing, one partition_info struct defines one partition and corresponds to one output file + */ +struct partition_info { + size_type start_row; + size_type num_rows; }; } // namespace io diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 768d6b25690..04638d3eca9 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -111,27 +111,33 @@ std::vector> make_datasources(source_info size_t range_offset = 0, size_t range_size = 0) { - switch (info.type) { + switch (info.type()) { case io_type::FILEPATH: { auto sources = std::vector>(); - for (auto const& filepath : info.filepaths) { + for (auto const& filepath : info.filepaths()) { sources.emplace_back(cudf::io::datasource::create(filepath, range_offset, range_size)); } return sources; } - case io_type::HOST_BUFFER: return cudf::io::datasource::create(info.buffers); - case io_type::USER_IMPLEMENTED: return cudf::io::datasource::create(info.user_sources); + case io_type::HOST_BUFFER: return cudf::io::datasource::create(info.buffers()); + case io_type::USER_IMPLEMENTED: return cudf::io::datasource::create(info.user_sources()); default: CUDF_FAIL("Unsupported source type"); } } -std::unique_ptr make_datasink(sink_info const& info) +std::vector> make_datasinks(sink_info const& info) { - switch (info.type) { - case io_type::FILEPATH: return cudf::io::data_sink::create(info.filepath); - case io_type::HOST_BUFFER: return cudf::io::data_sink::create(info.buffer); - case io_type::VOID: return cudf::io::data_sink::create(); - case io_type::USER_IMPLEMENTED: return cudf::io::data_sink::create(info.user_sink); + switch (info.type()) { + case io_type::FILEPATH: return cudf::io::data_sink::create(info.filepaths()); + case io_type::HOST_BUFFER: return cudf::io::data_sink::create(info.buffers()); + case io_type::VOID: { + std::vector> sinks; + for (size_t i = 0; i < info.num_sinks(); ++i) { + sinks.push_back(cudf::io::data_sink::create()); + } + return sinks; + } + case io_type::USER_IMPLEMENTED: return cudf::io::data_sink::create(info.user_sinks()); default: CUDF_FAIL("Unsupported sink type"); } } @@ -156,9 +162,9 @@ compression_type infer_compression_type(compression_type compression, source_inf { if (compression != compression_type::AUTO) { return compression; } - if (info.type != io_type::FILEPATH) { return compression_type::NONE; } + if (info.type() != io_type::FILEPATH) { return compression_type::NONE; } - auto filepath = info.filepaths[0]; + auto filepath = info.filepaths()[0]; // Attempt to infer from the file extension const auto pos = filepath.find_last_of('.'); @@ -218,10 +224,11 @@ void write_csv(csv_writer_options const& options, rmm::mr::device_memory_resourc { using namespace cudf::io::detail; - auto sink = make_datasink(options.get_sink()); + auto sinks = make_datasinks(options.get_sink()); + CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for CSV writing"); return csv::write_csv( // - sink.get(), + sinks[0].get(), options.get_table(), options.get_metadata(), options, @@ -235,15 +242,16 @@ raw_orc_statistics read_raw_orc_statistics(source_info const& src_info) { // Get source to read statistics from std::unique_ptr source; - if (src_info.type == io_type::FILEPATH) { - CUDF_EXPECTS(src_info.filepaths.size() == 1, "Only a single source is currently supported."); - source = cudf::io::datasource::create(src_info.filepaths[0]); - } else if (src_info.type == io_type::HOST_BUFFER) { - CUDF_EXPECTS(src_info.buffers.size() == 1, "Only a single source is currently supported."); - source = cudf::io::datasource::create(src_info.buffers[0]); - } else if (src_info.type == io_type::USER_IMPLEMENTED) { - CUDF_EXPECTS(src_info.user_sources.size() == 1, "Only a single source is currently supported."); - source = cudf::io::datasource::create(src_info.user_sources[0]); + if (src_info.type() == io_type::FILEPATH) { + CUDF_EXPECTS(src_info.filepaths().size() == 1, "Only a single source is currently supported."); + source = cudf::io::datasource::create(src_info.filepaths()[0]); + } else if (src_info.type() == io_type::HOST_BUFFER) { + CUDF_EXPECTS(src_info.buffers().size() == 1, "Only a single source is currently supported."); + source = cudf::io::datasource::create(src_info.buffers()[0]); + } else if (src_info.type() == io_type::USER_IMPLEMENTED) { + CUDF_EXPECTS(src_info.user_sources().size() == 1, + "Only a single source is currently supported."); + source = cudf::io::datasource::create(src_info.user_sources()[0]); } else { CUDF_FAIL("Unsupported source type"); } @@ -350,9 +358,11 @@ void write_orc(orc_writer_options const& options, rmm::mr::device_memory_resourc CUDF_FUNC_RANGE(); - auto sink = make_datasink(options.get_sink()); + auto sinks = make_datasinks(options.get_sink()); + CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for ORC writing"); + auto writer = std::make_unique( - std::move(sink), options, io_detail::SingleWriteMode::YES, rmm::cuda_stream_default, mr); + std::move(sinks[0]), options, io_detail::SingleWriteMode::YES, rmm::cuda_stream_default, mr); writer->write(options.get_table()); } @@ -365,10 +375,11 @@ orc_chunked_writer::orc_chunked_writer(chunked_orc_writer_options const& options { namespace io_detail = cudf::io::detail; - auto sink = make_datasink(options.get_sink()); + auto sinks = make_datasinks(options.get_sink()); + CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for ORC writing"); writer = std::make_unique( - std::move(sink), options, io_detail::SingleWriteMode::NO, rmm::cuda_stream_default, mr); + std::move(sinks[0]), options, io_detail::SingleWriteMode::NO, rmm::cuda_stream_default, mr); } /** @@ -417,9 +428,7 @@ std::unique_ptr> merge_row_group_metadata( return detail_parquet::writer::merge_row_group_metadata(metadata_list); } -table_input_metadata::table_input_metadata(table_view const& table, - std::map user_data) - : user_data{std::move(user_data)} +table_input_metadata::table_input_metadata(table_view const& table) { // Create a metadata hierarchy using `table` std::function get_children = [&](column_view const& col) { @@ -443,13 +452,13 @@ std::unique_ptr> write_parquet(parquet_writer_options const CUDF_FUNC_RANGE(); - auto sink = make_datasink(options.get_sink()); + auto sinks = make_datasinks(options.get_sink()); auto writer = std::make_unique( - std::move(sink), options, io_detail::SingleWriteMode::YES, rmm::cuda_stream_default, mr); + std::move(sinks), options, io_detail::SingleWriteMode::YES, rmm::cuda_stream_default, mr); - writer->write(options.get_table()); + writer->write(options.get_table(), options.get_partitions()); - return writer->close(options.get_column_chunks_file_path()); + return writer->close(options.get_column_chunks_file_paths()); } /** @@ -460,20 +469,21 @@ parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options co { namespace io_detail = cudf::io::detail; - auto sink = make_datasink(options.get_sink()); + auto sinks = make_datasinks(options.get_sink()); writer = std::make_unique( - std::move(sink), options, io_detail::SingleWriteMode::NO, rmm::cuda_stream_default, mr); + std::move(sinks), options, io_detail::SingleWriteMode::NO, rmm::cuda_stream_default, mr); } /** * @copydoc cudf::io::parquet_chunked_writer::write */ -parquet_chunked_writer& parquet_chunked_writer::write(table_view const& table) +parquet_chunked_writer& parquet_chunked_writer::write(table_view const& table, + std::vector const& partitions) { CUDF_FUNC_RANGE(); - writer->write(table); + writer->write(table, partitions); return *this; } @@ -482,7 +492,7 @@ parquet_chunked_writer& parquet_chunked_writer::write(table_view const& table) * @copydoc cudf::io::parquet_chunked_writer::close */ std::unique_ptr> parquet_chunked_writer::close( - std::string const& column_chunks_file_path) + std::vector const& column_chunks_file_path) { CUDF_FUNC_RANGE(); return writer->close(column_chunks_file_path); diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index a7a767585e6..e8fca1b4503 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1312,6 +1312,7 @@ writer::impl::impl(std::unique_ptr sink, compression_kind_(to_orc_compression(options.get_compression())), enable_statistics_(options.is_enabled_statistics()), single_write_mode(mode == SingleWriteMode::YES), + kv_meta(options.get_key_value_metadata()), out_sink_(std::move(sink)) { if (options.get_metadata()) { @@ -1332,6 +1333,7 @@ writer::impl::impl(std::unique_ptr sink, compression_kind_(to_orc_compression(options.get_compression())), enable_statistics_(options.is_enabled_statistics()), single_write_mode(mode == SingleWriteMode::YES), + kv_meta(options.get_key_value_metadata()), out_sink_(std::move(sink)) { if (options.get_metadata()) { @@ -2068,12 +2070,10 @@ void writer::impl::close() PostScript ps; ff.contentLength = out_sink_->bytes_written(); - std::transform(table_meta->user_data.begin(), - table_meta->user_data.end(), - std::back_inserter(ff.metadata), - [&](auto const& udata) { - return UserMetadataItem{udata.first, udata.second}; - }); + std::transform( + kv_meta.begin(), kv_meta.end(), std::back_inserter(ff.metadata), [&](auto const& udata) { + return UserMetadataItem{udata.first, udata.second}; + }); // Write statistics metadata if (md.stripeStats.size() != 0) { diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 68622d17b28..80c22b09927 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -369,6 +369,8 @@ class writer::impl { bool const single_write_mode; // optional user metadata std::unique_ptr table_meta; + // optional user metadata + std::map kv_meta; // to track if the output has been written to sink bool closed = false; diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index 42d27dadd1a..5589f87e57e 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -95,69 +95,41 @@ struct map_find_fn { template __global__ void __launch_bounds__(block_size, 1) populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan chunks, - size_type num_rows) + cudf::detail::device_2dspan frags) { auto col_idx = blockIdx.y; auto block_x = blockIdx.x; auto t = threadIdx.x; + auto frag = frags[col_idx][block_x]; + auto chunk = frag.chunk; + auto col = chunk->col_desc; - auto start_row = - block_x * - max_page_fragment_size; // This is fragment size. all chunks are multiple of these many rows. - size_type end_row = min(start_row + max_page_fragment_size, num_rows); + size_type start_row = frag.start_row; + size_type end_row = frag.start_row + frag.num_rows; - __shared__ EncColumnChunk* s_chunk; - __shared__ parquet_column_device_view s_col; __shared__ size_type s_start_value_idx; __shared__ size_type s_num_values; - if (t == 0) { - // Find the chunk this block is a part of - size_type num_rowgroups = chunks.size().first; - size_type rg_idx = 0; - while (rg_idx < num_rowgroups) { - if (auto ck = chunks[rg_idx][col_idx]; - start_row >= ck.start_row and start_row < ck.start_row + ck.num_rows) { - break; - } - ++rg_idx; - } - s_chunk = &chunks[rg_idx][col_idx]; - s_col = *(s_chunk->col_desc); - } - __syncthreads(); - if (not s_chunk->use_dictionary) { return; } + + if (not chunk->use_dictionary) { return; } if (t == 0) { // Find the bounds of values in leaf column to be inserted into the map for current chunk - auto col = *(s_col.parent_column); - auto start_value_idx = start_row; - auto end_value_idx = end_row; - while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) { - if (col.type().id() == type_id::STRUCT) { - start_value_idx += col.offset(); - end_value_idx += col.offset(); - col = col.child(0); - } else { - auto offset_col = col.child(lists_column_view::offsets_column_index); - start_value_idx = offset_col.element(start_value_idx + col.offset()); - end_value_idx = offset_col.element(end_value_idx + col.offset()); - col = col.child(lists_column_view::child_column_index); - } - } - s_start_value_idx = start_value_idx; - s_num_values = end_value_idx - start_value_idx; + auto cudf_col = *(col->parent_column); + s_start_value_idx = row_to_value_idx(start_row, cudf_col); + auto end_value_idx = row_to_value_idx(end_row, cudf_col); + s_num_values = end_value_idx - s_start_value_idx; } __syncthreads(); - column_device_view const& data_col = *s_col.leaf_column; + column_device_view const& data_col = *col->leaf_column; using block_reduce = cub::BlockReduce; __shared__ typename block_reduce::TempStorage reduce_storage; // Make a view of the hash map auto hash_map_mutable = map_type::device_mutable_view( - s_chunk->dict_map_slots, s_chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); + chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); auto hash_map = map_type::device_view( - s_chunk->dict_map_slots, s_chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); + chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); __shared__ int total_num_dict_entries; for (size_type i = 0; i < s_num_values; i += block_size) { @@ -176,7 +148,7 @@ __global__ void __launch_bounds__(block_size, 1) type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx); uniq_elem_size = [&]() -> size_type { if (not is_unique) { return 0; } - switch (s_col.physical_type) { + switch (col->physical_type) { case Type::INT32: return 4; case Type::INT64: return 8; case Type::INT96: return 12; @@ -200,9 +172,9 @@ __global__ void __launch_bounds__(block_size, 1) __syncthreads(); auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); if (t == 0) { - total_num_dict_entries = atomicAdd(&s_chunk->num_dict_entries, num_unique); + total_num_dict_entries = atomicAdd(&chunk->num_dict_entries, num_unique); total_num_dict_entries += num_unique; - atomicAdd(&s_chunk->uniq_data_size, uniq_data_size); + atomicAdd(&chunk->uniq_data_size, uniq_data_size); } __syncthreads(); @@ -246,67 +218,38 @@ __global__ void __launch_bounds__(block_size, 1) template __global__ void __launch_bounds__(block_size, 1) get_dictionary_indices_kernel(cudf::detail::device_2dspan chunks, - size_type num_rows) + cudf::detail::device_2dspan frags) { auto col_idx = blockIdx.y; auto block_x = blockIdx.x; auto t = threadIdx.x; + auto frag = frags[col_idx][block_x]; + auto chunk = frag.chunk; + auto col = chunk->col_desc; - size_type start_row = block_x * max_page_fragment_size; - size_type end_row = min(start_row + max_page_fragment_size, num_rows); + size_type start_row = frag.start_row; + size_type end_row = frag.start_row + frag.num_rows; - __shared__ EncColumnChunk s_chunk; - __shared__ parquet_column_device_view s_col; __shared__ size_type s_start_value_idx; __shared__ size_type s_ck_start_val_idx; __shared__ size_type s_num_values; if (t == 0) { - // Find the chunk this block is a part of - size_type num_rowgroups = chunks.size().first; - size_type rg_idx = 0; - while (rg_idx < num_rowgroups) { - if (auto ck = chunks[rg_idx][col_idx]; - start_row >= ck.start_row and start_row < ck.start_row + ck.num_rows) { - break; - } - ++rg_idx; - } - s_chunk = chunks[rg_idx][col_idx]; - s_col = *(s_chunk.col_desc); - - // Find the bounds of values in leaf column to be inserted into the map for current chunk - - auto col = *(s_col.parent_column); - auto start_value_idx = start_row; - auto end_value_idx = end_row; - auto chunk_start_val_idx = s_chunk.start_row; - while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) { - if (col.type().id() == type_id::STRUCT) { - start_value_idx += col.offset(); - chunk_start_val_idx += col.offset(); - end_value_idx += col.offset(); - col = col.child(0); - } else { - auto offset_col = col.child(lists_column_view::offsets_column_index); - start_value_idx = offset_col.element(start_value_idx + col.offset()); - chunk_start_val_idx = offset_col.element(chunk_start_val_idx + col.offset()); - end_value_idx = offset_col.element(end_value_idx + col.offset()); - col = col.child(lists_column_view::child_column_index); - } - } - s_start_value_idx = start_value_idx; - s_ck_start_val_idx = chunk_start_val_idx; - s_num_values = end_value_idx - start_value_idx; + // Find the bounds of values in leaf column to be searched in the map for current chunk + auto cudf_col = *(col->parent_column); + s_start_value_idx = row_to_value_idx(start_row, cudf_col); + s_ck_start_val_idx = row_to_value_idx(chunk->start_row, cudf_col); + auto end_value_idx = row_to_value_idx(end_row, cudf_col); + s_num_values = end_value_idx - s_start_value_idx; } __syncthreads(); - if (not s_chunk.use_dictionary) { return; } + if (not chunk->use_dictionary) { return; } - column_device_view const& data_col = *s_col.leaf_column; + column_device_view const& data_col = *col->leaf_column; auto map = map_type::device_view( - s_chunk.dict_map_slots, s_chunk.dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); + chunk->dict_map_slots, chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); for (size_t i = 0; i < s_num_values; i += block_size) { if (t + i < s_num_values) { @@ -321,7 +264,7 @@ __global__ void __launch_bounds__(block_size, 1) if (found_slot != map.end()) { // No need for atomic as this is not going to be modified by any other thread auto* val_ptr = reinterpret_cast(&found_slot->second); - s_chunk.dict_index[val_idx - s_ck_start_val_idx] = *val_ptr; + chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr; } } } @@ -336,16 +279,14 @@ void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_st } void populate_chunk_hash_maps(cudf::detail::device_2dspan chunks, - size_type num_rows, + cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { constexpr int block_size = 256; - auto const grid_x = cudf::detail::grid_1d(num_rows, max_page_fragment_size); - auto const num_columns = chunks.size().second; - dim3 const dim_grid(grid_x.num_blocks, num_columns); + dim3 const dim_grid(frags.size().second, frags.size().first); populate_chunk_hash_maps_kernel - <<>>(chunks, num_rows); + <<>>(chunks, frags); } void collect_map_entries(device_span chunks, rmm::cuda_stream_view stream) @@ -355,16 +296,14 @@ void collect_map_entries(device_span chunks, rmm::cuda_stream_vi } void get_dictionary_indices(cudf::detail::device_2dspan chunks, - size_type num_rows, + cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { constexpr int block_size = 256; - auto const grid_x = cudf::detail::grid_1d(num_rows, max_page_fragment_size); - auto const num_columns = chunks.size().second; - dim3 const dim_grid(grid_x.num_blocks, num_columns); + dim3 const dim_grid(frags.size().second, frags.size().first); get_dictionary_indices_kernel - <<>>(chunks, num_rows); + <<>>(chunks, frags); } } // namespace gpu } // namespace parquet diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 33647ff626c..ec6b24b3b4e 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -27,6 +27,7 @@ #include #include +#include #include #include @@ -49,7 +50,6 @@ constexpr uint32_t rle_buffer_size = (1 << 9); struct frag_init_state_s { parquet_column_device_view col; PageFragment frag; - size_type start_value_idx; }; struct page_enc_state_s { @@ -114,24 +114,14 @@ inline __device__ uint32_t uint64_init_hash(uint64_t v) return uint32_init_hash(static_cast(v + (v >> 32))); } -/** - * @brief Initializes encoder page fragments - * - * Based on the number of rows in each fragment, populates the value count, the size of data in the - * fragment, the number of unique values, and the data size of unique values. - * - * @param[in] frag Fragment array [fragment_id][column_id] - * @param[in] col_desc Column description array [column_id] - * @param[in] num_fragments Number of fragments per column - * @param[in] num_columns Number of columns - */ // blockDim {512,1,1} template __global__ void __launch_bounds__(block_size) gpuInitPageFragments(device_2dspan frag, device_span col_desc, - uint32_t fragment_size, - uint32_t max_num_rows) + device_span partitions, + device_span part_frag_offset, + uint32_t fragment_size) { __shared__ __align__(16) frag_init_state_s state_g; @@ -140,53 +130,36 @@ __global__ void __launch_bounds__(block_size) frag_init_state_s* const s = &state_g; uint32_t t = threadIdx.x; + int frag_y = blockIdx.y; if (t == 0) s->col = col_desc[blockIdx.x]; __syncthreads(); - uint32_t const start_row = blockIdx.y * fragment_size; if (!t) { - // frag.num_rows = fragment_size except for the last page fragment which can be smaller. + // Find which partition this fragment came from + auto it = + thrust::upper_bound(thrust::seq, part_frag_offset.begin(), part_frag_offset.end(), frag_y); + int p = it - part_frag_offset.begin() - 1; + int part_end_row = partitions[p].start_row + partitions[p].num_rows; + s->frag.start_row = (frag_y - part_frag_offset[p]) * fragment_size + partitions[p].start_row; + + // frag.num_rows = fragment_size except for the last fragment in partition which can be smaller. // num_rows is fixed but fragment size could be larger if the data is strings or nested. - s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows)); + s->frag.num_rows = min(fragment_size, part_end_row - s->frag.start_row); s->frag.num_dict_vals = 0; s->frag.fragment_data_size = 0; s->frag.dict_data_size = 0; - // To use num_vals instead of num_rows, we need to calculate num_vals on the fly. - // For list>, values between i and i+50 can be calculated by - // off_11 = off[i], off_12 = off[i+50] - // off_21 = child.off[off_11], off_22 = child.off[off_12] - // etc... - size_type end_value_idx = start_row + s->frag.num_rows; - if (s->col.parent_column == nullptr) { - s->start_value_idx = start_row; - } else { - auto col = *(s->col.parent_column); - auto current_start_value_idx = start_row; - while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) { - if (col.type().id() == type_id::STRUCT) { - current_start_value_idx += col.offset(); - end_value_idx += col.offset(); - col = col.child(0); - } else { - auto offset_col = col.child(lists_column_view::offsets_column_index); - current_start_value_idx = - offset_col.element(current_start_value_idx + col.offset()); - end_value_idx = offset_col.element(end_value_idx + col.offset()); - col = col.child(lists_column_view::child_column_index); - } - } - s->start_value_idx = current_start_value_idx; - } - s->frag.start_value_idx = s->start_value_idx; - s->frag.num_leaf_values = end_value_idx - s->start_value_idx; + auto col = *(s->col.parent_column); + s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, col); + size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, col); + s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx; if (s->col.level_offsets != nullptr) { // For nested schemas, the number of values in a fragment is not directly related to the // number of encoded data elements or the number of rows. It is simply the number of // repetition/definition values which together encode validity and nesting information. - size_type first_level_val_idx = s->col.level_offsets[start_row]; - size_type last_level_val_idx = s->col.level_offsets[start_row + s->frag.num_rows]; + size_type first_level_val_idx = s->col.level_offsets[s->frag.start_row]; + size_type last_level_val_idx = s->col.level_offsets[s->frag.start_row + s->frag.num_rows]; s->frag.num_values = last_level_val_idx - first_level_val_idx; } else { s->frag.num_values = s->frag.num_rows; @@ -197,7 +170,7 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); size_type nvals = s->frag.num_leaf_values; - size_type start_value_idx = s->start_value_idx; + size_type start_value_idx = s->frag.start_value_idx; for (uint32_t i = 0; i < nvals; i += block_size) { uint32_t val_idx = start_value_idx + i + t; @@ -912,28 +885,9 @@ __global__ void __launch_bounds__(128, 8) dst[0] = dict_bits; s->rle_out = dst + 1; } - s->page_start_val = s->page.start_row; // Dictionary page's start row is chunk's start row - auto chunk_start_val = s->ck.start_row; - if (s->col.parent_column != nullptr) { // TODO: remove this check. parent is now never nullptr - auto col = *(s->col.parent_column); - auto current_page_start_val = s->page_start_val; - // TODO: We do this so much. Add a global function that converts row idx to val idx - while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) { - if (col.type().id() == type_id::STRUCT) { - current_page_start_val += col.offset(); - chunk_start_val += col.offset(); - col = col.child(0); - } else { - auto offset_col = col.child(lists_column_view::offsets_column_index); - current_page_start_val = - offset_col.element(current_page_start_val + col.offset()); - chunk_start_val = offset_col.element(chunk_start_val + col.offset()); - col = col.child(lists_column_view::child_column_index); - } - } - s->page_start_val = current_page_start_val; - s->chunk_start_val = chunk_start_val; - } + auto col = *(s->col.parent_column); + s->page_start_val = row_to_value_idx(s->page.start_row, col); + s->chunk_start_val = row_to_value_idx(s->ck.start_row, col); } __syncthreads(); for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { @@ -1944,36 +1898,20 @@ dremel_data get_dremel_data(column_view h_col, std::move(new_offsets), std::move(rep_level), std::move(def_level), leaf_data_size}; } -/** - * @brief Launches kernel for initializing encoder page fragments - * - * @param[in,out] frag Fragment array [column_id][fragment_id] - * @param[in] col_desc Column description array [column_id] - * @param[in] num_fragments Number of fragments per column - * @param[in] num_columns Number of columns - * @param[in] stream CUDA stream to use, default 0 - */ void InitPageFragments(device_2dspan frag, device_span col_desc, + device_span partitions, + device_span part_frag_offset, uint32_t fragment_size, - uint32_t num_rows, rmm::cuda_stream_view stream) { auto num_columns = frag.size().first; auto num_fragments_per_column = frag.size().second; dim3 dim_grid(num_columns, num_fragments_per_column); // 1 threadblock per fragment - gpuInitPageFragments<512> - <<>>(frag, col_desc, fragment_size, num_rows); + gpuInitPageFragments<512><<>>( + frag, col_desc, partitions, part_frag_offset, fragment_size); } -/** - * @brief Launches kernel for initializing fragment statistics groups - * - * @param[out] groups Statistics groups [num_columns x num_fragments] - * @param[in] fragments Page fragments [num_columns x num_fragments] - * @param[in] col_desc Column description [num_columns] - * @param[in] stream CUDA stream to use, default 0 - */ void InitFragmentStatistics(device_2dspan groups, device_2dspan fragments, device_span col_desc, @@ -1986,19 +1924,6 @@ void InitFragmentStatistics(device_2dspan groups, gpuInitFragmentStats<<>>(groups, fragments, col_desc); } -/** - * @brief Launches kernel for initializing encoder data pages - * - * @param[in,out] chunks Column chunks [rowgroup][column] - * @param[out] pages Encode page array (null if just counting pages) - * @param[in] col_desc Column description array [column_id] - * @param[in] num_rowgroups Number of fragments per column - * @param[in] num_columns Number of columns - * @param[out] page_grstats Setup for page-level stats - * @param[out] chunk_grstats Setup for chunk-level stats - * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages - * @param[in] stream CUDA stream to use, default 0 - */ void InitEncoderPages(device_2dspan chunks, device_span pages, device_span col_desc, @@ -2014,14 +1939,6 @@ void InitEncoderPages(device_2dspan chunks, chunks, pages, col_desc, page_grstats, chunk_grstats, max_page_comp_data_size, num_columns); } -/** - * @brief Launches kernel for packing column data into parquet pages - * - * @param[in,out] pages Device array of EncPages (unordered) - * @param[out] comp_in Optionally initializes compressor input params - * @param[out] comp_stat Optionally initializes compressor status - * @param[in] stream CUDA stream to use, default 0 - */ void EncodePages(device_span pages, device_span comp_in, device_span comp_stat, @@ -2033,26 +1950,11 @@ void EncodePages(device_span pages, gpuEncodePages<128><<>>(pages, comp_in, comp_stat); } -/** - * @brief Launches kernel to make the compressed vs uncompressed chunk-level decision - * - * @param[in,out] chunks Column chunks - * @param[in] stream CUDA stream to use, default 0 - */ void DecideCompression(device_span chunks, rmm::cuda_stream_view stream) { gpuDecideCompression<<>>(chunks); } -/** - * @brief Launches kernel to encode page headers - * - * @param[in,out] pages Device array of EncPages - * @param[in] comp_stat Compressor status or nullptr if no compression - * @param[in] page_stats Optional page-level statistics to be included in page header - * @param[in] chunk_stats Optional chunk-level statistics to be encoded - * @param[in] stream CUDA stream to use, default 0 - */ void EncodePageHeaders(device_span pages, device_span comp_stat, device_span page_stats, @@ -2065,13 +1967,6 @@ void EncodePageHeaders(device_span pages, pages, comp_stat, page_stats, chunk_stats); } -/** - * @brief Launches kernel to gather pages to a single contiguous block per chunk - * - * @param[in,out] chunks Column chunks - * @param[in] pages Device array of EncPages - * @param[in] stream CUDA stream to use, default 0 - */ void GatherPages(device_span chunks, device_span pages, rmm::cuda_stream_view stream) diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index a0cbc28bc8d..53bb11c8b70 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -252,6 +252,8 @@ struct parquet_column_device_view : stats_column_desc { constexpr int max_page_fragment_size = 5000; //!< Max number of rows in a page fragment +struct EncColumnChunk; + /** * @brief Struct describing an encoder page fragment */ @@ -262,8 +264,10 @@ struct PageFragment { uint32_t start_value_idx; uint32_t num_leaf_values; //!< Number of leaf values in fragment. Does not include nulls at //!< non-leaf level + size_type start_row; //!< First row in fragment uint16_t num_rows; //!< Number of rows in fragment uint16_t num_dict_vals; //!< Number of unique dictionary entries + EncColumnChunk* chunk; //!< The chunk that this fragment belongs to }; /// Size of hash used for building dictionaries @@ -284,6 +288,27 @@ inline uint32_t __device__ int32_logical_len(type_id id) } } +/** + * @brief Translate the row index of a parent column_device_view into the index of the first value + * in the leaf child. + * Only works in the context of parquet writer where struct columns are previously modified s.t. + * they only have one immediate child. + */ +inline size_type __device__ row_to_value_idx(size_type idx, column_device_view col) +{ + while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) { + if (col.type().id() == type_id::STRUCT) { + idx += col.offset(); + col = col.child(0); + } else { + auto offset_col = col.child(lists_column_view::offsets_column_index); + idx = offset_col.element(idx + col.offset()); + col = col.child(lists_column_view::child_column_index); + } + } + return idx; +} + /** * @brief Return worst-case compressed size of compressed data given the uncompressed size */ @@ -309,7 +334,7 @@ struct EncColumnChunk { uint32_t compressed_size; //!< Compressed buffer size uint32_t max_page_data_size; //!< Max data size (excluding header) of any page in this chunk uint32_t page_headers_size; //!< Sum of size of all page headers - uint32_t start_row; //!< First row of chunk + size_type start_row; //!< First row of chunk uint32_t num_rows; //!< Number of rows in chunk size_type num_values; //!< Number of values in chunk. Different from num_rows for nested types uint32_t first_fragment; //!< First fragment of chunk @@ -459,18 +484,21 @@ dremel_data get_dremel_data(column_view h_col, /** * @brief Launches kernel for initializing encoder page fragments * + * Based on the number of rows in each fragment, populates the value count, the size of data in the + * fragment, the number of unique values, and the data size of unique values. + * * @param[out] frag Fragment array [column_id][fragment_id] * @param[in] col_desc Column description array [column_id] - * @param[in] num_fragments Number of fragments per column - * @param[in] num_columns Number of columns + * @param[in] partitions Information about partitioning of table + * @param[in] first_frag_in_part A Partition's offset into fragment array * @param[in] fragment_size Number of rows per fragment - * @param[in] num_rows Number of rows per column * @param[in] stream CUDA stream to use */ void InitPageFragments(cudf::detail::device_2dspan frag, device_span col_desc, + device_span partitions, + device_span first_frag_in_part, uint32_t fragment_size, - uint32_t num_rows, rmm::cuda_stream_view stream); /** @@ -498,11 +526,11 @@ void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_st * @brief Insert chunk values into their respective hash maps * * @param chunks Column chunks [rowgroup][column] - * @param num_rows Number of rows per column + * @param frags Column fragments * @param stream CUDA stream to use */ void populate_chunk_hash_maps(cudf::detail::device_2dspan chunks, - size_type num_rows, + cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); /** @@ -523,11 +551,11 @@ void collect_map_entries(device_span chunks, rmm::cuda_stream_vi * col[row] == col[dict_data[dict_index[row - chunk.start_row]]] * * @param chunks Column chunks [rowgroup][column] - * @param num_rows Number of rows per column + * @param frags Column fragments * @param stream CUDA stream to use */ void get_dictionary_indices(cudf::detail::device_2dspan chunks, - size_type num_rows, + cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index e04c8371df8..aceb3bfbec1 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -40,6 +41,8 @@ #include +#include + #include #include #include @@ -76,6 +79,113 @@ parquet::Compression to_parquet_compression(compression_type compression) } // namespace +struct aggregate_metadata { + aggregate_metadata(std::vector const& partitions, + size_type num_columns, + std::vector schema, + statistics_freq stats_granularity, + std::vector> const& kv_md) + : version(1), schema(std::move(schema)), files(partitions.size()) + { + for (size_t i = 0; i < partitions.size(); ++i) { + this->files[i].num_rows = partitions[i].num_rows; + } + this->column_order_listsize = + (stats_granularity != statistics_freq::STATISTICS_NONE) ? num_columns : 0; + + for (size_t p = 0; p < kv_md.size(); ++p) { + std::transform(kv_md[p].begin(), + kv_md[p].end(), + std::back_inserter(this->files[p].key_value_metadata), + [](auto const& kv) { + return KeyValue{kv.first, kv.second}; + }); + } + } + + void update_files(std::vector const& partitions) + { + CUDF_EXPECTS(partitions.size() == this->files.size(), + "New partitions must be same size as previously passed number of partitions"); + for (size_t i = 0; i < partitions.size(); ++i) { + this->files[i].num_rows += partitions[i].num_rows; + } + } + + FileMetaData get_metadata(size_t part) + { + CUDF_EXPECTS(part < files.size(), "Invalid part index queried"); + FileMetaData meta{}; + meta.version = this->version; + meta.schema = this->schema; + meta.num_rows = this->files[part].num_rows; + meta.row_groups = this->files[part].row_groups; + meta.key_value_metadata = this->files[part].key_value_metadata; + meta.created_by = this->created_by; + meta.column_order_listsize = this->column_order_listsize; + return meta; + } + + void set_file_paths(std::vector const& column_chunks_file_path) + { + for (size_t p = 0; p < this->files.size(); ++p) { + auto& file = this->files[p]; + auto const& file_path = column_chunks_file_path[p]; + for (auto& rowgroup : file.row_groups) { + for (auto& col : rowgroup.columns) { + col.file_path = file_path; + } + } + } + } + + FileMetaData get_merged_metadata() + { + FileMetaData merged_md; + for (size_t p = 0; p < this->files.size(); ++p) { + auto& file = this->files[p]; + if (p == 0) { + merged_md = this->get_metadata(0); + } else { + merged_md.row_groups.insert(merged_md.row_groups.end(), + std::make_move_iterator(file.row_groups.begin()), + std::make_move_iterator(file.row_groups.end())); + merged_md.num_rows += file.num_rows; + } + } + return merged_md; + } + + std::vector num_row_groups_per_file() + { + std::vector global_rowgroup_base; + std::transform(this->files.begin(), + this->files.end(), + std::back_inserter(global_rowgroup_base), + [](auto const& part) { return part.row_groups.size(); }); + return global_rowgroup_base; + } + + bool schema_matches(std::vector const& schema) const + { + return this->schema == schema; + } + auto& file(size_t p) { return files[p]; } + size_t num_files() const { return files.size(); } + + private: + int32_t version = 0; + std::vector schema; + struct per_file_metadata { + int64_t num_rows = 0; + std::vector row_groups; + std::vector key_value_metadata; + }; + std::vector files; + std::string created_by = ""; + uint32_t column_order_listsize = 0; +}; + struct linked_column_view; using LinkedColPtr = std::shared_ptr; @@ -736,10 +846,12 @@ gpu::parquet_column_device_view parquet_column_view::get_device_view( void writer::impl::init_page_fragments(cudf::detail::hostdevice_2dvector& frag, device_span col_desc, - uint32_t num_rows, + host_span partitions, + device_span part_frag_offset, uint32_t fragment_size) { - gpu::InitPageFragments(frag, col_desc, fragment_size, num_rows, stream); + auto d_partitions = cudf::detail::make_device_uvector_async(partitions, stream); + gpu::InitPageFragments(frag, col_desc, d_partitions, part_frag_offset, fragment_size, stream); frag.device_to_host(stream, true); } @@ -771,7 +883,7 @@ void writer::impl::init_page_sizes(hostdevice_2dvector& chu auto build_chunk_dictionaries(hostdevice_2dvector& chunks, host_span col_desc, - uint32_t num_rows, + device_2dspan frags, rmm::cuda_stream_view stream) { // At this point, we know all chunks and their sizes. We want to allocate dictionaries for each @@ -801,7 +913,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector& chunks, chunks.host_to_device(stream); gpu::initialize_chunk_hash_maps(chunks.device_view().flat_view(), stream); - gpu::populate_chunk_hash_maps(chunks, num_rows, stream); + gpu::populate_chunk_hash_maps(chunks, frags, stream); chunks.device_to_host(stream, true); @@ -850,7 +962,7 @@ auto build_chunk_dictionaries(hostdevice_2dvector& chunks, } chunks.host_to_device(stream); gpu::collect_map_entries(chunks.device_view().flat_view(), stream); - gpu::get_dictionary_indices(chunks.device_view(), num_rows, stream); + gpu::get_dictionary_indices(chunks.device_view(), frags, stream); return std::make_pair(std::move(dict_data), std::move(dict_index)); } @@ -1016,7 +1128,7 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks stream.synchronize(); } -writer::impl::impl(std::unique_ptr sink, +writer::impl::impl(std::vector> sinks, parquet_writer_options const& options, SingleWriteMode mode, rmm::cuda_stream_view stream, @@ -1028,8 +1140,9 @@ writer::impl::impl(std::unique_ptr sink, compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), - out_sink_(std::move(sink)), - single_write_mode(mode == SingleWriteMode::YES) + kv_md(options.get_key_value_metadata()), + single_write_mode(mode == SingleWriteMode::YES), + out_sink_(std::move(sinks)) { if (options.get_metadata()) { table_meta = std::make_unique(*options.get_metadata()); @@ -1037,7 +1150,7 @@ writer::impl::impl(std::unique_ptr sink, init_state(); } -writer::impl::impl(std::unique_ptr sink, +writer::impl::impl(std::vector> sinks, chunked_parquet_writer_options const& options, SingleWriteMode mode, rmm::cuda_stream_view stream, @@ -1049,8 +1162,9 @@ writer::impl::impl(std::unique_ptr sink, compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), + kv_md(options.get_key_value_metadata()), single_write_mode(mode == SingleWriteMode::YES), - out_sink_(std::move(sink)) + out_sink_(std::move(sinks)) { if (options.get_metadata()) { table_meta = std::make_unique(*options.get_metadata()); @@ -1062,19 +1176,21 @@ writer::impl::~impl() { close(); } void writer::impl::init_state() { + current_chunk_offset.resize(out_sink_.size()); // Write file header file_header_s fhdr; fhdr.magic = parquet_magic; - out_sink_->host_write(&fhdr, sizeof(fhdr)); - current_chunk_offset = sizeof(file_header_s); + for (auto& sink : out_sink_) { + sink->host_write(&fhdr, sizeof(fhdr)); + } + std::fill_n(current_chunk_offset.begin(), current_chunk_offset.size(), sizeof(file_header_s)); } -void writer::impl::write(table_view const& table) +void writer::impl::write(table_view const& table, std::vector const& partitions) { + last_write_successful = false; CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed"); - size_type num_rows = table.num_rows(); - if (not table_meta) { table_meta = std::make_unique(table); } // Fill unnamed columns' names in table_meta @@ -1109,25 +1225,15 @@ void writer::impl::write(table_view const& table) std::vector this_table_schema(schema_tree.begin(), schema_tree.end()); - if (md.version == 0) { - md.version = 1; - md.num_rows = num_rows; - md.column_order_listsize = - (stats_granularity_ != statistics_freq::STATISTICS_NONE) ? num_columns : 0; - std::transform(table_meta->user_data.begin(), - table_meta->user_data.end(), - std::back_inserter(md.key_value_metadata), - [](auto const& kv) { - return KeyValue{kv.first, kv.second}; - }); - md.schema = this_table_schema; + if (!md) { + md = std::make_unique( + partitions, num_columns, std::move(this_table_schema), stats_granularity_, kv_md); } else { // verify the user isn't passing mismatched tables - CUDF_EXPECTS(md.schema == this_table_schema, + CUDF_EXPECTS(md->schema_matches(this_table_schema), "Mismatch in schema between multiple calls to write_chunk"); - // increment num rows - md.num_rows += num_rows; + md->update_files(partitions); } // Create table_device_view so that corresponding column_device_view data // can be written into col_desc members @@ -1149,7 +1255,22 @@ void writer::impl::write(table_view const& table) // compression/decompression performance). using cudf::io::parquet::gpu::max_page_fragment_size; - size_type const num_fragments = (num_rows + max_page_fragment_size - 1) / max_page_fragment_size; + std::vector num_frag_in_part; + std::transform(partitions.begin(), + partitions.end(), + std::back_inserter(num_frag_in_part), + [](auto const& part) { + return util::div_rounding_up_unsafe(part.num_rows, max_page_fragment_size); + }); + + size_type num_fragments = std::reduce(num_frag_in_part.begin(), num_frag_in_part.end()); + + std::vector part_frag_offset; // Store the idx of the first fragment in each partition + std::exclusive_scan( + num_frag_in_part.begin(), num_frag_in_part.end(), std::back_inserter(part_frag_offset), 0); + part_frag_offset.push_back(part_frag_offset.back() + num_frag_in_part.back()); + + auto d_part_frag_offset = cudf::detail::make_device_uvector_async(part_frag_offset, stream); cudf::detail::hostdevice_2dvector fragments( num_columns, num_fragments, stream); @@ -1159,36 +1280,50 @@ void writer::impl::write(table_view const& table) leaf_column_views = create_leaf_column_device_views( col_desc, *parent_column_table_device_view, stream); - init_page_fragments(fragments, col_desc, num_rows, max_page_fragment_size); + init_page_fragments( + fragments, col_desc, partitions, d_part_frag_offset, max_page_fragment_size); } - auto const global_rowgroup_base = static_cast(md.row_groups.size()); + std::vector const global_rowgroup_base = md->num_row_groups_per_file(); // Decide row group boundaries based on uncompressed data size - auto rowgroup_size = 0ul; - auto num_rowgroups = 0; - for (auto f = 0, global_r = global_rowgroup_base, rowgroup_start = 0; f < num_fragments; f++) { - auto fragment_data_size = 0ul; - // Replace with STL algorithm to transform and sum - for (auto i = 0; i < num_columns; i++) { - fragment_data_size += fragments[i][f].fragment_data_size; - } - if (f > rowgroup_start && - (rowgroup_size + fragment_data_size > max_row_group_size || - (f + 1 - rowgroup_start) * max_page_fragment_size > max_row_group_rows)) { - // update schema - md.row_groups.resize(md.row_groups.size() + 1); - md.row_groups[global_r++].num_rows = (f - rowgroup_start) * max_page_fragment_size; - num_rowgroups++; - rowgroup_start = f; - rowgroup_size = 0; - } - rowgroup_size += fragment_data_size; - if (f + 1 == num_fragments) { - // update schema - md.row_groups.resize(md.row_groups.size() + 1); - md.row_groups[global_r++].num_rows = num_rows - rowgroup_start * max_page_fragment_size; - num_rowgroups++; + int num_rowgroups = 0; + + std::vector num_rg_in_part(partitions.size()); + for (size_t p = 0; p < partitions.size(); ++p) { + size_type curr_rg_num_rows = 0; + size_t curr_rg_data_size = 0; + int first_frag_in_rg = part_frag_offset[p]; + int last_frag_in_part = part_frag_offset[p + 1] - 1; + for (auto f = first_frag_in_rg; f <= last_frag_in_part; ++f) { + size_t fragment_data_size = 0; + for (auto c = 0; c < num_columns; c++) { + fragment_data_size += fragments[c][f].fragment_data_size; + } + size_type fragment_num_rows = fragments[0][f].num_rows; + + // If the fragment size gets larger than rg limit then break off a rg + if (f > first_frag_in_rg && // There has to be at least one fragment in row group + (curr_rg_data_size + fragment_data_size > max_row_group_size || + curr_rg_num_rows + fragment_num_rows > max_row_group_rows)) { + auto& rg = md->file(p).row_groups.emplace_back(); + rg.num_rows = curr_rg_num_rows; + num_rowgroups++; + num_rg_in_part[p]++; + curr_rg_num_rows = 0; + curr_rg_data_size = 0; + first_frag_in_rg = f; + } + curr_rg_num_rows += fragment_num_rows; + curr_rg_data_size += fragment_data_size; + + // TODO: (wishful) refactor to consolidate with above if block + if (f == last_frag_in_part) { + auto& rg = md->file(p).row_groups.emplace_back(); + rg.num_rows = curr_rg_num_rows; + num_rowgroups++; + num_rg_in_part[p]++; + } } } @@ -1196,58 +1331,79 @@ void writer::impl::write(table_view const& table) rmm::device_uvector frag_stats(0, stream); if (stats_granularity_ != statistics_freq::STATISTICS_NONE) { frag_stats.resize(num_fragments * num_columns, stream); - if (frag_stats.size() != 0) { + if (not frag_stats.is_empty()) { auto frag_stats_2dview = device_2dspan(frag_stats.data(), num_columns, num_fragments); gather_fragment_statistics(frag_stats_2dview, fragments, col_desc, num_fragments); } } + + std::vector first_rg_in_part; + std::exclusive_scan( + num_rg_in_part.begin(), num_rg_in_part.end(), std::back_inserter(first_rg_in_part), 0); + // Initialize row groups and column chunks auto const num_chunks = num_rowgroups * num_columns; hostdevice_2dvector chunks(num_rowgroups, num_columns, stream); - for (auto r = 0, global_r = global_rowgroup_base, f = 0, start_row = 0; r < num_rowgroups; - r++, global_r++) { - size_type const fragments_in_chunk = - (md.row_groups[global_r].num_rows + max_page_fragment_size - 1) / max_page_fragment_size; - md.row_groups[global_r].total_byte_size = 0; - md.row_groups[global_r].columns.resize(num_columns); - for (auto i = 0; i < num_columns; i++) { - gpu::EncColumnChunk* ck = &chunks[r][i]; - - *ck = {}; - ck->col_desc = col_desc.device_ptr() + i; - ck->col_desc_id = i; - ck->fragments = &fragments.device_view()[i][f]; - ck->stats = (frag_stats.size() != 0) ? frag_stats.data() + i * num_fragments + f : nullptr; - ck->start_row = start_row; - ck->num_rows = (uint32_t)md.row_groups[global_r].num_rows; - ck->first_fragment = i * num_fragments + f; - auto chunk_fragments = fragments[i].subspan(f, fragments_in_chunk); - ck->num_values = - std::accumulate(chunk_fragments.begin(), chunk_fragments.end(), 0, [](uint32_t l, auto r) { - return l + r.num_values; - }); - ck->plain_data_size = std::accumulate( - chunk_fragments.begin(), chunk_fragments.end(), 0, [](int sum, gpu::PageFragment frag) { - return sum + frag.fragment_data_size; - }); - md.row_groups[global_r].columns[i].meta_data.type = parquet_columns[i].physical_type(); - md.row_groups[global_r].columns[i].meta_data.encodings = {Encoding::PLAIN, Encoding::RLE}; - md.row_groups[global_r].columns[i].meta_data.path_in_schema = - parquet_columns[i].get_path_in_schema(); - md.row_groups[global_r].columns[i].meta_data.codec = UNCOMPRESSED; - md.row_groups[global_r].columns[i].meta_data.num_values = ck->num_values; + + for (size_t p = 0; p < partitions.size(); ++p) { + int f = part_frag_offset[p]; + size_type start_row = partitions[p].start_row; + for (int r = 0; r < num_rg_in_part[p]; r++) { + size_t global_r = global_rowgroup_base[p] + r; // Number of rowgroups already in file/part + auto& row_group = md->file(p).row_groups[global_r]; + uint32_t fragments_in_chunk = + util::div_rounding_up_unsafe(row_group.num_rows, max_page_fragment_size); + row_group.total_byte_size = 0; + row_group.columns.resize(num_columns); + for (int c = 0; c < num_columns; c++) { + gpu::EncColumnChunk& ck = chunks[r + first_rg_in_part[p]][c]; + + ck = {}; + ck.col_desc = col_desc.device_ptr() + c; + ck.col_desc_id = c; + ck.fragments = &fragments.device_view()[c][f]; + ck.stats = + (not frag_stats.is_empty()) ? frag_stats.data() + c * num_fragments + f : nullptr; + ck.start_row = start_row; + ck.num_rows = (uint32_t)row_group.num_rows; + ck.first_fragment = c * num_fragments + f; + auto chunk_fragments = fragments[c].subspan(f, fragments_in_chunk); + // In fragment struct, add a pointer to the chunk it belongs to + // In each fragment in chunk_fragments, update the chunk pointer here. + for (auto& frag : chunk_fragments) { + frag.chunk = &chunks.device_view()[r + first_rg_in_part[p]][c]; + } + ck.num_values = std::accumulate( + chunk_fragments.begin(), chunk_fragments.end(), 0, [](uint32_t l, auto r) { + return l + r.num_values; + }); + ck.plain_data_size = std::accumulate( + chunk_fragments.begin(), chunk_fragments.end(), 0, [](int sum, gpu::PageFragment frag) { + return sum + frag.fragment_data_size; + }); + auto& column_chunk_meta = row_group.columns[c].meta_data; + column_chunk_meta.type = parquet_columns[c].physical_type(); + column_chunk_meta.encodings = {Encoding::PLAIN, Encoding::RLE}; + column_chunk_meta.path_in_schema = parquet_columns[c].get_path_in_schema(); + column_chunk_meta.codec = UNCOMPRESSED; + column_chunk_meta.num_values = ck.num_values; + } + f += fragments_in_chunk; + start_row += (uint32_t)row_group.num_rows; } - f += fragments_in_chunk; - start_row += (uint32_t)md.row_groups[global_r].num_rows; } - auto dict_info_owner = build_chunk_dictionaries(chunks, col_desc, num_rows, stream); - for (auto rg = 0, global_rg = global_rowgroup_base; rg < num_rowgroups; rg++, global_rg++) { - for (auto col = 0; col < num_columns; col++) { - if (chunks.host_view()[rg][col].use_dictionary) { - md.row_groups[global_rg].columns[col].meta_data.encodings.push_back( - Encoding::PLAIN_DICTIONARY); + fragments.host_to_device(stream); + auto dict_info_owner = build_chunk_dictionaries(chunks, col_desc, fragments, stream); + for (size_t p = 0; p < partitions.size(); p++) { + for (int rg = 0; rg < num_rg_in_part[p]; rg++) { + size_t global_rg = global_rowgroup_base[p] + rg; + for (int col = 0; col < num_columns; col++) { + if (chunks.host_view()[rg][col].use_dictionary) { + md->file(p).row_groups[global_rg].columns[col].meta_data.encodings.push_back( + Encoding::PLAIN_DICTIONARY); + } } } } @@ -1272,6 +1428,12 @@ void writer::impl::write(table_view const& table) "Error in getting compressed size from nvcomp"); } + // Find which partition a rg belongs to + std::vector rg_to_part; + for (size_t p = 0; p < num_rg_in_part.size(); ++p) { + std::fill_n(std::back_inserter(rg_to_part), num_rg_in_part[p], p); + } + // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) std::vector batch_list; size_type num_pages = 0; @@ -1335,11 +1497,11 @@ void writer::impl::write(table_view const& table) auto bfr_c = static_cast(comp_bfr.data()); for (auto j = 0; j < batch_list[b]; j++, r++) { for (auto i = 0; i < num_columns; i++) { - gpu::EncColumnChunk* ck = &chunks[r][i]; - ck->uncompressed_bfr = bfr; - ck->compressed_bfr = bfr_c; - bfr += ck->bfr_size; - bfr_c += ck->compressed_size; + gpu::EncColumnChunk& ck = chunks[r][i]; + ck.uncompressed_bfr = bfr; + ck.compressed_bfr = bfr_c; + bfr += ck.bfr_size; + bfr_c += ck.compressed_size; } } } @@ -1359,9 +1521,7 @@ void writer::impl::write(table_view const& table) pinned_buffer host_bfr{nullptr, cudaFreeHost}; // Encode row groups in batches - for (auto b = 0, r = 0, global_r = global_rowgroup_base; - b < static_cast(batch_list.size()); - b++) { + for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { // Count pages in this batch auto const rnext = r + batch_list[b]; auto const first_page_in_batch = chunks[r][0].first_page; @@ -1381,30 +1541,33 @@ void writer::impl::write(table_view const& table) (stats_granularity_ != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages : nullptr); std::vector> write_tasks; - for (; r < rnext; r++, global_r++) { + for (; r < rnext; r++) { + int p = rg_to_part[r]; + int global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; + auto& row_group = md->file(p).row_groups[global_r]; for (auto i = 0; i < num_columns; i++) { - gpu::EncColumnChunk* ck = &chunks[r][i]; + gpu::EncColumnChunk& ck = chunks[r][i]; + auto& column_chunk_meta = row_group.columns[i].meta_data; uint8_t* dev_bfr; - if (ck->is_compressed) { - md.row_groups[global_r].columns[i].meta_data.codec = compression_; - dev_bfr = ck->compressed_bfr; + if (ck.is_compressed) { + column_chunk_meta.codec = compression_; + dev_bfr = ck.compressed_bfr; } else { - dev_bfr = ck->uncompressed_bfr; + dev_bfr = ck.uncompressed_bfr; } - if (out_sink_->is_device_write_preferred(ck->compressed_size)) { + if (out_sink_[p]->is_device_write_preferred(ck.compressed_size)) { // let the writer do what it wants to retrieve the data from the gpu. - write_tasks.push_back( - out_sink_->device_write_async(dev_bfr + ck->ck_stat_size, ck->compressed_size, stream)); + write_tasks.push_back(out_sink_[p]->device_write_async( + dev_bfr + ck.ck_stat_size, ck.compressed_size, stream)); // we still need to do a (much smaller) memcpy for the statistics. - if (ck->ck_stat_size != 0) { - md.row_groups[global_r].columns[i].meta_data.statistics_blob.resize(ck->ck_stat_size); - CUDA_TRY( - cudaMemcpyAsync(md.row_groups[global_r].columns[i].meta_data.statistics_blob.data(), - dev_bfr, - ck->ck_stat_size, - cudaMemcpyDeviceToHost, - stream.value())); + if (ck.ck_stat_size != 0) { + column_chunk_meta.statistics_blob.resize(ck.ck_stat_size); + CUDA_TRY(cudaMemcpyAsync(column_chunk_meta.statistics_blob.data(), + dev_bfr, + ck.ck_stat_size, + cudaMemcpyDeviceToHost, + stream.value())); stream.synchronize(); } } else { @@ -1419,86 +1582,91 @@ void writer::impl::write(table_view const& table) // copy the full data CUDA_TRY(cudaMemcpyAsync(host_bfr.get(), dev_bfr, - ck->ck_stat_size + ck->compressed_size, + ck.ck_stat_size + ck.compressed_size, cudaMemcpyDeviceToHost, stream.value())); stream.synchronize(); - out_sink_->host_write(host_bfr.get() + ck->ck_stat_size, ck->compressed_size); - if (ck->ck_stat_size != 0) { - md.row_groups[global_r].columns[i].meta_data.statistics_blob.resize(ck->ck_stat_size); - memcpy(md.row_groups[global_r].columns[i].meta_data.statistics_blob.data(), - host_bfr.get(), - ck->ck_stat_size); + out_sink_[p]->host_write(host_bfr.get() + ck.ck_stat_size, ck.compressed_size); + if (ck.ck_stat_size != 0) { + column_chunk_meta.statistics_blob.resize(ck.ck_stat_size); + memcpy(column_chunk_meta.statistics_blob.data(), host_bfr.get(), ck.ck_stat_size); } } - md.row_groups[global_r].total_byte_size += ck->compressed_size; - md.row_groups[global_r].columns[i].meta_data.data_page_offset = - current_chunk_offset + ((ck->use_dictionary) ? ck->dictionary_size : 0); - md.row_groups[global_r].columns[i].meta_data.dictionary_page_offset = - (ck->use_dictionary) ? current_chunk_offset : 0; - md.row_groups[global_r].columns[i].meta_data.total_uncompressed_size = ck->bfr_size; - md.row_groups[global_r].columns[i].meta_data.total_compressed_size = ck->compressed_size; - current_chunk_offset += ck->compressed_size; + row_group.total_byte_size += ck.compressed_size; + column_chunk_meta.data_page_offset = + current_chunk_offset[p] + ((ck.use_dictionary) ? ck.dictionary_size : 0); + column_chunk_meta.dictionary_page_offset = + (ck.use_dictionary) ? current_chunk_offset[p] : 0; + column_chunk_meta.total_uncompressed_size = ck.bfr_size; + column_chunk_meta.total_compressed_size = ck.compressed_size; + current_chunk_offset[p] += ck.compressed_size; } } for (auto const& task : write_tasks) { task.wait(); } } + last_write_successful = true; } std::unique_ptr> writer::impl::close( - std::string const& column_chunks_file_path) + std::vector const& column_chunks_file_path) { if (closed) { return nullptr; } closed = true; - CompactProtocolWriter cpw(&buffer_); - file_ender_s fendr; - buffer_.resize(0); - fendr.footer_len = static_cast(cpw.write(md)); - fendr.magic = parquet_magic; - out_sink_->host_write(buffer_.data(), buffer_.size()); - out_sink_->host_write(&fendr, sizeof(fendr)); - out_sink_->flush(); + if (not last_write_successful) { return nullptr; } + for (size_t p = 0; p < out_sink_.size(); p++) { + std::vector buffer; + CompactProtocolWriter cpw(&buffer); + file_ender_s fendr; + buffer.resize(0); + fendr.footer_len = static_cast(cpw.write(md->get_metadata(p))); + fendr.magic = parquet_magic; + out_sink_[p]->host_write(buffer.data(), buffer.size()); + out_sink_[p]->host_write(&fendr, sizeof(fendr)); + out_sink_[p]->flush(); + } // Optionally output raw file metadata with the specified column chunk file path - if (column_chunks_file_path.length() > 0) { + if (column_chunks_file_path.size() > 0) { + CUDF_EXPECTS(column_chunks_file_path.size() == md->num_files(), + "Expected one column chunk path per output file"); + md->set_file_paths(column_chunks_file_path); file_header_s fhdr = {parquet_magic}; - buffer_.resize(0); - buffer_.insert(buffer_.end(), - reinterpret_cast(&fhdr), - reinterpret_cast(&fhdr) + sizeof(fhdr)); - for (auto& rowgroup : md.row_groups) { - for (auto& col : rowgroup.columns) { - col.file_path = column_chunks_file_path; - } - } - fendr.footer_len = static_cast(cpw.write(md)); - buffer_.insert(buffer_.end(), - reinterpret_cast(&fendr), - reinterpret_cast(&fendr) + sizeof(fendr)); - return std::make_unique>(std::move(buffer_)); + std::vector buffer; + CompactProtocolWriter cpw(&buffer); + buffer.insert(buffer.end(), + reinterpret_cast(&fhdr), + reinterpret_cast(&fhdr) + sizeof(fhdr)); + file_ender_s fendr; + fendr.magic = parquet_magic; + fendr.footer_len = static_cast(cpw.write(md->get_merged_metadata())); + buffer.insert(buffer.end(), + reinterpret_cast(&fendr), + reinterpret_cast(&fendr) + sizeof(fendr)); + return std::make_unique>(std::move(buffer)); } else { return {nullptr}; } + return nullptr; } // Forward to implementation -writer::writer(std::unique_ptr sink, +writer::writer(std::vector> sinks, parquet_writer_options const& options, SingleWriteMode mode, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : _impl(std::make_unique(std::move(sink), options, mode, stream, mr)) + : _impl(std::make_unique(std::move(sinks), options, mode, stream, mr)) { } -writer::writer(std::unique_ptr sink, +writer::writer(std::vector> sinks, chunked_parquet_writer_options const& options, SingleWriteMode mode, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : _impl(std::make_unique(std::move(sink), options, mode, stream, mr)) + : _impl(std::make_unique(std::move(sinks), options, mode, stream, mr)) { } @@ -1506,16 +1674,21 @@ writer::writer(std::unique_ptr sink, writer::~writer() = default; // Forward to implementation -void writer::write(table_view const& table) { _impl->write(table); } +void writer::write(table_view const& table, std::vector const& partitions) +{ + _impl->write( + table, partitions.empty() ? std::vector{{0, table.num_rows()}} : partitions); +} // Forward to implementation -std::unique_ptr> writer::close(std::string const& column_chunks_file_path) +std::unique_ptr> writer::close( + std::vector const& column_chunks_file_path) { return _impl->close(column_chunks_file_path); } std::unique_ptr> writer::merge_row_group_metadata( - const std::vector>>& metadata_list) + std::vector>> const& metadata_list) { std::vector output; CompactProtocolWriter cpw(&output); diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 9188218f607..1cefb91c904 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -45,6 +45,7 @@ namespace detail { namespace parquet { // Forward internal classes struct parquet_column_view; +struct aggregate_metadata; using namespace cudf::io::parquet; using namespace cudf::io; @@ -60,13 +61,13 @@ class writer::impl { /** * @brief Constructor with writer options. * - * @param sink data_sink for storing dataset + * @param sink data_sink's for storing dataset * @param options Settings for controlling behavior * @param mode Option to write at once or in chunks * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ - explicit impl(std::unique_ptr sink, + explicit impl(std::vector> sinks, parquet_writer_options const& options, SingleWriteMode mode, rmm::cuda_stream_view stream, @@ -75,13 +76,13 @@ class writer::impl { /** * @brief Constructor with chunked writer options. * - * @param sink data_sink for storing dataset + * @param sink data_sink's for storing dataset * @param options Settings for controlling behavior * @param mode Option to write at once or in chunks * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ - explicit impl(std::unique_ptr sink, + explicit impl(std::vector> sinks, chunked_parquet_writer_options const& options, SingleWriteMode mode, rmm::cuda_stream_view stream, @@ -102,8 +103,10 @@ class writer::impl { * normally used for chunked writing. * * @param[in] table The table information to be written + * @param[in] partitions Optional partitions to divide the table into. If specified, must be same + * size as number of sinks. */ - void write(table_view const& table); + void write(table_view const& table, std::vector const& partitions); /** * @brief Finishes the chunked/streamed write process. @@ -112,7 +115,8 @@ class writer::impl { * @return A parquet-compatible blob that contains the data for all rowgroups in the list only if * `column_chunks_file_path` is provided, else null. */ - std::unique_ptr> close(std::string const& column_chunks_file_path = ""); + std::unique_ptr> close( + std::vector const& column_chunks_file_path = {}); private: /** @@ -120,12 +124,14 @@ class writer::impl { * * @param frag Destination page fragments * @param col_desc column description array - * @param num_rows Total number of rows + * @param[in] partitions Information about partitioning of table + * @param[in] part_frag_offset A Partition's offset into fragment array * @param fragment_size Number of rows per fragment */ void init_page_fragments(hostdevice_2dvector& frag, device_span col_desc, - uint32_t num_rows, + host_span partitions, + device_span part_frag_offset, uint32_t fragment_size); /** @@ -208,19 +214,22 @@ class writer::impl { statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE; bool int96_timestamps = false; // Overall file metadata. Filled in during the process and written during write_chunked_end() - cudf::io::parquet::FileMetaData md; + std::unique_ptr md; + // File footer key-value metadata. Written during write_chunked_end() + std::vector> kv_md; // optional user metadata std::unique_ptr table_meta; // to track if the output has been written to sink bool closed = false; + // To track if the last write(table) call completed successfully + bool last_write_successful = false; // current write position for rowgroups/chunks - std::size_t current_chunk_offset; + std::vector current_chunk_offset; // special parameter only used by detail::write() to indicate that we are guaranteeing // a single table write. this enables some internal optimizations. bool const single_write_mode = true; - std::vector buffer_; - std::unique_ptr out_sink_; + std::vector> out_sink_; }; } // namespace parquet diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index c376accd1ff..75ff39cbe70 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -838,13 +838,13 @@ TEST_F(ParquetWriterTest, MultiIndex) expected_metadata.column_metadata[2].set_name("int32s"); expected_metadata.column_metadata[3].set_name("floats"); expected_metadata.column_metadata[4].set_name("doubles"); - expected_metadata.user_data.insert( - {"pandas", "\"index_columns\": [\"int8s\", \"int16s\"], \"column1\": [\"int32s\"]"}); auto filepath = temp_env->get_temp_filepath("MultiIndex.parquet"); cudf_io::parquet_writer_options out_opts = cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, expected->view()) - .metadata(&expected_metadata); + .metadata(&expected_metadata) + .key_value_metadata( + {{{"pandas", "\"index_columns\": [\"int8s\", \"int16s\"], \"column1\": [\"int32s\"]"}}}); cudf_io::write_parquet(out_opts); cudf_io::parquet_reader_options in_opts = @@ -1174,6 +1174,100 @@ TEST_F(ParquetWriterTest, DeviceWriteLargeishFile) auto custom_tbl = cudf_io::read_parquet(custom_args); CUDF_TEST_EXPECT_TABLES_EQUAL(custom_tbl.tbl->view(), expected->view()); } + +TEST_F(ParquetWriterTest, PartitionedWrite) +{ + auto source = create_compressible_fixed_table(16, 4 * 1024 * 1024, 1000, false); + + auto filepath1 = temp_env->get_temp_filepath("PartitionedWrite1.parquet"); + auto filepath2 = temp_env->get_temp_filepath("PartitionedWrite2.parquet"); + + auto partition1 = cudf::io::partition_info{10, 1024 * 1024}; + auto partition2 = cudf::io::partition_info{20 * 1024 + 7, 3 * 1024 * 1024}; + + auto expected1 = + cudf::slice(*source, {partition1.start_row, partition1.start_row + partition1.num_rows}); + auto expected2 = + cudf::slice(*source, {partition2.start_row, partition2.start_row + partition2.num_rows}); + + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder( + cudf_io::sink_info(std::vector{filepath1, filepath2}), *source) + .partitions({partition1, partition2}) + .compression(cudf_io::compression_type::NONE); + cudf_io::write_parquet(args); + + auto result1 = cudf_io::read_parquet( + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath1))); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected1, result1.tbl->view()); + + auto result2 = cudf_io::read_parquet( + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath2))); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected2, result2.tbl->view()); +} + +TEST_F(ParquetWriterTest, PartitionedWriteEmptyPartitions) +{ + auto source = create_random_fixed_table(4, 4, false); + + auto filepath1 = temp_env->get_temp_filepath("PartitionedWrite1.parquet"); + auto filepath2 = temp_env->get_temp_filepath("PartitionedWrite2.parquet"); + + auto partition1 = cudf::io::partition_info{1, 0}; + auto partition2 = cudf::io::partition_info{1, 0}; + + auto expected1 = + cudf::slice(*source, {partition1.start_row, partition1.start_row + partition1.num_rows}); + auto expected2 = + cudf::slice(*source, {partition2.start_row, partition2.start_row + partition2.num_rows}); + + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder( + cudf_io::sink_info(std::vector{filepath1, filepath2}), *source) + .partitions({partition1, partition2}) + .compression(cudf_io::compression_type::NONE); + cudf_io::write_parquet(args); + + auto result1 = cudf_io::read_parquet( + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath1))); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected1, result1.tbl->view()); + + auto result2 = cudf_io::read_parquet( + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath2))); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected2, result2.tbl->view()); +} + +TEST_F(ParquetWriterTest, PartitionedWriteEmptyColumns) +{ + auto source = create_random_fixed_table(0, 4, false); + + auto filepath1 = temp_env->get_temp_filepath("PartitionedWrite1.parquet"); + auto filepath2 = temp_env->get_temp_filepath("PartitionedWrite2.parquet"); + + auto partition1 = cudf::io::partition_info{1, 0}; + auto partition2 = cudf::io::partition_info{1, 0}; + + auto expected1 = + cudf::slice(*source, {partition1.start_row, partition1.start_row + partition1.num_rows}); + auto expected2 = + cudf::slice(*source, {partition2.start_row, partition2.start_row + partition2.num_rows}); + + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder( + cudf_io::sink_info(std::vector{filepath1, filepath2}), *source) + .partitions({partition1, partition2}) + .compression(cudf_io::compression_type::NONE); + cudf_io::write_parquet(args); + + auto result1 = cudf_io::read_parquet( + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath1))); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected1, result1.tbl->view()); + + auto result2 = cudf_io::read_parquet( + cudf_io::parquet_reader_options::builder(cudf_io::source_info(filepath2))); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected2, result2.tbl->view()); +} + template std::string create_parquet_file(int num_cols) { @@ -1305,7 +1399,7 @@ TEST_F(ParquetChunkedWriterTest, ManyTables) std::for_each(table_views.begin(), table_views.end(), [&writer](table_view const& tbl) { writer.write(tbl); }); - auto md = writer.close("dummy/path"); + auto md = writer.close({"dummy/path"}); CUDF_EXPECTS(md, "The returned metadata should not be null."); cudf_io::parquet_reader_options read_opts = diff --git a/python/cudf/cudf/_lib/cpp/io/orc.pxd b/python/cudf/cudf/_lib/cpp/io/orc.pxd index 2fc71f64df1..e5a8bb926c1 100644 --- a/python/cudf/cudf/_lib/cpp/io/orc.pxd +++ b/python/cudf/cudf/_lib/cpp/io/orc.pxd @@ -2,6 +2,7 @@ from libc.stdint cimport uint8_t from libcpp cimport bool +from libcpp.map cimport map from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string from libcpp.vector cimport vector @@ -85,6 +86,7 @@ cdef extern from "cudf/io/orc.hpp" \ void set_row_index_stride(size_type val) except+ void set_table(cudf_table_view.table_view tbl) except+ void set_metadata(cudf_io_types.table_input_metadata* meta) except+ + void set_key_value_metadata(map[string, string] kvm) except + @staticmethod orc_writer_options_builder builder( @@ -107,6 +109,9 @@ cdef extern from "cudf/io/orc.hpp" \ orc_writer_options_builder& metadata( cudf_io_types.table_input_metadata *meta ) except+ + orc_writer_options_builder& key_value_metadata( + map[string, string] kvm + ) except+ orc_writer_options build() except+ @@ -134,6 +139,7 @@ cdef extern from "cudf/io/orc.hpp" \ void set_metadata( cudf_io_types.table_input_metadata* meta ) except+ + void set_key_value_metadata(map[string, string] kvm) except + @staticmethod chunked_orc_writer_options_builder builder( @@ -155,6 +161,9 @@ cdef extern from "cudf/io/orc.hpp" \ chunked_orc_writer_options_builder& metadata( cudf_io_types.table_input_metadata *meta ) except+ + chunked_orc_writer_options_builder& key_value_metadata( + map[string, string] kvm + ) except+ chunked_orc_writer_options build() except+ diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index 9d95dce83bc..60be608d997 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -73,21 +73,24 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_io_types.statistics_freq get_stats_level() except + cudf_table_view.table_view get_table() except + const cudf_io_types.table_input_metadata get_metadata() except + - string get_column_chunks_file_path() except+ + string get_column_chunks_file_paths() except+ size_t get_row_group_size_bytes() except+ size_type get_row_group_size_rows() except+ void set_metadata( cudf_io_types.table_input_metadata *m ) except + + void set_key_value_metadata( + vector[map[string, string]] kvm + ) except + void set_stats_level( cudf_io_types.statistics_freq sf ) except + void set_compression( cudf_io_types.compression_type compression ) except + - void set_column_chunks_file_path( - string column_chunks_file_path + void set_column_chunks_file_paths( + vector[string] column_chunks_file_paths ) except + void set_row_group_size_bytes(size_t val) except+ void set_row_group_size_rows(size_type val) except+ @@ -108,14 +111,17 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& metadata( cudf_io_types.table_input_metadata *m ) except + + parquet_writer_options_builder& key_value_metadata( + vector[map[string, string]] kvm + ) except + parquet_writer_options_builder& stats_level( cudf_io_types.statistics_freq sf ) except + parquet_writer_options_builder& compression( cudf_io_types.compression_type compression ) except + - parquet_writer_options_builder& column_chunks_file_path( - string column_chunks_file_path + parquet_writer_options_builder& column_chunks_file_paths( + vector[string] column_chunks_file_paths ) except + parquet_writer_options_builder& int96_timestamps( bool enabled @@ -146,6 +152,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_metadata( cudf_io_types.table_input_metadata *m ) except + + void set_key_value_metadata( + vector[map[string, string]] kvm + ) except + void set_stats_level( cudf_io_types.statistics_freq sf ) except + @@ -168,6 +177,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: chunked_parquet_writer_options_builder& metadata( cudf_io_types.table_input_metadata *m ) except + + chunked_parquet_writer_options_builder& key_value_metadata( + vector[map[string, string]] kvm + ) except + chunked_parquet_writer_options_builder& stats_level( cudf_io_types.statistics_freq sf ) except + @@ -190,7 +202,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_table_view.table_view table_, ) except+ unique_ptr[vector[uint8_t]] close( - string column_chunks_file_path, + vector[string] column_chunks_file_paths, ) except+ cdef unique_ptr[vector[uint8_t]] merge_row_group_metadata( diff --git a/python/cudf/cudf/_lib/cpp/io/types.pxd b/python/cudf/cudf/_lib/cpp/io/types.pxd index 6b68902d22f..40a056b46e0 100644 --- a/python/cudf/cudf/_lib/cpp/io/types.pxd +++ b/python/cudf/cudf/_lib/cpp/io/types.pxd @@ -70,13 +70,8 @@ cdef extern from "cudf/io/types.hpp" \ cdef cppclass table_input_metadata: table_input_metadata() except + table_input_metadata(const cudf_table_view.table_view& table) except + - table_input_metadata( - const cudf_table_view.table_view& table, - map[string, string] user_data - ) except + vector[column_in_metadata] column_metadata - map[string, string] user_data cdef cppclass host_buffer: const char* data @@ -87,8 +82,8 @@ cdef extern from "cudf/io/types.hpp" \ cdef cppclass source_info: io_type type - vector[string] filepaths - vector[host_buffer] buffers + const vector[string]& filepaths() except + + const vector[host_buffer]& buffers() except + vector[shared_ptr[CRandomAccessFile]] files source_info() except + @@ -98,9 +93,9 @@ cdef extern from "cudf/io/types.hpp" \ cdef cppclass sink_info: io_type type - string filepath - vector[char] * buffer - data_sink * user_sink + const vector[string]& filepaths() + const vector[vector[char] *]& buffers() + const vector[data_sink *]& user_sinks() sink_info() except + sink_info(string file_path) except + diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index 9a4bd8652da..bf761c30bc8 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -3,6 +3,7 @@ import cudf from libcpp cimport bool, int +from libcpp.map cimport map from libcpp.memory cimport make_unique, unique_ptr from libcpp.string cimport string from libcpp.utility cimport move @@ -311,10 +312,9 @@ cdef class ORCWriter: chunked_orc_writer_options anb creates a writer""" cdef table_view tv - # Set the table_metadata num_index_cols_meta = 0 self.tbl_meta = make_unique[table_input_metadata]( - table_view_from_table(table, ignore_index=True) + table_view_from_table(table, ignore_index=True), ) if self.index is not False: if isinstance(table._index, cudf.core.multiindex.MultiIndex): @@ -340,15 +340,16 @@ cdef class ORCWriter: table[name]._column, self.tbl_meta.get().column_metadata[i] ) + cdef map[string, string] user_data pandas_metadata = generate_pandas_metadata(table, self.index) - self.tbl_meta.get().user_data[str.encode("pandas")] = \ - str.encode(pandas_metadata) + user_data[str.encode("pandas")] = str.encode(pandas_metadata) cdef chunked_orc_writer_options args with nogil: args = move( chunked_orc_writer_options.builder(self.sink) .metadata(self.tbl_meta.get()) + .key_value_metadata(move(user_data)) .compression(self.comp_type) .enable_statistics(self.enable_stats) .build() diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index d17184685fa..955324778fd 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -296,7 +296,7 @@ cpdef write_parquet( # Create the write options cdef unique_ptr[table_input_metadata] tbl_meta - cdef map[string, string] user_data + cdef vector[map[string, string]] user_data cdef table_view tv cdef unique_ptr[cudf_io_types.data_sink] _data_sink cdef cudf_io_types.sink_info sink = make_sink_info(path, _data_sink) @@ -328,30 +328,29 @@ cpdef write_parquet( ) pandas_metadata = generate_pandas_metadata(table, index) - user_data[str.encode("pandas")] = str.encode(pandas_metadata) - - # Set the table_metadata - tbl_meta.get().user_data = user_data + user_data.resize(1) + user_data.back()[str.encode("pandas")] = str.encode(pandas_metadata) cdef cudf_io_types.compression_type comp_type = _get_comp_type(compression) cdef cudf_io_types.statistics_freq stat_freq = _get_stat_freq(statistics) cdef unique_ptr[vector[uint8_t]] out_metadata_c - cdef string c_column_chunks_file_path + cdef vector[string] c_column_chunks_file_paths cdef bool _int96_timestamps = int96_timestamps - if metadata_file_path is not None: - c_column_chunks_file_path = str.encode(metadata_file_path) # Perform write cdef parquet_writer_options args = move( parquet_writer_options.builder(sink, tv) .metadata(tbl_meta.get()) + .key_value_metadata(move(user_data)) .compression(comp_type) .stats_level(stat_freq) - .column_chunks_file_path(c_column_chunks_file_path) .int96_timestamps(_int96_timestamps) .build() ) + if metadata_file_path is not None: + c_column_chunks_file_paths.push_back(str.encode(metadata_file_path)) + args.set_column_chunks_file_paths(move(c_column_chunks_file_paths)) if row_group_size_bytes is not None: args.set_row_group_size_bytes(row_group_size_bytes) if row_group_size_rows is not None: @@ -413,18 +412,18 @@ cdef class ParquetWriter: def close(self, object metadata_file_path=None): cdef unique_ptr[vector[uint8_t]] out_metadata_c - cdef string column_chunks_file_path + cdef vector[string] column_chunks_file_paths if not self.initialized: return None # Update metadata-collection options if metadata_file_path is not None: - column_chunks_file_path = str.encode(metadata_file_path) + column_chunks_file_paths.push_back(str.encode(metadata_file_path)) with nogil: out_metadata_c = move( - self.writer.get()[0].close(column_chunks_file_path) + self.writer.get()[0].close(column_chunks_file_paths) ) if metadata_file_path is not None: @@ -471,14 +470,16 @@ cdef class ParquetWriter: ) pandas_metadata = generate_pandas_metadata(table, self.index) - self.tbl_meta.get().user_data[str.encode("pandas")] = \ - str.encode(pandas_metadata) + cdef vector[map[string, string]] user_data + user_data.resize(1) + user_data.back()[str.encode("pandas")] = str.encode(pandas_metadata) cdef chunked_parquet_writer_options args with nogil: args = move( chunked_parquet_writer_options.builder(self.sink) .metadata(self.tbl_meta.get()) + .key_value_metadata(move(user_data)) .compression(self.comp_type) .stats_level(self.stat_freq) .build()