From 3b1e3272ce6297230bfb61212f10eca3ec9e5c3b Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 22 May 2024 23:13:09 +0000 Subject: [PATCH 1/8] refactor parquet writer options --- cpp/include/cudf/io/parquet.hpp | 906 +++++++------------------------- cpp/src/io/functions.cpp | 271 ++++++---- 2 files changed, 352 insertions(+), 825 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index b2f949cdcee..51eeed5b721 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include namespace cudf::io { @@ -576,22 +577,16 @@ struct sorting_column { bool is_nulls_first{true}; //!< true if nulls come before non-null values }; -class parquet_writer_options_builder; - /** - * @brief Settings for `write_parquet()`. + * @brief Base settings for `write_parquet()` and `parquet_chunked_writer`. */ -class parquet_writer_options { +class parquet_writer_options_base { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use compression_type _compression = compression_type::SNAPPY; // Specify the level of statistics in the output file statistics_freq _stats_level = statistics_freq::STATISTICS_ROWGROUP; - // Sets of columns to output - table_view _table; - // Partitions described as {start_row, num_rows} pairs - std::vector _partitions; // Optional associated metadata std::optional _metadata; // Optional footer key_value_metadata @@ -602,8 +597,6 @@ class parquet_writer_options { // Parquet writer can write timestamps as UTC // Defaults to true because libcudf timestamps are implicitly UTC bool _write_timestamps_as_UTC = true; - // Column chunks file paths to be set in the raw output metadata. One per output file - std::vector _column_chunks_file_paths; // Maximum size of each row group (unless smaller than a single page) size_t _row_group_size_bytes = default_row_group_size_bytes; // Maximum number of rows in row group (unless smaller than a single page) @@ -627,18 +620,13 @@ class parquet_writer_options { // Which columns in _table are used for sorting std::optional> _sorting_columns; + protected: /** - * @brief Constructor from sink and table. + * @brief Constructor from sink. * * @param sink The sink used for writer output - * @param table Table to be written to output */ - explicit parquet_writer_options(sink_info const& sink, table_view const& table) - : _sink(sink), _table(table) - { - } - - friend parquet_writer_options_builder; + explicit parquet_writer_options_base(sink_info const& sink) : _sink(sink) {} public: /** @@ -646,24 +634,7 @@ class parquet_writer_options { * * This has been added since Cython requires a default constructor to create objects on stack. */ - parquet_writer_options() = default; - - /** - * @brief Create builder to create `parquet_writer_options`. - * - * @param sink The sink used for writer output - * @param table Table to be written to output - * - * @return Builder to build parquet_writer_options - */ - static parquet_writer_options_builder builder(sink_info const& sink, table_view const& table); - - /** - * @brief Create builder to create `parquet_writer_options`. - * - * @return parquet_writer_options_builder - */ - static parquet_writer_options_builder builder(); + parquet_writer_options_base() = default; /** * @brief Returns sink info. @@ -686,20 +657,6 @@ class parquet_writer_options { */ [[nodiscard]] statistics_freq get_stats_level() const { return _stats_level; } - /** - * @brief Returns table_view. - * - * @return Table view - */ - [[nodiscard]] table_view get_table() const { return _table; } - - /** - * @brief Returns partitions. - * - * @return Partitions - */ - [[nodiscard]] std::vector const& get_partitions() const { return _partitions; } - /** * @brief Returns associated metadata. * @@ -712,7 +669,8 @@ class parquet_writer_options { * * @return Key-Value footer metadata information */ - std::vector> const& get_key_value_metadata() const + [[nodiscard]] std::vector> const& get_key_value_metadata() + const { return _user_data; } @@ -722,7 +680,7 @@ class parquet_writer_options { * * @return `true` if timestamps will be written as INT96 */ - bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } + [[nodiscard]] bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } /** * @brief Returns `true` if timestamps will be written as UTC @@ -731,29 +689,19 @@ class parquet_writer_options { */ [[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; } - /** - * @brief Returns Column chunks file paths to be set in the raw output metadata. - * - * @return Column chunks file paths to be set in the raw output metadata - */ - std::vector const& get_column_chunks_file_paths() const - { - return _column_chunks_file_paths; - } - /** * @brief Returns maximum row group size, in bytes. * * @return Maximum row group size, in bytes */ - auto get_row_group_size_bytes() const { return _row_group_size_bytes; } + [[nodiscard]] auto get_row_group_size_bytes() const { return _row_group_size_bytes; } /** * @brief Returns maximum row group size, in rows. * * @return Maximum row group size, in rows */ - auto get_row_group_size_rows() const { return _row_group_size_rows; } + [[nodiscard]] auto get_row_group_size_rows() const { return _row_group_size_rows; } /** * @brief Returns the maximum uncompressed page size, in bytes. @@ -762,7 +710,7 @@ class parquet_writer_options { * * @return Maximum uncompressed page size, in bytes */ - auto get_max_page_size_bytes() const + [[nodiscard]] auto get_max_page_size_bytes() const { return std::min(_max_page_size_bytes, get_row_group_size_bytes()); } @@ -774,7 +722,7 @@ class parquet_writer_options { * * @return Maximum page size, in rows */ - auto get_max_page_size_rows() const + [[nodiscard]] auto get_max_page_size_rows() const { return std::min(_max_page_size_rows, get_row_group_size_rows()); } @@ -784,7 +732,10 @@ class parquet_writer_options { * * @return length min/max will be truncated to */ - auto get_column_index_truncate_length() const { return _column_index_truncate_length; } + [[nodiscard]] auto get_column_index_truncate_length() const + { + return _column_index_truncate_length; + } /** * @brief Returns policy for dictionary use. @@ -831,20 +782,12 @@ class parquet_writer_options { */ [[nodiscard]] auto const& get_sorting_columns() const { return _sorting_columns; } - /** - * @brief Sets partitions. - * - * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must - * be same size as number of sinks in sink_info - */ - void set_partitions(std::vector partitions); - /** * @brief Sets metadata. * * @param metadata Associated metadata */ - void set_metadata(table_input_metadata metadata) { _metadata = std::move(metadata); } + void set_metadata(table_input_metadata metadata); /** * @brief Sets metadata. @@ -858,14 +801,13 @@ class parquet_writer_options { * * @param sf Level of statistics requested in the output file */ - void set_stats_level(statistics_freq sf) { _stats_level = sf; } - + void set_stats_level(statistics_freq sf); /** * @brief Sets compression type. * * @param compression The compression type to use */ - void set_compression(compression_type compression) { _compression = compression; } + void set_compression(compression_type compression); /** * @brief Sets timestamp writing preferences. INT96 timestamps will be written @@ -873,22 +815,14 @@ class parquet_writer_options { * * @param req Boolean value to enable/disable writing of INT96 timestamps */ - void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } + void enable_int96_timestamps(bool req); /** * @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`. * * @param val Boolean value to enable/disable writing of timestamps as UTC. */ - void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; } - - /** - * @brief Sets column chunks file path to be set in the raw output metadata. - * - * @param file_paths Vector of Strings which indicates file path. Must be same size as number of - * data sinks in sink info - */ - void set_column_chunks_file_paths(std::vector file_paths); + void enable_utc_timestamps(bool val); /** * @brief Sets the maximum row group size, in bytes. @@ -951,116 +885,84 @@ class parquet_writer_options { * * @param comp_stats Pointer to compression statistics to be updated after writing */ - void set_compression_statistics(std::shared_ptr comp_stats) - { - _compression_stats = std::move(comp_stats); - } + void set_compression_statistics(std::shared_ptr comp_stats); /** * @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`. * * @param val Boolean value to enable/disable writing of V2 page headers. */ - void enable_write_v2_headers(bool val) { _v2_page_headers = val; } + void enable_write_v2_headers(bool val); /** * @brief Sets sorting columns. * * @param sorting_columns Column sort order metadata */ - void set_sorting_columns(std::vector sorting_columns) - { - _sorting_columns = std::move(sorting_columns); - } + void set_sorting_columns(std::vector sorting_columns); }; /** - * @brief Class to build `parquet_writer_options`. + * @brief Base class for Parquet options builders. */ -class parquet_writer_options_builder { - parquet_writer_options options; +template +class parquet_writer_options_builder_base { + OptionsT _options; - public: + protected: /** - * @brief Default constructor. + * @brief Return reference to the options object being built * - * This has been added since Cython requires a default constructor to create objects on stack. + * @return the options object */ - explicit parquet_writer_options_builder() = default; + inline OptionsT& get_options() { return _options; } /** - * @brief Constructor from sink and table. + * @brief Constructor from options. * - * @param sink The sink used for writer output - * @param table Table to be written to output + * @param options Options object to build */ - explicit parquet_writer_options_builder(sink_info const& sink, table_view const& table) - : options(sink, table) - { - } + explicit parquet_writer_options_builder_base(OptionsT options); + public: /** - * @brief Sets partitions in parquet_writer_options. + * @brief Default constructor. * - * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must - * be same size as number of sinks in sink_info - * @return this for chaining + * This has been added since Cython requires a default constructor to create objects on stack. */ - parquet_writer_options_builder& partitions(std::vector partitions); + explicit parquet_writer_options_builder_base() = default; /** - * @brief Sets metadata in parquet_writer_options. + * @brief Sets metadata. * * @param metadata Associated metadata * @return this for chaining */ - parquet_writer_options_builder& metadata(table_input_metadata metadata) - { - options._metadata = std::move(metadata); - return *this; - } + BuilderT& metadata(table_input_metadata metadata); /** - * @brief Sets Key-Value footer metadata in parquet_writer_options. + * @brief Sets Key-Value footer metadata. * * @param metadata Key-Value footer metadata * @return this for chaining */ - parquet_writer_options_builder& key_value_metadata( - std::vector> metadata); + BuilderT& key_value_metadata(std::vector> metadata); /** - * @brief Sets the level of statistics in parquet_writer_options. + * @brief Sets the level of statistics. * * @param sf Level of statistics requested in the output file * @return this for chaining */ - parquet_writer_options_builder& stats_level(statistics_freq sf) - { - options._stats_level = sf; - return *this; - } + BuilderT& stats_level(statistics_freq sf); /** - * @brief Sets compression type in parquet_writer_options. + * @brief Sets compression type. * * @param compression The compression type to use * @return this for chaining */ - parquet_writer_options_builder& compression(compression_type compression) - { - options._compression = compression; - return *this; - } - - /** - * @brief Sets column chunks file path to be set in the raw output metadata. - * - * @param file_paths Vector of Strings which indicates file path. Must be same size as number of - * data sinks - * @return this for chaining - */ - parquet_writer_options_builder& column_chunks_file_paths(std::vector file_paths); + BuilderT& compression(compression_type compression); /** * @brief Sets the maximum row group size, in bytes. @@ -1068,11 +970,7 @@ class parquet_writer_options_builder { * @param val maximum row group size * @return this for chaining */ - parquet_writer_options_builder& row_group_size_bytes(size_t val) - { - options.set_row_group_size_bytes(val); - return *this; - } + BuilderT& row_group_size_bytes(size_t val); /** * @brief Sets the maximum number of rows in output row groups. @@ -1080,11 +978,7 @@ class parquet_writer_options_builder { * @param val maximum number or rows * @return this for chaining */ - parquet_writer_options_builder& row_group_size_rows(size_type val) - { - options.set_row_group_size_rows(val); - return *this; - } + BuilderT& row_group_size_rows(size_type val); /** * @brief Sets the maximum uncompressed page size, in bytes. @@ -1096,11 +990,7 @@ class parquet_writer_options_builder { * @param val maximum page size * @return this for chaining */ - parquet_writer_options_builder& max_page_size_bytes(size_t val) - { - options.set_max_page_size_bytes(val); - return *this; - } + BuilderT& max_page_size_bytes(size_t val); /** * @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting. @@ -1109,11 +999,7 @@ class parquet_writer_options_builder { * @param val maximum rows per page * @return this for chaining */ - parquet_writer_options_builder& max_page_size_rows(size_type val) - { - options.set_max_page_size_rows(val); - return *this; - } + BuilderT& max_page_size_rows(size_type val); /** * @brief Sets the desired maximum size in bytes for min and max values in the column index. @@ -1128,11 +1014,7 @@ class parquet_writer_options_builder { * @param val length min/max will be truncated to, with 0 indicating no truncation * @return this for chaining */ - parquet_writer_options_builder& column_index_truncate_length(int32_t val) - { - options.set_column_index_truncate_length(val); - return *this; - } + BuilderT& column_index_truncate_length(int32_t val); /** * @brief Sets the policy for dictionary use. @@ -1151,7 +1033,7 @@ class parquet_writer_options_builder { * @param val policy for dictionary use * @return this for chaining */ - parquet_writer_options_builder& dictionary_policy(enum dictionary_policy val); + BuilderT& dictionary_policy(enum dictionary_policy val); /** * @brief Sets the maximum dictionary size, in bytes. @@ -1164,7 +1046,7 @@ class parquet_writer_options_builder { * @param val maximum dictionary size * @return this for chaining */ - parquet_writer_options_builder& max_dictionary_size(size_t val); + BuilderT& max_dictionary_size(size_t val); /** * @brief Sets the maximum page fragment size, in rows. @@ -1176,7 +1058,7 @@ class parquet_writer_options_builder { * @param val maximum page fragment size * @return this for chaining */ - parquet_writer_options_builder& max_page_fragment_size(size_type val); + BuilderT& max_page_fragment_size(size_type val); /** * @brief Sets the pointer to the output compression statistics. @@ -1184,24 +1066,16 @@ class parquet_writer_options_builder { * @param comp_stats Pointer to compression statistics to be filled once writer is done * @return this for chaining */ - parquet_writer_options_builder& compression_statistics( - std::shared_ptr const& comp_stats) - { - options._compression_stats = comp_stats; - return *this; - } + BuilderT& compression_statistics( + std::shared_ptr const& comp_stats); /** - * @brief Sets whether int96 timestamps are written or not in parquet_writer_options. + * @brief Sets whether int96 timestamps are written or not. * * @param enabled Boolean value to enable/disable int96 timestamps * @return this for chaining */ - parquet_writer_options_builder& int96_timestamps(bool enabled) - { - options._write_timestamps_as_int96 = enabled; - return *this; - } + BuilderT& int96_timestamps(bool enabled); /** * @brief Set to true if timestamps are to be written as UTC. @@ -1209,126 +1083,60 @@ class parquet_writer_options_builder { * @param enabled Boolean value to enable/disable writing of timestamps as UTC. * @return this for chaining */ - parquet_writer_options_builder& utc_timestamps(bool enabled) - { - options._write_timestamps_as_UTC = enabled; - return *this; - } - + BuilderT& utc_timestamps(bool enabled); /** * @brief Set to true if V2 page headers are to be written. * * @param enabled Boolean value to enable/disable writing of V2 page headers. * @return this for chaining */ - parquet_writer_options_builder& write_v2_headers(bool enabled); + BuilderT& write_v2_headers(bool enabled); /** - * @brief Sets column sorting metadata to chunked_parquet_writer_options. + * @brief Sets column sorting metadata. * * @param sorting_columns Column sort order metadata * @return this for chaining */ - parquet_writer_options_builder& sorting_columns(std::vector sorting_columns); + BuilderT& sorting_columns(std::vector sorting_columns); /** - * @brief move parquet_writer_options member once it's built. + * @brief move options member once it's built. */ - operator parquet_writer_options&&() { return std::move(options); } + operator OptionsT&&(); /** - * @brief move parquet_writer_options member once it's built. + * @brief move options member once it's built. * * This has been added since Cython does not support overloading of conversion operators. * * @return Built `parquet_writer_options` object's r-value reference */ - parquet_writer_options&& build() { return std::move(options); } + OptionsT&& build(); }; -/** - * @brief Writes a set of columns to parquet format. - * - * The following code snippet demonstrates how to write columns to a file: - * @code - * auto destination = cudf::io::sink_info("dataset.parquet"); - * auto options = cudf::io::parquet_writer_options::builder(destination, table->view()); - * cudf::io::write_parquet(options); - * @endcode - * - * @param options Settings for controlling writing behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @return A blob that contains the file metadata (parquet FileMetadata thrift message) if - * requested in parquet_writer_options (empty blob otherwise). - */ - -std::unique_ptr> write_parquet( - parquet_writer_options const& options, rmm::cuda_stream_view stream = cudf::get_default_stream()); +class parquet_writer_options_builder; /** - * @brief Merges multiple raw metadata blobs that were previously created by write_parquet - * into a single metadata blob. - * - * @ingroup io_writers - * - * @param[in] metadata_list List of input file metadata - * @return A parquet-compatible blob that contains the data for all row groups in the list + * @brief Settings for `write_parquet()`. */ -std::unique_ptr> merge_row_group_metadata( - std::vector>> const& metadata_list); - -class chunked_parquet_writer_options_builder; +class parquet_writer_options : public parquet_writer_options_base { + // Sets of columns to output + table_view _table; + // Partitions described as {start_row, num_rows} pairs + std::vector _partitions; + // Column chunks file paths to be set in the raw output metadata. One per output file + std::vector _column_chunks_file_paths; -/** - * @brief Settings for `write_parquet_chunked()`. - */ -class chunked_parquet_writer_options { - // Specify the sink to use for writer output - sink_info _sink; - // Specify the compression format to use - compression_type _compression = compression_type::AUTO; - // Specify the level of statistics in the output file - statistics_freq _stats_level = statistics_freq::STATISTICS_ROWGROUP; - // Optional associated metadata. - std::optional _metadata; - // Optional footer key_value_metadata - std::vector> _user_data; - // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. - // If true then overrides any per-column setting in _metadata. - bool _write_timestamps_as_int96 = false; - // Parquet writer can write timestamps as UTC. Defaults to true. - bool _write_timestamps_as_UTC = true; - // Maximum size of each row group (unless smaller than a single page) - size_t _row_group_size_bytes = default_row_group_size_bytes; - // Maximum number of rows in row group (unless smaller than a single page) - size_type _row_group_size_rows = default_row_group_size_rows; - // Maximum size of each page (uncompressed) - size_t _max_page_size_bytes = default_max_page_size_bytes; - // Maximum number of rows in a page - size_type _max_page_size_rows = default_max_page_size_rows; - // Maximum size of min or max values in column index - int32_t _column_index_truncate_length = default_column_index_truncate_length; - // When to use dictionary encoding for data - dictionary_policy _dictionary_policy = dictionary_policy::ADAPTIVE; - // Maximum size of column chunk dictionary (in bytes) - size_t _max_dictionary_size = default_max_dictionary_size; - // Maximum number of rows in a page fragment - std::optional _max_page_fragment_size; - // Optional compression statistics - std::shared_ptr _compression_stats; - // write V2 page headers? - bool _v2_page_headers = false; - // Which columns in _table are used for sorting - std::optional> _sorting_columns; + friend parquet_writer_options_builder; /** - * @brief Constructor from sink. + * @brief Constructor from sink and table. * - * @param sink Sink used for writer output + * @param sink The sink used for writer output + * @param table Table to be written to output */ - explicit chunked_parquet_writer_options(sink_info const& sink) : _sink(sink) {} - - friend chunked_parquet_writer_options_builder; + explicit parquet_writer_options(sink_info const& sink, table_view const& table); public: /** @@ -1336,277 +1144,160 @@ class chunked_parquet_writer_options { * * This has been added since Cython requires a default constructor to create objects on stack. */ - chunked_parquet_writer_options() = default; + parquet_writer_options() = default; /** - * @brief Returns sink info. + * @brief Create builder to create `parquet_writer_options`. * - * @return Sink info + * @param sink The sink used for writer output + * @param table Table to be written to output + * + * @return Builder to build parquet_writer_options */ - [[nodiscard]] sink_info const& get_sink() const { return _sink; } + static parquet_writer_options_builder builder(sink_info const& sink, table_view const& table); /** - * @brief Returns compression format used. + * @brief Create builder to create `parquet_writer_options`. * - * @return Compression format + * @return parquet_writer_options_builder */ - [[nodiscard]] compression_type get_compression() const { return _compression; } + static parquet_writer_options_builder builder(); /** - * @brief Returns level of statistics requested in output file. + * @brief Returns table_view. * - * @return Level of statistics requested in output file + * @return Table view */ - [[nodiscard]] statistics_freq get_stats_level() const { return _stats_level; } + [[nodiscard]] table_view get_table() const { return _table; } /** - * @brief Returns metadata information. + * @brief Returns partitions. * - * @return Metadata information + * @return Partitions */ - [[nodiscard]] auto const& get_metadata() const { return _metadata; } + [[nodiscard]] std::vector const& get_partitions() const { return _partitions; } /** - * @brief Returns Key-Value footer metadata information. + * @brief Returns Column chunks file paths to be set in the raw output metadata. * - * @return Key-Value footer metadata information + * @return Column chunks file paths to be set in the raw output metadata */ - std::vector> const& get_key_value_metadata() const + [[nodiscard]] std::vector const& get_column_chunks_file_paths() const { - return _user_data; - } - - /** - * @brief Returns `true` if timestamps will be written as INT96 - * - * @return `true` if timestamps will be written as INT96 - */ - bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } - - /** - * @brief Returns `true` if timestamps will be written as UTC - * - * @return `true` if timestamps will be written as UTC - */ - [[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; } - - /** - * @brief Returns maximum row group size, in bytes. - * - * @return Maximum row group size, in bytes - */ - auto get_row_group_size_bytes() const { return _row_group_size_bytes; } - - /** - * @brief Returns maximum row group size, in rows. - * - * @return Maximum row group size, in rows - */ - auto get_row_group_size_rows() const { return _row_group_size_rows; } - - /** - * @brief Returns maximum uncompressed page size, in bytes. - * - * If set larger than the row group size, then this will return the - * row group size. - * - * @return Maximum uncompressed page size, in bytes - */ - auto get_max_page_size_bytes() const - { - return std::min(_max_page_size_bytes, get_row_group_size_bytes()); - } - - /** - * @brief Returns maximum page size, in rows. - * - * If set larger than the row group size, then this will return the row group size. - * - * @return Maximum page size, in rows - */ - auto get_max_page_size_rows() const - { - return std::min(_max_page_size_rows, get_row_group_size_rows()); - } - - /** - * @brief Returns maximum length of min or max values in column index, in bytes. - * - * @return length min/max will be truncated to - */ - auto get_column_index_truncate_length() const { return _column_index_truncate_length; } - - /** - * @brief Returns policy for dictionary use. - * - * @return policy for dictionary use - */ - [[nodiscard]] dictionary_policy get_dictionary_policy() const { return _dictionary_policy; } - - /** - * @brief Returns maximum dictionary size, in bytes. - * - * @return Maximum dictionary size, in bytes. - */ - [[nodiscard]] auto get_max_dictionary_size() const { return _max_dictionary_size; } - - /** - * @brief Returns maximum page fragment size, in rows. - * - * @return Maximum page fragment size, in rows. - */ - [[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; } - - /** - * @brief Returns a shared pointer to the user-provided compression statistics. - * - * @return Compression statistics - */ - [[nodiscard]] std::shared_ptr get_compression_statistics() const - { - return _compression_stats; + return _column_chunks_file_paths; } /** - * @brief Returns `true` if V2 page headers should be written. - * - * @return `true` if V2 page headers should be written. - */ - [[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; } - - /** - * @brief Returns the sorting_columns. - * - * @return Column sort order metadata - */ - [[nodiscard]] auto const& get_sorting_columns() const { return _sorting_columns; } - - /** - * @brief Sets metadata. - * - * @param metadata Associated metadata - */ - void set_metadata(table_input_metadata metadata) { _metadata = std::move(metadata); } - - /** - * @brief Sets Key-Value footer metadata. - * - * @param metadata Key-Value footer metadata - */ - void set_key_value_metadata(std::vector> metadata); - - /** - * @brief Sets the level of statistics in parquet_writer_options. - * - * @param sf Level of statistics requested in the output file - */ - void set_stats_level(statistics_freq sf) { _stats_level = sf; } - - /** - * @brief Sets compression type. - * - * @param compression The compression type to use - */ - void set_compression(compression_type compression) { _compression = compression; } - - /** - * @brief Sets timestamp writing preferences. - * - * INT96 timestamps will be written if `true` and TIMESTAMP_MICROS will be written if `false`. + * @brief Sets partitions. * - * @param req Boolean value to enable/disable writing of INT96 timestamps + * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must + * be same size as number of sinks in sink_info */ - void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } + void set_partitions(std::vector partitions); /** - * @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`. + * @brief Sets column chunks file path to be set in the raw output metadata. * - * @param val Boolean value to enable/disable writing of timestamps as UTC. + * @param file_paths Vector of Strings which indicates file path. Must be same size as number of + * data sinks in sink info */ - void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; } + void set_column_chunks_file_paths(std::vector file_paths); +}; +/** + * @brief Class to build `parquet_writer_options`. + */ +class parquet_writer_options_builder + : public parquet_writer_options_builder_base { + public: /** - * @brief Sets the maximum row group size, in bytes. + * @brief Default constructor. * - * @param size_bytes Maximum row group size, in bytes to set + * This has been added since Cython requires a default constructor to create objects on stack. */ - void set_row_group_size_bytes(size_t size_bytes); + explicit parquet_writer_options_builder() = default; /** - * @brief Sets the maximum row group size, in rows. + * @brief Constructor from sink and table. * - * @param size_rows The maximum row group size, in rows to set + * @param sink The sink used for writer output + * @param table Table to be written to output */ - void set_row_group_size_rows(size_type size_rows); + explicit parquet_writer_options_builder(sink_info const& sink, table_view const& table); /** - * @brief Sets the maximum uncompressed page size, in bytes. + * @brief Sets partitions in parquet_writer_options. * - * @param size_bytes Maximum uncompressed page size, in bytes to set + * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must + * be same size as number of sinks in sink_info + * @return this for chaining */ - void set_max_page_size_bytes(size_t size_bytes); + parquet_writer_options_builder& partitions(std::vector partitions); /** - * @brief Sets the maximum page size, in rows. + * @brief Sets column chunks file path to be set in the raw output metadata. * - * @param size_rows The maximum page size, in rows to set + * @param file_paths Vector of Strings which indicates file path. Must be same size as number of + * data sinks + * @return this for chaining */ - void set_max_page_size_rows(size_type size_rows); + parquet_writer_options_builder& column_chunks_file_paths(std::vector file_paths); +}; - /** - * @brief Sets the maximum length of min or max values in column index, in bytes. - * - * @param size_bytes length min/max will be truncated to - */ - void set_column_index_truncate_length(int32_t size_bytes); +/** + * @brief Writes a set of columns to parquet format. + * + * The following code snippet demonstrates how to write columns to a file: + * @code + * auto destination = cudf::io::sink_info("dataset.parquet"); + * auto options = cudf::io::parquet_writer_options::builder(destination, table->view()); + * cudf::io::write_parquet(options); + * @endcode + * + * @param options Settings for controlling writing behavior + * @param stream CUDA stream used for device memory operations and kernel launches + * @return A blob that contains the file metadata (parquet FileMetadata thrift message) if + * requested in parquet_writer_options (empty blob otherwise). + */ - /** - * @brief Sets the policy for dictionary use. - * - * @param policy Policy for dictionary use - */ - void set_dictionary_policy(dictionary_policy policy); +std::unique_ptr> write_parquet( + parquet_writer_options const& options, rmm::cuda_stream_view stream = cudf::get_default_stream()); - /** - * @brief Sets the maximum dictionary size, in bytes. - * - * @param size_bytes Maximum dictionary size, in bytes - */ - void set_max_dictionary_size(size_t size_bytes); +/** + * @brief Merges multiple raw metadata blobs that were previously created by write_parquet + * into a single metadata blob. + * + * @ingroup io_writers + * + * @param[in] metadata_list List of input file metadata + * @return A parquet-compatible blob that contains the data for all row groups in the list + */ +std::unique_ptr> merge_row_group_metadata( + std::vector>> const& metadata_list); - /** - * @brief Sets the maximum page fragment size, in rows. - * - * @param size_rows Maximum page fragment size, in rows. - */ - void set_max_page_fragment_size(size_type size_rows); +class chunked_parquet_writer_options_builder; +/** + * @brief Settings for `parquet_chunked_writer`. + */ +class chunked_parquet_writer_options : public parquet_writer_options_base { /** - * @brief Sets the pointer to the output compression statistics. + * @brief Constructor from sink. * - * @param comp_stats Pointer to compression statistics to be updated after writing + * @param sink Sink used for writer output */ - void set_compression_statistics(std::shared_ptr comp_stats) - { - _compression_stats = std::move(comp_stats); - } + explicit chunked_parquet_writer_options(sink_info const& sink); - /** - * @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`. - * - * @param val Boolean value to enable/disable writing of V2 page headers. - */ - void enable_write_v2_headers(bool val) { _v2_page_headers = val; } + friend chunked_parquet_writer_options_builder; + public: /** - * @brief Sets sorting columns. + * @brief Default constructor. * - * @param sorting_columns Column sort order metadata + * This has been added since Cython requires a default constructor to create objects on stack. */ - void set_sorting_columns(std::vector sorting_columns) - { - _sorting_columns = std::move(sorting_columns); - } + chunked_parquet_writer_options() = default; /** * @brief creates builder to build chunked_parquet_writer_options. @@ -1619,11 +1310,11 @@ class chunked_parquet_writer_options { }; /** - * @brief Builds options for chunked_parquet_writer_options. + * @brief Class to build `chunked_parquet_writer_options`. */ -class chunked_parquet_writer_options_builder { - chunked_parquet_writer_options options; - +class chunked_parquet_writer_options_builder + : public parquet_writer_options_builder_base { public: /** * @brief Default constructor. @@ -1637,238 +1328,7 @@ class chunked_parquet_writer_options_builder { * * @param sink The sink used for writer output */ - chunked_parquet_writer_options_builder(sink_info const& sink) : options(sink){}; - - /** - * @brief Sets metadata to chunked_parquet_writer_options. - * - * @param metadata Associated metadata - * @return this for chaining - */ - chunked_parquet_writer_options_builder& metadata(table_input_metadata metadata) - { - options._metadata = std::move(metadata); - return *this; - } - - /** - * @brief Sets Key-Value footer metadata in parquet_writer_options. - * - * @param metadata Key-Value footer metadata - * @return this for chaining - */ - chunked_parquet_writer_options_builder& key_value_metadata( - std::vector> metadata); - - /** - * @brief Sets the level of statistics in chunked_parquet_writer_options. - * - * @param sf Level of statistics requested in the output file - * @return this for chaining - */ - chunked_parquet_writer_options_builder& stats_level(statistics_freq sf) - { - options._stats_level = sf; - return *this; - } - - /** - * @brief Sets compression type to chunked_parquet_writer_options. - * - * @param compression The compression type to use - * @return this for chaining - */ - chunked_parquet_writer_options_builder& compression(compression_type compression) - { - options._compression = compression; - return *this; - } - - /** - * @brief Set to true if timestamps should be written as - * int96 types instead of int64 types. Even though int96 is deprecated and is - * not an internal type for cudf, it needs to be written for backwards - * compatibility reasons. - * - * @param enabled Boolean value to enable/disable int96 timestamps - * @return this for chaining - */ - chunked_parquet_writer_options_builder& int96_timestamps(bool enabled) - { - options._write_timestamps_as_int96 = enabled; - return *this; - } - - /** - * @brief Set to true if timestamps are to be written as UTC. - * - * @param enabled Boolean value to enable/disable writing of timestamps as UTC. - * @return this for chaining - */ - chunked_parquet_writer_options_builder& utc_timestamps(bool enabled) - { - options._write_timestamps_as_UTC = enabled; - return *this; - } - - /** - * @brief Set to true if V2 page headers are to be written. - * - * @param enabled Boolean value to enable/disable writing of V2 page headers. - * @return this for chaining - */ - chunked_parquet_writer_options_builder& write_v2_headers(bool enabled); - - /** - * @brief Sets the maximum row group size, in bytes. - * - * @param val maximum row group size - * @return this for chaining - */ - chunked_parquet_writer_options_builder& row_group_size_bytes(size_t val) - { - options.set_row_group_size_bytes(val); - return *this; - } - - /** - * @brief Sets the maximum number of rows in output row groups. - * - * @param val maximum number or rows - * @return this for chaining - */ - chunked_parquet_writer_options_builder& row_group_size_rows(size_type val) - { - options.set_row_group_size_rows(val); - return *this; - } - - /** - * @brief Sets the maximum uncompressed page size, in bytes. - * - * Serves as a hint to the writer, and can be exceeded under certain circumstances. Cannot be - * larger than the row group size in bytes, and will be adjusted to match if it is. - * - * @param val maximum page size - * @return this for chaining - */ - chunked_parquet_writer_options_builder& max_page_size_bytes(size_t val) - { - options.set_max_page_size_bytes(val); - return *this; - } - - /** - * @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting. - * Cannot be larger than the row group size in rows, and will be adjusted to match if it is. - * - * @param val maximum rows per page - * @return this for chaining - */ - chunked_parquet_writer_options_builder& max_page_size_rows(size_type val) - { - options.set_max_page_size_rows(val); - return *this; - } - - /** - * @brief Sets the desired maximum size in bytes for min and max values in the column index. - * - * Values exceeding this limit will be truncated, but modified such that they will still - * be valid lower and upper bounds. This only applies to variable length types, such as string. - * Maximum values will not be truncated if there is no suitable truncation that results in - * a valid upper bound. - * - * Default value is 64. - * - * @param val length min/max will be truncated to, with 0 indicating no truncation - * @return this for chaining - */ - chunked_parquet_writer_options_builder& column_index_truncate_length(int32_t val) - { - options.set_column_index_truncate_length(val); - return *this; - } - - /** - * @brief Sets the policy for dictionary use. - * - * Certain compression algorithms (e.g Zstandard) have limits on how large of a buffer can - * be compressed. In some circumstances, the dictionary can grow beyond this limit, which - * will prevent the column from being compressed. This setting controls how the writer - * should act in these circumstances. A setting of dictionary_policy::ADAPTIVE will disable - * dictionary encoding for columns where the dictionary exceeds the limit. A setting of - * dictionary_policy::NEVER will disable the use of dictionary encoding globally. A setting of - * dictionary_policy::ALWAYS will allow the use of dictionary encoding even if it will result in - * the disabling of compression for columns that would otherwise be compressed. - * - * The default value is dictionary_policy::ADAPTIVE. - * - * @param val policy for dictionary use - * @return this for chaining - */ - chunked_parquet_writer_options_builder& dictionary_policy(enum dictionary_policy val); - - /** - * @brief Sets the maximum dictionary size, in bytes. - * - * Disables dictionary encoding for any column chunk where the dictionary will - * exceed this limit. Only used when the dictionary_policy is set to 'ADAPTIVE'. - * - * Default value is 1048576 (1MiB). - * - * @param val maximum dictionary size - * @return this for chaining - */ - chunked_parquet_writer_options_builder& max_dictionary_size(size_t val); - - /** - * @brief Sets the maximum page fragment size, in rows. - * - * Files with nested schemas or very long strings may need a page fragment size - * smaller than the default value of 5000 to ensure a single fragment will not - * exceed the desired maximum page size in bytes. - * - * @param val maximum page fragment size - * @return this for chaining - */ - chunked_parquet_writer_options_builder& max_page_fragment_size(size_type val); - - /** - * @brief Sets the pointer to the output compression statistics. - * - * @param comp_stats Pointer to compression statistics to be filled once writer is done - * @return this for chaining - */ - chunked_parquet_writer_options_builder& compression_statistics( - std::shared_ptr const& comp_stats) - { - options._compression_stats = comp_stats; - return *this; - } - - /** - * @brief Sets column sorting metadata to chunked_parquet_writer_options. - * - * @param sorting_columns Column sort order metadata - * @return this for chaining - */ - chunked_parquet_writer_options_builder& sorting_columns( - std::vector sorting_columns); - - /** - * @brief move chunked_parquet_writer_options member once it's built. - */ - operator chunked_parquet_writer_options&&() { return std::move(options); } - - /** - * @brief move chunked_parquet_writer_options member once it's is built. - * - * This has been added since Cython does not support overloading of conversion operators. - * - * @return Built `chunked_parquet_writer_options` object's r-value reference - */ - chunked_parquet_writer_options&& build() { return std::move(options); } + chunked_parquet_writer_options_builder(sink_info const& sink); }; /** diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 3ba2facf276..1ed8ee5ce06 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -115,7 +115,7 @@ parquet_writer_options_builder parquet_writer_options::builder() chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder( sink_info const& sink) { - return chunked_parquet_writer_options_builder(sink); + return chunked_parquet_writer_options_builder{sink}; } namespace { @@ -740,29 +740,37 @@ void parquet_reader_options::set_num_rows(size_type val) _num_rows = val; } -void parquet_writer_options::set_partitions(std::vector partitions) +void parquet_writer_options_base::set_metadata(table_input_metadata metadata) { - CUDF_EXPECTS(partitions.size() == _sink.num_sinks(), - "Mismatch between number of sinks and number of partitions"); - _partitions = std::move(partitions); + _metadata = std::move(metadata); } -void parquet_writer_options::set_key_value_metadata( +void parquet_writer_options_base::set_key_value_metadata( std::vector> metadata) { - CUDF_EXPECTS(metadata.size() == _sink.num_sinks(), + CUDF_EXPECTS(metadata.size() == get_sink().num_sinks(), "Mismatch between number of sinks and number of metadata maps"); _user_data = std::move(metadata); } -void parquet_writer_options::set_column_chunks_file_paths(std::vector file_paths) +void parquet_writer_options_base::set_stats_level(statistics_freq sf) { _stats_level = sf; } + +void parquet_writer_options_base::set_compression(compression_type compression) { - CUDF_EXPECTS(file_paths.size() == _sink.num_sinks(), - "Mismatch between number of sinks and number of chunk paths to set"); - _column_chunks_file_paths = std::move(file_paths); + _compression = compression; +} + +void parquet_writer_options_base::enable_int96_timestamps(bool req) +{ + _write_timestamps_as_int96 = req; +} + +void parquet_writer_options_base::enable_utc_timestamps(bool val) +{ + _write_timestamps_as_UTC = val; } -void parquet_writer_options::set_row_group_size_bytes(size_t size_bytes) +void parquet_writer_options_base::set_row_group_size_bytes(size_t size_bytes) { CUDF_EXPECTS( size_bytes >= 1024, @@ -770,13 +778,13 @@ void parquet_writer_options::set_row_group_size_bytes(size_t size_bytes) _row_group_size_bytes = size_bytes; } -void parquet_writer_options::set_row_group_size_rows(size_type size_rows) +void parquet_writer_options_base::set_row_group_size_rows(size_type size_rows) { CUDF_EXPECTS(size_rows > 0, "The maximum row group row count must be a positive integer."); _row_group_size_rows = size_rows; } -void parquet_writer_options::set_max_page_size_bytes(size_t size_bytes) +void parquet_writer_options_base::set_max_page_size_bytes(size_t size_bytes) { CUDF_EXPECTS(size_bytes >= 1024, "The maximum page size cannot be smaller than 1KB."); CUDF_EXPECTS(size_bytes <= static_cast(std::numeric_limits::max()), @@ -784,190 +792,249 @@ void parquet_writer_options::set_max_page_size_bytes(size_t size_bytes) _max_page_size_bytes = size_bytes; } -void parquet_writer_options::set_max_page_size_rows(size_type size_rows) +void parquet_writer_options_base::set_max_page_size_rows(size_type size_rows) { CUDF_EXPECTS(size_rows > 0, "The maximum page row count must be a positive integer."); _max_page_size_rows = size_rows; } -void parquet_writer_options::set_column_index_truncate_length(int32_t size_bytes) +void parquet_writer_options_base::set_column_index_truncate_length(int32_t size_bytes) { CUDF_EXPECTS(size_bytes >= 0, "Column index truncate length cannot be negative."); _column_index_truncate_length = size_bytes; } -void parquet_writer_options::set_dictionary_policy(dictionary_policy policy) +void parquet_writer_options_base::set_dictionary_policy(dictionary_policy policy) { _dictionary_policy = policy; } -void parquet_writer_options::set_max_dictionary_size(size_t size_bytes) +void parquet_writer_options_base::set_max_dictionary_size(size_t size_bytes) { CUDF_EXPECTS(size_bytes <= static_cast(std::numeric_limits::max()), "The maximum dictionary size cannot exceed 2GB."); _max_dictionary_size = size_bytes; } -void parquet_writer_options::set_max_page_fragment_size(size_type size_rows) +void parquet_writer_options_base::set_max_page_fragment_size(size_type size_rows) { CUDF_EXPECTS(size_rows > 0, "Page fragment size must be a positive integer."); _max_page_fragment_size = size_rows; } -parquet_writer_options_builder& parquet_writer_options_builder::partitions( - std::vector partitions) +void parquet_writer_options_base::set_compression_statistics( + std::shared_ptr comp_stats) { - options.set_partitions(std::move(partitions)); - return *this; + _compression_stats = std::move(comp_stats); +} + +void parquet_writer_options_base::enable_write_v2_headers(bool val) { _v2_page_headers = val; } + +void parquet_writer_options_base::set_sorting_columns(std::vector sorting_columns) +{ + _sorting_columns = std::move(sorting_columns); +} + +parquet_writer_options::parquet_writer_options(sink_info const& sink, table_view const& table) + : parquet_writer_options_base(sink), _table(table) +{ +} + +void parquet_writer_options::set_partitions(std::vector partitions) +{ + CUDF_EXPECTS(partitions.size() == get_sink().num_sinks(), + "Mismatch between number of sinks and number of partitions"); + _partitions = std::move(partitions); +} + +void parquet_writer_options::set_column_chunks_file_paths(std::vector file_paths) +{ + CUDF_EXPECTS(file_paths.size() == get_sink().num_sinks(), + "Mismatch between number of sinks and number of chunk paths to set"); + _column_chunks_file_paths = std::move(file_paths); +} + +template +parquet_writer_options_builder_base::parquet_writer_options_builder_base( + OptionsT options) + : _options(std::move(options)) +{ +} + +template +BuilderT& parquet_writer_options_builder_base::metadata( + table_input_metadata metadata) +{ + _options.set_metadata(std::move(metadata)); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::key_value_metadata( +template +BuilderT& parquet_writer_options_builder_base::key_value_metadata( std::vector> metadata) { - options.set_key_value_metadata(std::move(metadata)); - return *this; + _options.set_key_value_metadata(std::move(metadata)); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::column_chunks_file_paths( - std::vector file_paths) +template +BuilderT& parquet_writer_options_builder_base::stats_level(statistics_freq sf) { - options.set_column_chunks_file_paths(std::move(file_paths)); - return *this; + _options.set_stats_level(sf); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::dictionary_policy( - enum dictionary_policy val) +template +BuilderT& parquet_writer_options_builder_base::compression( + compression_type compression) { - options.set_dictionary_policy(val); - return *this; + _options.set_compression(compression); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::max_dictionary_size(size_t val) +template +BuilderT& parquet_writer_options_builder_base::row_group_size_bytes(size_t val) { - options.set_max_dictionary_size(val); - return *this; + _options.set_row_group_size_bytes(val); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::max_page_fragment_size( +template +BuilderT& parquet_writer_options_builder_base::row_group_size_rows( size_type val) { - options.set_max_page_fragment_size(val); - return *this; + _options.set_row_group_size_rows(val); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::write_v2_headers(bool enabled) +template +BuilderT& parquet_writer_options_builder_base::max_page_size_bytes(size_t val) { - options.enable_write_v2_headers(enabled); - return *this; + _options.set_max_page_size_bytes(val); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::sorting_columns( - std::vector sorting_columns) +template +BuilderT& parquet_writer_options_builder_base::max_page_size_rows(size_type val) { - options._sorting_columns = std::move(sorting_columns); - return *this; + _options.set_max_page_size_rows(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_key_value_metadata( - std::vector> metadata) +template +BuilderT& parquet_writer_options_builder_base::column_index_truncate_length( + int32_t val) { - CUDF_EXPECTS(metadata.size() == _sink.num_sinks(), - "Mismatch between number of sinks and number of metadata maps"); - _user_data = std::move(metadata); + _options.set_column_index_truncate_length(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_row_group_size_bytes(size_t size_bytes) +template +BuilderT& parquet_writer_options_builder_base::dictionary_policy( + enum dictionary_policy val) { - CUDF_EXPECTS( - size_bytes >= 1024, - "The maximum row group size cannot be smaller than the minimum page size, which is 1KB."); - _row_group_size_bytes = size_bytes; + _options.set_dictionary_policy(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_row_group_size_rows(size_type size_rows) +template +BuilderT& parquet_writer_options_builder_base::max_dictionary_size(size_t val) { - CUDF_EXPECTS(size_rows > 0, "The maximum row group row count must be a positive integer."); - _row_group_size_rows = size_rows; + _options.set_max_dictionary_size(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_max_page_size_bytes(size_t size_bytes) +template +BuilderT& parquet_writer_options_builder_base::max_page_fragment_size( + size_type val) { - CUDF_EXPECTS(size_bytes >= 1024, "The maximum page size cannot be smaller than 1KB."); - CUDF_EXPECTS(size_bytes <= static_cast(std::numeric_limits::max()), - "The maximum page size cannot exceed 2GB."); - _max_page_size_bytes = size_bytes; + _options.set_max_page_fragment_size(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_max_page_size_rows(size_type size_rows) +template +BuilderT& parquet_writer_options_builder_base::compression_statistics( + std::shared_ptr const& comp_stats) { - CUDF_EXPECTS(size_rows > 0, "The maximum page row count must be a positive integer."); - _max_page_size_rows = size_rows; + _options.set_compression_statistics(comp_stats); + return static_cast(*this); } -void chunked_parquet_writer_options::set_column_index_truncate_length(int32_t size_bytes) +template +BuilderT& parquet_writer_options_builder_base::int96_timestamps(bool enabled) { - CUDF_EXPECTS(size_bytes >= 0, "Column index truncate length cannot be negative."); - _column_index_truncate_length = size_bytes; + _options.enable_int96_timestamps(enabled); + return static_cast(*this); } -void chunked_parquet_writer_options::set_dictionary_policy(dictionary_policy policy) +template +BuilderT& parquet_writer_options_builder_base::utc_timestamps(bool enabled) { - _dictionary_policy = policy; + _options.enable_utc_timestamps(enabled); + return static_cast(*this); } -void chunked_parquet_writer_options::set_max_dictionary_size(size_t size_bytes) +template +BuilderT& parquet_writer_options_builder_base::write_v2_headers(bool enabled) { - CUDF_EXPECTS(size_bytes <= static_cast(std::numeric_limits::max()), - "The maximum dictionary size cannot exceed 2GB."); - _max_dictionary_size = size_bytes; + _options.enable_write_v2_headers(enabled); + return static_cast(*this); } -void chunked_parquet_writer_options::set_max_page_fragment_size(size_type size_rows) +template +BuilderT& parquet_writer_options_builder_base::sorting_columns( + std::vector sorting_columns) { - CUDF_EXPECTS(size_rows > 0, "Page fragment size must be a positive integer."); - _max_page_fragment_size = size_rows; + _options.set_sorting_columns(std::move(sorting_columns)); + return static_cast(*this); } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::key_value_metadata( - std::vector> metadata) +template +parquet_writer_options_builder_base::operator OptionsT&&() { - options.set_key_value_metadata(std::move(metadata)); - return *this; + return std::move(_options); } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::dictionary_policy( - enum dictionary_policy val) +template +OptionsT&& parquet_writer_options_builder_base::build() { - options.set_dictionary_policy(val); - return *this; + return std::move(_options); } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::max_dictionary_size( - size_t val) +template class parquet_writer_options_builder_base; +template class parquet_writer_options_builder_base; + +parquet_writer_options_builder::parquet_writer_options_builder(sink_info const& sink, + table_view const& table) + : parquet_writer_options_builder_base(parquet_writer_options{sink, table}) { - options.set_max_dictionary_size(val); - return *this; } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::write_v2_headers( - bool enabled) +parquet_writer_options_builder& parquet_writer_options_builder::partitions( + std::vector partitions) { - options.enable_write_v2_headers(enabled); + get_options().set_partitions(std::move(partitions)); return *this; } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::sorting_columns( - std::vector sorting_columns) +parquet_writer_options_builder& parquet_writer_options_builder::column_chunks_file_paths( + std::vector file_paths) { - options._sorting_columns = std::move(sorting_columns); + get_options().set_column_chunks_file_paths(std::move(file_paths)); return *this; } -chunked_parquet_writer_options_builder& -chunked_parquet_writer_options_builder::max_page_fragment_size(size_type val) +chunked_parquet_writer_options::chunked_parquet_writer_options(sink_info const& sink) + : parquet_writer_options_base(sink) +{ +} + +chunked_parquet_writer_options_builder::chunked_parquet_writer_options_builder( + sink_info const& sink) + : parquet_writer_options_builder_base(chunked_parquet_writer_options{sink}) { - options.set_max_page_fragment_size(val); - return *this; } } // namespace cudf::io From 0126d04372cf962b6659528bfe741cb2bf075839 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 23 May 2024 18:28:01 +0000 Subject: [PATCH 2/8] refactor python parquet_writer_options --- .../_lib/pylibcudf/libcudf/io/parquet.pxd | 61 +++++-------------- 1 file changed, 14 insertions(+), 47 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index 33a594b432f..eef86423b8e 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -66,24 +66,19 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cdef cudf_io_types.table_with_metadata read_parquet( parquet_reader_options args) except + - cdef cppclass parquet_writer_options: - parquet_writer_options() except + + cdef cppclass parquet_writer_options_base: + parquet_writer_options_base() except + cudf_io_types.sink_info get_sink_info() except + cudf_io_types.compression_type get_compression() except + cudf_io_types.statistics_freq get_stats_level() except + - cudf_table_view.table_view get_table() except + const optional[cudf_io_types.table_input_metadata]& get_metadata( ) except + - string get_column_chunks_file_paths() except + size_t get_row_group_size_bytes() except + size_type get_row_group_size_rows() except + size_t get_max_page_size_bytes() except + size_type get_max_page_size_rows() except + size_t get_max_dictionary_size() except + - void set_partitions( - vector[cudf_io_types.partition_info] partitions - ) except + void set_metadata( cudf_io_types.table_input_metadata m ) except + @@ -96,9 +91,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_compression( cudf_io_types.compression_type compression ) except + - void set_column_chunks_file_paths( - vector[string] column_chunks_file_paths - ) except + void set_int96_timestamps( bool enabled ) except + @@ -113,6 +105,17 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void enable_write_v2_headers(bool val) except + void set_dictionary_policy(cudf_io_types.dictionary_policy policy) except + + cdef cppclass parquet_writer_options(parquet_writer_options_base): + parquet_writer_options() except + + cudf_table_view.table_view get_table() except + + string get_column_chunks_file_paths() except + + void set_partitions( + vector[cudf_io_types.partition_info] partitions + ) except + + void set_column_chunks_file_paths( + vector[string] column_chunks_file_paths + ) except + + @staticmethod parquet_writer_options_builder builder( cudf_io_types.sink_info sink_, @@ -178,44 +181,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options args ) except + - cdef cppclass chunked_parquet_writer_options: + cdef cppclass chunked_parquet_writer_options(parquet_writer_options_base): chunked_parquet_writer_options() except + - cudf_io_types.sink_info get_sink() except + - cudf_io_types.compression_type get_compression() except + - cudf_io_types.statistics_freq get_stats_level() except + - const optional[cudf_io_types.table_input_metadata]& get_metadata( - ) except + - size_t get_row_group_size_bytes() except + - size_type get_row_group_size_rows() except + - size_t get_max_page_size_bytes() except + - size_type get_max_page_size_rows() except + - size_t get_max_dictionary_size() except + - - void set_metadata( - cudf_io_types.table_input_metadata m - ) except + - void set_key_value_metadata( - vector[map[string, string]] kvm - ) except + - void set_stats_level( - cudf_io_types.statistics_freq sf - ) except + - void set_compression( - cudf_io_types.compression_type compression - ) except + - void set_int96_timestamps( - bool enabled - ) except + - void set_utc_timestamps( - bool enabled - ) except + - void set_row_group_size_bytes(size_t val) except + - void set_row_group_size_rows(size_type val) except + - void set_max_page_size_bytes(size_t val) except + - void set_max_page_size_rows(size_type val) except + - void set_max_dictionary_size(size_t val) except + - void enable_write_v2_headers(bool val) except + - void set_dictionary_policy(cudf_io_types.dictionary_policy policy) except + @staticmethod chunked_parquet_writer_options_builder builder( From 01791c34c6f8c52a1460f0fb85eeef8751a7dfae Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 23 May 2024 18:42:15 +0000 Subject: [PATCH 3/8] refactor python builders --- .../_lib/pylibcudf/libcudf/io/parquet.pxd | 89 ++++++------------- 1 file changed, 28 insertions(+), 61 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index eef86423b8e..7b70aa92275 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -122,59 +122,63 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_table_view.table_view table_ ) except + - cdef cppclass parquet_writer_options_builder: - + cdef cppclass parquet_writer_options_builder_base[BuilderT]: parquet_writer_options_builder() except + - parquet_writer_options_builder( - cudf_io_types.sink_info sink_, - cudf_table_view.table_view table_ - ) except + - parquet_writer_options_builder& partitions( + + BuilderT& partitions( vector[cudf_io_types.partition_info] partitions ) except + - parquet_writer_options_builder& metadata( + BuilderT& metadata( cudf_io_types.table_input_metadata m ) except + - parquet_writer_options_builder& key_value_metadata( + BuilderT& key_value_metadata( vector[map[string, string]] kvm ) except + - parquet_writer_options_builder& stats_level( + BuilderT& stats_level( cudf_io_types.statistics_freq sf ) except + - parquet_writer_options_builder& compression( + BuilderT& compression( cudf_io_types.compression_type compression ) except + - parquet_writer_options_builder& column_chunks_file_paths( + BuilderT& column_chunks_file_paths( vector[string] column_chunks_file_paths ) except + - parquet_writer_options_builder& int96_timestamps( + BuilderT& int96_timestamps( bool enabled ) except + - parquet_writer_options_builder& utc_timestamps( + BuilderT& utc_timestamps( bool enabled ) except + - parquet_writer_options_builder& row_group_size_bytes( + BuilderT& row_group_size_bytes( size_t val ) except + - parquet_writer_options_builder& row_group_size_rows( + BuilderT& row_group_size_rows( size_type val ) except + - parquet_writer_options_builder& max_page_size_bytes( + BuilderT& max_page_size_bytes( size_t val ) except + - parquet_writer_options_builder& max_page_size_rows( + BuilderT& max_page_size_rows( size_type val ) except + - parquet_writer_options_builder& max_dictionary_size( + BuilderT& max_dictionary_size( size_t val ) except + - parquet_writer_options_builder& write_v2_headers( + BuilderT& write_v2_headers( bool val ) except + - parquet_writer_options_builder& dictionary_policy( + BuilderT& dictionary_policy( cudf_io_types.dictionary_policy val ) except + + cdef cppclass parquet_writer_options_builder( + parquet_writer_options_builder_base[parquet_writer_options_builder]): + parquet_writer_options_builder() except + + parquet_writer_options_builder( + cudf_io_types.sink_info sink_, + cudf_table_view.table_view table_ + ) except + + parquet_writer_options build() except + cdef unique_ptr[vector[uint8_t]] write_parquet( @@ -189,50 +193,13 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_io_types.sink_info sink_, ) except + - cdef cppclass chunked_parquet_writer_options_builder: + cdef cppclass chunked_parquet_writer_options_builder( + parquet_writer_options_builder_base[chunked_parquet_writer_options_builder] + ): chunked_parquet_writer_options_builder() except + chunked_parquet_writer_options_builder( cudf_io_types.sink_info sink_, ) except + - chunked_parquet_writer_options_builder& metadata( - cudf_io_types.table_input_metadata m - ) except + - chunked_parquet_writer_options_builder& key_value_metadata( - vector[map[string, string]] kvm - ) except + - chunked_parquet_writer_options_builder& stats_level( - cudf_io_types.statistics_freq sf - ) except + - chunked_parquet_writer_options_builder& compression( - cudf_io_types.compression_type compression - ) except + - chunked_parquet_writer_options_builder& int96_timestamps( - bool enabled - ) except + - chunked_parquet_writer_options_builder& utc_timestamps( - bool enabled - ) except + - chunked_parquet_writer_options_builder& row_group_size_bytes( - size_t val - ) except + - chunked_parquet_writer_options_builder& row_group_size_rows( - size_type val - ) except + - chunked_parquet_writer_options_builder& max_page_size_bytes( - size_t val - ) except + - chunked_parquet_writer_options_builder& max_page_size_rows( - size_type val - ) except + - chunked_parquet_writer_options_builder& max_dictionary_size( - size_t val - ) except + - parquet_writer_options_builder& write_v2_headers( - bool val - ) except + - parquet_writer_options_builder& dictionary_policy( - cudf_io_types.dictionary_policy val - ) except + chunked_parquet_writer_options build() except + From 23898ca10368a38690e85849703ad0f7fa3786dc Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 23 May 2024 18:51:31 +0000 Subject: [PATCH 4/8] missed some setters that belong in the child --- .../cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index 7b70aa92275..476e46a6c65 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -125,9 +125,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cdef cppclass parquet_writer_options_builder_base[BuilderT]: parquet_writer_options_builder() except + - BuilderT& partitions( - vector[cudf_io_types.partition_info] partitions - ) except + BuilderT& metadata( cudf_io_types.table_input_metadata m ) except + @@ -140,9 +137,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: BuilderT& compression( cudf_io_types.compression_type compression ) except + - BuilderT& column_chunks_file_paths( - vector[string] column_chunks_file_paths - ) except + BuilderT& int96_timestamps( bool enabled ) except + @@ -179,6 +173,13 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_table_view.table_view table_ ) except + + parquet_writer_options_builder& partitions( + vector[cudf_io_types.partition_info] partitions + ) except + + parquet_writer_options_builder& column_chunks_file_paths( + vector[string] column_chunks_file_paths + ) except + + parquet_writer_options build() except + cdef unique_ptr[vector[uint8_t]] write_parquet( From dce0934ab3152c4ebf7a61725552c779123feea1 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 24 May 2024 00:31:28 +0000 Subject: [PATCH 5/8] work around compiler errors --- .../cudf/_lib/pylibcudf/libcudf/io/parquet.pxd | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index 476e46a6c65..e77befe17f9 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -12,6 +12,9 @@ from libcpp.vector cimport vector cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types cimport cudf._lib.pylibcudf.libcudf.table.table_view as cudf_table_view from cudf._lib.pylibcudf.libcudf.expressions cimport expression +from cudf._lib.pylibcudf.libcudf.io.parquet cimport ( + parquet_writer_options_builder, +) from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type @@ -165,6 +168,14 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_io_types.dictionary_policy val ) except + + # forward declare derived builder type + cdef cppclass parquet_writer_options_builder( + parquet_writer_options_builder_base[parquet_writer_options_builder]) + + # now typedef the child type for use in method declarations + ctypedef parquet_writer_options_builder_base[ + parquet_writer_options_builder] writer_builder_type + cdef cppclass parquet_writer_options_builder( parquet_writer_options_builder_base[parquet_writer_options_builder]): parquet_writer_options_builder() except + @@ -173,10 +184,10 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_table_view.table_view table_ ) except + - parquet_writer_options_builder& partitions( + writer_builder_type& partitions( vector[cudf_io_types.partition_info] partitions ) except + - parquet_writer_options_builder& column_chunks_file_paths( + writer_builder_type& column_chunks_file_paths( vector[string] column_chunks_file_paths ) except + From 1857805a5046d4faac296b3243320e064355b53d Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 24 May 2024 09:41:05 -0700 Subject: [PATCH 6/8] workaround some cython issues with CRTP match C++ template args exactly and move build() into base class --- .../_lib/pylibcudf/libcudf/io/parquet.pxd | 41 +++++++++---------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index e77befe17f9..e3b4dd5a327 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -125,7 +125,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_table_view.table_view table_ ) except + - cdef cppclass parquet_writer_options_builder_base[BuilderT]: + cdef cppclass parquet_writer_options_builder_base[BuilderT, OptionsT]: parquet_writer_options_builder() except + BuilderT& metadata( @@ -167,32 +167,30 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: BuilderT& dictionary_policy( cudf_io_types.dictionary_policy val ) except + - - # forward declare derived builder type - cdef cppclass parquet_writer_options_builder( - parquet_writer_options_builder_base[parquet_writer_options_builder]) - - # now typedef the child type for use in method declarations - ctypedef parquet_writer_options_builder_base[ - parquet_writer_options_builder] writer_builder_type + # FIXME: the following two functions actually belong in + # parquet_writer_options_builder, but placing them there yields a + # "'parquet_writer_options_builder' is not a type identifier" error. + # This is probably a bug in cython since a simpler CRTP example that + # has methods returning references to a child class seem to work. + # Calling these from the chunked options builder will fail at compile + # time, so this should be safe. + BuilderT& partitions( + vector[cudf_io_types.partition_info] partitions + ) except + + BuilderT& column_chunks_file_paths( + vector[string] column_chunks_file_paths + ) except + + OptionsT build() except + cdef cppclass parquet_writer_options_builder( - parquet_writer_options_builder_base[parquet_writer_options_builder]): + parquet_writer_options_builder_base[parquet_writer_options_builder, + parquet_writer_options]): parquet_writer_options_builder() except + parquet_writer_options_builder( cudf_io_types.sink_info sink_, cudf_table_view.table_view table_ ) except + - writer_builder_type& partitions( - vector[cudf_io_types.partition_info] partitions - ) except + - writer_builder_type& column_chunks_file_paths( - vector[string] column_chunks_file_paths - ) except + - - parquet_writer_options build() except + - cdef unique_ptr[vector[uint8_t]] write_parquet( parquet_writer_options args ) except + @@ -206,15 +204,14 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: ) except + cdef cppclass chunked_parquet_writer_options_builder( - parquet_writer_options_builder_base[chunked_parquet_writer_options_builder] + parquet_writer_options_builder_base[chunked_parquet_writer_options_builder, + chunked_parquet_writer_options] ): chunked_parquet_writer_options_builder() except + chunked_parquet_writer_options_builder( cudf_io_types.sink_info sink_, ) except + - chunked_parquet_writer_options build() except + - cdef cppclass parquet_chunked_writer: parquet_chunked_writer() except + parquet_chunked_writer(chunked_parquet_writer_options args) except + From ceb621d83bec53230b0a5f6b0eb41491b23500fa Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 24 May 2024 16:48:02 +0000 Subject: [PATCH 7/8] add another note --- python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index e3b4dd5a327..dfc9bbc5ffe 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -174,6 +174,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: # has methods returning references to a child class seem to work. # Calling these from the chunked options builder will fail at compile # time, so this should be safe. + # NOTE: these two are never actually called from libcudf. Instead these + # properties are set in the options after calling build(), so perhaps + # they can be removed. BuilderT& partitions( vector[cudf_io_types.partition_info] partitions ) except + From e8ebed3ddcb27ef4903f8772ea7bb916fd3a3160 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 24 May 2024 10:28:33 -0700 Subject: [PATCH 8/8] remove leftover code from experimentation --- python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index dfc9bbc5ffe..08bb49ab1b4 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -12,9 +12,6 @@ from libcpp.vector cimport vector cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types cimport cudf._lib.pylibcudf.libcudf.table.table_view as cudf_table_view from cudf._lib.pylibcudf.libcudf.expressions cimport expression -from cudf._lib.pylibcudf.libcudf.io.parquet cimport ( - parquet_writer_options_builder, -) from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type