diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 332e8aff7fc..76580122fe6 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -351,7 +351,7 @@ table_with_metadata reader::impl::read(rmm::cuda_stream_view stream) // during the conversion stage const std::string quotechar(1, opts.quotechar); const std::string dblquotechar(2, opts.quotechar); - std::unique_ptr col = cudf::make_strings_column(out_buffers[i]._strings, stream); + std::unique_ptr col = cudf::make_strings_column(*out_buffers[i]._strings, stream); out_columns.emplace_back( cudf::strings::replace(col->view(), dblquotechar, quotechar, -1, mr_)); } else { diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 80935e3fbd5..fe9e3bbcc25 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -31,7 +31,7 @@ #include #include -#include +#include #include #include @@ -223,7 +223,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data( const OrcDecompressor *decompressor, std::vector &stream_info, size_t num_stripes, - rmm::device_vector &row_groups, + device_span row_groups, size_t row_index_stride, rmm::cuda_stream_view stream) { @@ -254,9 +254,9 @@ rmm::device_buffer reader::impl::decompress_stripe_data( CUDF_EXPECTS(total_decomp_size > 0, "No decompressible data found"); rmm::device_buffer decomp_data(total_decomp_size, stream); - rmm::device_vector inflate_in(num_compressed_blocks + - num_uncompressed_blocks); - rmm::device_vector inflate_out(num_compressed_blocks); + rmm::device_uvector inflate_in( + num_compressed_blocks + num_uncompressed_blocks, stream); + rmm::device_uvector inflate_out(num_compressed_blocks, stream); // Parse again to populate the decompression input/output buffers size_t decomp_offset = 0; @@ -265,9 +265,9 @@ rmm::device_buffer reader::impl::decompress_stripe_data( for (size_t i = 0; i < compinfo.size(); ++i) { auto dst_base = static_cast(decomp_data.data()); compinfo[i].uncompressed_data = dst_base + decomp_offset; - compinfo[i].decctl = inflate_in.data().get() + start_pos; - compinfo[i].decstatus = inflate_out.data().get() + start_pos; - compinfo[i].copyctl = inflate_in.data().get() + start_pos_uncomp; + compinfo[i].decctl = inflate_in.data() + start_pos; + compinfo[i].decstatus = inflate_out.data() + start_pos; + compinfo[i].copyctl = inflate_in.data() + start_pos_uncomp; stream_info[i].dst_pos = decomp_offset; decomp_offset += compinfo[i].max_uncompressed_size; @@ -285,19 +285,18 @@ rmm::device_buffer reader::impl::decompress_stripe_data( if (num_compressed_blocks > 0) { switch (decompressor->GetKind()) { case orc::ZLIB: - CUDA_TRY(gpuinflate( - inflate_in.data().get(), inflate_out.data().get(), num_compressed_blocks, 0, stream)); + CUDA_TRY( + gpuinflate(inflate_in.data(), inflate_out.data(), num_compressed_blocks, 0, stream)); break; case orc::SNAPPY: - CUDA_TRY(gpu_unsnap( - inflate_in.data().get(), inflate_out.data().get(), num_compressed_blocks, stream)); + CUDA_TRY(gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream)); break; default: CUDF_EXPECTS(false, "Unexpected decompression dispatch"); break; } } if (num_uncompressed_blocks > 0) { CUDA_TRY(gpu_copy_uncompressed_blocks( - inflate_in.data().get() + num_compressed_blocks, num_uncompressed_blocks, stream)); + inflate_in.data() + num_compressed_blocks, num_uncompressed_blocks, stream)); } gpu::PostDecompressionReassemble(compinfo.device_ptr(), compinfo.size(), stream); @@ -324,7 +323,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data( if (not row_groups.empty()) { chunks.host_to_device(stream); - gpu::ParseRowGroupIndex(row_groups.data().get(), + gpu::ParseRowGroupIndex(row_groups.data(), compinfo.device_ptr(), chunks.device_ptr(), num_columns, @@ -341,8 +340,8 @@ void reader::impl::decode_stream_data(hostdevice_vector &chunks size_t num_dicts, size_t skip_rows, size_t num_rows, - timezone_table const &tz_table, - const rmm::device_vector &row_groups, + timezone_table_view tz_table, + device_span row_groups, size_t row_index_stride, std::vector &out_buffers, rmm::cuda_stream_view stream) @@ -360,24 +359,19 @@ void reader::impl::decode_stream_data(hostdevice_vector &chunks } // Allocate global dictionary for deserializing - rmm::device_vector global_dict(num_dicts); + rmm::device_uvector global_dict(num_dicts, stream); chunks.host_to_device(stream); - gpu::DecodeNullsAndStringDictionaries(chunks.device_ptr(), - global_dict.data().get(), - num_columns, - num_stripes, - num_rows, - skip_rows, - stream); + gpu::DecodeNullsAndStringDictionaries( + chunks.device_ptr(), global_dict.data(), num_columns, num_stripes, num_rows, skip_rows, stream); gpu::DecodeOrcColumnData(chunks.device_ptr(), - global_dict.data().get(), + global_dict.data(), num_columns, num_stripes, num_rows, skip_rows, - tz_table.view(), - row_groups.data().get(), + tz_table, + row_groups.data(), row_groups.size() / num_columns, row_index_stride, stream); @@ -550,7 +544,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Process dataset chunk pages into output columns if (stripe_data.size() != 0) { // Setup row group descriptors if using indexes - rmm::device_vector row_groups(num_rowgroups * num_columns); + rmm::device_uvector row_groups(num_rowgroups * num_columns, stream); if (_metadata->ps.compression != orc::NONE) { auto decomp_data = decompress_stripe_data(chunks, stripe_data, @@ -563,9 +557,9 @@ table_with_metadata reader::impl::read(size_type skip_rows, stripe_data.clear(); stripe_data.push_back(std::move(decomp_data)); } else { - if (not row_groups.empty()) { + if (not row_groups.is_empty()) { chunks.host_to_device(stream); - gpu::ParseRowGroupIndex(row_groups.data().get(), + gpu::ParseRowGroupIndex(row_groups.data(), nullptr, chunks.device_ptr(), num_columns, @@ -579,7 +573,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Setup table for converting timestamp columns from local to UTC time auto const tz_table = _has_timestamp_column - ? build_timezone_transition_table(selected_stripes[0].second->writerTimezone) + ? build_timezone_transition_table(selected_stripes[0].second->writerTimezone, stream) : timezone_table{}; std::vector out_buffers; @@ -598,7 +592,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, num_dict_entries, skip_rows, num_rows, - tz_table, + tz_table.view(), row_groups, _metadata->get_row_index_stride(), out_buffers, diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 818e70b15e7..3a2913c5548 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -97,7 +97,7 @@ class reader::impl { const OrcDecompressor *decompressor, std::vector &stream_info, size_t num_stripes, - rmm::device_vector &row_groups, + device_span row_groups, size_t row_index_stride, rmm::cuda_stream_view stream); @@ -118,8 +118,8 @@ class reader::impl { size_t num_dicts, size_t skip_rows, size_t num_rows, - timezone_table const &tz_table, - const rmm::device_vector &row_groups, + timezone_table_view tz_table, + device_span row_groups, size_t row_index_stride, std::vector &out_buffers, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/orc/timezone.cpp b/cpp/src/io/orc/timezone.cpp index bf8b96b89dc..81ffa954c1a 100644 --- a/cpp/src/io/orc/timezone.cpp +++ b/cpp/src/io/orc/timezone.cpp @@ -374,7 +374,8 @@ static int64_t get_transition_time(dst_transition_s const &trans, int year) return trans.time + day * day_seconds; } -timezone_table build_timezone_transition_table(std::string const &timezone_name) +timezone_table build_timezone_transition_table(std::string const &timezone_name, + rmm::cuda_stream_view stream) { if (timezone_name == "UTC" || timezone_name.empty()) { // Return an empty table for UTC @@ -459,7 +460,22 @@ timezone_table build_timezone_transition_table(std::string const &timezone_name) year_timestamp += (365 + is_leap_year(year)) * day_seconds; } - return {get_gmt_offset(ttimes, offsets, orc_utc_offset), ttimes, offsets}; + rmm::device_uvector d_ttimes{ttimes.size(), stream}; + CUDA_TRY(cudaMemcpyAsync(d_ttimes.data(), + ttimes.data(), + ttimes.size() * sizeof(int64_t), + cudaMemcpyDefault, + stream.value())); + rmm::device_uvector d_offsets{offsets.size(), stream}; + CUDA_TRY(cudaMemcpyAsync(d_offsets.data(), + offsets.data(), + offsets.size() * sizeof(int32_t), + cudaMemcpyDefault, + stream.value())); + auto const gmt_offset = get_gmt_offset(ttimes, offsets, orc_utc_offset); + stream.synchronize(); + + return {gmt_offset, std::move(d_ttimes), std::move(d_offsets)}; } } // namespace io diff --git a/cpp/src/io/orc/timezone.cuh b/cpp/src/io/orc/timezone.cuh index 3a87f28391c..b0231ca9e7d 100644 --- a/cpp/src/io/orc/timezone.cuh +++ b/cpp/src/io/orc/timezone.cuh @@ -20,8 +20,8 @@ #include #include -#include #include +#include #include #include @@ -108,8 +108,15 @@ inline __device__ int32_t get_gmt_offset(cudf::device_span ttimes struct timezone_table { int32_t gmt_offset = 0; - rmm::device_vector ttimes; - rmm::device_vector offsets; + rmm::device_uvector ttimes; + rmm::device_uvector offsets; + timezone_table() : ttimes{0, rmm::cuda_stream_default}, offsets{0, rmm::cuda_stream_default} {} + timezone_table(int32_t gmt_offset, + rmm::device_uvector &&ttimes, + rmm::device_uvector &&offsets) + : gmt_offset{gmt_offset}, ttimes{std::move(ttimes)}, offsets{std::move(offsets)} + { + } timezone_table_view view() const { return {gmt_offset, ttimes, offsets}; } }; @@ -119,10 +126,12 @@ struct timezone_table { * Uses system's TZif files. Assumes little-endian platform when parsing these files. * * @param timezone_name standard timezone name (for example, "US/Pacific") + * @param stream CUDA stream used for any device memory operations and kernel launches * * @return The transition table for the given timezone */ -timezone_table build_timezone_transition_table(std::string const &timezone_name); +timezone_table build_timezone_transition_table(std::string const &timezone_name, + rmm::cuda_stream_view stream); } // namespace io } // namespace cudf diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index b4e26c042fb..88444d41206 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -32,7 +32,7 @@ #include #include -#include +#include namespace cudf { namespace io { @@ -108,7 +108,10 @@ struct column_buffer { size = _size; switch (type.id()) { - case type_id::STRING: _strings.resize(size); break; + case type_id::STRING: + _strings = std::make_unique>(size, stream); + cudaMemsetAsync(_strings->data(), 0, size * sizeof(str_pair), stream.value()); + break; // list columns store a buffer of int32's as offsets to represent // their individual rows @@ -125,8 +128,8 @@ struct column_buffer { } } - auto data() { return _strings.size() ? _strings.data().get() : _data.data(); } - auto data_size() { return std::max(_data.size(), _strings.size() * sizeof(str_pair)); } + auto data() { return _strings ? _strings->data() : _data.data(); } + auto data_size() const { return _strings ? _strings->size() : _data.size(); } template auto null_mask() @@ -137,7 +140,7 @@ struct column_buffer { auto& null_count() { return _null_count; } - rmm::device_vector _strings; + std::unique_ptr> _strings; rmm::device_buffer _data{}; rmm::device_buffer _null_mask{}; size_type _null_count{0}; @@ -178,7 +181,7 @@ std::unique_ptr make_column( schema_info->children.push_back(column_name_info{"offsets"}); schema_info->children.push_back(column_name_info{"chars"}); } - return make_strings_column(buffer._strings, stream, mr); + return make_strings_column(*buffer._strings, stream, mr); case type_id::LIST: { // make offsets column