From 61e116eb873fca6f611c43aa909c177aeacb6f02 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 16 Apr 2024 11:08:09 -0700 Subject: [PATCH] Removing all batching code from parquet writer (#15528) Fixes #13440. Removing the manually disabled batching code from parquet writer. Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - David Wendt (https://github.com/davidwendt) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/15528 --- cpp/src/io/parquet/writer_impl.cu | 143 ++++++++++------------------- cpp/src/io/parquet/writer_impl.hpp | 2 - 2 files changed, 46 insertions(+), 99 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 5a8d96975ce..fd8d4f8bd7f 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1396,16 +1396,13 @@ void init_encoder_pages(hostdevice_2dvector& chunks, } /** - * @brief Encode a batch of pages. + * @brief Encode pages. * * @throws rmm::bad_alloc if there is insufficient space for temporary buffers * * @param chunks column chunk array * @param pages encoder pages array - * @param pages_in_batch number of pages in this batch - * @param first_page_in_batch first page in batch - * @param rowgroups_in_batch number of rowgroups in this batch - * @param first_rowgroup first rowgroup in batch + * @param num_rowgroups number of rowgroups * @param page_stats optional page-level statistics (nullptr if none) * @param chunk_stats optional chunk-level statistics (nullptr if none) * @param column_stats optional page-level statistics for column index (nullptr if none) @@ -1417,10 +1414,6 @@ void init_encoder_pages(hostdevice_2dvector& chunks, */ void encode_pages(hostdevice_2dvector& chunks, device_span pages, - uint32_t pages_in_batch, - uint32_t first_page_in_batch, - uint32_t rowgroups_in_batch, - uint32_t first_rowgroup, statistics_chunk const* page_stats, statistics_chunk const* chunk_stats, statistics_chunk const* column_stats, @@ -1430,14 +1423,12 @@ void encode_pages(hostdevice_2dvector& chunks, bool write_v2_headers, rmm::cuda_stream_view stream) { - auto batch_pages = pages.subspan(first_page_in_batch, pages_in_batch); + auto const num_pages = pages.size(); + auto pages_stats = (page_stats != nullptr) + ? device_span(page_stats, num_pages) + : device_span(); - auto batch_pages_stats = - (page_stats != nullptr) - ? device_span(page_stats + first_page_in_batch, pages_in_batch) - : device_span(); - - uint32_t max_comp_pages = (compression != Compression::UNCOMPRESSED) ? pages_in_batch : 0; + uint32_t max_comp_pages = (compression != Compression::UNCOMPRESSED) ? num_pages : 0; rmm::device_uvector> comp_in(max_comp_pages, stream); rmm::device_uvector> comp_out(max_comp_pages, stream); @@ -1447,7 +1438,7 @@ void encode_pages(hostdevice_2dvector& chunks, comp_res.end(), compression_result{0, compression_status::FAILURE}); - EncodePages(batch_pages, write_v2_headers, comp_in, comp_out, comp_res, stream); + EncodePages(pages, write_v2_headers, comp_in, comp_out, comp_res, stream); switch (compression) { case Compression::SNAPPY: if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { @@ -1480,25 +1471,23 @@ void encode_pages(hostdevice_2dvector& chunks, // TBD: Not clear if the official spec actually allows dynamically turning off compression at the // chunk-level - auto d_chunks_in_batch = chunks.device_view().subspan(first_rowgroup, rowgroups_in_batch); - DecideCompression(d_chunks_in_batch.flat_view(), stream); - EncodePageHeaders(batch_pages, comp_res, batch_pages_stats, chunk_stats, stream); - GatherPages(d_chunks_in_batch.flat_view(), pages, stream); + auto d_chunks = chunks.device_view(); + DecideCompression(d_chunks.flat_view(), stream); + EncodePageHeaders(pages, comp_res, pages_stats, chunk_stats, stream); + GatherPages(d_chunks.flat_view(), pages, stream); // By now, the var_bytes has been calculated in InitPages, and the histograms in EncodePages. // EncodeColumnIndexes can encode the histograms in the ColumnIndex, and also sum up var_bytes // and the histograms for inclusion in the chunk's SizeStats. if (column_stats != nullptr) { - EncodeColumnIndexes(d_chunks_in_batch.flat_view(), - {column_stats, pages.size()}, - column_index_truncate_length, - stream); + EncodeColumnIndexes( + d_chunks.flat_view(), {column_stats, pages.size()}, column_index_truncate_length, stream); } - auto h_chunks_in_batch = chunks.host_view().subspan(first_rowgroup, rowgroups_in_batch); - CUDF_CUDA_TRY(cudaMemcpyAsync(h_chunks_in_batch.data(), - d_chunks_in_batch.data(), - d_chunks_in_batch.flat_view().size_bytes(), + auto h_chunks = chunks.host_view(); + CUDF_CUDA_TRY(cudaMemcpyAsync(h_chunks.data(), + d_chunks.data(), + d_chunks.flat_view().size_bytes(), cudaMemcpyDefault, stream.value())); @@ -1959,33 +1948,23 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, std::fill_n(std::back_inserter(rg_to_part), num_rg_in_part[p], p); } - // Batch processing is no longer supported. - // This line disables batch processing (so batch size will no longer be limited at 1GB as before). - // TODO: All the relevant code will be removed in the follow-up work: - // https://github.com/rapidsai/cudf/issues/13440 - auto const max_bytes_in_batch = std::numeric_limits::max(); - - // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) - std::vector batch_list; - size_type num_pages = 0; - size_t max_uncomp_bfr_size = 0; - size_t max_comp_bfr_size = 0; - size_t max_chunk_bfr_size = 0; - size_type max_pages_in_batch = 0; - size_t bytes_in_batch = 0; - size_t comp_bytes_in_batch = 0; + // Initialize rowgroups to encode + size_type num_pages = 0; + size_t max_uncomp_bfr_size = 0; + size_t max_comp_bfr_size = 0; + size_t max_chunk_bfr_size = 0; + size_t column_index_bfr_size = 0; size_t def_histogram_bfr_size = 0; size_t rep_histogram_bfr_size = 0; - for (size_type r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) { - size_t rowgroup_size = 0; - size_t comp_rowgroup_size = 0; + size_t rowgroup_size = 0; + size_t comp_rowgroup_size = 0; + for (size_type r = 0; r <= num_rowgroups; r++) { if (r < num_rowgroups) { for (int i = 0; i < num_columns; i++) { EncColumnChunk* ck = &chunks[r][i]; ck->first_page = num_pages; num_pages += ck->num_pages; - pages_in_batch += ck->num_pages; rowgroup_size += ck->bfr_size; comp_rowgroup_size += ck->compressed_size; max_chunk_bfr_size = @@ -2007,29 +1986,17 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } } } - // TBD: We may want to also shorten the batch if we have enough pages (not just based on size) - if ((r == num_rowgroups) || - (groups_in_batch != 0 && bytes_in_batch + rowgroup_size > max_bytes_in_batch)) { - max_uncomp_bfr_size = std::max(max_uncomp_bfr_size, bytes_in_batch); - max_comp_bfr_size = std::max(max_comp_bfr_size, comp_bytes_in_batch); - max_pages_in_batch = std::max(max_pages_in_batch, pages_in_batch); - if (groups_in_batch != 0) { - batch_list.push_back(groups_in_batch); - groups_in_batch = 0; - } - bytes_in_batch = 0; - comp_bytes_in_batch = 0; - pages_in_batch = 0; + // write bfr sizes if this is the last rowgroup + if (r == num_rowgroups) { + max_uncomp_bfr_size = rowgroup_size; + max_comp_bfr_size = comp_rowgroup_size; } - bytes_in_batch += rowgroup_size; - comp_bytes_in_batch += comp_rowgroup_size; - groups_in_batch++; } // Clear compressed buffer size if compression has been turned off if (compression == Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; } - // Initialize data pointers in batch + // Initialize data pointers uint32_t const num_stats_bfr = (stats_granularity != statistics_freq::STATISTICS_NONE) ? num_pages + num_chunks : 0; @@ -2055,10 +2022,10 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, auto bfr_i = static_cast(col_idx_bfr.data()); auto bfr_r = rep_level_histogram.data(); auto bfr_d = def_level_histogram.data(); - for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { + if (num_rowgroups != 0) { auto bfr = static_cast(uncomp_bfr.data()); auto bfr_c = static_cast(comp_bfr.data()); - for (auto j = 0; j < batch_list[b]; j++, r++) { + for (auto r = 0; r < num_rowgroups; r++) { for (auto i = 0; i < num_columns; i++) { EncColumnChunk& ck = chunks[r][i]; ck.uncompressed_bfr = bfr; @@ -2108,22 +2075,11 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, std::optional comp_stats; if (collect_compression_statistics) { comp_stats = writer_compression_statistics{}; } - // Encode row groups in batches - for (auto b = 0, batch_r_start = 0; b < static_cast(batch_list.size()); b++) { - // Count pages in this batch - auto const rnext = batch_r_start + batch_list[b]; - auto const first_page_in_batch = chunks[batch_r_start][0].first_page; - auto const first_page_in_next_batch = - (rnext < num_rowgroups) ? chunks[rnext][0].first_page : num_pages; - auto const pages_in_batch = first_page_in_next_batch - first_page_in_batch; - + // Encode row groups + if (num_rowgroups != 0) { encode_pages( chunks, {pages.data(), pages.size()}, - pages_in_batch, - first_page_in_batch, - batch_list[b], - batch_r_start, (stats_granularity == statistics_freq::STATISTICS_PAGE) ? page_stats.data() : nullptr, (stats_granularity != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages : nullptr, @@ -2152,7 +2108,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } } - for (int r = batch_r_start; r < rnext; r++) { + for (int r = 0; r < num_rowgroups; r++) { int p = rg_to_part[r]; int global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; auto& row_group = agg_meta->file(p).row_groups[global_r]; @@ -2192,7 +2148,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, auto h_def_ptr = h_def_histogram.data(); auto h_rep_ptr = h_rep_histogram.data(); - for (int r = batch_r_start; r < rnext; r++) { + for (int r = 0; r < num_rowgroups; r++) { int const p = rg_to_part[r]; int const global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; auto& row_group = agg_meta->file(p).row_groups[global_r]; @@ -2239,8 +2195,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } } } - - batch_r_start = rnext; } auto bounce_buffer = @@ -2251,7 +2205,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, std::move(chunks), std::move(global_rowgroup_base), std::move(first_rg_in_part), - std::move(batch_list), std::move(rg_to_part), std::move(comp_stats), std::move(uncomp_bfr), @@ -2358,7 +2311,6 @@ void writer::impl::write(table_view const& input, std::vector co chunks, global_rowgroup_base, first_rg_in_part, - batch_list, rg_to_part, comp_stats, uncomp_bfr, // unused, but contains data for later write to sink @@ -2402,7 +2354,6 @@ void writer::impl::write(table_view const& input, std::vector co chunks, global_rowgroup_base, first_rg_in_part, - batch_list, rg_to_part, bounce_buffer); @@ -2417,18 +2368,17 @@ void writer::impl::write_parquet_data_to_sink( host_2dspan chunks, host_span global_rowgroup_base, host_span first_rg_in_part, - host_span batch_list, host_span rg_to_part, host_span bounce_buffer) { - _agg_meta = std::move(updated_agg_meta); - auto const num_columns = chunks.size().second; + _agg_meta = std::move(updated_agg_meta); + auto const num_rowgroups = chunks.size().first; + auto const num_columns = chunks.size().second; - for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { - auto const rnext = r + batch_list[b]; + if (num_rowgroups != 0) { std::vector> write_tasks; - for (; r < rnext; r++) { + for (auto r = 0; r < static_cast(num_rowgroups); r++) { int const p = rg_to_part[r]; int const global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; auto& row_group = _agg_meta->file(p).row_groups[global_r]; @@ -2472,10 +2422,9 @@ void writer::impl::write_parquet_data_to_sink( auto const h_pages = cudf::detail::make_host_vector_sync(pages, _stream); // add column and offset indexes to metadata - for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { - auto const rnext = r + batch_list[b]; - auto curr_page_idx = chunks[r][0].first_page; - for (; r < rnext; r++) { + if (num_rowgroups != 0) { + auto curr_page_idx = chunks[0][0].first_page; + for (auto r = 0; r < static_cast(num_rowgroups); r++) { int const p = rg_to_part[r]; int const global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; auto const& row_group = _agg_meta->file(p).row_groups[global_r]; diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 2f6608b0ae7..3cbb7630fab 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -129,7 +129,6 @@ class writer::impl { * @param chunks Column chunks * @param global_rowgroup_base Numbers of rowgroups in each file/partition * @param first_rg_in_part The first rowgroup in each partition - * @param batch_list The batches of rowgroups to encode * @param rg_to_part A map from rowgroup to partition * @param[out] bounce_buffer Temporary host output buffer */ @@ -138,7 +137,6 @@ class writer::impl { host_2dspan chunks, host_span global_rowgroup_base, host_span first_rg_in_part, - host_span batch_list, host_span rg_to_part, host_span bounce_buffer);