Skip to content

Commit

Permalink
Review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
devavret committed Sep 9, 2021
1 parent bfa1366 commit 203cf15
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -890,17 +890,19 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>&
stream.synchronize();
}

void snappy_compress(device_span<gpu_inflate_input_s> comp_in,
void snappy_compress(device_span<gpu_inflate_input_s const> comp_in,
device_span<gpu_inflate_status_s> comp_stat,
size_t max_page_uncomp_data_size,
rmm::cuda_stream_view stream)
{
size_t num_comp_pages = comp_in.size();
do {
try {
size_t temp_size;
nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize(
num_comp_pages, max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &temp_size);
if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; }

CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess,
"Error in getting snappy compression scratch size");

// Not needed now but nvcomp API makes no promises about future
rmm::device_buffer scratch(temp_size, stream);
Expand Down Expand Up @@ -937,7 +939,7 @@ void snappy_compress(device_span<gpu_inflate_input_s> comp_in,
nvcompBatchedSnappyDefaultOpts,
stream.value());

if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; }
CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression");

// nvcomp also doesn't use comp_out.status . It guarantees that given enough output space,
// compression will succeed.
Expand All @@ -952,13 +954,13 @@ void snappy_compress(device_span<gpu_inflate_input_s> comp_in,
return status;
});
return;
} while (0);

// If we reach this then there was an error in compressing so set an error status for each page
thrust::for_each(rmm::exec_policy(stream),
comp_stat.begin(),
comp_stat.end(),
[] __device__(gpu_inflate_status_s & stat) { stat.status = 1; });
} catch (...) {
// If we reach this then there was an error in compressing so set an error status for each page
thrust::for_each(rmm::exec_policy(stream),
comp_stat.begin(),
comp_stat.end(),
[] __device__(gpu_inflate_status_s & stat) { stat.status = 1; });
};
}

void writer::impl::encode_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
Expand Down Expand Up @@ -1264,11 +1266,10 @@ void writer::impl::write(table_view const& table)

size_t max_page_comp_data_size = 0;
if (compression_ != parquet::Compression::UNCOMPRESSED) {
CUDF_EXPECTS(
nvcompStatus_t::nvcompSuccess ==
nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size),
"Error in getting compressed size from nvcomp");
auto status = nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size);
CUDF_EXPECTS(status == nvcompStatus_t::nvcompSuccess,
"Error in getting compressed size from nvcomp");
}

// Initialize batches of rowgroups to encode (mainly to limit peak memory usage)
Expand Down

0 comments on commit 203cf15

Please sign in to comment.