-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide data_chunk_source
wrapper for datasource
#11886
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,88 @@ namespace cudf::io::text { | |
|
||
namespace { | ||
|
||
/** | ||
* @brief A reader which produces owning chunks of device memory which contain a copy of the data | ||
* from an istream. | ||
*/ | ||
class datasource_chunk_reader : public data_chunk_reader { | ||
struct host_ticket { | ||
cudaEvent_t event; | ||
thrust::host_vector<char, thrust::system::cuda::experimental::pinned_allocator<char>> buffer; | ||
}; | ||
|
||
public: | ||
datasource_chunk_reader(datasource* source) : _source(source), _tickets(2) | ||
upsj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
// create an event to track the completion of the last device-to-host copy. | ||
for (auto& ticket : _tickets) { | ||
CUDF_CUDA_TRY(cudaEventCreate(&(ticket.event))); | ||
} | ||
} | ||
|
||
~datasource_chunk_reader() override | ||
{ | ||
for (auto& ticket : _tickets) { | ||
CUDF_CUDA_TRY(cudaEventDestroy(ticket.event)); | ||
} | ||
} | ||
|
||
void skip_bytes(std::size_t size) override | ||
{ | ||
_offset += std::min(_source->size() - _offset, size); | ||
}; | ||
|
||
std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t read_size, | ||
rmm::cuda_stream_view stream) override | ||
{ | ||
CUDF_FUNC_RANGE(); | ||
|
||
read_size = std::min(_source->size() - _offset, read_size); | ||
|
||
// get a device buffer containing read data on the device. | ||
auto chunk = rmm::device_uvector<char>(read_size, stream); | ||
|
||
if (_source->supports_device_read() && _source->is_device_read_preferred(read_size)) { | ||
_source->device_read_async( | ||
_offset, read_size, reinterpret_cast<uint8_t*>(chunk.data()), stream); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not in the scope of this PR. Should we unify the code use common type for source->device_read/host_read and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, zlib's deflate routine also takes |
||
} else { | ||
auto& h_ticket = _tickets[_next_ticket_idx]; | ||
|
||
_next_ticket_idx = (_next_ticket_idx + 1) % _tickets.size(); | ||
|
||
// synchronize on the last host-to-device copy, so we don't clobber the host buffer. | ||
CUDF_CUDA_TRY(cudaEventSynchronize(h_ticket.event)); | ||
|
||
// resize the host buffer as necessary to contain the requested number of bytes | ||
if (h_ticket.buffer.size() < read_size) { h_ticket.buffer.resize(read_size); } | ||
|
||
_source->host_read(_offset, read_size, reinterpret_cast<uint8_t*>(h_ticket.buffer.data())); | ||
|
||
// copy the host-pinned data on to device | ||
CUDF_CUDA_TRY(cudaMemcpyAsync( // | ||
upsj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
chunk.data(), | ||
h_ticket.buffer.data(), | ||
read_size, | ||
cudaMemcpyHostToDevice, | ||
stream.value())); | ||
|
||
// record the host-to-device copy. | ||
CUDF_CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value())); | ||
} | ||
|
||
_offset += read_size; | ||
|
||
// return the device buffer so it can be processed. | ||
return std::make_unique<device_uvector_data_chunk>(std::move(chunk)); | ||
} | ||
|
||
private: | ||
std::size_t _offset = 0; | ||
std::size_t _next_ticket_idx = 0; | ||
datasource* _source; | ||
std::vector<host_ticket> _tickets; | ||
}; | ||
|
||
/** | ||
* @brief A reader which produces owning chunks of device memory which contain a copy of the data | ||
* from an istream. | ||
|
@@ -180,6 +262,21 @@ class device_span_data_chunk_reader : public data_chunk_reader { | |
uint64_t _position = 0; | ||
}; | ||
|
||
/** | ||
* @brief A datasource-based data chunk source which creates a datasource_chunk_reader. | ||
*/ | ||
class datasource_chunk_source : public data_chunk_source { | ||
public: | ||
datasource_chunk_source(datasource& source) : _source(&source) {} | ||
[[nodiscard]] std::unique_ptr<data_chunk_reader> create_reader() const override | ||
{ | ||
return std::make_unique<datasource_chunk_reader>(_source); | ||
} | ||
|
||
private: | ||
datasource* _source; | ||
}; | ||
|
||
/** | ||
* @brief A file data source which creates an istream_data_chunk_reader. | ||
*/ | ||
|
@@ -228,6 +325,11 @@ class device_span_data_chunk_source : public data_chunk_source { | |
|
||
} // namespace | ||
|
||
std::unique_ptr<data_chunk_source> make_source(datasource& data) | ||
{ | ||
return std::make_unique<datasource_chunk_source>(data); | ||
} | ||
|
||
std::unique_ptr<data_chunk_source> make_source(host_span<const char> data) | ||
{ | ||
return std::make_unique<host_span_data_chunk_source>(data); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we document when this overload should be used? AFAIK, using
datasource
is often much slower than directly using aistream_data_chunk_reader
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure - datasource can use kvikio/cuFile, which we don't yet use in the data_chunk_source. Should I add it to the benchmark to put some numbers to this question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, yes I should
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
man, that thread took an unexpected turn :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For posterity: This is the difference between an
mmap
ped file andifstream