From 6863c8e77957de1b3f5b2e1714a9130d589de402 Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Mon, 10 Oct 2022 15:12:41 +0000 Subject: [PATCH 1/4] provide data_chunk_source wrapper for datasource --- .../io/text/data_chunk_source_factories.hpp | 9 ++ .../io/text/data_chunk_source_factories.cpp | 102 ++++++++++++++++++ cpp/tests/io/text/data_chunk_source_test.cpp | 28 +++++ 3 files changed, 139 insertions(+) diff --git a/cpp/include/cudf/io/text/data_chunk_source_factories.hpp b/cpp/include/cudf/io/text/data_chunk_source_factories.hpp index 6f94fb170a8..f5230863f17 100644 --- a/cpp/include/cudf/io/text/data_chunk_source_factories.hpp +++ b/cpp/include/cudf/io/text/data_chunk_source_factories.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -25,6 +26,14 @@ namespace cudf::io::text { +/** + * @brief Creates a data source capable of producing device-buffered views of a datasource. + * @param data the datasource to be exposed as a data chunk source + * @return the data chunk source for the provided datasource. It must not outlive the datasource + * used to construct it. + */ +std::unique_ptr make_source(datasource& data); + /** * @brief Creates a data source capable of producing device-buffered views of the given string. * @param data the host data to be exposed as a data chunk source. Its lifetime must be at least as diff --git a/cpp/src/io/text/data_chunk_source_factories.cpp b/cpp/src/io/text/data_chunk_source_factories.cpp index 9a549951d66..64558e89f93 100644 --- a/cpp/src/io/text/data_chunk_source_factories.cpp +++ b/cpp/src/io/text/data_chunk_source_factories.cpp @@ -30,6 +30,88 @@ namespace cudf::io::text { namespace { +/** + * @brief A reader which produces owning chunks of device memory which contain a copy of the data + * from an istream. + */ +class datasource_chunk_reader : public data_chunk_reader { + struct host_ticket { + cudaEvent_t event; + thrust::host_vector> buffer; + }; + + public: + datasource_chunk_reader(datasource* source) : _source(source), _tickets(2) + { + // create an event to track the completion of the last device-to-host copy. + for (auto& ticket : _tickets) { + CUDF_CUDA_TRY(cudaEventCreate(&(ticket.event))); + } + } + + ~datasource_chunk_reader() override + { + for (auto& ticket : _tickets) { + CUDF_CUDA_TRY(cudaEventDestroy(ticket.event)); + } + } + + void skip_bytes(std::size_t size) override + { + _offset += std::min(_source->size() - _offset, size); + }; + + std::unique_ptr get_next_chunk(std::size_t read_size, + rmm::cuda_stream_view stream) override + { + CUDF_FUNC_RANGE(); + + read_size = std::min(_source->size() - _offset, read_size); + + // get a device buffer containing read data on the device. + auto chunk = rmm::device_uvector(read_size, stream); + + if (_source->supports_device_read() && _source->is_device_read_preferred(read_size)) { + _source->device_read_async( + _offset, read_size, reinterpret_cast(chunk.data()), stream); + } else { + auto& h_ticket = _tickets[_next_ticket_idx]; + + _next_ticket_idx = (_next_ticket_idx + 1) % _tickets.size(); + + // synchronize on the last host-to-device copy, so we don't clobber the host buffer. + CUDF_CUDA_TRY(cudaEventSynchronize(h_ticket.event)); + + // resize the host buffer as necessary to contain the requested number of bytes + if (h_ticket.buffer.size() < read_size) { h_ticket.buffer.resize(read_size); } + + _source->host_read(_offset, read_size, reinterpret_cast(h_ticket.buffer.data())); + + // copy the host-pinned data on to device + CUDF_CUDA_TRY(cudaMemcpyAsync( // + chunk.data(), + h_ticket.buffer.data(), + read_size, + cudaMemcpyHostToDevice, + stream.value())); + + // record the host-to-device copy. + CUDF_CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value())); + } + + _offset += read_size; + + // return the device buffer so it can be processed. + return std::make_unique(std::move(chunk)); + } + + private: + std::size_t _offset = 0; + std::size_t _next_ticket_idx = 0; + datasource* _source; + std::vector _tickets; +}; + /** * @brief A reader which produces owning chunks of device memory which contain a copy of the data * from an istream. @@ -180,6 +262,21 @@ class device_span_data_chunk_reader : public data_chunk_reader { uint64_t _position = 0; }; +/** + * @brief A datasource-based data chunk source which creates a datasource_chunk_reader. + */ +class datasource_chunk_source : public data_chunk_source { + public: + datasource_chunk_source(datasource& source) : _source(&source) {} + [[nodiscard]] std::unique_ptr create_reader() const override + { + return std::make_unique(_source); + } + + private: + datasource* _source; +}; + /** * @brief A file data source which creates an istream_data_chunk_reader. */ @@ -228,6 +325,11 @@ class device_span_data_chunk_source : public data_chunk_source { } // namespace +std::unique_ptr make_source(datasource& data) +{ + return std::make_unique(data); +} + std::unique_ptr make_source(host_span data) { return std::make_unique(data); diff --git a/cpp/tests/io/text/data_chunk_source_test.cpp b/cpp/tests/io/text/data_chunk_source_test.cpp index 7cb75aea8e2..eba67506617 100644 --- a/cpp/tests/io/text/data_chunk_source_test.cpp +++ b/cpp/tests/io/text/data_chunk_source_test.cpp @@ -96,6 +96,34 @@ void test_source(const std::string& content, const cudf::io::text::data_chunk_so } } +TEST_F(DataChunkSourceTest, DataSourceHost) +{ + std::string const content = "host buffer source"; + auto const datasource = + cudf::io::datasource::create(cudf::io::host_buffer{content.data(), content.size()}); + auto const source = cudf::io::text::make_source(*datasource); + + test_source(content, *source); +} + +TEST_F(DataChunkSourceTest, DataSourceFile) +{ + std::string content = "file datasource"; + // make it big enought to have is_device_read_preferred return true + for (int i = 0; i < 20; i++) { + content = content + content; + } + auto const filename = temp_env->get_temp_filepath("file_source"); + { + std::ofstream file{filename}; + file << content; + } + auto const datasource = cudf::io::datasource::create(filename); + auto const source = cudf::io::text::make_source(*datasource); + + test_source(content, *source); +} + TEST_F(DataChunkSourceTest, Device) { std::string const content = "device buffer source"; From 16a38d50d85ec99b48d752cbe3469320774c1090 Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Wed, 26 Oct 2022 11:31:40 +0000 Subject: [PATCH 2/4] improve test content generation --- cpp/tests/io/text/data_chunk_source_test.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/tests/io/text/data_chunk_source_test.cpp b/cpp/tests/io/text/data_chunk_source_test.cpp index eba67506617..d8afd07b0f5 100644 --- a/cpp/tests/io/text/data_chunk_source_test.cpp +++ b/cpp/tests/io/text/data_chunk_source_test.cpp @@ -110,8 +110,9 @@ TEST_F(DataChunkSourceTest, DataSourceFile) { std::string content = "file datasource"; // make it big enought to have is_device_read_preferred return true + content.reserve(content.size() << 20); for (int i = 0; i < 20; i++) { - content = content + content; + content += content; } auto const filename = temp_env->get_temp_filepath("file_source"); { From 6090626f6e35a585cc4afcc56601253131c5afd3 Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Wed, 26 Oct 2022 12:10:24 +0000 Subject: [PATCH 3/4] add datasource to multibyte_split benchmark --- cpp/benchmarks/io/text/multibyte_split.cpp | 23 +++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/cpp/benchmarks/io/text/multibyte_split.cpp b/cpp/benchmarks/io/text/multibyte_split.cpp index b7e85d8aa7e..6edae80e79e 100644 --- a/cpp/benchmarks/io/text/multibyte_split.cpp +++ b/cpp/benchmarks/io/text/multibyte_split.cpp @@ -45,7 +45,7 @@ temp_directory const temp_dir("cudf_nvbench"); -enum class data_chunk_source_type { device, file, host, host_pinned, file_bgzip }; +enum class data_chunk_source_type { device, file, file_datasource, host, host_pinned, file_bgzip }; NVBENCH_DECLARE_ENUM_TYPE_STRINGS( data_chunk_source_type, @@ -53,6 +53,7 @@ NVBENCH_DECLARE_ENUM_TYPE_STRINGS( switch (value) { case data_chunk_source_type::device: return "device"; case data_chunk_source_type::file: return "file"; + case data_chunk_source_type::file_datasource: return "file_datasource"; case data_chunk_source_type::host: return "host"; case data_chunk_source_type::host_pinned: return "host_pinned"; case data_chunk_source_type::file_bgzip: return "file_bgzip"; @@ -133,13 +134,14 @@ static void bench_multibyte_split(nvbench::state& state, std::iota(delim.begin(), delim.end(), '1'); auto const delim_factor = static_cast(delim_percent) / 100; - auto device_input = create_random_input(file_size_approx, delim_factor, 0.05, delim); - auto host_input = std::vector{}; + std::unique_ptr datasource; + auto device_input = create_random_input(file_size_approx, delim_factor, 0.05, delim); + auto host_input = std::vector{}; auto host_pinned_input = thrust::host_vector>{}; - if (source_type == data_chunk_source_type::host || source_type == data_chunk_source_type::file || - source_type == data_chunk_source_type::file_bgzip) { + if (source_type != data_chunk_source_type::device && + source_type != data_chunk_source_type::host_pinned) { host_input = cudf::detail::make_std_vector_sync( {device_input.data(), static_cast(device_input.size())}, cudf::default_stream_value); @@ -154,11 +156,17 @@ static void bench_multibyte_split(nvbench::state& state, auto source = [&] { switch (source_type) { - case data_chunk_source_type::file: { + case data_chunk_source_type::file: + case data_chunk_source_type::file_datasource: { auto const temp_file_name = random_file_in_dir(temp_dir.path()); std::ofstream(temp_file_name, std::ofstream::out) .write(host_input.data(), host_input.size()); - return cudf::io::text::make_source_from_file(temp_file_name); + if (source_type == data_chunk_source_type::file) { + return cudf::io::text::make_source_from_file(temp_file_name); + } else { + datasource = cudf::io::datasource::create(temp_file_name); + return cudf::io::text::make_source(*datasource); + } } case data_chunk_source_type::host: // return cudf::io::text::make_source(host_input); @@ -197,6 +205,7 @@ static void bench_multibyte_split(nvbench::state& state, using source_type_list = nvbench::enum_type_list; From a267ce1d1575caaeaff43f43bf00307332cf48e2 Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Thu, 27 Oct 2022 10:15:34 +0000 Subject: [PATCH 4/4] store ticket count as constexpr --- .../io/text/data_chunk_source_factories.cpp | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/cpp/src/io/text/data_chunk_source_factories.cpp b/cpp/src/io/text/data_chunk_source_factories.cpp index 64558e89f93..b910037c5d2 100644 --- a/cpp/src/io/text/data_chunk_source_factories.cpp +++ b/cpp/src/io/text/data_chunk_source_factories.cpp @@ -40,8 +40,10 @@ class datasource_chunk_reader : public data_chunk_reader { thrust::host_vector> buffer; }; + constexpr static int num_tickets = 2; + public: - datasource_chunk_reader(datasource* source) : _source(source), _tickets(2) + datasource_chunk_reader(datasource* source) : _source(source) { // create an event to track the completion of the last device-to-host copy. for (auto& ticket : _tickets) { @@ -77,7 +79,7 @@ class datasource_chunk_reader : public data_chunk_reader { } else { auto& h_ticket = _tickets[_next_ticket_idx]; - _next_ticket_idx = (_next_ticket_idx + 1) % _tickets.size(); + _next_ticket_idx = (_next_ticket_idx + 1) % num_tickets; // synchronize on the last host-to-device copy, so we don't clobber the host buffer. CUDF_CUDA_TRY(cudaEventSynchronize(h_ticket.event)); @@ -88,12 +90,8 @@ class datasource_chunk_reader : public data_chunk_reader { _source->host_read(_offset, read_size, reinterpret_cast(h_ticket.buffer.data())); // copy the host-pinned data on to device - CUDF_CUDA_TRY(cudaMemcpyAsync( // - chunk.data(), - h_ticket.buffer.data(), - read_size, - cudaMemcpyHostToDevice, - stream.value())); + CUDF_CUDA_TRY(cudaMemcpyAsync( + chunk.data(), h_ticket.buffer.data(), read_size, cudaMemcpyHostToDevice, stream.value())); // record the host-to-device copy. CUDF_CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value())); @@ -108,8 +106,8 @@ class datasource_chunk_reader : public data_chunk_reader { private: std::size_t _offset = 0; std::size_t _next_ticket_idx = 0; + std::array _tickets{}; datasource* _source; - std::vector _tickets; }; /** @@ -122,9 +120,11 @@ class istream_data_chunk_reader : public data_chunk_reader { thrust::host_vector> buffer; }; + constexpr static int num_tickets = 2; + public: istream_data_chunk_reader(std::unique_ptr datastream) - : _datastream(std::move(datastream)), _tickets(2) + : _datastream(std::move(datastream)) { // create an event to track the completion of the last device-to-host copy. for (auto& ticket : _tickets) { @@ -148,7 +148,7 @@ class istream_data_chunk_reader : public data_chunk_reader { auto& h_ticket = _tickets[_next_ticket_idx]; - _next_ticket_idx = (_next_ticket_idx + 1) % _tickets.size(); + _next_ticket_idx = (_next_ticket_idx + 1) % num_tickets; // synchronize on the last host-to-device copy, so we don't clobber the host buffer. CUDF_CUDA_TRY(cudaEventSynchronize(h_ticket.event)); @@ -166,12 +166,8 @@ class istream_data_chunk_reader : public data_chunk_reader { auto chunk = rmm::device_uvector(read_size, stream); // copy the host-pinned data on to device - CUDF_CUDA_TRY(cudaMemcpyAsync( // - chunk.data(), - h_ticket.buffer.data(), - read_size, - cudaMemcpyHostToDevice, - stream.value())); + CUDF_CUDA_TRY(cudaMemcpyAsync( + chunk.data(), h_ticket.buffer.data(), read_size, cudaMemcpyHostToDevice, stream.value())); // record the host-to-device copy. CUDF_CUDA_TRY(cudaEventRecord(h_ticket.event, stream.value())); @@ -182,8 +178,8 @@ class istream_data_chunk_reader : public data_chunk_reader { private: std::size_t _next_ticket_idx = 0; + std::array _tickets{}; std::unique_ptr _datastream; - std::vector _tickets; }; /**