diff --git a/cpp/include/cudf/io/text/data_chunk_source.hpp b/cpp/include/cudf/io/text/data_chunk_source.hpp index 6ee1fa033d0..46bf9688469 100644 --- a/cpp/include/cudf/io/text/data_chunk_source.hpp +++ b/cpp/include/cudf/io/text/data_chunk_source.hpp @@ -25,6 +25,22 @@ namespace cudf { namespace io { namespace text { +/** + * @brief A contract guaranteeing stream-ordered memory access to the underlying device data. + * + * This class guarantees access to the underlying data for the stream on which the data was + * allocated. Possible implementations may own the device data, or may only have a view over the + * data. Any work enqueued to the stream on which this data was allocated is guaranteed to be + * performed prior to the destruction of the underlying data, but otherwise no guarantees are made + * regarding if or when the underlying data gets destroyed. + */ +class device_data_chunk { + public: + virtual char const* data() const = 0; + virtual std::size_t size() const = 0; + virtual operator device_span() const = 0; +}; + /** * @brief a reader capable of producing views over device memory. * @@ -32,8 +48,7 @@ namespace text { * source. A data source may be a file, a region of device memory, or a region of host memory. * Reading data from these data sources efficiently requires different strategies dependings on the * type of data source, type of compression, capabilities of the host and device, the data's - * destination. Whole-file decompression should be hidden behind this interface - * + * destination. Whole-file decompression should be hidden behind this interface. */ class data_chunk_reader { public: @@ -51,14 +66,13 @@ class data_chunk_reader { * reader reaches end of underlying data source. Returned data must be accessed in stream order * relative to the specified @param stream. */ - virtual device_span get_next_chunk(std::size_t size, - rmm::cuda_stream_view stream) = 0; + virtual std::unique_ptr get_next_chunk(std::size_t size, + rmm::cuda_stream_view stream) = 0; }; /** * @brief a data source capable of creating a reader which can produce views of the data source in * device memory. - * */ class data_chunk_source { public: diff --git a/cpp/include/cudf/io/text/data_chunk_source_factories.hpp b/cpp/include/cudf/io/text/data_chunk_source_factories.hpp index f6807c1c9a8..6b95de53ee7 100644 --- a/cpp/include/cudf/io/text/data_chunk_source_factories.hpp +++ b/cpp/include/cudf/io/text/data_chunk_source_factories.hpp @@ -37,6 +37,30 @@ namespace text { namespace { +class device_span_data_chunk : public device_data_chunk { + public: + device_span_data_chunk(device_span data) : _data(data) {} + + char const* data() const override { return _data.data(); } + std::size_t size() const override { return _data.size(); } + operator device_span() const override { return _data; } + + private: + device_span _data; +}; + +class device_uvector_data_chunk : public device_data_chunk { + public: + device_uvector_data_chunk(rmm::device_uvector&& data) : _data(std::move(data)) {} + + char const* data() const override { return _data.data(); } + std::size_t size() const override { return _data.size(); } + operator device_span() const override { return _data; } + + private: + rmm::device_uvector _data; +}; + /** * @brief a reader which produces views of device memory which contain a copy of the data from an * istream. @@ -50,7 +74,7 @@ class istream_data_chunk_reader : public data_chunk_reader { public: istream_data_chunk_reader(std::unique_ptr datastream) - : _datastream(std::move(datastream)), _buffers(), _tickets(2) + : _datastream(std::move(datastream)), _tickets(2) { // create an event to track the completion of the last device-to-host copy. for (std::size_t i = 0; i < _tickets.size(); i++) { @@ -65,19 +89,8 @@ class istream_data_chunk_reader : public data_chunk_reader { } } - device_span find_or_create_data(std::size_t size, rmm::cuda_stream_view stream) - { - auto search = _buffers.find(stream.value()); - - if (search == _buffers.end() || search->second.size() < size) { - _buffers[stream.value()] = rmm::device_buffer(size, stream); - } - - return device_span(static_cast(_buffers[stream.value()].data()), size); - } - - device_span get_next_chunk(std::size_t read_size, - rmm::cuda_stream_view stream) override + std::unique_ptr get_next_chunk(std::size_t read_size, + rmm::cuda_stream_view stream) override { CUDF_FUNC_RANGE(); @@ -98,11 +111,11 @@ class istream_data_chunk_reader : public data_chunk_reader { read_size = _datastream->gcount(); // get a view over some device memory we can use to buffer the read data on to device. - auto chunk_span = find_or_create_data(read_size, stream); + auto chunk = rmm::device_uvector(read_size, stream); // copy the host-pinned data on to device CUDA_TRY(cudaMemcpyAsync( // - chunk_span.data(), + chunk.data(), h_ticket.buffer.data(), read_size, cudaMemcpyHostToDevice, @@ -112,13 +125,12 @@ class istream_data_chunk_reader : public data_chunk_reader { CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value())); // return the view over device memory so it can be processed. - return chunk_span; + return std::make_unique(std::move(chunk)); } private: std::size_t _next_ticket_idx = 0; std::unique_ptr _datastream; - std::unordered_map _buffers; std::vector _tickets; }; @@ -131,8 +143,8 @@ class device_span_data_chunk_reader : public data_chunk_reader { public: device_span_data_chunk_reader(device_span data) : _data(data) {} - device_span get_next_chunk(std::size_t read_size, - rmm::cuda_stream_view stream) override + std::unique_ptr get_next_chunk(std::size_t read_size, + rmm::cuda_stream_view stream) override { // limit the read size to the number of bytes remaining in the device_span. if (read_size > _data.size() - _position) { read_size = _data.size() - _position; } @@ -144,7 +156,7 @@ class device_span_data_chunk_reader : public data_chunk_reader { _position += read_size; // return the view over device memory so it can be processed. - return chunk_span; + return std::make_unique(chunk_span); } private: diff --git a/cpp/src/io/text/multibyte_split.cu b/cpp/src/io/text/multibyte_split.cu index 662ec744680..a427809c81a 100644 --- a/cpp/src/io/text/multibyte_split.cu +++ b/cpp/src/io/text/multibyte_split.cu @@ -279,10 +279,10 @@ cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_sour auto chunk_stream = streams[i % streams.size()]; auto chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, chunk_stream); - if (chunk.size() == 0) { break; } + if (chunk->size() == 0) { break; } auto tiles_in_launch = - cudf::util::div_rounding_up_safe(chunk.size(), static_cast(ITEMS_PER_TILE)); + cudf::util::div_rounding_up_safe(chunk->size(), static_cast(ITEMS_PER_TILE)); // reset the next chunk of tile state multibyte_split_init_kernel<<>>( // @@ -299,13 +299,13 @@ cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_sour tile_offsets, trie.view(), chunk_offset, - chunk, + *chunk, output_buffer, output_char_buffer); cudaEventRecord(last_launch_event, chunk_stream); - chunk_offset += chunk.size(); + chunk_offset += chunk->size(); } cudaEventDestroy(last_launch_event);