diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 831e082a8bf..0382a7bb7ba 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -890,17 +890,19 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector& stream.synchronize(); } -void snappy_compress(device_span comp_in, +void snappy_compress(device_span comp_in, device_span comp_stat, size_t max_page_uncomp_data_size, rmm::cuda_stream_view stream) { size_t num_comp_pages = comp_in.size(); - do { + try { size_t temp_size; nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( num_comp_pages, max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &temp_size); - if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } + + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, + "Error in getting snappy compression scratch size"); // Not needed now but nvcomp API makes no promises about future rmm::device_buffer scratch(temp_size, stream); @@ -937,7 +939,7 @@ void snappy_compress(device_span comp_in, nvcompBatchedSnappyDefaultOpts, stream.value()); - if (nvcomp_status != nvcompStatus_t::nvcompSuccess) { break; } + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, // compression will succeed. @@ -952,13 +954,13 @@ void snappy_compress(device_span comp_in, return status; }); return; - } while (0); - - // If we reach this then there was an error in compressing so set an error status for each page - thrust::for_each(rmm::exec_policy(stream), - comp_stat.begin(), - comp_stat.end(), - [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + } catch (...) { + // If we reach this then there was an error in compressing so set an error status for each page + thrust::for_each(rmm::exec_policy(stream), + comp_stat.begin(), + comp_stat.end(), + [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + }; } void writer::impl::encode_pages(hostdevice_2dvector& chunks, @@ -1264,11 +1266,10 @@ void writer::impl::write(table_view const& table) size_t max_page_comp_data_size = 0; if (compression_ != parquet::Compression::UNCOMPRESSED) { - CUDF_EXPECTS( - nvcompStatus_t::nvcompSuccess == - nvcompBatchedSnappyCompressGetMaxOutputChunkSize( - max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size), - "Error in getting compressed size from nvcomp"); + auto status = nvcompBatchedSnappyCompressGetMaxOutputChunkSize( + max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &max_page_comp_data_size); + CUDF_EXPECTS(status == nvcompStatus_t::nvcompSuccess, + "Error in getting compressed size from nvcomp"); } // Initialize batches of rowgroups to encode (mainly to limit peak memory usage)