diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index dbc55827a32..7ed1aaed53b 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -155,6 +155,8 @@ include(cmake/thirdparty/get_gtest.cmake) include(cmake/Modules/JitifyPreprocessKernels.cmake) # find cuFile include(cmake/Modules/FindcuFile.cmake) +# find KvikIO +include(cmake/thirdparty/get_kvikio.cmake) # Workaround until https://github.com/rapidsai/rapids-cmake/issues/176 is resolved if(NOT BUILD_SHARED_LIBS) @@ -586,7 +588,7 @@ add_dependencies(cudf jitify_preprocess_run) target_link_libraries( cudf PUBLIC ${ARROW_LIBRARIES} libcudacxx::libcudacxx cudf::Thrust rmm::rmm - PRIVATE cuco::cuco ZLIB::ZLIB nvcomp::nvcomp + PRIVATE cuco::cuco ZLIB::ZLIB nvcomp::nvcomp kvikio::kvikio ) # Add Conda library, and include paths if specified diff --git a/cpp/cmake/thirdparty/get_kvikio.cmake b/cpp/cmake/thirdparty/get_kvikio.cmake new file mode 100644 index 00000000000..800ab2d5c6f --- /dev/null +++ b/cpp/cmake/thirdparty/get_kvikio.cmake @@ -0,0 +1,31 @@ +# ============================================================================= +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= + +# This function finds KvikIO +function(find_and_configure_kvikio VERSION) + + rapids_cpm_find( + KvikIO ${VERSION} + GLOBAL_TARGETS kvikio::kvikio + CPM_ARGS + GIT_REPOSITORY https://github.com/rapidsai/kvikio.git + GIT_TAG branch-${VERSION} + GIT_SHALLOW TRUE SOURCE_SUBDIR cpp + OPTIONS "KvikIO_BUILD_EXAMPLES OFF" + ) + +endfunction() + +set(KVIKIO_MIN_VERSION_cudf "${CUDF_VERSION_MAJOR}.${CUDF_VERSION_MINOR}") +find_and_configure_kvikio(${KVIKIO_MIN_VERSION_cudf}) diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index ed8c3d6e1e3..08b5914cb19 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -35,7 +35,7 @@ namespace { /** * @brief Defines which cuFile usage to enable. */ -enum class usage_policy : uint8_t { OFF, GDS, ALWAYS }; +enum class usage_policy : uint8_t { OFF, GDS, ALWAYS, KVIKIO }; /** * @brief Get the current usage policy. @@ -46,6 +46,7 @@ usage_policy get_env_policy() if (env_val == "OFF") return usage_policy::OFF; if (env_val == "GDS") return usage_policy::GDS; if (env_val == "ALWAYS") return usage_policy::ALWAYS; + if (env_val == "KVIKIO") return usage_policy::KVIKIO; CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val); } } // namespace @@ -54,6 +55,8 @@ bool is_always_enabled() { return get_env_policy() == usage_policy::ALWAYS; } bool is_gds_enabled() { return is_always_enabled() or get_env_policy() == usage_policy::GDS; } +bool is_kvikio_enabled() { return get_env_policy() == usage_policy::KVIKIO; } + } // namespace cufile_integration namespace nvcomp_integration { diff --git a/cpp/src/io/utilities/config_utils.hpp b/cpp/src/io/utilities/config_utils.hpp index 80c20529687..4f6a14091cf 100644 --- a/cpp/src/io/utilities/config_utils.hpp +++ b/cpp/src/io/utilities/config_utils.hpp @@ -48,6 +48,11 @@ bool is_always_enabled(); */ bool is_gds_enabled(); +/** + * @brief Returns true if KvikIO is enabled. + */ +bool is_kvikio_enabled(); + } // namespace cufile_integration namespace nvcomp_integration { diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 63d0103ddec..042afc01253 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,9 @@ #include "file_io_utilities.hpp" #include #include +#include +#include #include namespace cudf { @@ -30,10 +32,15 @@ namespace io { class file_sink : public data_sink { public: explicit file_sink(std::string const& filepath) - : _cufile_out(detail::make_cufile_output(filepath)) { _output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc); CUDF_EXPECTS(_output_stream.is_open(), "Cannot open output file"); + + if (detail::cufile_integration::is_kvikio_enabled()) { + _kvikio_file = kvikio::FileHandle(filepath, "w"); + } else { + _cufile_out = detail::make_cufile_output(filepath); + } } virtual ~file_sink() { flush(); } @@ -49,19 +56,15 @@ class file_sink : public data_sink { size_t bytes_written() override { return _bytes_written; } - [[nodiscard]] bool supports_device_write() const override { return _cufile_out != nullptr; } - - [[nodiscard]] bool is_device_write_preferred(size_t size) const override + [[nodiscard]] bool supports_device_write() const override { - return _cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size); + return !_kvikio_file.closed() || _cufile_out != nullptr; } - void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override + [[nodiscard]] bool is_device_write_preferred(size_t size) const override { - if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file."); - - _cufile_out->write(gpu_data, _bytes_written, size); - _bytes_written += size; + return !_kvikio_file.closed() || + (_cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size)); } std::future device_write_async(void const* gpu_data, @@ -70,15 +73,30 @@ class file_sink : public data_sink { { if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file."); - auto result = _cufile_out->write_async(gpu_data, _bytes_written, size); + size_t offset = _bytes_written; _bytes_written += size; - return result; + + if (!_kvikio_file.closed()) { + // KvikIO's `pwrite()` returns a `std::future` so we convert it + // to `std::future` + return std::async(std::launch::deferred, [this, gpu_data, size, offset] { + _kvikio_file.pwrite(gpu_data, size, offset).get(); + }); + } + return _cufile_out->write_async(gpu_data, offset, size); + } + + void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override + { + if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file."); + return device_write_async(gpu_data, _bytes_written, stream).get(); } private: std::ofstream _output_stream; size_t _bytes_written = 0; std::unique_ptr _cufile_out; + kvikio::FileHandle _kvikio_file; }; /** diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 6f864ab509f..80e07f31dd9 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,9 @@ #include #include +#include +#include + #include #include #include @@ -33,28 +36,38 @@ namespace { */ class file_source : public datasource { public: - explicit file_source(const char* filepath) - : _file(filepath, O_RDONLY), _cufile_in(detail::make_cufile_input(filepath)) + explicit file_source(const char* filepath) : _file(filepath, O_RDONLY) { + if (detail::cufile_integration::is_kvikio_enabled()) { + _kvikio_file = kvikio::FileHandle(filepath); + } else { + _cufile_in = detail::make_cufile_input(filepath); + } } virtual ~file_source() = default; - [[nodiscard]] bool supports_device_read() const override { return _cufile_in != nullptr; } + [[nodiscard]] bool supports_device_read() const override + { + return !_kvikio_file.closed() || _cufile_in != nullptr; + } [[nodiscard]] bool is_device_read_preferred(size_t size) const override { - return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size); + return !_kvikio_file.closed() || + (_cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size)); } - std::unique_ptr device_read(size_t offset, - size_t size, - rmm::cuda_stream_view stream) override + std::future device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override { CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); auto const read_size = std::min(size, _file.size() - offset); - return _cufile_in->read(offset, read_size, stream); + if (!_kvikio_file.closed()) { return _kvikio_file.pread(dst, read_size, offset); } + return _cufile_in->read_async(offset, read_size, dst, stream); } size_t device_read(size_t offset, @@ -62,21 +75,17 @@ class file_source : public datasource { uint8_t* dst, rmm::cuda_stream_view stream) override { - CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - - auto const read_size = std::min(size, _file.size() - offset); - return _cufile_in->read(offset, read_size, dst, stream); + return device_read_async(offset, size, dst, stream).get(); } - std::future device_read_async(size_t offset, - size_t size, - uint8_t* dst, - rmm::cuda_stream_view stream) override + std::unique_ptr device_read(size_t offset, + size_t size, + rmm::cuda_stream_view stream) override { - CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - - auto const read_size = std::min(size, _file.size() - offset); - return _cufile_in->read_async(offset, read_size, dst, stream); + rmm::device_buffer out_data(size, stream); + size_t read = device_read(offset, size, reinterpret_cast(out_data.data()), stream); + out_data.resize(read, stream); + return datasource::buffer::create(std::move(out_data)); } [[nodiscard]] size_t size() const override { return _file.size(); } @@ -86,6 +95,7 @@ class file_source : public datasource { private: std::unique_ptr _cufile_in; + kvikio::FileHandle _kvikio_file; }; /** diff --git a/cpp/src/io/utilities/file_io_utilities.cpp b/cpp/src/io/utilities/file_io_utilities.cpp index f7e250f1d3f..c0dd85702e2 100644 --- a/cpp/src/io/utilities/file_io_utilities.cpp +++ b/cpp/src/io/utilities/file_io_utilities.cpp @@ -176,16 +176,6 @@ cufile_input_impl::cufile_input_impl(std::string const& filepath) pool.sleep_duration = 10; } -std::unique_ptr cufile_input_impl::read(size_t offset, - size_t size, - rmm::cuda_stream_view stream) -{ - rmm::device_buffer out_data(size, stream); - auto read_size = read(offset, size, reinterpret_cast(out_data.data()), stream); - out_data.resize(read_size, stream); - return datasource::buffer::create(std::move(out_data)); -} - namespace { template cufile_input_impl::read_async(size_t offset, return std::async(std::launch::deferred, waiter, std::move(slice_tasks)); } -size_t cufile_input_impl::read(size_t offset, - size_t size, - uint8_t* dst, - rmm::cuda_stream_view stream) -{ - auto result = read_async(offset, size, dst, stream); - return result.get(); -} - cufile_output_impl::cufile_output_impl(std::string const& filepath) : shim{cufile_shim::instance()}, cf_file(shim, filepath, O_CREAT | O_RDWR | O_DIRECT, 0664), @@ -250,11 +231,6 @@ cufile_output_impl::cufile_output_impl(std::string const& filepath) { } -void cufile_output_impl::write(void const* data, size_t offset, size_t size) -{ - write_async(data, offset, size).wait(); -} - std::future cufile_output_impl::write_async(void const* data, size_t offset, size_t size) { int device; diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp index be3ecc49ab0..704ee77de8a 100644 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ b/cpp/src/io/utilities/file_io_utilities.hpp @@ -80,35 +80,6 @@ class cufile_io_base { */ class cufile_input : public cufile_io_base { public: - /** - * @brief Reads into a new device buffer. - * - * @throws cudf::logic_error on cuFile error - * - * @param offset Number of bytes from the start - * @param size Number of bytes to read - * @param stream CUDA stream to use - * - * @return The data buffer in the device memory - */ - virtual std::unique_ptr read(size_t offset, - size_t size, - rmm::cuda_stream_view stream) = 0; - - /** - * @brief Reads into existing device memory. - * - * @throws cudf::logic_error on cuFile error - * - * @param offset Number of bytes from the start - * @param size Number of bytes to read - * @param dst Address of the existing device memory - * @param stream CUDA stream to use - * - * @return The number of bytes read - */ - virtual size_t read(size_t offset, size_t size, uint8_t* dst, rmm::cuda_stream_view stream) = 0; - /** * @brief Asynchronously reads into existing device memory. * @@ -132,17 +103,6 @@ class cufile_input : public cufile_io_base { */ class cufile_output : public cufile_io_base { public: - /** - * @brief Writes the data from a device buffer into a file. - * - * @throws cudf::logic_error on cuFile error - * - * @param data Pointer to the buffer to be written into the output file - * @param offset Number of bytes from the start - * @param size Number of bytes to write - */ - virtual void write(void const* data, size_t offset, size_t size) = 0; - /** * @brief Asynchronously writes the data from a device buffer into a file. * @@ -203,12 +163,6 @@ class cufile_input_impl final : public cufile_input { public: cufile_input_impl(std::string const& filepath); - std::unique_ptr read(size_t offset, - size_t size, - rmm::cuda_stream_view stream) override; - - size_t read(size_t offset, size_t size, uint8_t* dst, rmm::cuda_stream_view stream) override; - std::future read_async(size_t offset, size_t size, uint8_t* dst, @@ -229,7 +183,6 @@ class cufile_output_impl final : public cufile_output { public: cufile_output_impl(std::string const& filepath); - void write(void const* data, size_t offset, size_t size) override; std::future write_async(void const* data, size_t offset, size_t size) override; private: @@ -241,18 +194,6 @@ class cufile_output_impl final : public cufile_output { class cufile_input_impl final : public cufile_input { public: - std::unique_ptr read(size_t offset, - size_t size, - rmm::cuda_stream_view stream) override - { - CUDF_FAIL("Only used to compile without cufile library, should not be called"); - } - - size_t read(size_t offset, size_t size, uint8_t* dst, rmm::cuda_stream_view stream) override - { - CUDF_FAIL("Only used to compile without cufile library, should not be called"); - } - std::future read_async(size_t offset, size_t size, uint8_t* dst, @@ -264,10 +205,6 @@ class cufile_input_impl final : public cufile_input { class cufile_output_impl final : public cufile_output { public: - void write(void const* data, size_t offset, size_t size) override - { - CUDF_FAIL("Only used to compile without cufile library, should not be called"); - } std::future write_async(void const* data, size_t offset, size_t size) override { CUDF_FAIL("Only used to compile without cufile library, should not be called"); diff --git a/docs/cudf/source/basics/io-gds-integration.rst b/docs/cudf/source/basics/io-gds-integration.rst index 5ff07ac29c5..ce774453386 100644 --- a/docs/cudf/source/basics/io-gds-integration.rst +++ b/docs/cudf/source/basics/io-gds-integration.rst @@ -10,10 +10,11 @@ GDS is also included in CUDA Toolkit 11.4 and higher. Use of GPUDirect Storage in cuDF is enabled by default, but can be disabled through the environment variable ``LIBCUDF_CUFILE_POLICY``. This variable also controls the GDS compatibility mode. -There are three valid values for the environment variable: +There are four valid values for the environment variable: - "GDS": Enable GDS use; GDS compatibility mode is *off*. - "ALWAYS": Enable GDS use; GDS compatibility mode is *on*. +- "KVIKIO": Enable GDS through `KvikIO `_. - "OFF": Completely disable GDS use. If no value is set, behavior will be the same as the "GDS" option. @@ -21,7 +22,9 @@ If no value is set, behavior will be the same as the "GDS" option. This environment variable also affects how cuDF treats GDS errors. When ``LIBCUDF_CUFILE_POLICY`` is set to "GDS" and a GDS API call fails for any reason, cuDF falls back to the internal implementation with bounce buffers. When ``LIBCUDF_CUFILE_POLICY`` is set to "ALWAYS" and a GDS API call fails for any reason (unlikely, given that the compatibility mode is on), -cuDF throws an exception to propagate the error to te user. +cuDF throws an exception to propagate the error to the user. +When ``LIBCUDF_CUFILE_POLICY`` is set to "KVIKIO" and a KvikIO API call fails for any reason (unlikely, given that KvikIO implements its own compatibility mode) cuDF throws an exception to propagate the error to the user. +For more information about error handling, compatibility mode, and tuning parameters in KvikIO see: https://github.com/rapidsai/kvikio Operations that support the use of GPUDirect Storage: @@ -36,4 +39,4 @@ Several parameters that can be used to tune the performance of GDS-enabled I/O a - ``LIBCUDF_CUFILE_THREAD_COUNT``: Integral value, maximum number of parallel reads/writes per file (default 16); - ``LIBCUDF_CUFILE_SLICE_SIZE``: Integral value, maximum size of each GDS read/write, in bytes (default 4MB). - Larger I/O operations are split into multiple calls. \ No newline at end of file + Larger I/O operations are split into multiple calls.