Skip to content

Commit

Permalink
Reduce peak memory use when writing compressed ORC files. (#12963)
Browse files Browse the repository at this point in the history
This PR changes how the buffer for encoded data is allocated in the ORC writer. Instead of a single buffer for the whole table, each stream of each stripe is allocated separately. 
Since size of the encoded data is not known in advance, buffers are over sized in most cases (decimal types and dictionary encoded data being the exceptions). Resizing these buffers to the exact encoded data size before compression reduces peak memory usage.
The resizing of the encoded buffers is done in the step where row groups are gathered to make contiguous encoded stripe in memory. This way we don't incur additional copies (compared to previous approach to `gather_stripes`).

Other changes:
Removed `compute_offsets` because it is not needed with separate buffers for each stripe/stream.
Refactored parts of `encode_columns` to initialize data buffers + stream descriptors one stripe at a time, allowing future separation into per-stripe processing (for e.g. pipelining).

Impact: internal benchmarks show average reduction of peak memory use of 14% when SNAPPY compression is enabled, with minimal impact on performance.
cub::DeviceMemcpy::Batched can now be used in the ORC writer.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Nghia Truong (https://github.com/ttnghia)
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: #12963
  • Loading branch information
vuule authored Apr 3, 2023
1 parent f3f84f2 commit 09b114e
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 164 deletions.
1 change: 1 addition & 0 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ struct encoder_chunk_streams {
* @brief Struct to describe a column stream within a stripe
*/
struct StripeStream {
uint8_t* data_ptr; // encoded and gathered output
size_t bfr_offset; // Offset of this stream in compressed buffer
uint32_t stream_size; // Size of stream in bytes
uint32_t first_chunk_id; // First chunk of the stripe
Expand Down
55 changes: 21 additions & 34 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ namespace gpu {

using cudf::detail::device_2dspan;

constexpr int scratch_buffer_size = 512 * 4;
constexpr int scratch_buffer_size = 512 * 4;
constexpr int compact_streams_block_size = 1024;

// Apache ORC reader does not handle zero-length patch lists for RLEv2 mode2
// Workaround replaces zero-length patch lists by a dummy zero patch
Expand Down Expand Up @@ -1082,51 +1083,37 @@ __global__ void __launch_bounds__(block_size)
* @param[in,out] strm_desc StripeStream device array [stripe][stream]
* @param[in,out] streams List of encoder chunk streams [column][rowgroup]
*/
// blockDim {1024,1,1}
__global__ void __launch_bounds__(1024)
// blockDim {compact_streams_block_size,1,1}
__global__ void __launch_bounds__(compact_streams_block_size)
gpuCompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> streams)
{
__shared__ __align__(16) StripeStream ss;
__shared__ __align__(16) encoder_chunk_streams strm0;
__shared__ uint8_t* volatile ck_curptr_g;
__shared__ uint32_t volatile ck_curlen_g;

auto const stripe_id = blockIdx.x;
auto const stream_id = blockIdx.y;
uint32_t t = threadIdx.x;
auto const t = threadIdx.x;

if (t == 0) {
ss = strm_desc[stripe_id][stream_id];
strm0 = streams[ss.column_id][ss.first_chunk_id];
}
if (t == 0) { ss = strm_desc[stripe_id][stream_id]; }
__syncthreads();

if (ss.data_ptr == nullptr) { return; }

auto const cid = ss.stream_type;
auto dst_ptr = strm0.data_ptrs[cid] + strm0.lengths[cid];
for (auto group = ss.first_chunk_id + 1; group < ss.first_chunk_id + ss.num_chunks; ++group) {
uint8_t* src_ptr;
uint32_t len;
if (t == 0) {
src_ptr = streams[ss.column_id][group].data_ptrs[cid];
len = streams[ss.column_id][group].lengths[cid];
if (src_ptr != dst_ptr) { streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; }
ck_curptr_g = src_ptr;
ck_curlen_g = len;
}
__syncthreads();
src_ptr = ck_curptr_g;
len = ck_curlen_g;
if (len > 0 && src_ptr != dst_ptr) {
for (uint32_t i = 0; i < len; i += 1024) {
uint8_t v = (i + t < len) ? src_ptr[i + t] : 0;
__syncthreads();
if (i + t < len) { dst_ptr[i + t] = v; }
auto dst_ptr = ss.data_ptr;
for (auto group = ss.first_chunk_id; group < ss.first_chunk_id + ss.num_chunks; ++group) {
auto const len = streams[ss.column_id][group].lengths[cid];
if (len > 0) {
auto const src_ptr = streams[ss.column_id][group].data_ptrs[cid];
for (uint32_t i = t; i < len; i += blockDim.x) {
dst_ptr[i] = src_ptr[i];
}

__syncthreads();
if (t == 0) { streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; }
dst_ptr += len;
}
dst_ptr += len;
__syncthreads();
}
if (!t) { strm_desc[stripe_id][stream_id].stream_size = dst_ptr - strm0.data_ptrs[cid]; }
}

/**
Expand Down Expand Up @@ -1299,7 +1286,7 @@ void CompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> enc_streams,
rmm::cuda_stream_view stream)
{
dim3 dim_block(1024, 1);
dim3 dim_block(compact_streams_block_size, 1);
dim3 dim_grid(strm_desc.size().first, strm_desc.size().second);
gpuCompactOrcDataStreams<<<dim_grid, dim_block, 0, stream.value()>>>(strm_desc, enc_streams);
}
Expand Down
Loading

0 comments on commit 09b114e

Please sign in to comment.