Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide data_chunk_source wrapper for datasource #11886

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions cpp/benchmarks/io/text/multibyte_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@

temp_directory const temp_dir("cudf_nvbench");

enum class data_chunk_source_type { device, file, host, host_pinned, file_bgzip };
enum class data_chunk_source_type { device, file, file_datasource, host, host_pinned, file_bgzip };

NVBENCH_DECLARE_ENUM_TYPE_STRINGS(
data_chunk_source_type,
[](auto value) {
switch (value) {
case data_chunk_source_type::device: return "device";
case data_chunk_source_type::file: return "file";
case data_chunk_source_type::file_datasource: return "file_datasource";
case data_chunk_source_type::host: return "host";
case data_chunk_source_type::host_pinned: return "host_pinned";
case data_chunk_source_type::file_bgzip: return "file_bgzip";
Expand Down Expand Up @@ -133,13 +134,14 @@ static void bench_multibyte_split(nvbench::state& state,
std::iota(delim.begin(), delim.end(), '1');

auto const delim_factor = static_cast<double>(delim_percent) / 100;
auto device_input = create_random_input(file_size_approx, delim_factor, 0.05, delim);
auto host_input = std::vector<char>{};
std::unique_ptr<cudf::io::datasource> datasource;
auto device_input = create_random_input(file_size_approx, delim_factor, 0.05, delim);
auto host_input = std::vector<char>{};
auto host_pinned_input =
thrust::host_vector<char, thrust::system::cuda::experimental::pinned_allocator<char>>{};

if (source_type == data_chunk_source_type::host || source_type == data_chunk_source_type::file ||
source_type == data_chunk_source_type::file_bgzip) {
if (source_type != data_chunk_source_type::device &&
source_type != data_chunk_source_type::host_pinned) {
host_input = cudf::detail::make_std_vector_sync<char>(
{device_input.data(), static_cast<std::size_t>(device_input.size())},
cudf::default_stream_value);
Expand All @@ -154,11 +156,17 @@ static void bench_multibyte_split(nvbench::state& state,

auto source = [&] {
switch (source_type) {
case data_chunk_source_type::file: {
case data_chunk_source_type::file:
case data_chunk_source_type::file_datasource: {
auto const temp_file_name = random_file_in_dir(temp_dir.path());
std::ofstream(temp_file_name, std::ofstream::out)
.write(host_input.data(), host_input.size());
return cudf::io::text::make_source_from_file(temp_file_name);
if (source_type == data_chunk_source_type::file) {
return cudf::io::text::make_source_from_file(temp_file_name);
} else {
datasource = cudf::io::datasource::create(temp_file_name);
return cudf::io::text::make_source(*datasource);
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
}
}
case data_chunk_source_type::host: //
return cudf::io::text::make_source(host_input);
Expand Down Expand Up @@ -197,6 +205,7 @@ static void bench_multibyte_split(nvbench::state& state,

using source_type_list = nvbench::enum_type_list<data_chunk_source_type::device,
data_chunk_source_type::file,
data_chunk_source_type::file_datasource,
data_chunk_source_type::host,
data_chunk_source_type::host_pinned,
data_chunk_source_type::file_bgzip>;
Expand Down
9 changes: 9 additions & 0 deletions cpp/include/cudf/io/text/data_chunk_source_factories.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <cudf/io/datasource.hpp>
#include <cudf/io/text/data_chunk_source.hpp>
#include <cudf/scalar/scalar.hpp>
#include <cudf/utilities/span.hpp>
Expand All @@ -25,6 +26,14 @@

namespace cudf::io::text {

/**
* @brief Creates a data source capable of producing device-buffered views of a datasource.
* @param data the datasource to be exposed as a data chunk source
* @return the data chunk source for the provided datasource. It must not outlive the datasource
* used to construct it.
*/
std::unique_ptr<data_chunk_source> make_source(datasource& data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document when this overload should be used? AFAIK, using datasource is often much slower than directly using a istream_data_chunk_reader.

Copy link
Contributor Author

@upsj upsj Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure - datasource can use kvikio/cuFile, which we don't yet use in the data_chunk_source. Should I add it to the benchmark to put some numbers to this question?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, yes I should

T size_approx GPU Time Encoded file size
file 2^10 = 1024 44.450 ms 999.000 B
file 2^12 = 4096 44.404 ms 3.944 KiB
file 2^14 = 16384 44.271 ms 15.718 KiB
file 2^16 = 65536 44.338 ms 63.220 KiB
file 2^18 = 262144 44.442 ms 251.584 KiB
file 2^20 = 1048576 44.780 ms 1004.521 KiB
file 2^22 = 4194304 45.677 ms 3.927 MiB
file 2^24 = 16777216 49.339 ms 15.726 MiB
file 2^26 = 67108864 68.670 ms 62.926 MiB
file 2^28 = 268435456 126.198 ms 251.709 MiB
file 2^30 = 1073741824 357.786 ms 1006.638 MiB
file_datasource 2^10 = 1024 1.063 ms 999.000 B
file_datasource 2^12 = 4096 1.063 ms 3.944 KiB
file_datasource 2^14 = 16384 1.072 ms 15.718 KiB
file_datasource 2^16 = 65536 1.088 ms 63.220 KiB
file_datasource 2^18 = 262144 1.147 ms 251.584 KiB
file_datasource 2^20 = 1048576 1.587 ms 1004.521 KiB
file_datasource 2^22 = 4194304 3.879 ms 3.927 MiB
file_datasource 2^24 = 16777216 15.695 ms 15.726 MiB
file_datasource 2^26 = 67108864 65.186 ms 62.926 MiB
file_datasource 2^28 = 268435456 110.538 ms 251.709 MiB
file_datasource 2^30 = 1073741824 288.055 ms 1006.638 MiB

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

man, that thread took an unexpected turn :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity: This is the difference between an mmapped file and ifstream


/**
* @brief Creates a data source capable of producing device-buffered views of the given string.
* @param data the host data to be exposed as a data chunk source. Its lifetime must be at least as
Expand Down
116 changes: 107 additions & 9 deletions cpp/src/io/text/data_chunk_source_factories.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,86 @@ namespace cudf::io::text {

namespace {

/**
* @brief A reader which produces owning chunks of device memory which contain a copy of the data
* from an istream.
*/
class datasource_chunk_reader : public data_chunk_reader {
struct host_ticket {
cudaEvent_t event;
thrust::host_vector<char, thrust::system::cuda::experimental::pinned_allocator<char>> buffer;
};

constexpr static int num_tickets = 2;

public:
datasource_chunk_reader(datasource* source) : _source(source)
{
// create an event to track the completion of the last device-to-host copy.
for (auto& ticket : _tickets) {
CUDF_CUDA_TRY(cudaEventCreate(&(ticket.event)));
}
}

~datasource_chunk_reader() override
{
for (auto& ticket : _tickets) {
CUDF_CUDA_TRY(cudaEventDestroy(ticket.event));
}
}

void skip_bytes(std::size_t size) override
{
_offset += std::min(_source->size() - _offset, size);
};

std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
{
CUDF_FUNC_RANGE();

read_size = std::min(_source->size() - _offset, read_size);

// get a device buffer containing read data on the device.
auto chunk = rmm::device_uvector<char>(read_size, stream);

if (_source->supports_device_read() && _source->is_device_read_preferred(read_size)) {
_source->device_read_async(
_offset, read_size, reinterpret_cast<uint8_t*>(chunk.data()), stream);
Copy link
Contributor

@karthikeyann karthikeyann Oct 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not in the scope of this PR. Should we unify the code use common type for source->device_read/host_read and device_uvector_data_chunk ? Right now, one uses uint8_t*, the other uses char*.
asking @vuule for his comments too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe char* is used because it makes sense for text input. datasource does not make this assumption.
I would like to use std::byte for such "untyped" buffers. The problem is that std::byte is hard to use for anything that's not a bitwise operation. We could also templatize IO APIs to avoid casting the returned buffers. IMO it would be good to discuss this at some point (maybe a meeting with @upsj is here?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, zlib's deflate routine also takes unsigned char as input, while the iostream API uses char

} else {
auto& h_ticket = _tickets[_next_ticket_idx];

_next_ticket_idx = (_next_ticket_idx + 1) % num_tickets;

// synchronize on the last host-to-device copy, so we don't clobber the host buffer.
CUDF_CUDA_TRY(cudaEventSynchronize(h_ticket.event));

// resize the host buffer as necessary to contain the requested number of bytes
if (h_ticket.buffer.size() < read_size) { h_ticket.buffer.resize(read_size); }

_source->host_read(_offset, read_size, reinterpret_cast<uint8_t*>(h_ticket.buffer.data()));

// copy the host-pinned data on to device
CUDF_CUDA_TRY(cudaMemcpyAsync(
chunk.data(), h_ticket.buffer.data(), read_size, cudaMemcpyHostToDevice, stream.value()));

// record the host-to-device copy.
CUDF_CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value()));
}

_offset += read_size;

// return the device buffer so it can be processed.
return std::make_unique<device_uvector_data_chunk>(std::move(chunk));
}

private:
std::size_t _offset = 0;
std::size_t _next_ticket_idx = 0;
std::array<host_ticket, num_tickets> _tickets{};
datasource* _source;
};

/**
* @brief A reader which produces owning chunks of device memory which contain a copy of the data
* from an istream.
Expand All @@ -40,9 +120,11 @@ class istream_data_chunk_reader : public data_chunk_reader {
thrust::host_vector<char, thrust::system::cuda::experimental::pinned_allocator<char>> buffer;
};

constexpr static int num_tickets = 2;

public:
istream_data_chunk_reader(std::unique_ptr<std::istream> datastream)
: _datastream(std::move(datastream)), _tickets(2)
: _datastream(std::move(datastream))
{
// create an event to track the completion of the last device-to-host copy.
for (auto& ticket : _tickets) {
Expand All @@ -66,7 +148,7 @@ class istream_data_chunk_reader : public data_chunk_reader {

auto& h_ticket = _tickets[_next_ticket_idx];

_next_ticket_idx = (_next_ticket_idx + 1) % _tickets.size();
_next_ticket_idx = (_next_ticket_idx + 1) % num_tickets;

// synchronize on the last host-to-device copy, so we don't clobber the host buffer.
CUDF_CUDA_TRY(cudaEventSynchronize(h_ticket.event));
Expand All @@ -84,12 +166,8 @@ class istream_data_chunk_reader : public data_chunk_reader {
auto chunk = rmm::device_uvector<char>(read_size, stream);

// copy the host-pinned data on to device
CUDF_CUDA_TRY(cudaMemcpyAsync( //
chunk.data(),
h_ticket.buffer.data(),
read_size,
cudaMemcpyHostToDevice,
stream.value()));
CUDF_CUDA_TRY(cudaMemcpyAsync(
chunk.data(), h_ticket.buffer.data(), read_size, cudaMemcpyHostToDevice, stream.value()));

// record the host-to-device copy.
CUDF_CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value()));
Expand All @@ -100,8 +178,8 @@ class istream_data_chunk_reader : public data_chunk_reader {

private:
std::size_t _next_ticket_idx = 0;
std::array<host_ticket, num_tickets> _tickets{};
std::unique_ptr<std::istream> _datastream;
std::vector<host_ticket> _tickets;
};

/**
Expand Down Expand Up @@ -180,6 +258,21 @@ class device_span_data_chunk_reader : public data_chunk_reader {
uint64_t _position = 0;
};

/**
* @brief A datasource-based data chunk source which creates a datasource_chunk_reader.
*/
class datasource_chunk_source : public data_chunk_source {
public:
datasource_chunk_source(datasource& source) : _source(&source) {}
[[nodiscard]] std::unique_ptr<data_chunk_reader> create_reader() const override
{
return std::make_unique<datasource_chunk_reader>(_source);
}

private:
datasource* _source;
};

/**
* @brief A file data source which creates an istream_data_chunk_reader.
*/
Expand Down Expand Up @@ -228,6 +321,11 @@ class device_span_data_chunk_source : public data_chunk_source {

} // namespace

std::unique_ptr<data_chunk_source> make_source(datasource& data)
{
return std::make_unique<datasource_chunk_source>(data);
}

std::unique_ptr<data_chunk_source> make_source(host_span<const char> data)
{
return std::make_unique<host_span_data_chunk_source>(data);
Expand Down
29 changes: 29 additions & 0 deletions cpp/tests/io/text/data_chunk_source_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,35 @@ void test_source(const std::string& content, const cudf::io::text::data_chunk_so
}
}

TEST_F(DataChunkSourceTest, DataSourceHost)
{
std::string const content = "host buffer source";
auto const datasource =
cudf::io::datasource::create(cudf::io::host_buffer{content.data(), content.size()});
auto const source = cudf::io::text::make_source(*datasource);

test_source(content, *source);
}

TEST_F(DataChunkSourceTest, DataSourceFile)
{
std::string content = "file datasource";
upsj marked this conversation as resolved.
Show resolved Hide resolved
// make it big enought to have is_device_read_preferred return true
content.reserve(content.size() << 20);
for (int i = 0; i < 20; i++) {
content += content;
}
auto const filename = temp_env->get_temp_filepath("file_source");
{
std::ofstream file{filename};
file << content;
}
auto const datasource = cudf::io::datasource::create(filename);
auto const source = cudf::io::text::make_source(*datasource);

test_source(content, *source);
}

TEST_F(DataChunkSourceTest, Device)
{
std::string const content = "device buffer source";
Expand Down