Skip to content

Commit

Permalink
make data chunk reader return unique_ptr (#9129)
Browse files Browse the repository at this point in the history
Depends on rapidsai/rmm#851, for performance reasons.

There are two parts to this change. First, we remove a workaround for RMM's sync-and-steal behavior which was preventing some work from overlapping. This behavior is significantly improveed in rmm#851. The workaround involved allocating long-lived buffers and reusing them. With this change, we create device_uvectors on-the-fly and return them, which brings us to the second part of the change...

Because the data chunk reader owned the long-lived buffers, it was possible to return `device_span`s from the `get_next_chunk` method. Now that the `device_uvector`s are created on the fly and returned, we need an interface that supports ownership of the data on an implementation basis. Different readers can return different implementations of `device_data_chunk` via a `unique_ptr`. Those implementations can be owners of data, or just views.

This PR should merge only after rmm#851, else it will cause performance degradation in `multibyte_split` (which is the only API to use this reader so far).

Authors:
  - Christopher Harris (https://github.com/cwharris)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Elias Stehle (https://github.com/elstehle)

URL: #9129
  • Loading branch information
cwharris authored Aug 27, 2021
1 parent 4d8e401 commit 31b731e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 30 deletions.
24 changes: 19 additions & 5 deletions cpp/include/cudf/io/text/data_chunk_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,30 @@ namespace cudf {
namespace io {
namespace text {

/**
* @brief A contract guaranteeing stream-ordered memory access to the underlying device data.
*
* This class guarantees access to the underlying data for the stream on which the data was
* allocated. Possible implementations may own the device data, or may only have a view over the
* data. Any work enqueued to the stream on which this data was allocated is guaranteed to be
* performed prior to the destruction of the underlying data, but otherwise no guarantees are made
* regarding if or when the underlying data gets destroyed.
*/
class device_data_chunk {
public:
virtual char const* data() const = 0;
virtual std::size_t size() const = 0;
virtual operator device_span<char const>() const = 0;
};

/**
* @brief a reader capable of producing views over device memory.
*
* The data chunk reader API encapsulates the idea of statefully traversing and loading a data
* source. A data source may be a file, a region of device memory, or a region of host memory.
* Reading data from these data sources efficiently requires different strategies dependings on the
* type of data source, type of compression, capabilities of the host and device, the data's
* destination. Whole-file decompression should be hidden behind this interface
*
* destination. Whole-file decompression should be hidden behind this interface.
*/
class data_chunk_reader {
public:
Expand All @@ -51,14 +66,13 @@ class data_chunk_reader {
* reader reaches end of underlying data source. Returned data must be accessed in stream order
* relative to the specified @param stream.
*/
virtual device_span<char const> get_next_chunk(std::size_t size,
rmm::cuda_stream_view stream) = 0;
virtual std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t size,
rmm::cuda_stream_view stream) = 0;
};

/**
* @brief a data source capable of creating a reader which can produce views of the data source in
* device memory.
*
*/
class data_chunk_source {
public:
Expand Down
54 changes: 33 additions & 21 deletions cpp/include/cudf/io/text/data_chunk_source_factories.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ namespace text {

namespace {

class device_span_data_chunk : public device_data_chunk {
public:
device_span_data_chunk(device_span<char const> data) : _data(data) {}

char const* data() const override { return _data.data(); }
std::size_t size() const override { return _data.size(); }
operator device_span<char const>() const override { return _data; }

private:
device_span<char const> _data;
};

class device_uvector_data_chunk : public device_data_chunk {
public:
device_uvector_data_chunk(rmm::device_uvector<char>&& data) : _data(std::move(data)) {}

char const* data() const override { return _data.data(); }
std::size_t size() const override { return _data.size(); }
operator device_span<char const>() const override { return _data; }

private:
rmm::device_uvector<char> _data;
};

/**
* @brief a reader which produces views of device memory which contain a copy of the data from an
* istream.
Expand All @@ -50,7 +74,7 @@ class istream_data_chunk_reader : public data_chunk_reader {

public:
istream_data_chunk_reader(std::unique_ptr<std::istream> datastream)
: _datastream(std::move(datastream)), _buffers(), _tickets(2)
: _datastream(std::move(datastream)), _tickets(2)
{
// create an event to track the completion of the last device-to-host copy.
for (std::size_t i = 0; i < _tickets.size(); i++) {
Expand All @@ -65,19 +89,8 @@ class istream_data_chunk_reader : public data_chunk_reader {
}
}

device_span<char> find_or_create_data(std::size_t size, rmm::cuda_stream_view stream)
{
auto search = _buffers.find(stream.value());

if (search == _buffers.end() || search->second.size() < size) {
_buffers[stream.value()] = rmm::device_buffer(size, stream);
}

return device_span<char>(static_cast<char*>(_buffers[stream.value()].data()), size);
}

device_span<char const> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
{
CUDF_FUNC_RANGE();

Expand All @@ -98,11 +111,11 @@ class istream_data_chunk_reader : public data_chunk_reader {
read_size = _datastream->gcount();

// get a view over some device memory we can use to buffer the read data on to device.
auto chunk_span = find_or_create_data(read_size, stream);
auto chunk = rmm::device_uvector<char>(read_size, stream);

// copy the host-pinned data on to device
CUDA_TRY(cudaMemcpyAsync( //
chunk_span.data(),
chunk.data(),
h_ticket.buffer.data(),
read_size,
cudaMemcpyHostToDevice,
Expand All @@ -112,13 +125,12 @@ class istream_data_chunk_reader : public data_chunk_reader {
CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value()));

// return the view over device memory so it can be processed.
return chunk_span;
return std::make_unique<device_uvector_data_chunk>(std::move(chunk));
}

private:
std::size_t _next_ticket_idx = 0;
std::unique_ptr<std::istream> _datastream;
std::unordered_map<cudaStream_t, rmm::device_buffer> _buffers;
std::vector<host_ticket> _tickets;
};

Expand All @@ -131,8 +143,8 @@ class device_span_data_chunk_reader : public data_chunk_reader {
public:
device_span_data_chunk_reader(device_span<char const> data) : _data(data) {}

device_span<char const> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
{
// limit the read size to the number of bytes remaining in the device_span.
if (read_size > _data.size() - _position) { read_size = _data.size() - _position; }
Expand All @@ -144,7 +156,7 @@ class device_span_data_chunk_reader : public data_chunk_reader {
_position += read_size;

// return the view over device memory so it can be processed.
return chunk_span;
return std::make_unique<device_span_data_chunk>(chunk_span);
}

private:
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/text/multibyte_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,10 @@ cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_sour
auto chunk_stream = streams[i % streams.size()];
auto chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, chunk_stream);

if (chunk.size() == 0) { break; }
if (chunk->size() == 0) { break; }

auto tiles_in_launch =
cudf::util::div_rounding_up_safe(chunk.size(), static_cast<std::size_t>(ITEMS_PER_TILE));
cudf::util::div_rounding_up_safe(chunk->size(), static_cast<std::size_t>(ITEMS_PER_TILE));

// reset the next chunk of tile state
multibyte_split_init_kernel<<<tiles_in_launch, THREADS_PER_TILE, 0, chunk_stream>>>( //
Expand All @@ -299,13 +299,13 @@ cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_sour
tile_offsets,
trie.view(),
chunk_offset,
chunk,
*chunk,
output_buffer,
output_char_buffer);

cudaEventRecord(last_launch_event, chunk_stream);

chunk_offset += chunk.size();
chunk_offset += chunk->size();
}

cudaEventDestroy(last_launch_event);
Expand Down

0 comments on commit 31b731e

Please sign in to comment.