diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9936db5b2fa..259dde373ef 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -155,6 +155,8 @@ include(cmake/thirdparty/get_gtest.cmake) include(cmake/Modules/JitifyPreprocessKernels.cmake) # find cuFile include(cmake/Modules/FindcuFile.cmake) +# find KvikIO +include(cmake/thirdparty/get_kvikio.cmake) # ################################################################################################## # * library targets ------------------------------------------------------------------------------- @@ -530,6 +532,7 @@ target_include_directories( cudf PUBLIC "$" "$" + "$" "$" "$" PRIVATE "$" diff --git a/cpp/cmake/thirdparty/get_kvikio.cmake b/cpp/cmake/thirdparty/get_kvikio.cmake new file mode 100644 index 00000000000..0c5ca62f41c --- /dev/null +++ b/cpp/cmake/thirdparty/get_kvikio.cmake @@ -0,0 +1,34 @@ +# ============================================================================= +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= + +# This function finds KvikIO and sets `KvikIO_INCLUDE_DIR` +function(find_and_configure_kvikio) + + rapids_cpm_find( + KvikIO 22.04 + GLOBAL_TARGETS kvikio::kvikio + # CPM_ARGS GIT_REPOSITORY https://github.com/rapidsai/kvikio.git + CPM_ARGS # TODO: use version tags when they become available + GIT_REPOSITORY https://github.com/madsbk/kvikio.git SOURCE_SUBDIR cpp + GIT_TAG used_by_cudf_for_testing + OPTIONS "KvikIO_BUILD_EXAMPLES FALSE" # No need to build the KvikIO example + ) + set(KvikIO_INCLUDE_DIR + ${KvikIO_SOURCE_DIR}/cpp/include + PARENT_SCOPE + ) + +endfunction() + +find_and_configure_kvikio() diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index a6bfb0d888f..45d2155da09 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,10 +37,7 @@ namespace { */ enum class usage_policy : uint8_t { OFF, GDS, ALWAYS }; -/** - * @brief Get the current usage policy. - */ -usage_policy get_env_policy() +usage_policy _get_env_policy() { static auto const env_val = getenv_or("LIBCUDF_CUFILE_POLICY", "GDS"); if (env_val == "OFF") return usage_policy::OFF; @@ -48,6 +45,15 @@ usage_policy get_env_policy() if (env_val == "ALWAYS") return usage_policy::ALWAYS; CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val); } + +/** + * @brief Get the current usage policy. + */ +usage_policy get_env_policy() +{ + static auto const ret = _get_env_policy(); + return ret; +} } // namespace bool is_always_enabled() { return get_env_policy() == usage_policy::ALWAYS; } diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 63d0103ddec..718030d093e 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,12 @@ #include -#include "file_io_utilities.hpp" +#include + #include #include +#include #include namespace cudf { @@ -29,8 +31,7 @@ namespace io { */ class file_sink : public data_sink { public: - explicit file_sink(std::string const& filepath) - : _cufile_out(detail::make_cufile_output(filepath)) + explicit file_sink(std::string const& filepath) : _file(filepath, "w") { _output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc); CUDF_EXPECTS(_output_stream.is_open(), "Cannot open output file"); @@ -49,36 +50,33 @@ class file_sink : public data_sink { size_t bytes_written() override { return _bytes_written; } - [[nodiscard]] bool supports_device_write() const override { return _cufile_out != nullptr; } - - [[nodiscard]] bool is_device_write_preferred(size_t size) const override - { - return _cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size); - } - - void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override + [[nodiscard]] bool supports_device_write() const override { - if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file."); - - _cufile_out->write(gpu_data, _bytes_written, size); - _bytes_written += size; + return detail::cufile_integration::is_gds_enabled(); } std::future device_write_async(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override { - if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file."); - - auto result = _cufile_out->write_async(gpu_data, _bytes_written, size); + // KvikIO's `pwrite()` returns a `std::future` so we convert it + // to `std::future` + size_t offset = _bytes_written; _bytes_written += size; - return result; + return std::async(std::launch::deferred, [this, gpu_data, size, offset] { + _file.pwrite(gpu_data, size, offset).get(); + }); + } + + void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override + { + return device_write_async(gpu_data, size, stream).get(); } private: std::ofstream _output_stream; size_t _bytes_written = 0; - std::unique_ptr _cufile_out; + kvikio::FileHandle _file; }; /** diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 6f864ab509f..6c44330d55b 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,11 +14,13 @@ * limitations under the License. */ -#include "file_io_utilities.hpp" +#include #include #include -#include + +#include +#include #include #include @@ -33,28 +35,21 @@ namespace { */ class file_source : public datasource { public: - explicit file_source(const char* filepath) - : _file(filepath, O_RDONLY), _cufile_in(detail::make_cufile_input(filepath)) - { - } + explicit file_source(const char* filepath) : _file(filepath) {} virtual ~file_source() = default; - [[nodiscard]] bool supports_device_read() const override { return _cufile_in != nullptr; } - - [[nodiscard]] bool is_device_read_preferred(size_t size) const override + [[nodiscard]] bool supports_device_read() const override { - return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size); + return detail::cufile_integration::is_gds_enabled(); } - std::unique_ptr device_read(size_t offset, - size_t size, - rmm::cuda_stream_view stream) override + std::future device_read_async(size_t offset, + size_t size, + uint8_t* dst, + rmm::cuda_stream_view stream) override { - CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - - auto const read_size = std::min(size, _file.size() - offset); - return _cufile_in->read(offset, read_size, stream); + return _file.pread(dst, std::min(size, _file.nbytes() - offset), offset); } size_t device_read(size_t offset, @@ -62,30 +57,24 @@ class file_source : public datasource { uint8_t* dst, rmm::cuda_stream_view stream) override { - CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - - auto const read_size = std::min(size, _file.size() - offset); - return _cufile_in->read(offset, read_size, dst, stream); + return device_read_async(offset, size, dst, stream).get(); } - std::future device_read_async(size_t offset, - size_t size, - uint8_t* dst, - rmm::cuda_stream_view stream) override + std::unique_ptr device_read(size_t offset, + size_t size, + rmm::cuda_stream_view stream) override { - CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - - auto const read_size = std::min(size, _file.size() - offset); - return _cufile_in->read_async(offset, read_size, dst, stream); + rmm::device_buffer out_data(size, stream); + size_t read_size = + device_read(offset, size, reinterpret_cast(out_data.data()), stream); + out_data.resize(read_size, stream); + return datasource::buffer::create(std::move(out_data)); } - [[nodiscard]] size_t size() const override { return _file.size(); } + [[nodiscard]] size_t size() const override { return _file.nbytes(); } protected: - detail::file_wrapper _file; - - private: - std::unique_ptr _cufile_in; + kvikio::FileHandle _file; }; /** @@ -99,7 +88,7 @@ class memory_mapped_source : public file_source { explicit memory_mapped_source(const char* filepath, size_t offset, size_t size) : file_source(filepath) { - if (_file.size() != 0) map(_file.desc(), offset, size); + if (_file.nbytes() != 0) map(_file.fd(), offset, size); } ~memory_mapped_source() override @@ -133,12 +122,12 @@ class memory_mapped_source : public file_source { private: void map(int fd, size_t offset, size_t size) { - CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file"); + CUDF_EXPECTS(offset < _file.nbytes(), "Offset is past end of file"); // Offset for `mmap()` must be page aligned _map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1); - if (size == 0 || (offset + size) > _file.size()) { size = _file.size() - offset; } + if (size == 0 || (offset + size) > _file.nbytes()) { size = _file.nbytes() - offset; } // Size for `mmap()` needs to include the page padding _map_size = size + (offset - _map_offset); @@ -166,24 +155,26 @@ class direct_read_source : public file_source { std::unique_ptr host_read(size_t offset, size_t size) override { - lseek(_file.desc(), offset, SEEK_SET); + CUDF_EXPECTS(offset < _file.nbytes(), "Offset is past end of file"); + lseek(_file.fd(), offset, SEEK_SET); // Clamp length to available data - ssize_t const read_size = std::min(size, _file.size() - offset); + ssize_t const read_size = std::min(size, _file.nbytes() - offset); std::vector v(read_size); - CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed"); + CUDF_EXPECTS(read(_file.fd(), v.data(), read_size) == read_size, "read failed"); return buffer::create(std::move(v)); } size_t host_read(size_t offset, size_t size, uint8_t* dst) override { - lseek(_file.desc(), offset, SEEK_SET); + CUDF_EXPECTS(offset < _file.nbytes(), "Offset is past end of file"); + lseek(_file.fd(), offset, SEEK_SET); // Clamp length to available data - auto const read_size = std::min(size, _file.size() - offset); + auto const read_size = std::min(size, _file.nbytes() - offset); - CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast(read_size), + CUDF_EXPECTS(read(_file.fd(), dst, read_size) == static_cast(read_size), "read failed"); return read_size; } @@ -242,13 +233,12 @@ std::unique_ptr datasource::create(const std::string& filepath, size_t offset, size_t size) { -#ifdef CUFILE_FOUND - if (detail::cufile_integration::is_always_enabled()) { - // avoid mmap as GDS is expected to be used for most reads - return std::make_unique(filepath.c_str()); - } -#endif - // Use our own memory mapping implementation for direct file reads + // TODO: do we want an option to enable the use of `direct_read_source` instead + // of `memory_mapped_source`? + // return std::make_unique(filepath.c_str()); + + // Notice, some readers, such as avro, will call `host_read()` on more data + // than what is actually accessed thus the lazy nature of mmap is essential. return std::make_unique(filepath.c_str(), offset, size); }