Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make data chunk reader return unique_ptr #9129

Merged
merged 3 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions cpp/include/cudf/io/text/data_chunk_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,30 @@ namespace cudf {
namespace io {
namespace text {

/**
* @brief A contract guaranteeing stream-ordered memory access to the underlying device data.
*
* This class guarantees access to the underlying data for the stream on which the data was
* allocated. Possible implementations may own the device data, or may only have a view over the
* data. Any work enqueued to the stream on which this data was allocated is guaranteed to be
* performed prior to the destruction of the underlying data, but otherwise no guarantees are made
* regarding if or when the underlying data gets destroyed.
*/
class device_data_chunk {
public:
virtual char const* data() const = 0;
virtual std::size_t size() const = 0;
virtual operator device_span<char const>() const = 0;
};

/**
* @brief a reader capable of producing views over device memory.
*
* The data chunk reader API encapsulates the idea of statefully traversing and loading a data
* source. A data source may be a file, a region of device memory, or a region of host memory.
* Reading data from these data sources efficiently requires different strategies dependings on the
* type of data source, type of compression, capabilities of the host and device, the data's
* destination. Whole-file decompression should be hidden behind this interface
*
* destination. Whole-file decompression should be hidden behind this interface.
*/
class data_chunk_reader {
public:
Expand All @@ -51,14 +66,13 @@ class data_chunk_reader {
* reader reaches end of underlying data source. Returned data must be accessed in stream order
* relative to the specified @param stream.
*/
virtual device_span<char const> get_next_chunk(std::size_t size,
rmm::cuda_stream_view stream) = 0;
virtual std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t size,
rmm::cuda_stream_view stream) = 0;
};

/**
* @brief a data source capable of creating a reader which can produce views of the data source in
* device memory.
*
*/
class data_chunk_source {
public:
Expand Down
54 changes: 33 additions & 21 deletions cpp/include/cudf/io/text/data_chunk_source_factories.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ namespace text {

namespace {

class device_span_data_chunk : public device_data_chunk {
public:
device_span_data_chunk(device_span<char const> data) : _data(data) {}

char const* data() const override { return _data.data(); }
std::size_t size() const override { return _data.size(); }
operator device_span<char const>() const override { return _data; }

private:
device_span<char const> _data;
};

class device_uvector_data_chunk : public device_data_chunk {
public:
device_uvector_data_chunk(rmm::device_uvector<char>&& data) : _data(std::move(data)) {}

char const* data() const override { return _data.data(); }
std::size_t size() const override { return _data.size(); }
operator device_span<char const>() const override { return _data; }

private:
rmm::device_uvector<char> _data;
};

/**
* @brief a reader which produces views of device memory which contain a copy of the data from an
* istream.
Expand All @@ -50,7 +74,7 @@ class istream_data_chunk_reader : public data_chunk_reader {

public:
istream_data_chunk_reader(std::unique_ptr<std::istream> datastream)
: _datastream(std::move(datastream)), _buffers(), _tickets(2)
: _datastream(std::move(datastream)), _tickets(2)
{
// create an event to track the completion of the last device-to-host copy.
for (std::size_t i = 0; i < _tickets.size(); i++) {
Expand All @@ -65,19 +89,8 @@ class istream_data_chunk_reader : public data_chunk_reader {
}
}

device_span<char> find_or_create_data(std::size_t size, rmm::cuda_stream_view stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of hard to connect the RMM optimization and changes here. Was this a performance hack to avoid repeatedly allocating the chunks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the sync-and-steal behavior in rmm was stealing too often, preventing portions of work from overlapping. With the changes in rmm#851, we no longer have to work around that behavior.

Copy link
Contributor Author

@cwharris cwharris Aug 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR description to give a better overview of how the changes relate.

{
auto search = _buffers.find(stream.value());

if (search == _buffers.end() || search->second.size() < size) {
_buffers[stream.value()] = rmm::device_buffer(size, stream);
}

return device_span<char>(static_cast<char*>(_buffers[stream.value()].data()), size);
}

device_span<char const> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
{
CUDF_FUNC_RANGE();

Expand All @@ -98,11 +111,11 @@ class istream_data_chunk_reader : public data_chunk_reader {
read_size = _datastream->gcount();

// get a view over some device memory we can use to buffer the read data on to device.
auto chunk_span = find_or_create_data(read_size, stream);
auto chunk = rmm::device_uvector<char>(read_size, stream);

// copy the host-pinned data on to device
CUDA_TRY(cudaMemcpyAsync( //
chunk_span.data(),
chunk.data(),
h_ticket.buffer.data(),
read_size,
cudaMemcpyHostToDevice,
Expand All @@ -112,13 +125,12 @@ class istream_data_chunk_reader : public data_chunk_reader {
CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value()));

// return the view over device memory so it can be processed.
return chunk_span;
return std::make_unique<device_uvector_data_chunk>(std::move(chunk));
}

private:
std::size_t _next_ticket_idx = 0;
std::unique_ptr<std::istream> _datastream;
std::unordered_map<cudaStream_t, rmm::device_buffer> _buffers;
std::vector<host_ticket> _tickets;
};

Expand All @@ -131,8 +143,8 @@ class device_span_data_chunk_reader : public data_chunk_reader {
public:
device_span_data_chunk_reader(device_span<char const> data) : _data(data) {}

device_span<char const> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
{
// limit the read size to the number of bytes remaining in the device_span.
if (read_size > _data.size() - _position) { read_size = _data.size() - _position; }
Expand All @@ -144,7 +156,7 @@ class device_span_data_chunk_reader : public data_chunk_reader {
_position += read_size;

// return the view over device memory so it can be processed.
return chunk_span;
return std::make_unique<device_span_data_chunk>(chunk_span);
}

private:
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/text/multibyte_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,10 @@ cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_sour
auto chunk_stream = streams[i % streams.size()];
auto chunk = reader->get_next_chunk(ITEMS_PER_CHUNK, chunk_stream);

if (chunk.size() == 0) { break; }
if (chunk->size() == 0) { break; }

auto tiles_in_launch =
cudf::util::div_rounding_up_safe(chunk.size(), static_cast<std::size_t>(ITEMS_PER_TILE));
cudf::util::div_rounding_up_safe(chunk->size(), static_cast<std::size_t>(ITEMS_PER_TILE));

// reset the next chunk of tile state
multibyte_split_init_kernel<<<tiles_in_launch, THREADS_PER_TILE, 0, chunk_stream>>>( //
Expand All @@ -299,13 +299,13 @@ cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_sour
tile_offsets,
trie.view(),
chunk_offset,
chunk,
*chunk,
output_buffer,
output_char_buffer);

cudaEventRecord(last_launch_event, chunk_stream);

chunk_offset += chunk.size();
chunk_offset += chunk->size();
}

cudaEventDestroy(last_launch_event);
Expand Down