Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce peak memory use when writing compressed ORC files. #12963

Merged
merged 38 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f05525e
top iter stripes, bottom row groups
vuule Feb 1, 2023
66feb06
separate sizes from offsets calc
vuule Feb 1, 2023
5e057c7
Merge branch 'branch-23.04' of https://github.com/rapidsai/cudf into …
vuule Mar 1, 2023
a5a1ba0
per-stream enc data buffer
vuule Mar 1, 2023
71f4c4f
per-stripe buffer
vuule Mar 1, 2023
fc3fc2a
Merge branch 'branch-23.04' of https://github.com/rapidsai/cudf into …
vuule Mar 6, 2023
beea72a
Merge branch 'branch-23.04' of https://github.com/rapidsai/cudf into …
vuule Mar 8, 2023
7694c1d
merge
vuule Mar 9, 2023
5bee01e
Merge branch 'branch-23.04' of https://github.com/rapidsai/cudf into …
vuule Mar 15, 2023
866f827
Merge branch 'branch-23.04' of https://github.com/rapidsai/cudf into …
vuule Mar 15, 2023
8ed0857
POC
vuule Mar 15, 2023
d4c5075
optimization
vuule Mar 17, 2023
c120e08
Merge branch 'branch-23.04' of https://github.com/rapidsai/cudf into …
vuule Mar 17, 2023
579fd42
todo comment
vuule Mar 17, 2023
94e18b7
style
vuule Mar 17, 2023
4a2f652
remove compute_offsets
vuule Mar 17, 2023
f6a8765
slight clean up
vuule Mar 17, 2023
bfcc351
minor kernel simplification
vuule Mar 17, 2023
bdfa0b6
bit o' cleanup
vuule Mar 17, 2023
a919cbb
TODO
vuule Mar 17, 2023
f358232
Merge branch 'branch-23.04' into reduce-orc-writer-mem
vuule Mar 17, 2023
b383052
Merge branch 'branch-23.04' of https://github.com/rapidsai/cudf into …
vuule Mar 21, 2023
8bdad8b
still merge
vuule Mar 21, 2023
4c02381
Merge branch 'reduce-orc-writer-mem' of https://github.com/vuule/cudf…
vuule Mar 21, 2023
a57f7bb
Merge branch 'branch-23.04' into reduce-orc-writer-mem
vuule Mar 21, 2023
8d2cc43
style
vuule Mar 22, 2023
5097a23
Merge branch 'reduce-orc-writer-mem' of https://github.com/vuule/cudf…
vuule Mar 22, 2023
8372172
Merge branch 'branch-23.04' of https://github.com/rapidsai/cudf into …
vuule Mar 24, 2023
b48fbf7
Merge branch 'branch-23.04' into reduce-orc-writer-mem
vuule Mar 24, 2023
43aa1f7
add comment
vuule Mar 24, 2023
25af66e
Merge branch 'branch-23.04' of https://github.com/rapidsai/cudf into …
vuule Mar 28, 2023
60a23ed
doc update
vuule Mar 28, 2023
4e30777
pass bool comment
vuule Mar 29, 2023
9efde71
remove magic number
vuule Mar 29, 2023
12655fc
Merge branch 'reduce-orc-writer-mem' of https://github.com/vuule/cudf…
vuule Mar 29, 2023
0522524
style
vuule Mar 29, 2023
ba4c078
Merge branch 'branch-23.06' into reduce-orc-writer-mem
vuule Mar 30, 2023
313131c
Merge branch 'branch-23.06' into reduce-orc-writer-mem
vuule Apr 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ struct encoder_chunk_streams {
* @brief Struct to describe a column stream within a stripe
*/
struct StripeStream {
uint8_t* data_ptr; // encoded and gathered output
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
size_t bfr_offset; // Offset of this stream in compressed buffer
uint32_t stream_size; // Size of stream in bytes
uint32_t first_chunk_id; // First chunk of the stripe
Expand Down
46 changes: 16 additions & 30 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1088,45 +1088,31 @@ __global__ void __launch_bounds__(1024)
device_2dspan<encoder_chunk_streams> streams)
{
__shared__ __align__(16) StripeStream ss;
__shared__ __align__(16) encoder_chunk_streams strm0;
__shared__ uint8_t* volatile ck_curptr_g;
__shared__ uint32_t volatile ck_curlen_g;

auto const stripe_id = blockIdx.x;
auto const stream_id = blockIdx.y;
uint32_t t = threadIdx.x;
auto const t = threadIdx.x;

if (t == 0) {
ss = strm_desc[stripe_id][stream_id];
strm0 = streams[ss.column_id][ss.first_chunk_id];
}
if (t == 0) { ss = strm_desc[stripe_id][stream_id]; }
__syncthreads();

if (ss.data_ptr == nullptr) { return; }

auto const cid = ss.stream_type;
auto dst_ptr = strm0.data_ptrs[cid] + strm0.lengths[cid];
for (auto group = ss.first_chunk_id + 1; group < ss.first_chunk_id + ss.num_chunks; ++group) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
uint8_t* src_ptr;
uint32_t len;
if (t == 0) {
src_ptr = streams[ss.column_id][group].data_ptrs[cid];
len = streams[ss.column_id][group].lengths[cid];
if (src_ptr != dst_ptr) { streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; }
ck_curptr_g = src_ptr;
ck_curlen_g = len;
}
__syncthreads();
src_ptr = ck_curptr_g;
len = ck_curlen_g;
if (len > 0 && src_ptr != dst_ptr) {
for (uint32_t i = 0; i < len; i += 1024) {
uint8_t v = (i + t < len) ? src_ptr[i + t] : 0;
__syncthreads();
if (i + t < len) { dst_ptr[i + t] = v; }
auto dst_ptr = ss.data_ptr;
for (auto group = ss.first_chunk_id; group < ss.first_chunk_id + ss.num_chunks; ++group) {
auto const len = streams[ss.column_id][group].lengths[cid];
if (len > 0) {
auto const src_ptr = streams[ss.column_id][group].data_ptrs[cid];
for (uint32_t i = t; i < len; i += 1024) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
dst_ptr[i] = src_ptr[i];
}

__syncthreads();
if (t == 0) { streams[ss.column_id][group].data_ptrs[cid] = dst_ptr; }
dst_ptr += len;
}
dst_ptr += len;
__syncthreads();
}
if (!t) { strm_desc[stripe_id][stream_id].stream_size = dst_ptr - strm0.data_ptrs[cid]; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to set the stream size, its been computed on the host

}

/**
Expand Down
223 changes: 107 additions & 116 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@
#include <thrust/tabulate.h>
#include <thrust/transform.h>

#include <algorithm>
#include <cstring>
#include <numeric>
#include <tuple>
#include <utility>

#include <cooperative_groups.h>
#include <cooperative_groups/memcpy_async.h>

#include <cuda/std/climits>
#include <cuda/std/limits>

#include <algorithm>
#include <cstring>
#include <numeric>
#include <tuple>
#include <utility>

namespace cudf {
namespace io {
namespace detail {
Expand Down Expand Up @@ -333,47 +333,6 @@ size_type orc_table_view::num_rows() const noexcept
return columns.empty() ? 0 : columns.front().size();
}

orc_streams::orc_stream_offsets orc_streams::compute_offsets(
host_span<orc_column_view const> columns, size_t num_rowgroups) const
{
std::vector<size_t> strm_offsets(streams.size());
size_t non_rle_data_size = 0;
size_t rle_data_size = 0;
for (size_t i = 0; i < streams.size(); ++i) {
const auto& stream = streams[i];

auto const is_rle_data = [&]() {
// First stream is an index stream, don't check types, etc.
if (!stream.column_index().has_value()) return true;

auto const& column = columns[stream.column_index().value()];
// Dictionary encoded string column - dictionary characters or
// directly encoded string - column characters
if (column.orc_kind() == TypeKind::STRING &&
((stream.kind == DICTIONARY_DATA && column.orc_encoding() == DICTIONARY_V2) ||
(stream.kind == DATA && column.orc_encoding() == DIRECT_V2)))
return false;
// Decimal data
if (column.orc_kind() == TypeKind::DECIMAL && stream.kind == DATA) return false;

// Everything else uses RLE
return true;
}();
// non-RLE and RLE streams are separated in the buffer that stores encoded data
// The computed offsets do not take the streams of the other type into account
if (is_rle_data) {
strm_offsets[i] = rle_data_size;
rle_data_size += (stream.length + 7) & ~7;
} else {
strm_offsets[i] = non_rle_data_size;
non_rle_data_size += stream.length;
}
}
non_rle_data_size = (non_rle_data_size + 7) & ~7;

return {std::move(strm_offsets), non_rle_data_size, rle_data_size};
}

namespace {
struct string_length_functor {
__device__ inline size_type operator()(int const i) const
Expand Down Expand Up @@ -1026,9 +985,6 @@ encoded_data encode_columns(orc_table_view const& orc_table,
{
auto const num_columns = orc_table.num_columns();
hostdevice_2dvector<gpu::EncChunk> chunks(num_columns, segmentation.num_rowgroups(), stream);
auto const stream_offsets =
streams.compute_offsets(orc_table.columns, segmentation.num_rowgroups());
rmm::device_uvector<uint8_t> encoded_data(stream_offsets.data_size(), stream);

auto const aligned_rowgroups = calculate_aligned_rowgroup_bounds(orc_table, segmentation, stream);

Expand Down Expand Up @@ -1104,75 +1060,81 @@ encoded_data encode_columns(orc_table_view const& orc_table,

hostdevice_2dvector<gpu::encoder_chunk_streams> chunk_streams(
num_columns, segmentation.num_rowgroups(), stream);
for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
auto const& column = orc_table.column(col_idx);
auto col_streams = chunk_streams[col_idx];
for (auto const& stripe : segmentation.stripes) {
for (auto rg_idx_it = stripe.cbegin(); rg_idx_it < stripe.cend(); ++rg_idx_it) {
auto const rg_idx = *rg_idx_it;
auto const& ck = chunks[col_idx][rg_idx];
auto& strm = col_streams[rg_idx];
// per-stripe, per-stream owning buffers
std::vector<std::vector<rmm::device_uvector<uint8_t>>> encoded_data(segmentation.num_stripes());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR, but this reminds me that we could extract the owning buffer strong type here (

class owning_buffer : public buffer {
) and apply it across cuIO code. Thus the above code would be a 2D vector of owning_buffer. This also helps unify data type by using std::byte consistently.

Also, owning buffer could be enforced via unique ptr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I get this one. rmm::device_uvector already implies an owning buffer. We know that this type deallocates the memory in the destructor.
In the datasource::buffer we have the distinction between owning and non-owning buffers because we use the abstract buffer type, which does not imply ownership, only the access API to the contained data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rmm::device_uvector already implies an owning buffer

My intention was to use a strong type like:

struct owning_buffer{
  ...
  std::unique_ptr<rmm::device_uvector<uint8_t>> data;
};

to explicitly represent owning buffers in cuIO.

for (auto const& stripe : segmentation.stripes) {
std::generate_n(std::back_inserter(encoded_data[stripe.id]), streams.size(), [stream]() {
return rmm::device_uvector<uint8_t>(0, stream);
});

for (int strm_type = 0; strm_type < gpu::CI_NUM_STREAMS; ++strm_type) {
auto const strm_id = streams.id(col_idx * gpu::CI_NUM_STREAMS + strm_type);
for (size_t col_idx = 0; col_idx < num_columns; col_idx++) {
for (int strm_type = 0; strm_type < gpu::CI_NUM_STREAMS; ++strm_type) {
Comment on lines +1070 to +1071
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why mix post-increment and pre-increment here?

auto const& column = orc_table.column(col_idx);
auto col_streams = chunk_streams[col_idx];
auto const strm_id = streams.id(col_idx * gpu::CI_NUM_STREAMS + strm_type);

std::for_each(stripe.cbegin(), stripe.cend(), [&](auto rg_idx) {
col_streams[rg_idx].ids[strm_type] = strm_id;
col_streams[rg_idx].lengths[strm_type] = 0;
});

// Calculate rowgroup sizes and stripe size
if (strm_id >= 0) {
size_t stripe_size = 0;
std::for_each(stripe.cbegin(), stripe.cend(), [&](auto rg_idx) {
auto const& ck = chunks[col_idx][rg_idx];
auto& strm = col_streams[rg_idx];

strm.ids[strm_type] = strm_id;
if (strm_id >= 0) {
if ((strm_type == gpu::CI_DICTIONARY) ||
(strm_type == gpu::CI_DATA2 && ck.encoding_kind == DICTIONARY_V2)) {
if (rg_idx_it == stripe.cbegin()) {
const int32_t dict_stride = column.dict_stride();
const auto stripe_dict = column.host_stripe_dict(stripe.id);
if (rg_idx == *stripe.cbegin()) {
const auto stripe_dict = column.host_stripe_dict(stripe.id);
strm.lengths[strm_type] =
(strm_type == gpu::CI_DICTIONARY)
? stripe_dict->dict_char_count
: (((stripe_dict->num_strings + 0x1ff) >> 9) * (512 * 4 + 2));
if (stripe.id == 0) {
strm.data_ptrs[strm_type] = encoded_data.data() + stream_offsets.offsets[strm_id];
// Dictionary lengths are encoded as RLE, which are all stored after non-RLE data:
// include non-RLE data size in the offset only in that case
if (strm_type == gpu::CI_DATA2 && ck.encoding_kind == DICTIONARY_V2)
strm.data_ptrs[strm_type] += stream_offsets.non_rle_data_size;
} else {
auto const& strm_up = col_streams[stripe_dict[-dict_stride].start_chunk];
strm.data_ptrs[strm_type] =
strm_up.data_ptrs[strm_type] + strm_up.lengths[strm_type];
}
} else {
strm.lengths[strm_type] = 0;
strm.data_ptrs[strm_type] = col_streams[rg_idx - 1].data_ptrs[strm_type];
strm.lengths[strm_type] = 0;
}
} else if (strm_type == gpu::CI_DATA && ck.type_kind == TypeKind::STRING &&
ck.encoding_kind == DIRECT_V2) {
strm.lengths[strm_type] = column.host_dict_chunk(rg_idx)->string_char_count;
strm.data_ptrs[strm_type] = (rg_idx == 0)
? encoded_data.data() + stream_offsets.offsets[strm_id]
: (col_streams[rg_idx - 1].data_ptrs[strm_type] +
col_streams[rg_idx - 1].lengths[strm_type]);
strm.lengths[strm_type] =
std::max(column.host_dict_chunk(rg_idx)->string_char_count, 1u);
} else if (strm_type == gpu::CI_DATA && streams[strm_id].length == 0 &&
(ck.type_kind == DOUBLE || ck.type_kind == FLOAT)) {
// Pass-through
strm.lengths[strm_type] = ck.num_rows * ck.dtype_len;
strm.data_ptrs[strm_type] = nullptr;

strm.lengths[strm_type] = ck.num_rows * ck.dtype_len;
} else if (ck.type_kind == DECIMAL && strm_type == gpu::CI_DATA) {
strm.lengths[strm_type] = dec_chunk_sizes.rg_sizes.at(col_idx)[rg_idx];
strm.data_ptrs[strm_type] = (rg_idx == 0)
? encoded_data.data() + stream_offsets.offsets[strm_id]
: (col_streams[rg_idx - 1].data_ptrs[strm_type] +
col_streams[rg_idx - 1].lengths[strm_type]);
strm.lengths[strm_type] = dec_chunk_sizes.rg_sizes.at(col_idx)[rg_idx];
} else {
strm.lengths[strm_type] = RLE_stream_size(streams.type(strm_id), ck.num_rows);
// RLE encoded streams are stored after all non-RLE streams
strm.data_ptrs[strm_type] =
(rg_idx == 0) ? (encoded_data.data() + stream_offsets.non_rle_data_size +
stream_offsets.offsets[strm_id])
: (col_streams[rg_idx - 1].data_ptrs[strm_type] +
col_streams[rg_idx - 1].lengths[strm_type]);
}
} else {
strm.lengths[strm_type] = 0;
// Allow extra space for alignment
stripe_size += strm.lengths[strm_type] + uncomp_block_align - 1;
Comment on lines +1112 to +1113
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a potential bug fix for extreme corner cases where alignment can push the writing of encoded data into the next stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential bug as in we haven't seen this before in practice? Do we have a test for it? Should we?

Copy link
Contributor Author

@vuule vuule Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have this test, but it's not trivial to come up with the failing input.
Maybe a decimal column + ZSTD, since we use the exact size (and it doesn't have to be a multiple of 4, unlike floats)? I'll look into this, just not for this PR :)

});

encoded_data[stripe.id][strm_id] = rmm::device_uvector<uint8_t>(stripe_size, stream);
}

// Set offsets
for (auto rg_idx_it = stripe.cbegin(); rg_idx_it < stripe.cend(); ++rg_idx_it) {
auto const rg_idx = *rg_idx_it;
auto const& ck = chunks[col_idx][rg_idx];
auto& strm = col_streams[rg_idx];

if (strm_id < 0 or (strm_type == gpu::CI_DATA && streams[strm_id].length == 0 &&
(ck.type_kind == DOUBLE || ck.type_kind == FLOAT))) {
strm.data_ptrs[strm_type] = nullptr;
} else {
if ((strm_type == gpu::CI_DICTIONARY) ||
(strm_type == gpu::CI_DATA2 && ck.encoding_kind == DICTIONARY_V2)) {
strm.data_ptrs[strm_type] = encoded_data[stripe.id][strm_id].data();
} else {
strm.data_ptrs[strm_type] = (rg_idx_it == stripe.cbegin())
? encoded_data[stripe.id][strm_id].data()
: (col_streams[rg_idx - 1].data_ptrs[strm_type] +
col_streams[rg_idx - 1].lengths[strm_type]);
}
}
auto const misalignment =
reinterpret_cast<intptr_t>(strm.data_ptrs[strm_type]) % uncomp_block_align;
Expand Down Expand Up @@ -1201,11 +1163,12 @@ encoded_data encode_columns(orc_table_view const& orc_table,
}
dictionaries.data.clear();
dictionaries.index.clear();
stream.synchronize();
chunk_streams.device_to_host(stream, true);
vuule marked this conversation as resolved.
Show resolved Hide resolved

return {std::move(encoded_data), std::move(chunk_streams)};
}

// TODO: remove StripeInformation from this function and return strm_desc instead
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
/**
* @brief Returns stripe information after compacting columns' individual data
* chunks into contiguous data streams.
Expand All @@ -1217,25 +1180,45 @@ encoded_data encode_columns(orc_table_view const& orc_table,
* @param[in] stream CUDA stream used for device memory operations and kernel launches
* @return The stripes' information
*/
std::vector<StripeInformation> gather_stripes(
size_t num_index_streams,
file_segmentation const& segmentation,
hostdevice_2dvector<gpu::encoder_chunk_streams>* enc_streams,
hostdevice_2dvector<gpu::StripeStream>* strm_desc,
rmm::cuda_stream_view stream)
std::vector<StripeInformation> gather_stripes(size_t num_index_streams,
vuule marked this conversation as resolved.
Show resolved Hide resolved
file_segmentation const& segmentation,
encoded_data* enc_data,
hostdevice_2dvector<gpu::StripeStream>* strm_desc,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For non-iterator parameter, try to avoid pointer.

Suggested change
hostdevice_2dvector<gpu::StripeStream>* strm_desc,
hostdevice_2dvector<gpu::StripeStream>& strm_desc,

Copy link
Contributor Author

@vuule vuule Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used a pointer here to make it obvious at the call site that the parameter will the modified. I personally prefer references for non-optional parameters.
Edit: to clarify - the use of a pointer here was requested in the code review, I initially used a reference.

rmm::cuda_stream_view stream)
{
if (segmentation.num_stripes() == 0) { return {}; }

// gathered stripes - per-stripe, per-stream (same as encoded_data.data)
std::vector<std::vector<rmm::device_uvector<uint8_t>>> gathered_stripes(enc_data->data.size());
for (auto& stripe_data : gathered_stripes) {
std::generate_n(std::back_inserter(stripe_data), enc_data->data[0].size(), [&]() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would a stripe_data.reserve(enc_data->data[0].size()); help here?

return rmm::device_uvector<uint8_t>(0, stream);
});
}
std::vector<StripeInformation> stripes(segmentation.num_stripes());
for (auto const& stripe : segmentation.stripes) {
for (size_t col_idx = 0; col_idx < enc_streams->size().first; col_idx++) {
const auto& strm = (*enc_streams)[col_idx][stripe.first];

for (size_t col_idx = 0; col_idx < enc_data->streams.size().first; col_idx++) {
auto const& col_streams = (enc_data->streams)[col_idx];
// Assign stream data of column data stream(s)
for (int k = 0; k < gpu::CI_INDEX; k++) {
const auto stream_id = strm.ids[k];
const auto stream_id = col_streams[0].ids[k];
if (stream_id != -1) {
auto const actual_stripe_size = std::accumulate(
col_streams.begin() + stripe.first,
col_streams.begin() + stripe.first + stripe.size,
0ul,
[&](auto const& sum, auto const& strm) { return sum + strm.lengths[k]; });

auto const& allocated_stripe_size = enc_data->data[stripe.id][stream_id].size();
CUDF_EXPECTS(allocated_stripe_size >= actual_stripe_size, "OOB memory access");
if (stripe.size > 1 and allocated_stripe_size > actual_stripe_size) {
gathered_stripes[stripe.id][stream_id] =
rmm::device_uvector<uint8_t>(actual_stripe_size, stream);
}

auto* ss = &(*strm_desc)[stripe.id][stream_id - num_index_streams];
ss->stream_size = 0;
ss->data_ptr = gathered_stripes[stripe.id][stream_id].data();
ss->stream_size = actual_stripe_size;
ss->first_chunk_id = stripe.first;
ss->num_chunks = stripe.size;
ss->column_id = col_idx;
Expand All @@ -1251,9 +1234,18 @@ std::vector<StripeInformation> gather_stripes(
}

strm_desc->host_to_device(stream);
gpu::CompactOrcDataStreams(*strm_desc, *enc_streams, stream);
// TODO: use cub::DeviceMemcpy::Batched
gpu::CompactOrcDataStreams(*strm_desc, enc_data->streams, stream);
strm_desc->device_to_host(stream);
enc_streams->device_to_host(stream, true);
enc_data->streams.device_to_host(stream, true);

// move the gathered stripes to encoded_data.data for lifetime management
for (auto stripe_id = 0ul; stripe_id < enc_data->data.size(); ++stripe_id) {
for (auto stream_id = 0ul; stream_id < enc_data->data[0].size(); ++stream_id) {
if (not gathered_stripes[stripe_id][stream_id].is_empty())
enc_data->data[stripe_id][stream_id] = std::move(gathered_stripes[stripe_id][stream_id]);
}
}

return stripes;
}
Expand Down Expand Up @@ -2288,8 +2280,7 @@ convert_table_to_orc_data(table_view const& input,
const auto num_data_streams = streams.size() - num_index_streams;
hostdevice_2dvector<gpu::StripeStream> strm_descs(
segmentation.num_stripes(), num_data_streams, stream);
auto stripes =
gather_stripes(num_index_streams, segmentation, &enc_data.streams, &strm_descs, stream);
auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data, &strm_descs, stream);

if (num_rows == 0) {
return {std::move(streams),
Expand Down Expand Up @@ -2370,7 +2361,7 @@ convert_table_to_orc_data(table_view const& input,
stream);

// deallocate encoded data as it is not needed anymore
enc_data.data = rmm::device_uvector<uint8_t>{0, stream};
enc_data.data.clear();

strm_descs.device_to_host(stream);
comp_results.device_to_host(stream, true);
Expand Down
Loading