Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ZSTD compression in ORC and Parquet writers #11551

Merged
merged 72 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
d2b9a1c
add ZSTD compression to the adapter
vuule Aug 17, 2022
fd4c440
C++ changes
vuule Aug 17, 2022
61d607b
Python changes
vuule Aug 17, 2022
782b435
cmake workaround
vuule Aug 17, 2022
2478bd6
untested Parquet C++
vuule Aug 17, 2022
6291190
tmp tests
vuule Aug 17, 2022
e2c6109
Merge branch 'fea-nvcomp-zstd-comp' of https://github.com/vuule/cudf …
vuule Aug 17, 2022
4641c11
Parquet Python
vuule Aug 18, 2022
70c30f6
revert temp tests
vuule Aug 18, 2022
198f169
py tests
vuule Aug 18, 2022
0dbf2a0
style :D
vuule Aug 18, 2022
f000bb8
update java compression types
vuule Aug 18, 2022
a5f81ed
compression block limit in ORC
vuule Aug 18, 2022
377864f
ORC compression check
vuule Aug 18, 2022
78293d9
Parquet page limit, compression check, correct output size + HACK
vuule Aug 18, 2022
89a0b0b
fixed Pq test
vuule Aug 18, 2022
d9b2dce
fix Parquet w/o compression; check scratch buffer alignment
vuule Aug 19, 2022
bc75ce7
comp_in padding
vuule Aug 19, 2022
e5c9e1a
remove printf
vuule Aug 19, 2022
d56e6b3
style
vuule Aug 19, 2022
d059f58
reduce scope of comp_in/out in ORC writer
vuule Aug 19, 2022
9629fe9
nvcomp input dump
vuule Aug 22, 2022
1fcb732
fix dump
vuule Aug 22, 2022
61d019c
Merge branch 'branch-22.10' of https://github.com/rapidsai/cudf into …
vuule Aug 23, 2022
f75f107
remove output chunk size workaround
vuule Aug 23, 2022
3c4ad50
fix dump some more
vuule Aug 23, 2022
a550ee2
improve dump naming
vuule Aug 23, 2022
56af125
nvcomp output dump
vuule Aug 25, 2022
48a4f1d
clean up
vuule Aug 25, 2022
3ccc6b9
skip pages too large for ZSTD
vuule Aug 26, 2022
e3fe520
only skip compression of large chunks for ZSTD
vuule Aug 26, 2022
86812cb
page fragment scaling (tmp!)
vuule Aug 26, 2022
4fce557
avoid calling get max comp size for oversized pages
vuule Aug 26, 2022
3fd04a6
remove constexpr
vuule Aug 26, 2022
d977da2
rename
vuule Aug 26, 2022
868d165
ORC alignment
vuule Aug 29, 2022
a28c903
Merge branch 'branch-22.10' of https://github.com/rapidsai/cudf into …
vuule Aug 29, 2022
40307fd
initialize decompression_status arrays
vuule Aug 30, 2022
5bc095c
comp status refactor
vuule Aug 30, 2022
ca365bf
docs
vuule Aug 30, 2022
1293396
remove comp debug dump
vuule Aug 30, 2022
b10ad1d
remove encoded_data padding
vuule Aug 30, 2022
1016b2c
more renaming
vuule Aug 30, 2022
6d21074
comment
vuule Aug 30, 2022
caf1102
undo cmake workaround
vuule Aug 31, 2022
4e3eb90
update test
vuule Aug 31, 2022
812e5b6
status return fix
vuule Aug 31, 2022
13b211a
print skipped count [TEMP]
vuule Aug 31, 2022
c8b8a33
alignment
vuule Sep 1, 2022
a620ce9
central max chunk size; other clean up
vuule Sep 1, 2022
3039afa
max page size clean up
vuule Sep 1, 2022
a14c2a0
remove unused header
vuule Sep 1, 2022
c06acf9
minor clean up
vuule Sep 1, 2022
e22fd6f
fix pq test
vuule Sep 1, 2022
5a6377a
style fix
vuule Sep 1, 2022
e8c2161
move definitions back to cpp
vuule Sep 1, 2022
714da71
fix deflate alignment
vuule Sep 1, 2022
7c8f571
remove debug code
vuule Sep 1, 2022
c8c1f41
docs
vuule Sep 1, 2022
2a8d082
rename to keep up with nvcomp
vuule Sep 2, 2022
d0554c1
Merge branch 'branch-22.10' of https://github.com/rapidsai/cudf into …
vuule Sep 2, 2022
0e423ec
cast; fix comp_buffer size
vuule Sep 2, 2022
e296815
use switch
vuule Sep 2, 2022
96b75df
style
vuule Sep 2, 2022
83006f9
separate input and output alignment
vuule Sep 2, 2022
58f8439
copyright year
vuule Sep 2, 2022
aaaaa12
Merge branch 'branch-22.10' of https://github.com/rapidsai/cudf into …
vuule Sep 6, 2022
8bf2a11
nvcomp version fallback in is_compression_enabled
vuule Sep 7, 2022
632a6a9
rename status to result to match the type name
vuule Sep 9, 2022
282af9d
address Python code review
vuule Sep 9, 2022
f766f00
style
vuule Sep 10, 2022
1f60695
Merge branch 'branch-22.10' into fea-nvcomp-zstd-comp
vuule Sep 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions cpp/src/io/avro/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ rmm::device_buffer decompress_data(datasource& source,
if (meta.codec == "deflate") {
auto inflate_in = hostdevice_vector<device_span<uint8_t const>>(meta.block_list.size(), stream);
auto inflate_out = hostdevice_vector<device_span<uint8_t>>(meta.block_list.size(), stream);
auto inflate_stats = hostdevice_vector<decompress_status>(meta.block_list.size(), stream);
auto inflate_stats = hostdevice_vector<compression_result>(meta.block_list.size(), stream);
thrust::fill(rmm::exec_policy(stream),
inflate_stats.d_begin(),
inflate_stats.d_end(),
compression_result{0, compression_status::FAILURE});

// Guess an initial maximum uncompressed block size. We estimate the compression factor is two
// and round up to the next multiple of 4096 bytes.
Expand All @@ -190,8 +194,6 @@ rmm::device_buffer decompress_data(datasource& source,

for (int loop_cnt = 0; loop_cnt < 2; loop_cnt++) {
inflate_out.host_to_device(stream);
CUDF_CUDA_TRY(cudaMemsetAsync(
inflate_stats.device_ptr(), 0, inflate_stats.memory_size(), stream.value()));
gpuinflate(inflate_in, inflate_out, inflate_stats, gzip_header_included::NO, stream);
inflate_stats.device_to_host(stream, true);

Expand All @@ -204,9 +206,9 @@ rmm::device_buffer decompress_data(datasource& source,
inflate_stats.begin(),
std::back_inserter(actual_uncomp_sizes),
[](auto const& inf_out, auto const& inf_stats) {
// If error status is 1 (buffer too small), the `bytes_written` field
// If error status is OUTPUT_OVERFLOW, the `bytes_written` field
// actually contains the uncompressed data size
return inf_stats.status == 1
return inf_stats.status == compression_status::OUTPUT_OVERFLOW
vuule marked this conversation as resolved.
Show resolved Hide resolved
? std::max(inf_out.size(), inf_stats.bytes_written)
: inf_out.size();
});
Expand Down
15 changes: 8 additions & 7 deletions cpp/src/io/comp/debrotli.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1906,15 +1906,15 @@ static __device__ void ProcessCommands(debrotli_state_s* s, const brotli_diction
*
* @param[in] inputs Source buffer per block
* @param[out] outputs Destination buffer per block
* @param[out] statuses Decompressor status per block
* @param[out] results Decompressor status per block
* @param scratch Intermediate device memory heap space (will be dynamically shared between blocks)
* @param scratch_size Size of scratch heap space (smaller sizes may result in serialization between
* blocks)
*/
__global__ void __launch_bounds__(block_size, 2)
gpu_debrotli_kernel(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
device_span<compression_result> results,
uint8_t* scratch,
uint32_t scratch_size)
{
Expand Down Expand Up @@ -2016,10 +2016,11 @@ __global__ void __launch_bounds__(block_size, 2)
__syncthreads();
// Output decompression status
if (!t) {
statuses[block_id].bytes_written = s->out - s->outbase;
statuses[block_id].status = s->error;
results[block_id].bytes_written = s->out - s->outbase;
results[block_id].status =
(s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE;
// Return ext heap used by last block (statistics)
statuses[block_id].reserved = s->fb_size;
results[block_id].reserved = s->fb_size;
}
}

Expand Down Expand Up @@ -2079,7 +2080,7 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs)

void gpu_debrotli(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
device_span<compression_result> results,
void* scratch,
size_t scratch_size,
rmm::cuda_stream_view stream)
Expand All @@ -2104,7 +2105,7 @@ void gpu_debrotli(device_span<device_span<uint8_t const> const> inputs,
cudaMemcpyHostToDevice,
stream.value()));
gpu_debrotli_kernel<<<dim_grid, dim_block, 0, stream.value()>>>(
inputs, outputs, statuses, scratch_u8, fb_heap_size);
inputs, outputs, results, scratch_u8, fb_heap_size);
#if DUMP_FB_HEAP
uint32_t dump[2];
uint32_t cur = 0;
Expand Down
20 changes: 13 additions & 7 deletions cpp/src/io/comp/gpuinflate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1020,14 +1020,14 @@ __device__ int parse_gzip_header(const uint8_t* src, size_t src_size)
* @tparam block_size Thread block dimension for this call
* @param inputs Source and destination buffer information per block
* @param outputs Destination buffer information per block
* @param statuses Decompression status buffer per block
* @param results Decompression status buffer per block
* @param parse_hdr If nonzero, indicates that the compressed bitstream includes a GZIP header
*/
template <int block_size>
__global__ void __launch_bounds__(block_size)
inflate_kernel(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
device_span<compression_result> results,
gzip_header_included parse_hdr)
{
__shared__ __align__(16) inflate_state_s state_g;
Expand Down Expand Up @@ -1133,9 +1133,15 @@ __global__ void __launch_bounds__(block_size)
// Output buffer too small
state->err = 1;
}
statuses[z].bytes_written = state->out - state->outbase;
statuses[z].status = state->err;
statuses[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes
results[z].bytes_written = state->out - state->outbase;
results[z].status = [&]() {
switch (state->err) {
case 0: return compression_status::SUCCESS;
case 1: return compression_status::OUTPUT_OVERFLOW;
default: return compression_status::FAILURE;
}
}();
results[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes
}
}

Expand Down Expand Up @@ -1200,14 +1206,14 @@ __global__ void __launch_bounds__(1024)

void gpuinflate(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
device_span<compression_result> results,
gzip_header_included parse_hdr,
rmm::cuda_stream_view stream)
{
constexpr int block_size = 128; // Threads per block
if (inputs.size() > 0) {
inflate_kernel<block_size>
<<<inputs.size(), block_size, 0, stream.value()>>>(inputs, outputs, statuses, parse_hdr);
<<<inputs.size(), block_size, 0, stream.value()>>>(inputs, outputs, results, parse_hdr);
}
}

Expand Down
32 changes: 21 additions & 11 deletions cpp/src/io/comp/gpuinflate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,21 @@ namespace cudf {
namespace io {

/**
* @brief Output parameters for the decompression interface
* @brief Status of a compression/decompression operation.
*/
struct decompress_status {
enum class compression_status : uint8_t {
SUCCESS, ///< Successful, output is valid
FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way)
SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used)
OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output
};

/**
* @brief Descriptor of compression/decompression result.
*/
struct compression_result {
uint64_t bytes_written;
uint32_t status;
compression_status status;
uint32_t reserved;
};

Expand All @@ -44,13 +54,13 @@ enum class gzip_header_included { NO, YES };
*
* @param[in] inputs List of input buffers
* @param[out] outputs List of output buffers
* @param[out] statuses List of output status structures
* @param[out] results List of output status structures
* @param[in] parse_hdr Whether or not to parse GZIP header
* @param[in] stream CUDA stream to use
*/
void gpuinflate(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
device_span<compression_result> results,
gzip_header_included parse_hdr,
rmm::cuda_stream_view stream);

Expand All @@ -73,12 +83,12 @@ void gpu_copy_uncompressed_blocks(device_span<device_span<uint8_t const> const>
*
* @param[in] inputs List of input buffers
* @param[out] outputs List of output buffers
* @param[out] statuses List of output status structures
* @param[out] results List of output status structures
* @param[in] stream CUDA stream to use
*/
void gpu_unsnap(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
device_span<compression_result> results,
rmm::cuda_stream_view stream);

/**
Expand All @@ -98,14 +108,14 @@ size_t get_gpu_debrotli_scratch_size(int max_num_inputs = 0);
*
* @param[in] inputs List of input buffers
* @param[out] outputs List of output buffers
* @param[out] statuses List of output status structures
* @param[out] results List of output status structures
* @param[in] scratch Temporary memory for intermediate work
* @param[in] scratch_size Size in bytes of the temporary memory
* @param[in] stream CUDA stream to use
*/
void gpu_debrotli(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
device_span<compression_result> results,
void* scratch,
size_t scratch_size,
rmm::cuda_stream_view stream);
Expand All @@ -118,12 +128,12 @@ void gpu_debrotli(device_span<device_span<uint8_t const> const> inputs,
*
* @param[in] inputs List of input buffers
* @param[out] outputs List of output buffers
* @param[out] statuses List of output status structures
* @param[out] results List of output status structures
* @param[in] stream CUDA stream to use
*/
void gpu_snap(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
device_span<compression_result> results,
rmm::cuda_stream_view stream);

} // namespace io
Expand Down
Loading