Skip to content

Commit

Permalink
Use per-page max compressed size estimate for compression (#11066)
Browse files Browse the repository at this point in the history
Closes #10857 

The current behaviour of parquet writer is to get the estimate for maximum page compressed size by first finding the maximum page size and using nvcomp's `nvcompBatchedSnappyCompressGetMaxOutputChunkSize` API once for the largest page. The total output memory is allocated for max_compressed_page_size * num_pages.

This approach is pessimistic and over-allocates output buffer for batched compression.

This PR changes this to call `nvcompBatchedSnappyCompressGetMaxOutputChunkSize` for each page and sum up the result to get the output buffer size. This greatly reduces the peak memory consumption of parquet writer so the compression step is no longer the bottleneck.

Authors:
  - Devavret Makkar (https://github.com/devavret)

Approvers:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #11066
  • Loading branch information
devavret authored Jul 11, 2022
1 parent 89a8e70 commit 99eaaef
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 40 deletions.
29 changes: 21 additions & 8 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,11 @@ __global__ void __launch_bounds__(128)
__global__ void __launch_bounds__(128)
gpuInitPages(device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<size_type> page_sizes,
device_span<size_type> comp_page_sizes,
device_span<parquet_column_device_view const> col_desc,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows)
Expand Down Expand Up @@ -299,13 +300,16 @@ __global__ void __launch_bounds__(128)
page_g.num_leaf_values = ck_g.num_dict_entries;
page_g.num_values = ck_g.num_dict_entries; // TODO: shouldn't matter for dict page
page_offset += page_g.max_hdr_size + page_g.max_data_size;
comp_page_offset += page_g.max_hdr_size + max_page_comp_data_size;
if (not comp_page_sizes.empty()) {
comp_page_offset += page_g.max_hdr_size + comp_page_sizes[ck_g.first_page];
}
page_headers_size += page_g.max_hdr_size;
max_page_data_size = max(max_page_data_size, page_g.max_data_size);
}
__syncwarp();
if (t == 0) {
if (not pages.empty()) pages[ck_g.first_page] = page_g;
if (not page_sizes.empty()) page_sizes[ck_g.first_page] = page_g.max_data_size;
if (page_grstats) page_grstats[ck_g.first_page] = pagestats_g;
}
num_pages = 1;
Expand Down Expand Up @@ -366,8 +370,10 @@ __global__ void __launch_bounds__(128)
}
page_g.max_hdr_size += stats_hdr_len;
}
page_g.page_data = ck_g.uncompressed_bfr + page_offset;
page_g.compressed_data = ck_g.compressed_bfr + comp_page_offset;
page_g.page_data = ck_g.uncompressed_bfr + page_offset;
if (not comp_page_sizes.empty()) {
page_g.compressed_data = ck_g.compressed_bfr + comp_page_offset;
}
page_g.start_row = cur_row;
page_g.num_rows = rows_in_page;
page_g.num_leaf_values = leaf_values_in_page;
Expand All @@ -389,7 +395,9 @@ __global__ void __launch_bounds__(128)
pagestats_g.start_chunk = ck_g.first_fragment + page_start;
pagestats_g.num_chunks = page_g.num_fragments;
page_offset += page_g.max_hdr_size + page_g.max_data_size;
comp_page_offset += page_g.max_hdr_size + max_page_comp_data_size;
if (not comp_page_sizes.empty()) {
comp_page_offset += page_g.max_hdr_size + comp_page_sizes[ck_g.first_page + num_pages];
}
page_headers_size += page_g.max_hdr_size;
max_page_data_size = max(max_page_data_size, page_g.max_data_size);
cur_row += rows_in_page;
Expand All @@ -398,7 +406,9 @@ __global__ void __launch_bounds__(128)
__syncwarp();
if (t == 0) {
if (not pages.empty()) { pages[ck_g.first_page + num_pages] = page_g; }

if (not page_sizes.empty()) {
page_sizes[ck_g.first_page + num_pages] = page_g.max_data_size;
}
if (page_grstats) { page_grstats[ck_g.first_page + num_pages] = pagestats_g; }
}

Expand Down Expand Up @@ -431,6 +441,7 @@ __global__ void __launch_bounds__(128)
ck_g.bfr_size = page_offset;
ck_g.page_headers_size = page_headers_size;
ck_g.max_page_data_size = max_page_data_size;
if (not comp_page_sizes.empty()) { ck_g.compressed_size = comp_page_offset; }
pagestats_g.start_chunk = ck_g.first_page + ck_g.use_dictionary; // Exclude dictionary
pagestats_g.num_chunks = num_pages - ck_g.use_dictionary;
}
Expand Down Expand Up @@ -1977,23 +1988,25 @@ void InitFragmentStatistics(device_2dspan<statistics_group> groups,

void InitEncoderPages(device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<size_type> page_sizes,
device_span<size_type> comp_page_sizes,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
rmm::cuda_stream_view stream)
{
auto num_rowgroups = chunks.size().first;
dim3 dim_grid(num_columns, num_rowgroups); // 1 threadblock per rowgroup
gpuInitPages<<<dim_grid, 128, 0, stream.value()>>>(chunks,
pages,
page_sizes,
comp_page_sizes,
col_desc,
page_grstats,
chunk_grstats,
max_page_comp_data_size,
num_columns,
max_page_size_bytes,
max_page_size_rows);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,13 +573,14 @@ void get_dictionary_indices(cudf::detail::device_2dspan<gpu::PageFragment const>
*/
void InitEncoderPages(cudf::detail::device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
device_span<size_type> page_sizes,
device_span<size_type> comp_page_sizes,
device_span<parquet_column_device_view const> col_desc,
int32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
statistics_merge_group* page_grstats,
statistics_merge_group* chunk_grstats,
size_t max_page_comp_data_size,
rmm::cuda_stream_view stream);

/**
Expand Down
85 changes: 65 additions & 20 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -854,22 +854,77 @@ void writer::impl::gather_fragment_statistics(
stream.synchronize();
}

void writer::impl::init_page_sizes(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
device_span<gpu::parquet_column_device_view const> col_desc,
uint32_t num_columns)
auto init_page_sizes(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
device_span<gpu::parquet_column_device_view const> col_desc,
uint32_t num_columns,
size_t max_page_size_bytes,
size_type max_page_size_rows,
rmm::cuda_stream_view stream)
{
if (chunks.is_empty()) { return hostdevice_vector<size_type>{}; }

chunks.host_to_device(stream);
// Calculate number of pages and store in respective chunks
gpu::InitEncoderPages(chunks,
{},
{},
{},
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
nullptr,
nullptr,
stream);
chunks.device_to_host(stream, true);

int num_pages = 0;
for (auto& chunk : chunks.host_view().flat_view()) {
chunk.first_page = num_pages;
num_pages += chunk.num_pages;
}
chunks.host_to_device(stream);

// Now that we know the number of pages, allocate an array to hold per page size and get it
// populated
hostdevice_vector<size_type> page_sizes(num_pages, stream);
gpu::InitEncoderPages(chunks,
{},
page_sizes,
{},
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
nullptr,
nullptr,
stream);
page_sizes.device_to_host(stream, true);

// Get per-page max compressed size
hostdevice_vector<size_type> comp_page_sizes(num_pages, stream);
std::transform(page_sizes.begin(), page_sizes.end(), comp_page_sizes.begin(), [](auto page_size) {
size_t page_comp_max_size = 0;
nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
page_size, nvcompBatchedSnappyDefaultOpts, &page_comp_max_size);
return page_comp_max_size;
});
comp_page_sizes.host_to_device(stream);

// Use per-page max compressed size to calculate chunk.compressed_size
gpu::InitEncoderPages(chunks,
{},
{},
comp_page_sizes,
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
nullptr,
nullptr,
0,
stream);
chunks.device_to_host(stream, true);
return comp_page_sizes;
}

auto build_chunk_dictionaries(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
Expand Down Expand Up @@ -961,9 +1016,9 @@ auto build_chunk_dictionaries(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
void writer::impl::init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
device_span<gpu::parquet_column_device_view const> col_desc,
device_span<gpu::EncPage> pages,
hostdevice_vector<size_type>& comp_page_sizes,
statistics_chunk* page_stats,
statistics_chunk* frag_stats,
size_t max_page_comp_data_size,
uint32_t num_columns,
uint32_t num_pages,
uint32_t num_stats_bfr)
Expand All @@ -972,13 +1027,14 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>&
chunks.host_to_device(stream);
InitEncoderPages(chunks,
pages,
{},
comp_page_sizes,
col_desc,
num_columns,
max_page_size_bytes,
max_page_size_rows,
(num_stats_bfr) ? page_stats_mrg.data() : nullptr,
(num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr,
max_page_comp_data_size,
stream);
if (num_stats_bfr > 0) {
detail::merge_group_statistics<detail::io_file_format::PARQUET>(
Expand Down Expand Up @@ -1048,7 +1104,6 @@ void snappy_compress(device_span<device_span<uint8_t const> const> comp_in,
compressed_bytes_written.data(),
nvcompBatchedSnappyDefaultOpts,
stream.value());

CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression");

// nvcomp also doesn't use comp_out.status . It guarantees that given enough output space,
Expand Down Expand Up @@ -1409,7 +1464,8 @@ void writer::impl::write(table_view const& table, std::vector<partition_info> co
}

// Build chunk dictionaries and count pages
if (num_chunks != 0) { init_page_sizes(chunks, col_desc, num_columns); }
hostdevice_vector<size_type> comp_page_sizes =
init_page_sizes(chunks, col_desc, num_columns, max_page_size_bytes, max_page_size_rows, stream);

// Get the maximum page size across all chunks
size_type max_page_uncomp_data_size =
Expand All @@ -1420,14 +1476,6 @@ void writer::impl::write(table_view const& table, std::vector<partition_info> co
return std::max(max_page_size, chunk.max_page_data_size);
});

size_t max_page_comp_data_size = 0;
if (compression_ != parquet::Compression::UNCOMPRESSED) {
auto status = nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size);
CUDF_EXPECTS(status == nvcompStatus_t::nvcompSuccess,
"Error in getting compressed size from nvcomp");
}

// Find which partition a rg belongs to
std::vector<int> rg_to_part;
for (size_t p = 0; p < num_rg_in_part.size(); ++p) {
Expand All @@ -1454,8 +1502,6 @@ void writer::impl::write(table_view const& table, std::vector<partition_info> co
num_pages += ck->num_pages;
pages_in_batch += ck->num_pages;
rowgroup_size += ck->bfr_size;
ck->compressed_size =
ck->ck_stat_size + ck->page_headers_size + max_page_comp_data_size * ck->num_pages;
comp_rowgroup_size += ck->compressed_size;
max_chunk_bfr_size =
std::max(max_chunk_bfr_size, (size_t)std::max(ck->bfr_size, ck->compressed_size));
Expand Down Expand Up @@ -1510,9 +1556,9 @@ void writer::impl::write(table_view const& table, std::vector<partition_info> co
init_encoder_pages(chunks,
col_desc,
{pages.data(), pages.size()},
comp_page_sizes,
(num_stats_bfr) ? page_stats.data() : nullptr,
(num_stats_bfr) ? frag_stats.data() : nullptr,
max_page_comp_data_size,
num_columns,
num_pages,
num_stats_bfr);
Expand All @@ -1528,7 +1574,6 @@ void writer::impl::write(table_view const& table, std::vector<partition_info> co
auto const first_page_in_next_batch =
(rnext < num_rowgroups) ? chunks[rnext][0].first_page : num_pages;
auto const pages_in_batch = first_page_in_next_batch - first_page_in_batch;
// device_span<gpu::EncPage> batch_pages{pages.data() + first_page_in_batch, }
encode_pages(
chunks,
{pages.data(), pages.size()},
Expand Down
12 changes: 1 addition & 11 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,6 @@ class writer::impl {
device_2dspan<gpu::PageFragment const> frag,
device_span<gpu::parquet_column_device_view const> col_desc,
uint32_t num_fragments);
/**
* @brief Build per-chunk dictionaries and count data pages
*
* @param chunks column chunk array
* @param col_desc column description array
* @param num_columns Total number of columns
*/
void init_page_sizes(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
device_span<gpu::parquet_column_device_view const> col_desc,
uint32_t num_columns);

/**
* @brief Initialize encoder pages
Expand All @@ -173,9 +163,9 @@ class writer::impl {
void init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
device_span<gpu::parquet_column_device_view const> col_desc,
device_span<gpu::EncPage> pages,
hostdevice_vector<size_type>& comp_page_sizes,
statistics_chunk* page_stats,
statistics_chunk* frag_stats,
size_t max_page_comp_data_size,
uint32_t num_columns,
uint32_t num_pages,
uint32_t num_stats_bfr);
Expand Down

0 comments on commit 99eaaef

Please sign in to comment.