Skip to content

Commit

Permalink
Removing all batching code from parquet writer (#15528)
Browse files Browse the repository at this point in the history
Fixes #13440. Removing the manually disabled batching code from parquet writer.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Nghia Truong (https://github.com/ttnghia)

URL: #15528
  • Loading branch information
mhaseeb123 authored Apr 16, 2024
1 parent feb96cb commit 61e116e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 99 deletions.
143 changes: 46 additions & 97 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1396,16 +1396,13 @@ void init_encoder_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
}

/**
* @brief Encode a batch of pages.
* @brief Encode pages.
*
* @throws rmm::bad_alloc if there is insufficient space for temporary buffers
*
* @param chunks column chunk array
* @param pages encoder pages array
* @param pages_in_batch number of pages in this batch
* @param first_page_in_batch first page in batch
* @param rowgroups_in_batch number of rowgroups in this batch
* @param first_rowgroup first rowgroup in batch
* @param num_rowgroups number of rowgroups
* @param page_stats optional page-level statistics (nullptr if none)
* @param chunk_stats optional chunk-level statistics (nullptr if none)
* @param column_stats optional page-level statistics for column index (nullptr if none)
Expand All @@ -1417,10 +1414,6 @@ void init_encoder_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
*/
void encode_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
device_span<EncPage> pages,
uint32_t pages_in_batch,
uint32_t first_page_in_batch,
uint32_t rowgroups_in_batch,
uint32_t first_rowgroup,
statistics_chunk const* page_stats,
statistics_chunk const* chunk_stats,
statistics_chunk const* column_stats,
Expand All @@ -1430,14 +1423,12 @@ void encode_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
bool write_v2_headers,
rmm::cuda_stream_view stream)
{
auto batch_pages = pages.subspan(first_page_in_batch, pages_in_batch);
auto const num_pages = pages.size();
auto pages_stats = (page_stats != nullptr)
? device_span<statistics_chunk const>(page_stats, num_pages)
: device_span<statistics_chunk const>();

auto batch_pages_stats =
(page_stats != nullptr)
? device_span<statistics_chunk const>(page_stats + first_page_in_batch, pages_in_batch)
: device_span<statistics_chunk const>();

uint32_t max_comp_pages = (compression != Compression::UNCOMPRESSED) ? pages_in_batch : 0;
uint32_t max_comp_pages = (compression != Compression::UNCOMPRESSED) ? num_pages : 0;

rmm::device_uvector<device_span<uint8_t const>> comp_in(max_comp_pages, stream);
rmm::device_uvector<device_span<uint8_t>> comp_out(max_comp_pages, stream);
Expand All @@ -1447,7 +1438,7 @@ void encode_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
comp_res.end(),
compression_result{0, compression_status::FAILURE});

EncodePages(batch_pages, write_v2_headers, comp_in, comp_out, comp_res, stream);
EncodePages(pages, write_v2_headers, comp_in, comp_out, comp_res, stream);
switch (compression) {
case Compression::SNAPPY:
if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) {
Expand Down Expand Up @@ -1480,25 +1471,23 @@ void encode_pages(hostdevice_2dvector<EncColumnChunk>& chunks,
// TBD: Not clear if the official spec actually allows dynamically turning off compression at the
// chunk-level

auto d_chunks_in_batch = chunks.device_view().subspan(first_rowgroup, rowgroups_in_batch);
DecideCompression(d_chunks_in_batch.flat_view(), stream);
EncodePageHeaders(batch_pages, comp_res, batch_pages_stats, chunk_stats, stream);
GatherPages(d_chunks_in_batch.flat_view(), pages, stream);
auto d_chunks = chunks.device_view();
DecideCompression(d_chunks.flat_view(), stream);
EncodePageHeaders(pages, comp_res, pages_stats, chunk_stats, stream);
GatherPages(d_chunks.flat_view(), pages, stream);

// By now, the var_bytes has been calculated in InitPages, and the histograms in EncodePages.
// EncodeColumnIndexes can encode the histograms in the ColumnIndex, and also sum up var_bytes
// and the histograms for inclusion in the chunk's SizeStats.
if (column_stats != nullptr) {
EncodeColumnIndexes(d_chunks_in_batch.flat_view(),
{column_stats, pages.size()},
column_index_truncate_length,
stream);
EncodeColumnIndexes(
d_chunks.flat_view(), {column_stats, pages.size()}, column_index_truncate_length, stream);
}

auto h_chunks_in_batch = chunks.host_view().subspan(first_rowgroup, rowgroups_in_batch);
CUDF_CUDA_TRY(cudaMemcpyAsync(h_chunks_in_batch.data(),
d_chunks_in_batch.data(),
d_chunks_in_batch.flat_view().size_bytes(),
auto h_chunks = chunks.host_view();
CUDF_CUDA_TRY(cudaMemcpyAsync(h_chunks.data(),
d_chunks.data(),
d_chunks.flat_view().size_bytes(),
cudaMemcpyDefault,
stream.value()));

Expand Down Expand Up @@ -1959,33 +1948,23 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
std::fill_n(std::back_inserter(rg_to_part), num_rg_in_part[p], p);
}

// Batch processing is no longer supported.
// This line disables batch processing (so batch size will no longer be limited at 1GB as before).
// TODO: All the relevant code will be removed in the follow-up work:
// https://github.com/rapidsai/cudf/issues/13440
auto const max_bytes_in_batch = std::numeric_limits<size_t>::max();

// Initialize batches of rowgroups to encode (mainly to limit peak memory usage)
std::vector<size_type> batch_list;
size_type num_pages = 0;
size_t max_uncomp_bfr_size = 0;
size_t max_comp_bfr_size = 0;
size_t max_chunk_bfr_size = 0;
size_type max_pages_in_batch = 0;
size_t bytes_in_batch = 0;
size_t comp_bytes_in_batch = 0;
// Initialize rowgroups to encode
size_type num_pages = 0;
size_t max_uncomp_bfr_size = 0;
size_t max_comp_bfr_size = 0;
size_t max_chunk_bfr_size = 0;

size_t column_index_bfr_size = 0;
size_t def_histogram_bfr_size = 0;
size_t rep_histogram_bfr_size = 0;
for (size_type r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) {
size_t rowgroup_size = 0;
size_t comp_rowgroup_size = 0;
size_t rowgroup_size = 0;
size_t comp_rowgroup_size = 0;
for (size_type r = 0; r <= num_rowgroups; r++) {
if (r < num_rowgroups) {
for (int i = 0; i < num_columns; i++) {
EncColumnChunk* ck = &chunks[r][i];
ck->first_page = num_pages;
num_pages += ck->num_pages;
pages_in_batch += ck->num_pages;
rowgroup_size += ck->bfr_size;
comp_rowgroup_size += ck->compressed_size;
max_chunk_bfr_size =
Expand All @@ -2007,29 +1986,17 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
}
}
}
// TBD: We may want to also shorten the batch if we have enough pages (not just based on size)
if ((r == num_rowgroups) ||
(groups_in_batch != 0 && bytes_in_batch + rowgroup_size > max_bytes_in_batch)) {
max_uncomp_bfr_size = std::max(max_uncomp_bfr_size, bytes_in_batch);
max_comp_bfr_size = std::max(max_comp_bfr_size, comp_bytes_in_batch);
max_pages_in_batch = std::max(max_pages_in_batch, pages_in_batch);
if (groups_in_batch != 0) {
batch_list.push_back(groups_in_batch);
groups_in_batch = 0;
}
bytes_in_batch = 0;
comp_bytes_in_batch = 0;
pages_in_batch = 0;
// write bfr sizes if this is the last rowgroup
if (r == num_rowgroups) {
max_uncomp_bfr_size = rowgroup_size;
max_comp_bfr_size = comp_rowgroup_size;
}
bytes_in_batch += rowgroup_size;
comp_bytes_in_batch += comp_rowgroup_size;
groups_in_batch++;
}

// Clear compressed buffer size if compression has been turned off
if (compression == Compression::UNCOMPRESSED) { max_comp_bfr_size = 0; }

// Initialize data pointers in batch
// Initialize data pointers
uint32_t const num_stats_bfr =
(stats_granularity != statistics_freq::STATISTICS_NONE) ? num_pages + num_chunks : 0;

Expand All @@ -2055,10 +2022,10 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
auto bfr_i = static_cast<uint8_t*>(col_idx_bfr.data());
auto bfr_r = rep_level_histogram.data();
auto bfr_d = def_level_histogram.data();
for (auto b = 0, r = 0; b < static_cast<size_type>(batch_list.size()); b++) {
if (num_rowgroups != 0) {
auto bfr = static_cast<uint8_t*>(uncomp_bfr.data());
auto bfr_c = static_cast<uint8_t*>(comp_bfr.data());
for (auto j = 0; j < batch_list[b]; j++, r++) {
for (auto r = 0; r < num_rowgroups; r++) {
for (auto i = 0; i < num_columns; i++) {
EncColumnChunk& ck = chunks[r][i];
ck.uncompressed_bfr = bfr;
Expand Down Expand Up @@ -2108,22 +2075,11 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
std::optional<writer_compression_statistics> comp_stats;
if (collect_compression_statistics) { comp_stats = writer_compression_statistics{}; }

// Encode row groups in batches
for (auto b = 0, batch_r_start = 0; b < static_cast<size_type>(batch_list.size()); b++) {
// Count pages in this batch
auto const rnext = batch_r_start + batch_list[b];
auto const first_page_in_batch = chunks[batch_r_start][0].first_page;
auto const first_page_in_next_batch =
(rnext < num_rowgroups) ? chunks[rnext][0].first_page : num_pages;
auto const pages_in_batch = first_page_in_next_batch - first_page_in_batch;

// Encode row groups
if (num_rowgroups != 0) {
encode_pages(
chunks,
{pages.data(), pages.size()},
pages_in_batch,
first_page_in_batch,
batch_list[b],
batch_r_start,
(stats_granularity == statistics_freq::STATISTICS_PAGE) ? page_stats.data() : nullptr,
(stats_granularity != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages
: nullptr,
Expand Down Expand Up @@ -2152,7 +2108,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
}
}

for (int r = batch_r_start; r < rnext; r++) {
for (int r = 0; r < num_rowgroups; r++) {
int p = rg_to_part[r];
int global_r = global_rowgroup_base[p] + r - first_rg_in_part[p];
auto& row_group = agg_meta->file(p).row_groups[global_r];
Expand Down Expand Up @@ -2192,7 +2148,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
auto h_def_ptr = h_def_histogram.data();
auto h_rep_ptr = h_rep_histogram.data();

for (int r = batch_r_start; r < rnext; r++) {
for (int r = 0; r < num_rowgroups; r++) {
int const p = rg_to_part[r];
int const global_r = global_rowgroup_base[p] + r - first_rg_in_part[p];
auto& row_group = agg_meta->file(p).row_groups[global_r];
Expand Down Expand Up @@ -2239,8 +2195,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
}
}
}

batch_r_start = rnext;
}

auto bounce_buffer =
Expand All @@ -2251,7 +2205,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
std::move(chunks),
std::move(global_rowgroup_base),
std::move(first_rg_in_part),
std::move(batch_list),
std::move(rg_to_part),
std::move(comp_stats),
std::move(uncomp_bfr),
Expand Down Expand Up @@ -2358,7 +2311,6 @@ void writer::impl::write(table_view const& input, std::vector<partition_info> co
chunks,
global_rowgroup_base,
first_rg_in_part,
batch_list,
rg_to_part,
comp_stats,
uncomp_bfr, // unused, but contains data for later write to sink
Expand Down Expand Up @@ -2402,7 +2354,6 @@ void writer::impl::write(table_view const& input, std::vector<partition_info> co
chunks,
global_rowgroup_base,
first_rg_in_part,
batch_list,
rg_to_part,
bounce_buffer);

Expand All @@ -2417,18 +2368,17 @@ void writer::impl::write_parquet_data_to_sink(
host_2dspan<EncColumnChunk const> chunks,
host_span<size_t const> global_rowgroup_base,
host_span<int const> first_rg_in_part,
host_span<size_type const> batch_list,
host_span<int const> rg_to_part,
host_span<uint8_t> bounce_buffer)
{
_agg_meta = std::move(updated_agg_meta);
auto const num_columns = chunks.size().second;
_agg_meta = std::move(updated_agg_meta);
auto const num_rowgroups = chunks.size().first;
auto const num_columns = chunks.size().second;

for (auto b = 0, r = 0; b < static_cast<size_type>(batch_list.size()); b++) {
auto const rnext = r + batch_list[b];
if (num_rowgroups != 0) {
std::vector<std::future<void>> write_tasks;

for (; r < rnext; r++) {
for (auto r = 0; r < static_cast<int>(num_rowgroups); r++) {
int const p = rg_to_part[r];
int const global_r = global_rowgroup_base[p] + r - first_rg_in_part[p];
auto& row_group = _agg_meta->file(p).row_groups[global_r];
Expand Down Expand Up @@ -2472,10 +2422,9 @@ void writer::impl::write_parquet_data_to_sink(
auto const h_pages = cudf::detail::make_host_vector_sync(pages, _stream);

// add column and offset indexes to metadata
for (auto b = 0, r = 0; b < static_cast<size_type>(batch_list.size()); b++) {
auto const rnext = r + batch_list[b];
auto curr_page_idx = chunks[r][0].first_page;
for (; r < rnext; r++) {
if (num_rowgroups != 0) {
auto curr_page_idx = chunks[0][0].first_page;
for (auto r = 0; r < static_cast<int>(num_rowgroups); r++) {
int const p = rg_to_part[r];
int const global_r = global_rowgroup_base[p] + r - first_rg_in_part[p];
auto const& row_group = _agg_meta->file(p).row_groups[global_r];
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ class writer::impl {
* @param chunks Column chunks
* @param global_rowgroup_base Numbers of rowgroups in each file/partition
* @param first_rg_in_part The first rowgroup in each partition
* @param batch_list The batches of rowgroups to encode
* @param rg_to_part A map from rowgroup to partition
* @param[out] bounce_buffer Temporary host output buffer
*/
Expand All @@ -138,7 +137,6 @@ class writer::impl {
host_2dspan<EncColumnChunk const> chunks,
host_span<size_t const> global_rowgroup_base,
host_span<int const> first_rg_in_part,
host_span<size_type const> batch_list,
host_span<int const> rg_to_part,
host_span<uint8_t> bounce_buffer);

Expand Down

0 comments on commit 61e116e

Please sign in to comment.