From d7e7c0baef522511bb31af93094eb11e538e5f1b Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Mon, 1 May 2023 14:21:45 -0700 Subject: [PATCH] Refactor Parquet chunked writer (#13076) Similar to https://github.com/rapidsai/cudf/pull/12949, this refactors Parquet writer to support retry mechanism. The internal `writer::impl::write()` function is rewritten such that it is separated into multiple pieces: * A free function that performs compressing/encoding the input table into intermediate results. These intermediate results are totally independent of the writer. * After having the intermediate results in the previous step, these results will be actually applied to the output data sink to start the actual data writing. Closes: * https://github.com/rapidsai/cudf/issues/13042 Depends on: * https://github.com/rapidsai/cudf/pull/13206 Authors: - Nghia Truong (https://github.com/ttnghia) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - https://github.com/nvdbaranec URL: https://github.com/rapidsai/cudf/pull/13076 --- cpp/src/io/orc/writer_impl.hpp | 2 +- cpp/src/io/parquet/writer_impl.cu | 945 ++++++++++++++++++----------- cpp/src/io/parquet/writer_impl.hpp | 115 +--- 3 files changed, 624 insertions(+), 438 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 2b04b418b41..cdcc092549a 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -291,7 +291,7 @@ class writer::impl { * @brief Write the intermediate ORC data into the data sink. * * The intermediate data is generated from processing (compressing/encoding) an cuDF input table - * by `process_for_write` called in the `write()` function. + * by `convert_table_to_orc_data` called in the `write()` function. * * @param[in] enc_data ORC per-chunk streams of encoded data * @param[in] segmentation Description of how the ORC file is segmented into stripes and rowgroups diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 6cd2601ac1b..cc27455e1c0 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -32,28 +32,21 @@ #include #include -#include #include #include #include #include #include -#include #include #include #include #include #include -#include #include -#include +#include #include -#include -#include -#include -#include #include #include @@ -67,69 +60,15 @@ namespace parquet { using namespace cudf::io::parquet; using namespace cudf::io; -namespace { - -/** - * @brief Function that translates GDF compression to parquet compression - */ -parquet::Compression to_parquet_compression(compression_type compression) -{ - switch (compression) { - case compression_type::AUTO: - case compression_type::SNAPPY: return parquet::Compression::SNAPPY; - case compression_type::ZSTD: return parquet::Compression::ZSTD; - case compression_type::NONE: return parquet::Compression::UNCOMPRESSED; - default: CUDF_FAIL("Unsupported compression type"); - } -} - -size_t column_size(column_view const& column, rmm::cuda_stream_view stream) -{ - if (column.size() == 0) { return 0; } - - if (is_fixed_width(column.type())) { - return size_of(column.type()) * column.size(); - } else if (column.type().id() == type_id::STRING) { - auto const scol = strings_column_view(column); - return cudf::detail::get_value(scol.offsets(), column.size(), stream) - - cudf::detail::get_value(scol.offsets(), 0, stream); - } else if (column.type().id() == type_id::STRUCT) { - auto const scol = structs_column_view(column); - size_t ret = 0; - for (int i = 0; i < scol.num_children(); i++) { - ret += column_size(scol.get_sliced_child(i, stream), stream); - } - return ret; - } else if (column.type().id() == type_id::LIST) { - auto const lcol = lists_column_view(column); - return column_size(lcol.get_sliced_child(stream), stream); - } - - CUDF_FAIL("Unexpected compound type"); -} - -// checks to see if the given column has a fixed size. This doesn't -// check every row, so assumes string and list columns are not fixed, even -// if each row is the same width. -// TODO: update this if FIXED_LEN_BYTE_ARRAY is ever supported for writes. -bool is_col_fixed_width(column_view const& column) -{ - if (column.type().id() == type_id::STRUCT) { - return std::all_of(column.child_begin(), column.child_end(), is_col_fixed_width); - } - - return is_fixed_width(column.type()); -} - -} // namespace - struct aggregate_writer_metadata { - aggregate_writer_metadata(std::vector const& partitions, + aggregate_writer_metadata(host_span partitions, + host_span const> kv_md, + host_span tbl_schema, size_type num_columns, - std::vector schema, - statistics_freq stats_granularity, - std::vector> const& kv_md) - : version(1), schema(std::move(schema)), files(partitions.size()) + statistics_freq stats_granularity) + : version(1), + schema(std::vector(tbl_schema.begin(), tbl_schema.end())), + files(partitions.size()) { for (size_t i = 0; i < partitions.size(); ++i) { this->files[i].num_rows = partitions[i].num_rows; @@ -147,7 +86,9 @@ struct aggregate_writer_metadata { } } - void update_files(std::vector const& partitions) + aggregate_writer_metadata(aggregate_writer_metadata const&) = default; + + void update_files(host_span partitions) { CUDF_EXPECTS(partitions.size() == this->files.size(), "New partitions must be same size as previously passed number of partitions"); @@ -170,7 +111,7 @@ struct aggregate_writer_metadata { return meta; } - void set_file_paths(std::vector const& column_chunks_file_path) + void set_file_paths(host_span column_chunks_file_path) { for (size_t p = 0; p < this->files.size(); ++p) { auto& file = this->files[p]; @@ -232,6 +173,70 @@ struct aggregate_writer_metadata { uint32_t column_order_listsize = 0; }; +namespace { + +/** + * @brief Function that translates GDF compression to parquet compression. + * + * @param compression The compression type + * @return The supported Parquet compression + */ +parquet::Compression to_parquet_compression(compression_type compression) +{ + switch (compression) { + case compression_type::AUTO: + case compression_type::SNAPPY: return parquet::Compression::SNAPPY; + case compression_type::ZSTD: return parquet::Compression::ZSTD; + case compression_type::NONE: return parquet::Compression::UNCOMPRESSED; + default: CUDF_FAIL("Unsupported compression type"); + } +} + +/** + * @brief Compute size (in bytes) of the data stored in the given column. + * + * @param column The input column + * @param stream CUDA stream used for device memory operations and kernel launches + * @return The data size of the input + */ +size_t column_size(column_view const& column, rmm::cuda_stream_view stream) +{ + if (column.size() == 0) { return 0; } + + if (is_fixed_width(column.type())) { + return size_of(column.type()) * column.size(); + } else if (column.type().id() == type_id::STRING) { + auto const scol = strings_column_view(column); + return cudf::detail::get_value(scol.offsets(), column.size(), stream) - + cudf::detail::get_value(scol.offsets(), 0, stream); + } else if (column.type().id() == type_id::STRUCT) { + auto const scol = structs_column_view(column); + size_t ret = 0; + for (int i = 0; i < scol.num_children(); i++) { + ret += column_size(scol.get_sliced_child(i), stream); + } + return ret; + } else if (column.type().id() == type_id::LIST) { + auto const lcol = lists_column_view(column); + return column_size(lcol.get_sliced_child(stream), stream); + } + + CUDF_FAIL("Unexpected compound type"); +} + +// checks to see if the given column has a fixed size. This doesn't +// check every row, so assumes string and list columns are not fixed, even +// if each row is the same width. +// TODO: update this if FIXED_LEN_BYTE_ARRAY is ever supported for writes. +bool is_col_fixed_width(column_view const& column) +{ + if (column.type().id() == type_id::STRUCT) { + return std::all_of(column.child_begin(), column.child_end(), is_col_fixed_width); + } + + return is_fixed_width(column.type()); +} + /** * @brief Extends SchemaElement to add members required in constructing parquet_column_view * @@ -921,37 +926,69 @@ gpu::parquet_column_device_view parquet_column_view::get_device_view( return desc; } -void writer::impl::init_row_group_fragments( - cudf::detail::hostdevice_2dvector& frag, - device_span col_desc, - host_span partitions, - device_span part_frag_offset, - uint32_t fragment_size) +/** + * @brief Gather row group fragments + * + * This calculates fragments to be used in determining row group boundaries. + * + * @param frag Destination row group fragments + * @param col_desc column description array + * @param partitions Information about partitioning of table + * @param part_frag_offset A Partition's offset into fragment array + * @param fragment_size Number of rows per fragment + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void init_row_group_fragments(cudf::detail::hostdevice_2dvector& frag, + device_span col_desc, + host_span partitions, + device_span part_frag_offset, + uint32_t fragment_size, + rmm::cuda_stream_view stream) { auto d_partitions = cudf::detail::make_device_uvector_async( - partitions, _stream, rmm::mr::get_current_device_resource()); - gpu::InitRowGroupFragments( - frag, col_desc, d_partitions, part_frag_offset, fragment_size, _stream); - frag.device_to_host(_stream, true); + partitions, stream, rmm::mr::get_current_device_resource()); + gpu::InitRowGroupFragments(frag, col_desc, d_partitions, part_frag_offset, fragment_size, stream); + frag.device_to_host(stream, true); } -void writer::impl::calculate_page_fragments(device_span frag, - host_span frag_sizes) +/** + * @brief Recalculate page fragments + * + * This calculates fragments to be used to determine page boundaries within + * column chunks. + * + * @param frag Destination page fragments + * @param frag_sizes Array of fragment sizes for each column + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void calculate_page_fragments(device_span frag, + host_span frag_sizes, + rmm::cuda_stream_view stream) { auto d_frag_sz = cudf::detail::make_device_uvector_async( - frag_sizes, _stream, rmm::mr::get_current_device_resource()); - gpu::CalculatePageFragments(frag, d_frag_sz, _stream); + frag_sizes, stream, rmm::mr::get_current_device_resource()); + gpu::CalculatePageFragments(frag, d_frag_sz, stream); } -void writer::impl::gather_fragment_statistics(device_span frag_stats, - device_span frags) +/** + * @brief Gather per-fragment statistics + * + * @param frag_stats output statistics + * @param frags Input page fragments + * @param int96_timestamps Flag to indicate if timestamps will be written as INT96 + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void gather_fragment_statistics(device_span frag_stats, + device_span frags, + bool int96_timestamps, + rmm::cuda_stream_view stream) { - rmm::device_uvector frag_stats_group(frag_stats.size(), _stream); + rmm::device_uvector frag_stats_group(frag_stats.size(), stream); - gpu::InitFragmentStatistics(frag_stats_group, frags, _stream); + gpu::InitFragmentStatistics(frag_stats_group, frags, stream); detail::calculate_group_statistics( - frag_stats.data(), frag_stats_group.data(), frag_stats.size(), _stream, _int96_timestamps); - _stream.synchronize(); + frag_stats.data(), frag_stats_group.data(), frag_stats.size(), stream, int96_timestamps); + stream.synchronize(); } auto to_nvcomp_compression_type(Compression codec) @@ -1169,55 +1206,98 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, return std::pair(std::move(dict_data), std::move(dict_index)); } -void writer::impl::init_encoder_pages(hostdevice_2dvector& chunks, - device_span col_desc, - device_span pages, - hostdevice_vector& comp_page_sizes, - statistics_chunk* page_stats, - statistics_chunk* frag_stats, - uint32_t num_columns, - uint32_t num_pages, - uint32_t num_stats_bfr) +/** + * @brief Initialize encoder pages. + * + * @param chunks Column chunk array + * @param col_desc Column description array + * @param pages Encoder pages array + * @param comp_page_sizes Per-page max compressed size + * @param page_stats Page statistics array + * @param frag_stats Fragment statistics array + * @param num_columns Total number of columns + * @param num_pages Total number of pages + * @param num_stats_bfr Number of statistics buffers + * @param compression Compression format + * @param max_page_size_bytes Maximum uncompressed page size, in bytes + * @param max_page_size_rows Maximum page size, in rows + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void init_encoder_pages(hostdevice_2dvector& chunks, + device_span col_desc, + device_span pages, + hostdevice_vector& comp_page_sizes, + statistics_chunk* page_stats, + statistics_chunk* frag_stats, + uint32_t num_columns, + uint32_t num_pages, + uint32_t num_stats_bfr, + Compression compression, + size_t max_page_size_bytes, + size_type max_page_size_rows, + rmm::cuda_stream_view stream) { - rmm::device_uvector page_stats_mrg(num_stats_bfr, _stream); - chunks.host_to_device(_stream); + rmm::device_uvector page_stats_mrg(num_stats_bfr, stream); + chunks.host_to_device(stream); InitEncoderPages(chunks, pages, {}, comp_page_sizes, col_desc, num_columns, - _max_page_size_bytes, - _max_page_size_rows, - page_alignment(_compression), + max_page_size_bytes, + max_page_size_rows, + page_alignment(compression), (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, - _stream); + stream); if (num_stats_bfr > 0) { detail::merge_group_statistics( - page_stats, frag_stats, page_stats_mrg.data(), num_pages, _stream); + page_stats, frag_stats, page_stats_mrg.data(), num_pages, stream); if (num_stats_bfr > num_pages) { detail::merge_group_statistics( page_stats + num_pages, page_stats, page_stats_mrg.data() + num_pages, num_stats_bfr - num_pages, - _stream); + stream); } } - _stream.synchronize(); + stream.synchronize(); } -void writer::impl::encode_pages(hostdevice_2dvector& chunks, - device_span pages, - size_t max_page_uncomp_data_size, - uint32_t pages_in_batch, - uint32_t first_page_in_batch, - uint32_t rowgroups_in_batch, - uint32_t first_rowgroup, - const statistics_chunk* page_stats, - const statistics_chunk* chunk_stats, - const statistics_chunk* column_stats) +/** + * @brief Encode a batch of pages. + * + * @throws rmm::bad_alloc if there is insufficient space for temporary buffers + * + * @param chunks column chunk array + * @param pages encoder pages array + * @param max_page_uncomp_data_size maximum uncompressed size of any page's data + * @param pages_in_batch number of pages in this batch + * @param first_page_in_batch first page in batch + * @param rowgroups_in_batch number of rowgroups in this batch + * @param first_rowgroup first rowgroup in batch + * @param page_stats optional page-level statistics (nullptr if none) + * @param chunk_stats optional chunk-level statistics (nullptr if none) + * @param column_stats optional page-level statistics for column index (nullptr if none) + * @param compression compression format + * @param column_index_truncate_length maximum length of min or max values in column index, in bytes + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void encode_pages(hostdevice_2dvector& chunks, + device_span pages, + size_t max_page_uncomp_data_size, + uint32_t pages_in_batch, + uint32_t first_page_in_batch, + uint32_t rowgroups_in_batch, + uint32_t first_rowgroup, + const statistics_chunk* page_stats, + const statistics_chunk* chunk_stats, + const statistics_chunk* column_stats, + Compression compression, + int32_t column_index_truncate_length, + rmm::cuda_stream_view stream) { auto batch_pages = pages.subspan(first_page_in_batch, pages_in_batch); @@ -1227,24 +1307,24 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks : device_span(); uint32_t max_comp_pages = - (_compression != parquet::Compression::UNCOMPRESSED) ? pages_in_batch : 0; + (compression != parquet::Compression::UNCOMPRESSED) ? pages_in_batch : 0; - rmm::device_uvector> comp_in(max_comp_pages, _stream); - rmm::device_uvector> comp_out(max_comp_pages, _stream); - rmm::device_uvector comp_res(max_comp_pages, _stream); - thrust::fill(rmm::exec_policy(_stream), + rmm::device_uvector> comp_in(max_comp_pages, stream); + rmm::device_uvector> comp_out(max_comp_pages, stream); + rmm::device_uvector comp_res(max_comp_pages, stream); + thrust::fill(rmm::exec_policy(stream), comp_res.begin(), comp_res.end(), compression_result{0, compression_status::FAILURE}); - gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, _stream); - switch (_compression) { + gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, stream); + switch (compression) { case parquet::Compression::SNAPPY: if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_snap(comp_in, comp_out, comp_res, _stream); + gpu_snap(comp_in, comp_out, comp_res, stream); } else { nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, _stream); + nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); } break; case parquet::Compression::ZSTD: { @@ -1252,8 +1332,7 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks reason) { CUDF_FAIL("Compression error: " + reason.value()); } - nvcomp::batched_compress( - nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, _stream); + nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); break; } @@ -1263,16 +1342,17 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks // TBD: Not clear if the official spec actually allows dynamically turning off compression at the // chunk-level + auto d_chunks_in_batch = chunks.device_view().subspan(first_rowgroup, rowgroups_in_batch); - DecideCompression(d_chunks_in_batch.flat_view(), _stream); - EncodePageHeaders(batch_pages, comp_res, batch_pages_stats, chunk_stats, _stream); - GatherPages(d_chunks_in_batch.flat_view(), pages, _stream); + DecideCompression(d_chunks_in_batch.flat_view(), stream); + EncodePageHeaders(batch_pages, comp_res, batch_pages_stats, chunk_stats, stream); + GatherPages(d_chunks_in_batch.flat_view(), pages, stream); if (column_stats != nullptr) { EncodeColumnIndexes(d_chunks_in_batch.flat_view(), {column_stats, pages.size()}, - _column_index_truncate_length, - _stream); + column_index_truncate_length, + stream); } auto h_chunks_in_batch = chunks.host_view().subspan(first_rowgroup, rowgroups_in_batch); @@ -1280,11 +1360,19 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks d_chunks_in_batch.data(), d_chunks_in_batch.flat_view().size_bytes(), cudaMemcpyDefault, - _stream.value())); - _stream.synchronize(); + stream.value())); + stream.synchronize(); } -size_t writer::impl::column_index_buffer_size(gpu::EncColumnChunk* ck) const +/** + * @brief Function to calculate the memory needed to encode the column index of the given + * column chunk. + * + * @param ck pointer to column chunk + * @param column_index_truncate_length maximum length of min or max values in column index, in bytes + * @return Computed buffer size needed to encode the column index + */ +size_t column_index_buffer_size(gpu::EncColumnChunk* ck, int32_t column_index_truncate_length) { // encoding the column index for a given chunk requires: // each list (4 of them) requires 6 bytes of overhead @@ -1310,82 +1398,18 @@ size_t writer::impl::column_index_buffer_size(gpu::EncColumnChunk* ck) const // // calculating this per-chunk because the sizes can be wildly different. constexpr size_t padding = 7; - return ck->ck_stat_size * ck->num_pages + _column_index_truncate_length + padding; -} - -writer::impl::impl(std::vector> sinks, - parquet_writer_options const& options, - single_write_mode mode, - rmm::cuda_stream_view stream) - : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), - _max_row_group_size{options.get_row_group_size_bytes()}, - _max_row_group_rows{options.get_row_group_size_rows()}, - _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), - _max_page_size_rows(options.get_max_page_size_rows()), - _stats_granularity(options.get_stats_level()), - _dict_policy(options.get_dictionary_policy()), - _max_dictionary_size(options.get_max_dictionary_size()), - _max_page_fragment_size(options.get_max_page_fragment_size()), - _int96_timestamps(options.is_enabled_int96_timestamps()), - _column_index_truncate_length(options.get_column_index_truncate_length()), - _kv_meta(options.get_key_value_metadata()), - _single_write_mode(mode), - _out_sink(std::move(sinks)) -{ - if (options.get_metadata()) { - _table_meta = std::make_unique(*options.get_metadata()); - } - init_state(); + return ck->ck_stat_size * ck->num_pages + column_index_truncate_length + padding; } -writer::impl::impl(std::vector> sinks, - chunked_parquet_writer_options const& options, - single_write_mode mode, - rmm::cuda_stream_view stream) - : _stream(stream), - _compression(to_parquet_compression(options.get_compression())), - _max_row_group_size{options.get_row_group_size_bytes()}, - _max_row_group_rows{options.get_row_group_size_rows()}, - _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), - _max_page_size_rows(options.get_max_page_size_rows()), - _stats_granularity(options.get_stats_level()), - _dict_policy(options.get_dictionary_policy()), - _max_dictionary_size(options.get_max_dictionary_size()), - _max_page_fragment_size(options.get_max_page_fragment_size()), - _int96_timestamps(options.is_enabled_int96_timestamps()), - _column_index_truncate_length(options.get_column_index_truncate_length()), - _kv_meta(options.get_key_value_metadata()), - _single_write_mode(mode), - _out_sink(std::move(sinks)) -{ - if (options.get_metadata()) { - _table_meta = std::make_unique(*options.get_metadata()); - } - init_state(); -} - -writer::impl::~impl() { close(); } - -void writer::impl::init_state() -{ - _current_chunk_offset.resize(_out_sink.size()); - // Write file header - file_header_s fhdr; - fhdr.magic = parquet_magic; - for (auto& sink : _out_sink) { - sink->host_write(&fhdr, sizeof(fhdr)); - } - std::fill_n(_current_chunk_offset.begin(), _current_chunk_offset.size(), sizeof(file_header_s)); -} - -void writer::impl::write(table_view const& table, std::vector const& partitions) +/** + * @brief Fill the table metadata with default column names. + * + * @param table_meta The table metadata to fill + * @param input The input CUDF table + */ +void fill_table_meta(std::unique_ptr const& table_meta, + table_view const& input) { - _last_write_successful = false; - CUDF_EXPECTS(not _closed, "Data has already been flushed to out and closed"); - - if (not _table_meta) { _table_meta = std::make_unique(table); } - // Fill unnamed columns' names in table_meta std::function add_default_name = [&](column_in_metadata& col_meta, std::string default_name) { @@ -1394,20 +1418,65 @@ void writer::impl::write(table_view const& table, std::vector co add_default_name(col_meta.child(i), col_meta.get_name() + "_" + std::to_string(i)); } }; - for (size_t i = 0; i < _table_meta->column_metadata.size(); ++i) { - add_default_name(_table_meta->column_metadata[i], "_col" + std::to_string(i)); + for (size_t i = 0; i < table_meta->column_metadata.size(); ++i) { + add_default_name(table_meta->column_metadata[i], "_col" + std::to_string(i)); } +} - auto vec = table_to_linked_columns(table); - auto schema_tree = - construct_schema_tree(vec, *_table_meta, _single_write_mode, _int96_timestamps); +/** + * @brief Perform the processing steps needed to convert the input table into the output Parquet + * data for writing, such as compression and encoding. + * + * @param[in,out] table_meta The table metadata + * @param input The input table + * @param partitions Optional partitions to divide the table into, if specified then must be same + * size as number of sinks + * @param kv_meta Optional user metadata + * @param curr_agg_meta The current aggregate writer metadata + * @param max_page_fragment_size_opt Optional maximum number of rows in a page fragment + * @param max_row_group_size Maximum row group size, in bytes + * @param max_page_size_bytes Maximum uncompressed page size, in bytes + * @param max_row_group_rows Maximum row group size, in rows + * @param max_page_size_rows Maximum page size, in rows + * @param column_index_truncate_length maximum length of min or max values in column index, in bytes + * @param stats_granularity Level of statistics requested in output file + * @param compression Compression format + * @param dict_policy Policy for dictionary use + * @param max_dictionary_size Maximum dictionary size, in bytes + * @param single_write_mode Flag to indicate that we are guaranteeing a single table write + * @param int96_timestamps Flag to indicate if timestamps will be written as INT96 + * @param out_sink Sink for checking if device write is supported, should not be used to write any + * data in this function + * @param stream CUDA stream used for device memory operations and kernel launches + * @return A tuple of the intermediate results containing the processed data + */ +auto convert_table_to_parquet_data(table_input_metadata& table_meta, + table_view const& input, + host_span partitions, + host_span const> kv_meta, + std::unique_ptr const& curr_agg_meta, + std::optional max_page_fragment_size_opt, + size_t max_row_group_size, + size_t max_page_size_bytes, + size_type max_row_group_rows, + size_type max_page_size_rows, + int32_t column_index_truncate_length, + statistics_freq stats_granularity, + Compression compression, + dictionary_policy dict_policy, + size_t max_dictionary_size, + single_write_mode write_mode, + bool int96_timestamps, + host_span const> out_sink, + rmm::cuda_stream_view stream) +{ + auto vec = table_to_linked_columns(input); + auto schema_tree = construct_schema_tree(vec, table_meta, write_mode, int96_timestamps); // Construct parquet_column_views from the schema tree leaf nodes. std::vector parquet_columns; for (schema_tree_node const& schema_node : schema_tree) { - if (schema_node.leaf_column) { - parquet_columns.emplace_back(schema_node, schema_tree, _stream); - } + if (schema_node.leaf_column) { parquet_columns.emplace_back(schema_node, schema_tree, stream); } } // Mass allocation of column_device_views for each parquet_column_view @@ -1421,48 +1490,34 @@ void writer::impl::write(table_view const& table, std::vector co std::vector this_table_schema(schema_tree.begin(), schema_tree.end()); - if (!_agg_meta) { - _agg_meta = std::make_unique( - partitions, num_columns, std::move(this_table_schema), _stats_granularity, _kv_meta); - } else { - // verify the user isn't passing mismatched tables - CUDF_EXPECTS(_agg_meta->schema_matches(this_table_schema), - "Mismatch in schema between multiple calls to write_chunk"); - - _agg_meta->update_files(partitions); - } - // Create table_device_view so that corresponding column_device_view data - // can be written into col_desc members - auto parent_column_table_device_view = table_device_view::create(single_streams_table, _stream); - rmm::device_uvector leaf_column_views(0, _stream); - // Initialize column description - hostdevice_vector col_desc(parquet_columns.size(), _stream); + hostdevice_vector col_desc(parquet_columns.size(), stream); std::transform( parquet_columns.begin(), parquet_columns.end(), col_desc.host_ptr(), [&](auto const& pcol) { - return pcol.get_device_view(_stream); + return pcol.get_device_view(stream); }); // Init page fragments // 5000 is good enough for up to ~200-character strings. Longer strings and deeply nested columns // will start producing fragments larger than the desired page size, so calculate fragment sizes // for each leaf column. Skip if the fragment size is not the default. - auto max_page_fragment_size = _max_page_fragment_size.value_or(default_max_page_fragment_size); + size_type max_page_fragment_size = + max_page_fragment_size_opt.value_or(default_max_page_fragment_size); std::vector column_frag_size(num_columns, max_page_fragment_size); - if (table.num_rows() > 0 && not _max_page_fragment_size.has_value()) { + if (input.num_rows() > 0 && not max_page_fragment_size_opt.has_value()) { std::vector column_sizes; std::transform(single_streams_table.begin(), single_streams_table.end(), std::back_inserter(column_sizes), - [this](auto const& column) { return column_size(column, _stream); }); + [&](auto const& column) { return column_size(column, stream); }); // adjust global fragment size if a single fragment will overrun a rowgroup auto const table_size = std::reduce(column_sizes.begin(), column_sizes.end()); - auto const avg_row_len = util::div_rounding_up_safe(table_size, table.num_rows()); + auto const avg_row_len = util::div_rounding_up_safe(table_size, input.num_rows()); if (avg_row_len > 0) { - auto const rg_frag_size = util::div_rounding_up_safe(_max_row_group_size, avg_row_len); + auto const rg_frag_size = util::div_rounding_up_safe(max_row_group_size, avg_row_len); max_page_fragment_size = std::min(rg_frag_size, max_page_fragment_size); } @@ -1474,9 +1529,9 @@ void writer::impl::write(table_view const& table, std::vector co auto frag_size_fn = [&](auto const& col, size_type col_size) { const int target_frags_per_page = is_col_fixed_width(col) ? 1 : 4; auto const avg_len = - target_frags_per_page * util::div_rounding_up_safe(col_size, table.num_rows()); + target_frags_per_page * util::div_rounding_up_safe(col_size, input.num_rows()); if (avg_len > 0) { - auto const frag_size = util::div_rounding_up_safe(_max_page_size_bytes, avg_len); + auto const frag_size = util::div_rounding_up_safe(max_page_size_bytes, avg_len); return std::min(max_page_fragment_size, frag_size); } else { return max_page_fragment_size; @@ -1499,7 +1554,7 @@ void writer::impl::write(table_view const& table, std::vector co std::transform(partitions.begin(), partitions.end(), std::back_inserter(num_frag_in_part), - [this, max_page_fragment_size](auto const& part) { + [max_page_fragment_size](auto const& part) { return util::div_rounding_up_unsafe(part.num_rows, max_page_fragment_size); }); @@ -1511,24 +1566,48 @@ void writer::impl::write(table_view const& table, std::vector co part_frag_offset.push_back(part_frag_offset.back() + num_frag_in_part.back()); auto d_part_frag_offset = cudf::detail::make_device_uvector_async( - part_frag_offset, _stream, rmm::mr::get_current_device_resource()); + part_frag_offset, stream, rmm::mr::get_current_device_resource()); cudf::detail::hostdevice_2dvector row_group_fragments( - num_columns, num_fragments, _stream); + num_columns, num_fragments, stream); + + // Create table_device_view so that corresponding column_device_view data + // can be written into col_desc members + // These are unused but needs to be kept alive. + auto parent_column_table_device_view = table_device_view::create(single_streams_table, stream); + rmm::device_uvector leaf_column_views(0, stream); if (num_fragments != 0) { // Move column info to device - col_desc.host_to_device(_stream); + col_desc.host_to_device(stream); leaf_column_views = create_leaf_column_device_views( - col_desc, *parent_column_table_device_view, _stream); + col_desc, *parent_column_table_device_view, stream); + + init_row_group_fragments(row_group_fragments, + col_desc, + partitions, + d_part_frag_offset, + max_page_fragment_size, + stream); + } + + std::unique_ptr agg_meta; + if (!curr_agg_meta) { + agg_meta = std::make_unique( + partitions, kv_meta, this_table_schema, num_columns, stats_granularity); + } else { + agg_meta = std::make_unique(*curr_agg_meta); + + // verify the user isn't passing mismatched tables + CUDF_EXPECTS(agg_meta->schema_matches(this_table_schema), + "Mismatch in schema between multiple calls to write_chunk"); - init_row_group_fragments( - row_group_fragments, col_desc, partitions, d_part_frag_offset, max_page_fragment_size); + agg_meta->update_files(partitions); } - std::vector const global_rowgroup_base = _agg_meta->num_row_groups_per_file(); + auto global_rowgroup_base = agg_meta->num_row_groups_per_file(); // Decide row group boundaries based on uncompressed data size - int num_rowgroups = 0; + size_type num_rowgroups = 0; std::vector num_rg_in_part(partitions.size()); for (size_t p = 0; p < partitions.size(); ++p) { @@ -1545,9 +1624,9 @@ void writer::impl::write(table_view const& table, std::vector co // If the fragment size gets larger than rg limit then break off a rg if (f > first_frag_in_rg && // There has to be at least one fragment in row group - (curr_rg_data_size + fragment_data_size > _max_row_group_size || - curr_rg_num_rows + fragment_num_rows > _max_row_group_rows)) { - auto& rg = _agg_meta->file(p).row_groups.emplace_back(); + (curr_rg_data_size + fragment_data_size > max_row_group_size || + curr_rg_num_rows + fragment_num_rows > max_row_group_rows)) { + auto& rg = agg_meta->file(p).row_groups.emplace_back(); rg.num_rows = curr_rg_num_rows; num_rowgroups++; num_rg_in_part[p]++; @@ -1560,7 +1639,7 @@ void writer::impl::write(table_view const& table, std::vector co // TODO: (wishful) refactor to consolidate with above if block if (f == last_frag_in_part) { - auto& rg = _agg_meta->file(p).row_groups.emplace_back(); + auto& rg = agg_meta->file(p).row_groups.emplace_back(); rg.num_rows = curr_rg_num_rows; num_rowgroups++; num_rg_in_part[p]++; @@ -1574,7 +1653,7 @@ void writer::impl::write(table_view const& table, std::vector co // Initialize row groups and column chunks auto const num_chunks = num_rowgroups * num_columns; - hostdevice_2dvector chunks(num_rowgroups, num_columns, _stream); + hostdevice_2dvector chunks(num_rowgroups, num_columns, stream); // total fragments per column (in case they are non-uniform) std::vector frags_per_column(num_columns, 0); @@ -1584,7 +1663,7 @@ void writer::impl::write(table_view const& table, std::vector co size_type start_row = partitions[p].start_row; for (int r = 0; r < num_rg_in_part[p]; r++) { size_t global_r = global_rowgroup_base[p] + r; // Number of rowgroups already in file/part - auto& row_group = _agg_meta->file(p).row_groups[global_r]; + auto& row_group = agg_meta->file(p).row_groups[global_r]; uint32_t fragments_in_chunk = util::div_rounding_up_unsafe(row_group.num_rows, max_page_fragment_size); row_group.total_byte_size = 0; @@ -1629,20 +1708,15 @@ void writer::impl::write(table_view const& table, std::vector co } } - row_group_fragments.host_to_device(_stream); - auto dict_info_owner = build_chunk_dictionaries(chunks, - col_desc, - row_group_fragments, - _compression, - _dict_policy, - _max_dictionary_size, - _stream); + row_group_fragments.host_to_device(stream); + auto dict_info_owner = build_chunk_dictionaries( + chunks, col_desc, row_group_fragments, compression, dict_policy, max_dictionary_size, stream); for (size_t p = 0; p < partitions.size(); p++) { for (int rg = 0; rg < num_rg_in_part[p]; rg++) { size_t global_rg = global_rowgroup_base[p] + rg; for (int col = 0; col < num_columns; col++) { if (chunks.host_view()[rg][col].use_dictionary) { - _agg_meta->file(p).row_groups[global_rg].columns[col].meta_data.encodings.push_back( + agg_meta->file(p).row_groups[global_rg].columns[col].meta_data.encodings.push_back( Encoding::PLAIN_DICTIONARY); } } @@ -1666,13 +1740,13 @@ void writer::impl::write(table_view const& table, std::vector co } }(); - rmm::device_uvector frag_stats(0, _stream); - hostdevice_vector page_fragments(total_frags, _stream); + rmm::device_uvector frag_stats(0, stream); + hostdevice_vector page_fragments(total_frags, stream); // update fragments and/or prepare for fragment statistics calculation if necessary if (total_frags != 0) { - if (_stats_granularity != statistics_freq::STATISTICS_NONE) { - frag_stats.resize(total_frags, _stream); + if (stats_granularity != statistics_freq::STATISTICS_NONE) { + frag_stats.resize(total_frags, stream); } for (int c = 0; c < num_columns; c++) { @@ -1682,7 +1756,7 @@ void writer::impl::write(table_view const& table, std::vector co for (size_t p = 0; p < partitions.size(); ++p) { for (int r = 0; r < num_rg_in_part[p]; r++) { auto const global_r = global_rowgroup_base[p] + r; - auto const& row_group = _agg_meta->file(p).row_groups[global_r]; + auto const& row_group = agg_meta->file(p).row_groups[global_r]; uint32_t const fragments_in_chunk = util::div_rounding_up_unsafe(row_group.num_rows, frag_size); gpu::EncColumnChunk& ck = chunks[r + first_rg_in_part[p]][c]; @@ -1701,27 +1775,24 @@ void writer::impl::write(table_view const& table, std::vector co } } - chunks.host_to_device(_stream); + chunks.host_to_device(stream); // re-initialize page fragments - page_fragments.host_to_device(_stream); - calculate_page_fragments(page_fragments, column_frag_size); + page_fragments.host_to_device(stream); + calculate_page_fragments(page_fragments, column_frag_size, stream); // and gather fragment statistics if (not frag_stats.is_empty()) { gather_fragment_statistics(frag_stats, - {page_fragments.device_ptr(), static_cast(total_frags)}); + {page_fragments.device_ptr(), static_cast(total_frags)}, + int96_timestamps, + stream); } } // Build chunk dictionaries and count pages. Sends chunks to device. - hostdevice_vector comp_page_sizes = init_page_sizes(chunks, - col_desc, - num_columns, - _max_page_size_bytes, - _max_page_size_rows, - _compression, - _stream); + hostdevice_vector comp_page_sizes = init_page_sizes( + chunks, col_desc, num_columns, max_page_size_bytes, max_page_size_rows, compression, stream); // Get the maximum page size across all chunks size_type max_page_uncomp_data_size = @@ -1762,8 +1833,8 @@ void writer::impl::write(table_view const& table, std::vector co comp_rowgroup_size += ck->compressed_size; max_chunk_bfr_size = std::max(max_chunk_bfr_size, (size_t)std::max(ck->bfr_size, ck->compressed_size)); - if (_stats_granularity == statistics_freq::STATISTICS_COLUMN) { - column_index_bfr_size += column_index_buffer_size(ck); + if (stats_granularity == statistics_freq::STATISTICS_COLUMN) { + column_index_bfr_size += column_index_buffer_size(ck, column_index_truncate_length); } } } @@ -1787,18 +1858,18 @@ void writer::impl::write(table_view const& table, std::vector co } // Clear compressed buffer size if compression has been turned off - if (_compression == parquet::Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; } + if (compression == parquet::Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; } // Initialize data pointers in batch - uint32_t num_stats_bfr = - (_stats_granularity != statistics_freq::STATISTICS_NONE) ? num_pages + num_chunks : 0; - rmm::device_buffer uncomp_bfr(max_uncomp_bfr_size, _stream); - rmm::device_buffer comp_bfr(max_comp_bfr_size, _stream); - rmm::device_buffer col_idx_bfr(column_index_bfr_size, _stream); - rmm::device_uvector pages(num_pages, _stream); + uint32_t const num_stats_bfr = + (stats_granularity != statistics_freq::STATISTICS_NONE) ? num_pages + num_chunks : 0; + rmm::device_buffer uncomp_bfr(max_uncomp_bfr_size, stream); + rmm::device_buffer comp_bfr(max_comp_bfr_size, stream); + rmm::device_buffer col_idx_bfr(column_index_bfr_size, stream); + rmm::device_uvector pages(num_pages, stream); // This contains stats for both the pages and the rowgroups. TODO: make them separate. - rmm::device_uvector page_stats(num_stats_bfr, _stream); + rmm::device_uvector page_stats(num_stats_bfr, stream); auto bfr_i = static_cast(col_idx_bfr.data()); for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { auto bfr = static_cast(uncomp_bfr.data()); @@ -1811,8 +1882,8 @@ void writer::impl::write(table_view const& table, std::vector co ck.column_index_blob = bfr_i; bfr += ck.bfr_size; bfr_c += ck.compressed_size; - if (_stats_granularity == statistics_freq::STATISTICS_COLUMN) { - ck.column_index_size = column_index_buffer_size(&ck); + if (stats_granularity == statistics_freq::STATISTICS_COLUMN) { + ck.column_index_size = column_index_buffer_size(&ck, column_index_truncate_length); bfr_i += ck.column_index_size; } } @@ -1828,10 +1899,16 @@ void writer::impl::write(table_view const& table, std::vector co (num_stats_bfr) ? frag_stats.data() : nullptr, num_columns, num_pages, - num_stats_bfr); + num_stats_bfr, + compression, + max_page_size_bytes, + max_page_size_rows, + stream); } - cudf::detail::pinned_host_vector host_bfr(max_chunk_bfr_size); + // Check device write support for all chunks and initialize bounce_buffer. + bool all_device_write = true; + uint32_t max_write_size = 0; // Encode row groups in batches for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { @@ -1841,6 +1918,7 @@ void writer::impl::write(table_view const& table, std::vector co auto const first_page_in_next_batch = (rnext < num_rowgroups) ? chunks[rnext][0].first_page : num_pages; auto const pages_in_batch = first_page_in_next_batch - first_page_in_batch; + encode_pages( chunks, {pages.data(), pages.size()}, @@ -1849,62 +1927,247 @@ void writer::impl::write(table_view const& table, std::vector co first_page_in_batch, batch_list[b], r, - (_stats_granularity == statistics_freq::STATISTICS_PAGE) ? page_stats.data() : nullptr, - (_stats_granularity != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages - : nullptr, - (_stats_granularity == statistics_freq::STATISTICS_COLUMN) ? page_stats.data() : nullptr); + (stats_granularity == statistics_freq::STATISTICS_PAGE) ? page_stats.data() : nullptr, + (stats_granularity != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages + : nullptr, + (stats_granularity == statistics_freq::STATISTICS_COLUMN) ? page_stats.data() : nullptr, + compression, + column_index_truncate_length, + stream); + + bool need_sync{false}; - std::vector> write_tasks; for (; r < rnext; r++) { int p = rg_to_part[r]; int global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; - auto& row_group = _agg_meta->file(p).row_groups[global_r]; + auto& row_group = agg_meta->file(p).row_groups[global_r]; + for (auto i = 0; i < num_columns; i++) { - gpu::EncColumnChunk& ck = chunks[r][i]; + auto const& ck = chunks[r][i]; + auto const dev_bfr = ck.is_compressed ? ck.compressed_bfr : ck.uncompressed_bfr; auto& column_chunk_meta = row_group.columns[i].meta_data; - uint8_t* dev_bfr; - if (ck.is_compressed) { - column_chunk_meta.codec = _compression; - dev_bfr = ck.compressed_bfr; - } else { - dev_bfr = ck.uncompressed_bfr; + + if (ck.is_compressed) { column_chunk_meta.codec = compression; } + if (!out_sink[p]->is_device_write_preferred(ck.compressed_size)) { + all_device_write = false; + } + max_write_size = std::max(max_write_size, ck.compressed_size); + + if (ck.ck_stat_size != 0) { + column_chunk_meta.statistics_blob.resize(ck.ck_stat_size); + CUDF_CUDA_TRY(cudaMemcpyAsync(column_chunk_meta.statistics_blob.data(), + dev_bfr, + ck.ck_stat_size, + cudaMemcpyDefault, + stream.value())); + need_sync = true; } + row_group.total_byte_size += ck.compressed_size; + column_chunk_meta.total_uncompressed_size = ck.bfr_size; + column_chunk_meta.total_compressed_size = ck.compressed_size; + } + } + + // Sync before calling the next `encode_pages` which may alter the stats data. + if (need_sync) { stream.synchronize(); } + } + + auto bounce_buffer = + cudf::detail::pinned_host_vector(all_device_write ? 0 : max_write_size); + + return std::tuple{std::move(agg_meta), + std::move(pages), + std::move(chunks), + std::move(global_rowgroup_base), + std::move(first_rg_in_part), + std::move(batch_list), + std::move(rg_to_part), + std::move(uncomp_bfr), + std::move(comp_bfr), + std::move(col_idx_bfr), + std::move(bounce_buffer)}; +} + +} // namespace + +writer::impl::impl(std::vector> sinks, + parquet_writer_options const& options, + single_write_mode mode, + rmm::cuda_stream_view stream) + : _stream(stream), + _compression(to_parquet_compression(options.get_compression())), + _max_row_group_size{options.get_row_group_size_bytes()}, + _max_row_group_rows{options.get_row_group_size_rows()}, + _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), + _max_page_size_rows(options.get_max_page_size_rows()), + _stats_granularity(options.get_stats_level()), + _dict_policy(options.get_dictionary_policy()), + _max_dictionary_size(options.get_max_dictionary_size()), + _max_page_fragment_size(options.get_max_page_fragment_size()), + _int96_timestamps(options.is_enabled_int96_timestamps()), + _column_index_truncate_length(options.get_column_index_truncate_length()), + _kv_meta(options.get_key_value_metadata()), + _single_write_mode(mode), + _out_sink(std::move(sinks)) +{ + if (options.get_metadata()) { + _table_meta = std::make_unique(*options.get_metadata()); + } + init_state(); +} + +writer::impl::impl(std::vector> sinks, + chunked_parquet_writer_options const& options, + single_write_mode mode, + rmm::cuda_stream_view stream) + : _stream(stream), + _compression(to_parquet_compression(options.get_compression())), + _max_row_group_size{options.get_row_group_size_bytes()}, + _max_row_group_rows{options.get_row_group_size_rows()}, + _max_page_size_bytes(max_page_bytes(_compression, options.get_max_page_size_bytes())), + _max_page_size_rows(options.get_max_page_size_rows()), + _stats_granularity(options.get_stats_level()), + _dict_policy(options.get_dictionary_policy()), + _max_dictionary_size(options.get_max_dictionary_size()), + _max_page_fragment_size(options.get_max_page_fragment_size()), + _int96_timestamps(options.is_enabled_int96_timestamps()), + _column_index_truncate_length(options.get_column_index_truncate_length()), + _kv_meta(options.get_key_value_metadata()), + _single_write_mode(mode), + _out_sink(std::move(sinks)) +{ + if (options.get_metadata()) { + _table_meta = std::make_unique(*options.get_metadata()); + } + init_state(); +} + +writer::impl::~impl() { close(); } + +void writer::impl::init_state() +{ + _current_chunk_offset.resize(_out_sink.size()); + // Write file header + file_header_s fhdr; + fhdr.magic = parquet_magic; + for (auto& sink : _out_sink) { + sink->host_write(&fhdr, sizeof(fhdr)); + } + std::fill_n(_current_chunk_offset.begin(), _current_chunk_offset.size(), sizeof(file_header_s)); +} + +void writer::impl::write(table_view const& input, std::vector const& partitions) +{ + _last_write_successful = false; + CUDF_EXPECTS(not _closed, "Data has already been flushed to out and closed"); + + if (not _table_meta) { _table_meta = std::make_unique(input); } + fill_table_meta(_table_meta, input); + + // All kinds of memory allocation and data compressions/encoding are performed here. + // If any error occurs, such as out-of-memory exception, the internal state of the current + // writer is still intact. + [[maybe_unused]] auto [updated_agg_meta, + pages, + chunks, + global_rowgroup_base, + first_rg_in_part, + batch_list, + rg_to_part, + uncomp_bfr, // unused, but contains data for later write to sink + comp_bfr, // unused, but contains data for later write to sink + col_idx_bfr, // unused, but contains data for later write to sink + bounce_buffer] = [&] { + try { + return convert_table_to_parquet_data(*_table_meta, + input, + partitions, + _kv_meta, + _agg_meta, + _max_page_fragment_size, + _max_row_group_size, + _max_page_size_bytes, + _max_row_group_rows, + _max_page_size_rows, + _column_index_truncate_length, + _stats_granularity, + _compression, + _dict_policy, + _max_dictionary_size, + _single_write_mode, + _int96_timestamps, + _out_sink, + _stream); + } catch (...) { // catch any exception type + CUDF_LOG_ERROR( + "Parquet writer encountered exception during processing. " + "No data has been written to the sink."); + throw; // this throws the same exception + } + }(); + + // Compression/encoding were all successful. Now write the intermediate results. + write_parquet_data_to_sink(updated_agg_meta, + pages, + chunks, + global_rowgroup_base, + first_rg_in_part, + batch_list, + rg_to_part, + bounce_buffer); + + _last_write_successful = true; +} + +void writer::impl::write_parquet_data_to_sink( + std::unique_ptr& updated_agg_meta, + device_span pages, + host_2dspan chunks, + host_span global_rowgroup_base, + host_span first_rg_in_part, + host_span batch_list, + host_span rg_to_part, + host_span bounce_buffer) +{ + _agg_meta = std::move(updated_agg_meta); + auto const num_columns = chunks.size().second; + + for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { + auto const rnext = r + batch_list[b]; + std::vector> write_tasks; + + for (; r < rnext; r++) { + int const p = rg_to_part[r]; + int const global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; + auto& row_group = _agg_meta->file(p).row_groups[global_r]; + + for (std::size_t i = 0; i < num_columns; i++) { + auto const& ck = chunks[r][i]; + auto const dev_bfr = ck.is_compressed ? ck.compressed_bfr : ck.uncompressed_bfr; + + // Skip the range [0, ck.ck_stat_size) since it has already been copied to host + // and stored in _agg_meta before. if (_out_sink[p]->is_device_write_preferred(ck.compressed_size)) { - // let the writer do what it wants to retrieve the data from the gpu. write_tasks.push_back(_out_sink[p]->device_write_async( dev_bfr + ck.ck_stat_size, ck.compressed_size, _stream)); - // we still need to do a (much smaller) memcpy for the statistics. - if (ck.ck_stat_size != 0) { - column_chunk_meta.statistics_blob.resize(ck.ck_stat_size); - CUDF_CUDA_TRY(cudaMemcpyAsync(column_chunk_meta.statistics_blob.data(), - dev_bfr, - ck.ck_stat_size, - cudaMemcpyDefault, - _stream.value())); - _stream.synchronize(); - } } else { - // copy the full data - CUDF_CUDA_TRY(cudaMemcpyAsync(host_bfr.data(), - dev_bfr, - ck.ck_stat_size + ck.compressed_size, + CUDF_EXPECTS(bounce_buffer.size() >= ck.compressed_size, + "Bounce buffer was not properly initialized."); + CUDF_CUDA_TRY(cudaMemcpyAsync(bounce_buffer.data(), + dev_bfr + ck.ck_stat_size, + ck.compressed_size, cudaMemcpyDefault, _stream.value())); _stream.synchronize(); - _out_sink[p]->host_write(host_bfr.data() + ck.ck_stat_size, ck.compressed_size); - if (ck.ck_stat_size != 0) { - column_chunk_meta.statistics_blob.resize(ck.ck_stat_size); - memcpy(column_chunk_meta.statistics_blob.data(), host_bfr.data(), ck.ck_stat_size); - } + _out_sink[p]->host_write(bounce_buffer.data(), ck.compressed_size); } - row_group.total_byte_size += ck.compressed_size; + + auto& column_chunk_meta = row_group.columns[i].meta_data; column_chunk_meta.data_page_offset = _current_chunk_offset[p] + ((ck.use_dictionary) ? ck.dictionary_size : 0); column_chunk_meta.dictionary_page_offset = (ck.use_dictionary) ? _current_chunk_offset[p] : 0; - column_chunk_meta.total_uncompressed_size = ck.bfr_size; - column_chunk_meta.total_compressed_size = ck.compressed_size; _current_chunk_offset[p] += ck.compressed_size; } } @@ -1915,19 +2178,17 @@ void writer::impl::write(table_view const& table, std::vector co if (_stats_granularity == statistics_freq::STATISTICS_COLUMN) { // need pages on host to create offset_indexes - thrust::host_vector h_pages = - cudf::detail::make_host_vector_async(pages, _stream); - _stream.synchronize(); + auto const h_pages = cudf::detail::make_host_vector_sync(pages, _stream); // add column and offset indexes to metadata for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { auto const rnext = r + batch_list[b]; auto curr_page_idx = chunks[r][0].first_page; for (; r < rnext; r++) { - int p = rg_to_part[r]; - int global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; + int const p = rg_to_part[r]; + int const global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; auto const& row_group = _agg_meta->file(p).row_groups[global_r]; - for (auto i = 0; i < num_columns; i++) { + for (std::size_t i = 0; i < num_columns; i++) { gpu::EncColumnChunk const& ck = chunks[r][i]; auto const& column_chunk_meta = row_group.columns[i].meta_data; @@ -1958,14 +2219,12 @@ void writer::impl::write(table_view const& table, std::vector co } _stream.synchronize(); - _agg_meta->file(p).offset_indexes.push_back(offset_idx); - _agg_meta->file(p).column_indexes.push_back(column_idx); + _agg_meta->file(p).offset_indexes.emplace_back(std::move(offset_idx)); + _agg_meta->file(p).column_indexes.emplace_back(std::move(column_idx)); } } } } - - _last_write_successful = true; } std::unique_ptr> writer::impl::close( diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index c88287994a1..a9fe5612bfb 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -44,7 +44,6 @@ namespace io { namespace detail { namespace parquet { // Forward internal classes -struct parquet_column_view; struct aggregate_writer_metadata; using namespace cudf::io::parquet; @@ -118,102 +117,30 @@ class writer::impl { private: /** - * @brief Gather row group fragments + * @brief Write the intermediate Parquet data into the data sink. * - * This calculates fragments to be used in determining row group boundariesa. + * The intermediate data is generated from processing (compressing/encoding) a cuDF input table + * by `convert_table_to_parquet_data` called in the `write()` function. * - * @param frag Destination row group fragments - * @param col_desc column description array - * @param[in] partitions Information about partitioning of table - * @param[in] part_frag_offset A Partition's offset into fragment array - * @param fragment_size Number of rows per fragment + * @param updated_agg_meta The updated aggregate data after processing the input + * @param pages Encoded pages + * @param chunks Column chunks + * @param global_rowgroup_base Numbers of rowgroups in each file/partition + * @param first_rg_in_part The first rowgroup in each partition + * @param batch_list The batches of rowgroups to encode + * @param rg_to_part A map from rowgroup to partition + * @param[out] bounce_buffer Temporary host output buffer */ - void init_row_group_fragments(hostdevice_2dvector& frag, - device_span col_desc, - host_span partitions, - device_span part_frag_offset, - uint32_t fragment_size); - - /** - * @brief Recalculate page fragments - * - * This calculates fragments to be used to determine page boundaries within - * column chunks. - * - * @param frag Destination page fragments - * @param frag_sizes Array of fragment sizes for each column - */ - void calculate_page_fragments(device_span frag, - host_span frag_sizes); - - /** - * @brief Gather per-fragment statistics - * - * @param frag_stats output statistics - * @param frags Input page fragments - */ - void gather_fragment_statistics(device_span frag_stats, - device_span frags); - - /** - * @brief Initialize encoder pages - * - * @param chunks column chunk array - * @param col_desc column description array - * @param pages encoder pages array - * @param page_stats page statistics array - * @param frag_stats fragment statistics array - * @param max_page_comp_data_size max compressed - * @param num_columns Total number of columns - * @param num_pages Total number of pages - * @param num_stats_bfr Number of statistics buffers - */ - void init_encoder_pages(hostdevice_2dvector& chunks, - device_span col_desc, - device_span pages, - hostdevice_vector& comp_page_sizes, - statistics_chunk* page_stats, - statistics_chunk* frag_stats, - uint32_t num_columns, - uint32_t num_pages, - uint32_t num_stats_bfr); - /** - * @brief Encode a batch of pages - * - * @throws rmm::bad_alloc if there is insufficient space for temporary buffers - * - * @param chunks column chunk array - * @param pages encoder pages array - * @param max_page_uncomp_data_size maximum uncompressed size of any page's data - * @param pages_in_batch number of pages in this batch - * @param first_page_in_batch first page in batch - * @param rowgroups_in_batch number of rowgroups in this batch - * @param first_rowgroup first rowgroup in batch - * @param page_stats optional page-level statistics (nullptr if none) - * @param chunk_stats optional chunk-level statistics (nullptr if none) - * @param column_stats optional page-level statistics for column index (nullptr if none) - */ - void encode_pages(hostdevice_2dvector& chunks, - device_span pages, - size_t max_page_uncomp_data_size, - uint32_t pages_in_batch, - uint32_t first_page_in_batch, - uint32_t rowgroups_in_batch, - uint32_t first_rowgroup, - const statistics_chunk* page_stats, - const statistics_chunk* chunk_stats, - const statistics_chunk* column_stats); - - /** - * @brief Function to calculate the memory needed to encode the column index of the given - * column chunk - * - * @param chunk pointer to column chunk - */ - size_t column_index_buffer_size(gpu::EncColumnChunk* chunk) const; - - private: - // Cuda stream to be used. + void write_parquet_data_to_sink(std::unique_ptr& updated_agg_meta, + device_span pages, + host_2dspan chunks, + host_span global_rowgroup_base, + host_span first_rg_in_part, + host_span batch_list, + host_span rg_to_part, + host_span bounce_buffer); + + // Cuda stream to be used rmm::cuda_stream_view _stream; // Writer options.