-
Notifications
You must be signed in to change notification settings - Fork 923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use nvcomp's snappy compressor in ORC writer #9242
Changes from 29 commits
db23741
a5f3363
61018aa
64d7d1c
27764e7
95a57ec
cc9500a
7989b9c
f90409c
40ebd1e
4a2cb24
6019b0f
140d3d0
05f5343
62d92b4
3c73be3
e0a013d
7501b11
bfa1366
203cf15
99e4f80
6721fb8
5391e13
354e229
66d49e8
4e78529
8ed68ef
8b471de
34a42c3
0569281
11e20e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,9 @@ | |
|
||
#include <cub/cub.cuh> | ||
#include <rmm/cuda_stream_view.hpp> | ||
#include <rmm/exec_policy.hpp> | ||
|
||
#include <nvcomp/snappy.h> | ||
|
||
namespace cudf { | ||
namespace io { | ||
|
@@ -1102,15 +1105,17 @@ __global__ void __launch_bounds__(1024) | |
* @param[out] comp_out Per-block compression status | ||
* @param[in] compressed_bfr Compression output buffer | ||
* @param[in] comp_blk_size Compression block size | ||
* @param[in] max_comp_blk_size Max size of any block after compression | ||
*/ | ||
// blockDim {256,1,1} | ||
__global__ void __launch_bounds__(256) | ||
gpuInitCompressionBlocks(device_2dspan<StripeStream const> strm_desc, | ||
device_2dspan<encoder_chunk_streams> streams, // const? | ||
gpu_inflate_input_s* comp_in, | ||
gpu_inflate_status_s* comp_out, | ||
device_span<gpu_inflate_input_s> comp_in, | ||
device_span<gpu_inflate_status_s> comp_out, | ||
uint8_t* compressed_bfr, | ||
uint32_t comp_blk_size) | ||
uint32_t comp_blk_size, | ||
uint32_t max_comp_blk_size) | ||
{ | ||
__shared__ __align__(16) StripeStream ss; | ||
__shared__ uint8_t* volatile uncomp_base_g; | ||
|
@@ -1135,8 +1140,8 @@ __global__ void __launch_bounds__(256) | |
uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); | ||
blk_in->srcDevice = src + b * comp_blk_size; | ||
blk_in->srcSize = blk_size; | ||
blk_in->dstDevice = dst + b * (3 + comp_blk_size) + 3; // reserve 3 bytes for block header | ||
blk_in->dstSize = blk_size; | ||
blk_in->dstDevice = dst + b * (3 + max_comp_blk_size) + 3; // reserve 3 bytes for block header | ||
blk_in->dstSize = max_comp_blk_size; | ||
devavret marked this conversation as resolved.
Show resolved
Hide resolved
|
||
blk_out->bytes_written = blk_size; | ||
blk_out->status = 1; | ||
blk_out->reserved = 0; | ||
|
@@ -1153,14 +1158,16 @@ __global__ void __launch_bounds__(256) | |
* @param[in] comp_out Per-block compression status | ||
* @param[in] compressed_bfr Compression output buffer | ||
* @param[in] comp_blk_size Compression block size | ||
* @param[in] max_comp_blk_size Max size of any block after compression | ||
*/ | ||
// blockDim {1024,1,1} | ||
__global__ void __launch_bounds__(1024) | ||
gpuCompactCompressedBlocks(device_2dspan<StripeStream> strm_desc, | ||
gpu_inflate_input_s* comp_in, | ||
gpu_inflate_status_s* comp_out, | ||
device_span<gpu_inflate_input_s> comp_in, | ||
device_span<gpu_inflate_status_s> comp_out, | ||
uint8_t* compressed_bfr, | ||
uint32_t comp_blk_size) | ||
uint32_t comp_blk_size, | ||
uint32_t max_comp_blk_size) | ||
{ | ||
__shared__ __align__(16) StripeStream ss; | ||
__shared__ const uint8_t* volatile comp_src_g; | ||
|
@@ -1271,20 +1278,83 @@ void CompressOrcDataStreams(uint8_t* compressed_data, | |
uint32_t num_compressed_blocks, | ||
CompressionKind compression, | ||
uint32_t comp_blk_size, | ||
uint32_t max_comp_blk_size, | ||
device_2dspan<StripeStream> strm_desc, | ||
device_2dspan<encoder_chunk_streams> enc_streams, | ||
gpu_inflate_input_s* comp_in, | ||
gpu_inflate_status_s* comp_out, | ||
device_span<gpu_inflate_input_s> comp_in, | ||
device_span<gpu_inflate_status_s> comp_out, | ||
rmm::cuda_stream_view stream) | ||
{ | ||
dim3 dim_block_init(256, 1); | ||
dim3 dim_grid(strm_desc.size().first, strm_desc.size().second); | ||
gpuInitCompressionBlocks<<<dim_grid, dim_block_init, 0, stream.value()>>>( | ||
strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size); | ||
if (compression == SNAPPY) { gpu_snap(comp_in, comp_out, num_compressed_blocks, stream); } | ||
strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size); | ||
if (compression == SNAPPY) { | ||
vuule marked this conversation as resolved.
Show resolved
Hide resolved
|
||
auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP"); | ||
bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0; | ||
if (use_nvcomp) { | ||
try { | ||
size_t temp_size; | ||
nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( | ||
num_compressed_blocks, comp_blk_size, nvcompBatchedSnappyDefaultOpts, &temp_size); | ||
|
||
CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, | ||
"Error in getting snappy compression scratch size"); | ||
|
||
rmm::device_buffer scratch(temp_size, stream); | ||
rmm::device_uvector<void const*> uncompressed_data_ptrs(num_compressed_blocks, stream); | ||
rmm::device_uvector<size_t> uncompressed_data_sizes(num_compressed_blocks, stream); | ||
rmm::device_uvector<void*> compressed_data_ptrs(num_compressed_blocks, stream); | ||
rmm::device_uvector<size_t> compressed_bytes_written(num_compressed_blocks, stream); | ||
|
||
auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), | ||
uncompressed_data_sizes.begin(), | ||
compressed_data_ptrs.begin()); | ||
thrust::transform(rmm::exec_policy(stream), | ||
comp_in.begin(), | ||
comp_in.end(), | ||
comp_it, | ||
[] __device__(gpu_inflate_input_s in) { | ||
return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); | ||
}); | ||
nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), | ||
uncompressed_data_sizes.data(), | ||
max_comp_blk_size, | ||
num_compressed_blocks, | ||
scratch.data(), | ||
scratch.size(), | ||
compressed_data_ptrs.data(), | ||
compressed_bytes_written.data(), | ||
nvcompBatchedSnappyDefaultOpts, | ||
stream.value()); | ||
|
||
CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); | ||
|
||
thrust::transform(rmm::exec_policy(stream), | ||
compressed_bytes_written.begin(), | ||
compressed_bytes_written.end(), | ||
comp_out.begin(), | ||
[] __device__(size_t size) { | ||
gpu_inflate_status_s status{}; | ||
status.bytes_written = size; | ||
return status; | ||
Comment on lines
+1333
to
+1340
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be replaced with a [1] https://thrust.github.io/doc/classthrust_1_1transform__output__iterator.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see. Too bad! Thanks for clarifying 👍 |
||
}); | ||
} catch (...) { | ||
// If we reach this then there was an error in compressing so set an error status for each | ||
// block | ||
thrust::for_each(rmm::exec_policy(stream), | ||
comp_out.begin(), | ||
comp_out.end(), | ||
[] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); | ||
}; | ||
|
||
} else { | ||
gpu_snap(comp_in.data(), comp_out.data(), num_compressed_blocks, stream); | ||
} | ||
} | ||
dim3 dim_block_compact(1024, 1); | ||
gpuCompactCompressedBlocks<<<dim_grid, dim_block_compact, 0, stream.value()>>>( | ||
strm_desc, comp_in, comp_out, compressed_data, comp_blk_size); | ||
strm_desc, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size); | ||
} | ||
|
||
} // namespace gpu | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it mildly worse to pass a span here if you don't ever use the size? In the sense of having to hypothetically push another argument on the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I see it, the trade-off is that with span it's clear that the parameter is an array, not a pointer to a single object. Now, whether this is a good trade-off is up for debate :D