Skip to content

Commit

Permalink
Use rmm::device_uvector in place of rmm::device_vector for ORC reader…
Browse files Browse the repository at this point in the history
…/writer and cudf::io::column_buffer (#7614)

Issue #7287

Replaces `device_vector` with `device_uvector` and `device_span`. Because `device_uvector` does not have a default constructor, some additional changes were required for `device_uvector` data members.

Performance impact: this change makes a measurable difference in reader benchmarks. Most string column cases are sped up around **5%**, with other cases having a measurable, but less consistent improvement.

Authors:
  - Vukasin Milovanovic (@vuule)

Approvers:
  - Ram (Ramakrishna Prabhu) (@rgsl888prabhu)
  - Kumar Aatish (@kaatish)

URL: #7614
  • Loading branch information
vuule authored Mar 20, 2021
1 parent cdd44d2 commit bb05ddc
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ table_with_metadata reader::impl::read(rmm::cuda_stream_view stream)
// during the conversion stage
const std::string quotechar(1, opts.quotechar);
const std::string dblquotechar(2, opts.quotechar);
std::unique_ptr<column> col = cudf::make_strings_column(out_buffers[i]._strings, stream);
std::unique_ptr<column> col = cudf::make_strings_column(*out_buffers[i]._strings, stream);
out_columns.emplace_back(
cudf::strings::replace(col->view(), dblquotechar, quotechar, -1, mr_));
} else {
Expand Down
58 changes: 26 additions & 32 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/device_vector.hpp>
#include <rmm/device_uvector.hpp>

#include <algorithm>
#include <array>
Expand Down Expand Up @@ -223,7 +223,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
const OrcDecompressor *decompressor,
std::vector<orc_stream_info> &stream_info,
size_t num_stripes,
rmm::device_vector<gpu::RowGroup> &row_groups,
device_span<gpu::RowGroup> row_groups,
size_t row_index_stride,
rmm::cuda_stream_view stream)
{
Expand Down Expand Up @@ -254,9 +254,9 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
CUDF_EXPECTS(total_decomp_size > 0, "No decompressible data found");

rmm::device_buffer decomp_data(total_decomp_size, stream);
rmm::device_vector<gpu_inflate_input_s> inflate_in(num_compressed_blocks +
num_uncompressed_blocks);
rmm::device_vector<gpu_inflate_status_s> inflate_out(num_compressed_blocks);
rmm::device_uvector<gpu_inflate_input_s> inflate_in(
num_compressed_blocks + num_uncompressed_blocks, stream);
rmm::device_uvector<gpu_inflate_status_s> inflate_out(num_compressed_blocks, stream);

// Parse again to populate the decompression input/output buffers
size_t decomp_offset = 0;
Expand All @@ -265,9 +265,9 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
for (size_t i = 0; i < compinfo.size(); ++i) {
auto dst_base = static_cast<uint8_t *>(decomp_data.data());
compinfo[i].uncompressed_data = dst_base + decomp_offset;
compinfo[i].decctl = inflate_in.data().get() + start_pos;
compinfo[i].decstatus = inflate_out.data().get() + start_pos;
compinfo[i].copyctl = inflate_in.data().get() + start_pos_uncomp;
compinfo[i].decctl = inflate_in.data() + start_pos;
compinfo[i].decstatus = inflate_out.data() + start_pos;
compinfo[i].copyctl = inflate_in.data() + start_pos_uncomp;

stream_info[i].dst_pos = decomp_offset;
decomp_offset += compinfo[i].max_uncompressed_size;
Expand All @@ -285,19 +285,18 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
if (num_compressed_blocks > 0) {
switch (decompressor->GetKind()) {
case orc::ZLIB:
CUDA_TRY(gpuinflate(
inflate_in.data().get(), inflate_out.data().get(), num_compressed_blocks, 0, stream));
CUDA_TRY(
gpuinflate(inflate_in.data(), inflate_out.data(), num_compressed_blocks, 0, stream));
break;
case orc::SNAPPY:
CUDA_TRY(gpu_unsnap(
inflate_in.data().get(), inflate_out.data().get(), num_compressed_blocks, stream));
CUDA_TRY(gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream));
break;
default: CUDF_EXPECTS(false, "Unexpected decompression dispatch"); break;
}
}
if (num_uncompressed_blocks > 0) {
CUDA_TRY(gpu_copy_uncompressed_blocks(
inflate_in.data().get() + num_compressed_blocks, num_uncompressed_blocks, stream));
inflate_in.data() + num_compressed_blocks, num_uncompressed_blocks, stream));
}
gpu::PostDecompressionReassemble(compinfo.device_ptr(), compinfo.size(), stream);

Expand All @@ -324,7 +323,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(

if (not row_groups.empty()) {
chunks.host_to_device(stream);
gpu::ParseRowGroupIndex(row_groups.data().get(),
gpu::ParseRowGroupIndex(row_groups.data(),
compinfo.device_ptr(),
chunks.device_ptr(),
num_columns,
Expand All @@ -341,8 +340,8 @@ void reader::impl::decode_stream_data(hostdevice_vector<gpu::ColumnDesc> &chunks
size_t num_dicts,
size_t skip_rows,
size_t num_rows,
timezone_table const &tz_table,
const rmm::device_vector<gpu::RowGroup> &row_groups,
timezone_table_view tz_table,
device_span<gpu::RowGroup const> row_groups,
size_t row_index_stride,
std::vector<column_buffer> &out_buffers,
rmm::cuda_stream_view stream)
Expand All @@ -360,24 +359,19 @@ void reader::impl::decode_stream_data(hostdevice_vector<gpu::ColumnDesc> &chunks
}

// Allocate global dictionary for deserializing
rmm::device_vector<gpu::DictionaryEntry> global_dict(num_dicts);
rmm::device_uvector<gpu::DictionaryEntry> global_dict(num_dicts, stream);

chunks.host_to_device(stream);
gpu::DecodeNullsAndStringDictionaries(chunks.device_ptr(),
global_dict.data().get(),
num_columns,
num_stripes,
num_rows,
skip_rows,
stream);
gpu::DecodeNullsAndStringDictionaries(
chunks.device_ptr(), global_dict.data(), num_columns, num_stripes, num_rows, skip_rows, stream);
gpu::DecodeOrcColumnData(chunks.device_ptr(),
global_dict.data().get(),
global_dict.data(),
num_columns,
num_stripes,
num_rows,
skip_rows,
tz_table.view(),
row_groups.data().get(),
tz_table,
row_groups.data(),
row_groups.size() / num_columns,
row_index_stride,
stream);
Expand Down Expand Up @@ -548,7 +542,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Process dataset chunk pages into output columns
if (stripe_data.size() != 0) {
// Setup row group descriptors if using indexes
rmm::device_vector<gpu::RowGroup> row_groups(num_rowgroups * num_columns);
rmm::device_uvector<gpu::RowGroup> row_groups(num_rowgroups * num_columns, stream);
if (_metadata->ps.compression != orc::NONE) {
auto decomp_data = decompress_stripe_data(chunks,
stripe_data,
Expand All @@ -561,9 +555,9 @@ table_with_metadata reader::impl::read(size_type skip_rows,
stripe_data.clear();
stripe_data.push_back(std::move(decomp_data));
} else {
if (not row_groups.empty()) {
if (not row_groups.is_empty()) {
chunks.host_to_device(stream);
gpu::ParseRowGroupIndex(row_groups.data().get(),
gpu::ParseRowGroupIndex(row_groups.data(),
nullptr,
chunks.device_ptr(),
num_columns,
Expand All @@ -577,7 +571,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Setup table for converting timestamp columns from local to UTC time
auto const tz_table =
_has_timestamp_column
? build_timezone_transition_table(selected_stripes[0].second->writerTimezone)
? build_timezone_transition_table(selected_stripes[0].second->writerTimezone, stream)
: timezone_table{};

std::vector<column_buffer> out_buffers;
Expand All @@ -596,7 +590,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
num_dict_entries,
skip_rows,
num_rows,
tz_table,
tz_table.view(),
row_groups,
_metadata->get_row_index_stride(),
out_buffers,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/orc/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class reader::impl {
const OrcDecompressor *decompressor,
std::vector<orc_stream_info> &stream_info,
size_t num_stripes,
rmm::device_vector<gpu::RowGroup> &row_groups,
device_span<gpu::RowGroup> row_groups,
size_t row_index_stride,
rmm::cuda_stream_view stream);

Expand All @@ -118,8 +118,8 @@ class reader::impl {
size_t num_dicts,
size_t skip_rows,
size_t num_rows,
timezone_table const &tz_table,
const rmm::device_vector<gpu::RowGroup> &row_groups,
timezone_table_view tz_table,
device_span<gpu::RowGroup const> row_groups,
size_t row_index_stride,
std::vector<column_buffer> &out_buffers,
rmm::cuda_stream_view stream);
Expand Down
20 changes: 18 additions & 2 deletions cpp/src/io/orc/timezone.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ static int64_t get_transition_time(dst_transition_s const &trans, int year)
return trans.time + day * day_seconds;
}

timezone_table build_timezone_transition_table(std::string const &timezone_name)
timezone_table build_timezone_transition_table(std::string const &timezone_name,
rmm::cuda_stream_view stream)
{
if (timezone_name == "UTC" || timezone_name.empty()) {
// Return an empty table for UTC
Expand Down Expand Up @@ -459,7 +460,22 @@ timezone_table build_timezone_transition_table(std::string const &timezone_name)
year_timestamp += (365 + is_leap_year(year)) * day_seconds;
}

return {get_gmt_offset(ttimes, offsets, orc_utc_offset), ttimes, offsets};
rmm::device_uvector<int64_t> d_ttimes{ttimes.size(), stream};
CUDA_TRY(cudaMemcpyAsync(d_ttimes.data(),
ttimes.data(),
ttimes.size() * sizeof(int64_t),
cudaMemcpyDefault,
stream.value()));
rmm::device_uvector<int32_t> d_offsets{offsets.size(), stream};
CUDA_TRY(cudaMemcpyAsync(d_offsets.data(),
offsets.data(),
offsets.size() * sizeof(int32_t),
cudaMemcpyDefault,
stream.value()));
auto const gmt_offset = get_gmt_offset(ttimes, offsets, orc_utc_offset);
stream.synchronize();

return {gmt_offset, std::move(d_ttimes), std::move(d_offsets)};
}

} // namespace io
Expand Down
17 changes: 13 additions & 4 deletions cpp/src/io/orc/timezone.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <cudf/utilities/span.hpp>

#include <thrust/binary_search.h>
#include <thrust/device_vector.h>
#include <thrust/execution_policy.h>
#include <rmm/device_uvector.hpp>

#include <stdint.h>
#include <string>
Expand Down Expand Up @@ -108,8 +108,15 @@ inline __device__ int32_t get_gmt_offset(cudf::device_span<int64_t const> ttimes

struct timezone_table {
int32_t gmt_offset = 0;
rmm::device_vector<int64_t> ttimes;
rmm::device_vector<int32_t> offsets;
rmm::device_uvector<int64_t> ttimes;
rmm::device_uvector<int32_t> offsets;
timezone_table() : ttimes{0, rmm::cuda_stream_default}, offsets{0, rmm::cuda_stream_default} {}
timezone_table(int32_t gmt_offset,
rmm::device_uvector<int64_t> &&ttimes,
rmm::device_uvector<int32_t> &&offsets)
: gmt_offset{gmt_offset}, ttimes{std::move(ttimes)}, offsets{std::move(offsets)}
{
}
timezone_table_view view() const { return {gmt_offset, ttimes, offsets}; }
};

Expand All @@ -119,10 +126,12 @@ struct timezone_table {
* Uses system's TZif files. Assumes little-endian platform when parsing these files.
*
* @param timezone_name standard timezone name (for example, "US/Pacific")
* @param stream CUDA stream used for any device memory operations and kernel launches
*
* @return The transition table for the given timezone
*/
timezone_table build_timezone_transition_table(std::string const &timezone_name);
timezone_table build_timezone_transition_table(std::string const &timezone_name,
rmm::cuda_stream_view stream);

} // namespace io
} // namespace cudf
15 changes: 9 additions & 6 deletions cpp/src/io/utilities/column_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/device_vector.hpp>
#include <rmm/device_uvector.hpp>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -108,7 +108,10 @@ struct column_buffer {
size = _size;

switch (type.id()) {
case type_id::STRING: _strings.resize(size); break;
case type_id::STRING:
_strings = std::make_unique<rmm::device_uvector<str_pair>>(size, stream);
cudaMemsetAsync(_strings->data(), 0, size * sizeof(str_pair), stream.value());
break;

// list columns store a buffer of int32's as offsets to represent
// their individual rows
Expand All @@ -125,8 +128,8 @@ struct column_buffer {
}
}

auto data() { return _strings.size() ? _strings.data().get() : _data.data(); }
auto data_size() { return std::max(_data.size(), _strings.size() * sizeof(str_pair)); }
auto data() { return _strings ? _strings->data() : _data.data(); }
auto data_size() const { return _strings ? _strings->size() : _data.size(); }

template <typename T = uint32_t>
auto null_mask()
Expand All @@ -137,7 +140,7 @@ struct column_buffer {

auto& null_count() { return _null_count; }

rmm::device_vector<str_pair> _strings;
std::unique_ptr<rmm::device_uvector<str_pair>> _strings;
rmm::device_buffer _data{};
rmm::device_buffer _null_mask{};
size_type _null_count{0};
Expand Down Expand Up @@ -178,7 +181,7 @@ std::unique_ptr<column> make_column(
schema_info->children.push_back(column_name_info{"offsets"});
schema_info->children.push_back(column_name_info{"chars"});
}
return make_strings_column(buffer._strings, stream, mr);
return make_strings_column(*buffer._strings, stream, mr);

case type_id::LIST: {
// make offsets column
Expand Down

0 comments on commit bb05ddc

Please sign in to comment.