Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rmm::device_uvector in place of rmm::device_vector for ORC reader/writer and cudf::io::column_buffer #7614

Merged
merged 3 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ table_with_metadata reader::impl::read(rmm::cuda_stream_view stream)
// during the conversion stage
const std::string quotechar(1, opts.quotechar);
const std::string dblquotechar(2, opts.quotechar);
std::unique_ptr<column> col = cudf::make_strings_column(out_buffers[i]._strings, stream);
std::unique_ptr<column> col = cudf::make_strings_column(*out_buffers[i]._strings, stream);
out_columns.emplace_back(
cudf::strings::replace(col->view(), dblquotechar, quotechar, -1, mr_));
} else {
Expand Down
58 changes: 26 additions & 32 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/device_vector.hpp>
#include <rmm/device_uvector.hpp>

#include <algorithm>
#include <array>
Expand Down Expand Up @@ -223,7 +223,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
const OrcDecompressor *decompressor,
std::vector<orc_stream_info> &stream_info,
size_t num_stripes,
rmm::device_vector<gpu::RowGroup> &row_groups,
device_span<gpu::RowGroup> row_groups,
size_t row_index_stride,
rmm::cuda_stream_view stream)
{
Expand Down Expand Up @@ -254,9 +254,9 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
CUDF_EXPECTS(total_decomp_size > 0, "No decompressible data found");

rmm::device_buffer decomp_data(total_decomp_size, stream);
rmm::device_vector<gpu_inflate_input_s> inflate_in(num_compressed_blocks +
num_uncompressed_blocks);
rmm::device_vector<gpu_inflate_status_s> inflate_out(num_compressed_blocks);
rmm::device_uvector<gpu_inflate_input_s> inflate_in(
num_compressed_blocks + num_uncompressed_blocks, stream);
rmm::device_uvector<gpu_inflate_status_s> inflate_out(num_compressed_blocks, stream);

// Parse again to populate the decompression input/output buffers
size_t decomp_offset = 0;
Expand All @@ -265,9 +265,9 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
for (size_t i = 0; i < compinfo.size(); ++i) {
auto dst_base = static_cast<uint8_t *>(decomp_data.data());
compinfo[i].uncompressed_data = dst_base + decomp_offset;
compinfo[i].decctl = inflate_in.data().get() + start_pos;
compinfo[i].decstatus = inflate_out.data().get() + start_pos;
compinfo[i].copyctl = inflate_in.data().get() + start_pos_uncomp;
compinfo[i].decctl = inflate_in.data() + start_pos;
compinfo[i].decstatus = inflate_out.data() + start_pos;
compinfo[i].copyctl = inflate_in.data() + start_pos_uncomp;

stream_info[i].dst_pos = decomp_offset;
decomp_offset += compinfo[i].max_uncompressed_size;
Expand All @@ -285,19 +285,18 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
if (num_compressed_blocks > 0) {
switch (decompressor->GetKind()) {
case orc::ZLIB:
CUDA_TRY(gpuinflate(
inflate_in.data().get(), inflate_out.data().get(), num_compressed_blocks, 0, stream));
CUDA_TRY(
gpuinflate(inflate_in.data(), inflate_out.data(), num_compressed_blocks, 0, stream));
break;
case orc::SNAPPY:
CUDA_TRY(gpu_unsnap(
inflate_in.data().get(), inflate_out.data().get(), num_compressed_blocks, stream));
CUDA_TRY(gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream));
break;
default: CUDF_EXPECTS(false, "Unexpected decompression dispatch"); break;
}
}
if (num_uncompressed_blocks > 0) {
CUDA_TRY(gpu_copy_uncompressed_blocks(
inflate_in.data().get() + num_compressed_blocks, num_uncompressed_blocks, stream));
inflate_in.data() + num_compressed_blocks, num_uncompressed_blocks, stream));
}
gpu::PostDecompressionReassemble(compinfo.device_ptr(), compinfo.size(), stream);

Expand All @@ -324,7 +323,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(

if (not row_groups.empty()) {
chunks.host_to_device(stream);
gpu::ParseRowGroupIndex(row_groups.data().get(),
gpu::ParseRowGroupIndex(row_groups.data(),
compinfo.device_ptr(),
chunks.device_ptr(),
num_columns,
Expand All @@ -341,8 +340,8 @@ void reader::impl::decode_stream_data(hostdevice_vector<gpu::ColumnDesc> &chunks
size_t num_dicts,
size_t skip_rows,
size_t num_rows,
timezone_table const &tz_table,
const rmm::device_vector<gpu::RowGroup> &row_groups,
timezone_table_view tz_table,
device_span<gpu::RowGroup const> row_groups,
size_t row_index_stride,
std::vector<column_buffer> &out_buffers,
rmm::cuda_stream_view stream)
Expand All @@ -360,24 +359,19 @@ void reader::impl::decode_stream_data(hostdevice_vector<gpu::ColumnDesc> &chunks
}

// Allocate global dictionary for deserializing
rmm::device_vector<gpu::DictionaryEntry> global_dict(num_dicts);
rmm::device_uvector<gpu::DictionaryEntry> global_dict(num_dicts, stream);

chunks.host_to_device(stream);
gpu::DecodeNullsAndStringDictionaries(chunks.device_ptr(),
global_dict.data().get(),
num_columns,
num_stripes,
num_rows,
skip_rows,
stream);
gpu::DecodeNullsAndStringDictionaries(
chunks.device_ptr(), global_dict.data(), num_columns, num_stripes, num_rows, skip_rows, stream);
gpu::DecodeOrcColumnData(chunks.device_ptr(),
global_dict.data().get(),
global_dict.data(),
num_columns,
num_stripes,
num_rows,
skip_rows,
tz_table.view(),
row_groups.data().get(),
tz_table,
row_groups.data(),
row_groups.size() / num_columns,
row_index_stride,
stream);
Expand Down Expand Up @@ -550,7 +544,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Process dataset chunk pages into output columns
if (stripe_data.size() != 0) {
// Setup row group descriptors if using indexes
rmm::device_vector<gpu::RowGroup> row_groups(num_rowgroups * num_columns);
rmm::device_uvector<gpu::RowGroup> row_groups(num_rowgroups * num_columns, stream);
if (_metadata->ps.compression != orc::NONE) {
auto decomp_data = decompress_stripe_data(chunks,
stripe_data,
Expand All @@ -563,9 +557,9 @@ table_with_metadata reader::impl::read(size_type skip_rows,
stripe_data.clear();
stripe_data.push_back(std::move(decomp_data));
} else {
if (not row_groups.empty()) {
if (not row_groups.is_empty()) {
chunks.host_to_device(stream);
gpu::ParseRowGroupIndex(row_groups.data().get(),
gpu::ParseRowGroupIndex(row_groups.data(),
nullptr,
chunks.device_ptr(),
num_columns,
Expand All @@ -579,7 +573,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Setup table for converting timestamp columns from local to UTC time
auto const tz_table =
_has_timestamp_column
? build_timezone_transition_table(selected_stripes[0].second->writerTimezone)
? build_timezone_transition_table(selected_stripes[0].second->writerTimezone, stream)
: timezone_table{};

std::vector<column_buffer> out_buffers;
Expand All @@ -598,7 +592,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
num_dict_entries,
skip_rows,
num_rows,
tz_table,
tz_table.view(),
row_groups,
_metadata->get_row_index_stride(),
out_buffers,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/orc/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class reader::impl {
const OrcDecompressor *decompressor,
std::vector<orc_stream_info> &stream_info,
size_t num_stripes,
rmm::device_vector<gpu::RowGroup> &row_groups,
device_span<gpu::RowGroup> row_groups,
size_t row_index_stride,
rmm::cuda_stream_view stream);

Expand All @@ -118,8 +118,8 @@ class reader::impl {
size_t num_dicts,
size_t skip_rows,
size_t num_rows,
timezone_table const &tz_table,
const rmm::device_vector<gpu::RowGroup> &row_groups,
timezone_table_view tz_table,
device_span<gpu::RowGroup const> row_groups,
size_t row_index_stride,
std::vector<column_buffer> &out_buffers,
rmm::cuda_stream_view stream);
Expand Down
20 changes: 18 additions & 2 deletions cpp/src/io/orc/timezone.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ static int64_t get_transition_time(dst_transition_s const &trans, int year)
return trans.time + day * day_seconds;
}

timezone_table build_timezone_transition_table(std::string const &timezone_name)
timezone_table build_timezone_transition_table(std::string const &timezone_name,
rmm::cuda_stream_view stream)
{
if (timezone_name == "UTC" || timezone_name.empty()) {
// Return an empty table for UTC
Expand Down Expand Up @@ -459,7 +460,22 @@ timezone_table build_timezone_transition_table(std::string const &timezone_name)
year_timestamp += (365 + is_leap_year(year)) * day_seconds;
}

return {get_gmt_offset(ttimes, offsets, orc_utc_offset), ttimes, offsets};
rmm::device_uvector<int64_t> d_ttimes{ttimes.size(), stream};
CUDA_TRY(cudaMemcpyAsync(d_ttimes.data(),
ttimes.data(),
ttimes.size() * sizeof(int64_t),
cudaMemcpyDefault,
stream.value()));
rmm::device_uvector<int32_t> d_offsets{offsets.size(), stream};
CUDA_TRY(cudaMemcpyAsync(d_offsets.data(),
offsets.data(),
offsets.size() * sizeof(int32_t),
cudaMemcpyDefault,
stream.value()));
auto const gmt_offset = get_gmt_offset(ttimes, offsets, orc_utc_offset);
stream.synchronize();

return {gmt_offset, std::move(d_ttimes), std::move(d_offsets)};
}

} // namespace io
Expand Down
17 changes: 13 additions & 4 deletions cpp/src/io/orc/timezone.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <cudf/utilities/span.hpp>

#include <thrust/binary_search.h>
#include <thrust/device_vector.h>
#include <thrust/execution_policy.h>
#include <rmm/device_uvector.hpp>

#include <stdint.h>
#include <string>
Expand Down Expand Up @@ -108,8 +108,15 @@ inline __device__ int32_t get_gmt_offset(cudf::device_span<int64_t const> ttimes

struct timezone_table {
int32_t gmt_offset = 0;
rmm::device_vector<int64_t> ttimes;
rmm::device_vector<int32_t> offsets;
rmm::device_uvector<int64_t> ttimes;
rmm::device_uvector<int32_t> offsets;
timezone_table() : ttimes{0, rmm::cuda_stream_default}, offsets{0, rmm::cuda_stream_default} {}
timezone_table(int32_t gmt_offset,
rmm::device_uvector<int64_t> &&ttimes,
rmm::device_uvector<int32_t> &&offsets)
: gmt_offset{gmt_offset}, ttimes{std::move(ttimes)}, offsets{std::move(offsets)}
{
}
timezone_table_view view() const { return {gmt_offset, ttimes, offsets}; }
};

Expand All @@ -119,10 +126,12 @@ struct timezone_table {
* Uses system's TZif files. Assumes little-endian platform when parsing these files.
*
* @param timezone_name standard timezone name (for example, "US/Pacific")
* @param stream CUDA stream used for any device memory operations and kernel launches
*
* @return The transition table for the given timezone
*/
timezone_table build_timezone_transition_table(std::string const &timezone_name);
timezone_table build_timezone_transition_table(std::string const &timezone_name,
rmm::cuda_stream_view stream);

} // namespace io
} // namespace cudf
15 changes: 9 additions & 6 deletions cpp/src/io/utilities/column_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/device_vector.hpp>
#include <rmm/device_uvector.hpp>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -108,7 +108,10 @@ struct column_buffer {
size = _size;

switch (type.id()) {
case type_id::STRING: _strings.resize(size); break;
case type_id::STRING:
_strings = std::make_unique<rmm::device_uvector<str_pair>>(size, stream);
cudaMemsetAsync(_strings->data(), 0, size * sizeof(str_pair), stream.value());
break;

// list columns store a buffer of int32's as offsets to represent
// their individual rows
Expand All @@ -125,8 +128,8 @@ struct column_buffer {
}
}

auto data() { return _strings.size() ? _strings.data().get() : _data.data(); }
auto data_size() { return std::max(_data.size(), _strings.size() * sizeof(str_pair)); }
auto data() { return _strings ? _strings->data() : _data.data(); }
auto data_size() const { return _strings ? _strings->size() : _data.size(); }

template <typename T = uint32_t>
auto null_mask()
Expand All @@ -137,7 +140,7 @@ struct column_buffer {

auto& null_count() { return _null_count; }

rmm::device_vector<str_pair> _strings;
std::unique_ptr<rmm::device_uvector<str_pair>> _strings;
rmm::device_buffer _data{};
rmm::device_buffer _null_mask{};
size_type _null_count{0};
Expand Down Expand Up @@ -178,7 +181,7 @@ std::unique_ptr<column> make_column(
schema_info->children.push_back(column_name_info{"offsets"});
schema_info->children.push_back(column_name_info{"chars"});
}
return make_strings_column(buffer._strings, stream, mr);
return make_strings_column(*buffer._strings, stream, mr);

case type_id::LIST: {
// make offsets column
Expand Down