diff --git a/cpp/include/cudf/io/datasource.hpp b/cpp/include/cudf/io/datasource.hpp index 8fcc045e6d2..ab7a3a6fa9b 100644 --- a/cpp/include/cudf/io/datasource.hpp +++ b/cpp/include/cudf/io/datasource.hpp @@ -123,7 +123,7 @@ class datasource { * @param[in] offset Bytes from the start * @param[in] size Bytes to read * - * @return The data buffer + * @return The data buffer (can be smaller than size) */ virtual std::unique_ptr host_read(size_t offset, size_t size) = 0; diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 3f2884d5b7d..8f2a5389b4d 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -25,32 +25,69 @@ namespace cudf { namespace io { +namespace { /** - * @brief Implementation class for reading from a file or memory source using - * memory mapped access. - * - * Unlike Arrow's memory mapped IO class, this implementation allows memory - * mapping a subset of the file where the starting offset may not be zero. + * @brief Base class for file input. Only implements direct device reads. */ -class memory_mapped_source : public datasource { - class memory_mapped_buffer : public buffer { - size_t _size = 0; - uint8_t *_data = nullptr; +class file_source : public datasource { + public: + explicit file_source(const char *filepath) + : _file(filepath, O_RDONLY), _cufile_in(detail::make_cufile_input(filepath)) + { + } + + virtual ~file_source() = default; + + bool supports_device_read() const override { return _cufile_in != nullptr; } + + bool is_device_read_preferred(size_t size) const + { + return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size); + } + + std::unique_ptr device_read(size_t offset, + size_t size, + rmm::cuda_stream_view stream) override + { + CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); + + auto const read_size = std::min(size, _file.size() - offset); + return _cufile_in->read(offset, read_size, stream); + } + + size_t device_read(size_t offset, + size_t size, + uint8_t *dst, + rmm::cuda_stream_view stream) override + { + CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - public: - memory_mapped_buffer(uint8_t *data, size_t size) : _size(size), _data(data) {} - size_t size() const override { return _size; } - const uint8_t *data() const override { return _data; } - }; + auto const read_size = std::min(size, _file.size() - offset); + return _cufile_in->read(offset, read_size, dst, stream); + } + + size_t size() const override { return _file.size(); } + + protected: + detail::file_wrapper _file; + + private: + std::unique_ptr _cufile_in; +}; +/** + * @brief Implementation class for reading from a file using memory mapped access. + * + * Unlike Arrow's memory mapped IO class, this implementation allows memory mapping a subset of the + * file where the starting offset may not be zero. + */ +class memory_mapped_source : public file_source { public: explicit memory_mapped_source(const char *filepath, size_t offset, size_t size) - : _cufile_in(detail::make_cufile_input(filepath)) + : file_source(filepath) { - auto const file = detail::file_wrapper(filepath, O_RDONLY); - _file_size = file.size(); - if (_file_size != 0) { map(file.desc(), offset, size); } + if (_file.size() != 0) map(_file.desc(), offset, size); } virtual ~memory_mapped_source() @@ -65,7 +102,7 @@ class memory_mapped_source : public datasource { // Clamp length to available data in the mapped region auto const read_size = std::min(size, _map_size - (offset - _map_offset)); - return std::make_unique( + return std::make_unique( static_cast(_map_addr) + (offset - _map_offset), read_size); } @@ -81,49 +118,15 @@ class memory_mapped_source : public datasource { return read_size; } - bool supports_device_read() const override { return _cufile_in != nullptr; } - - bool is_device_read_preferred(size_t size) const - { - return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size); - } - - std::unique_ptr device_read(size_t offset, - size_t size, - rmm::cuda_stream_view stream) override - { - if (!supports_device_read()) CUDF_FAIL("Device reads are not supported for this file."); - - auto const read_size = std::min(size, _map_size - (offset - _map_offset)); - return _cufile_in->read(offset, read_size, stream); - } - - size_t device_read(size_t offset, - size_t size, - uint8_t *dst, - rmm::cuda_stream_view stream) override - { - if (!supports_device_read()) CUDF_FAIL("Device reads are not supported for this file."); - auto const read_size = std::min(size, _map_size - (offset - _map_offset)); - return _cufile_in->read(offset, read_size, dst, stream); - } - - size_t size() const override { return _file_size; } - private: void map(int fd, size_t offset, size_t size) { - CUDF_EXPECTS(offset < _file_size, "Offset is past end of file"); + CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file"); // Offset for `mmap()` must be page aligned _map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1); - // Clamp length to available data in the file - if (size == 0) { - size = _file_size - offset; - } else { - if ((offset + size) > _file_size) { size = _file_size - offset; } - } + if (size == 0 || (offset + size) > _file.size()) { size = _file.size() - offset; } // Size for `mmap()` needs to include the page padding _map_size = size + (offset - _map_offset); @@ -134,11 +137,44 @@ class memory_mapped_source : public datasource { } private: - size_t _file_size = 0; - void *_map_addr = nullptr; size_t _map_size = 0; size_t _map_offset = 0; - std::unique_ptr _cufile_in; + void *_map_addr = nullptr; +}; + +/** + * @brief Implementation class for reading from a file using `read` calls + * + * Potentially faster than `memory_mapped_source` when only a small portion of the file is read + * through the host. + */ +class direct_read_source : public file_source { + public: + explicit direct_read_source(const char *filepath) : file_source(filepath) {} + + std::unique_ptr host_read(size_t offset, size_t size) override + { + lseek(_file.desc(), offset, SEEK_SET); + + // Clamp length to available data + ssize_t const read_size = std::min(size, _file.size() - offset); + + std::vector v(read_size); + CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed"); + return buffer::create(std::move(v)); + } + + size_t host_read(size_t offset, size_t size, uint8_t *dst) override + { + lseek(_file.desc(), offset, SEEK_SET); + + // Clamp length to available data + auto const read_size = std::min(size, _file.size() - offset); + + CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast(read_size), + "read failed"); + return read_size; + } }; /** @@ -185,10 +221,18 @@ class user_datasource_wrapper : public datasource { datasource *const source; ///< A non-owning pointer to the user-implemented datasource }; +} // namespace + std::unique_ptr datasource::create(const std::string &filepath, size_t offset, size_t size) { +#ifdef CUFILE_FOUND + if (detail::cufile_config::instance()->is_required()) { + // avoid mmap as GDS is expected to be used for most reads + return std::make_unique(filepath.c_str()); + } +#endif // Use our own memory mapping implementation for direct file reads return std::make_unique(filepath.c_str(), offset, size); } diff --git a/cpp/src/io/utilities/file_io_utilities.cpp b/cpp/src/io/utilities/file_io_utilities.cpp index 22ff057cbc1..322296715fc 100644 --- a/cpp/src/io/utilities/file_io_utilities.cpp +++ b/cpp/src/io/utilities/file_io_utilities.cpp @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include #include #include @@ -26,93 +25,67 @@ namespace cudf { namespace io { namespace detail { +size_t get_file_size(int file_descriptor) +{ + struct stat st; + CUDF_EXPECTS(fstat(file_descriptor, &st) != -1, "Cannot query file size"); + return static_cast(st.st_size); +} + file_wrapper::file_wrapper(std::string const &filepath, int flags) - : fd(open(filepath.c_str(), flags)) + : fd(open(filepath.c_str(), flags)), _size{get_file_size(fd)} { CUDF_EXPECTS(fd != -1, "Cannot open file " + filepath); } file_wrapper::file_wrapper(std::string const &filepath, int flags, mode_t mode) - : fd(open(filepath.c_str(), flags, mode)) + : fd(open(filepath.c_str(), flags, mode)), _size{get_file_size(fd)} { CUDF_EXPECTS(fd != -1, "Cannot open file " + filepath); } file_wrapper::~file_wrapper() { close(fd); } -long file_wrapper::size() const +std::string getenv_or(std::string const &env_var_name, std::string const &default_val) { - if (_size < 0) { - struct stat st; - CUDF_EXPECTS(fstat(fd, &st) != -1, "Cannot query file size"); - _size = static_cast(st.st_size); - } - return _size; + auto const env_val = std::getenv(env_var_name.c_str()); + return (env_val == nullptr) ? default_val : std::string(env_val); } #ifdef CUFILE_FOUND -/** - * @brief Class that manages cuFile configuration. - */ -class cufile_config { - std::string const default_policy = "OFF"; - std::string const json_path_env_var = "CUFILE_ENV_PATH_JSON"; - - std::string const policy = default_policy; - temp_directory tmp_config_dir{"cudf_cufile_config"}; - - std::string getenv_or(std::string const &env_var_name, std::string const &default_val) - { - auto const env_val = std::getenv(env_var_name.c_str()); - return (env_val == nullptr) ? default_val : std::string(env_val); - } - - cufile_config() : policy{getenv_or("LIBCUDF_CUFILE_POLICY", default_policy)} - { - if (is_enabled()) { - // Modify the config file based on the policy - auto const config_file_path = getenv_or(json_path_env_var, "/etc/cufile.json"); - std::ifstream user_config_file(config_file_path); - // Modified config file is stored in a temporary directory - auto const cudf_config_path = tmp_config_dir.path() + "/cufile.json"; - std::ofstream cudf_config_file(cudf_config_path); - - std::string line; - while (std::getline(user_config_file, line)) { - std::string const tag = "\"allow_compat_mode\""; - if (line.find(tag) != std::string::npos) { - // TODO: only replace the true/false value - // Enable compatiblity mode when cuDF does not fall back to host path - cudf_config_file << tag << ": " << (is_required() ? "true" : "false") << ",\n"; - } else { - cudf_config_file << line << '\n'; - } - - // Point libcufile to the modified config file - CUDF_EXPECTS(setenv(json_path_env_var.c_str(), cudf_config_path.c_str(), 0) == 0, - "Failed to set the cuFile config file environment variable."); +cufile_config::cufile_config() : policy{getenv_or("LIBCUDF_CUFILE_POLICY", default_policy)} +{ + if (is_enabled()) { + // Modify the config file based on the policy + auto const config_file_path = getenv_or(json_path_env_var, "/etc/cufile.json"); + std::ifstream user_config_file(config_file_path); + // Modified config file is stored in a temporary directory + auto const cudf_config_path = tmp_config_dir.path() + "/cufile.json"; + std::ofstream cudf_config_file(cudf_config_path); + + std::string line; + while (std::getline(user_config_file, line)) { + std::string const tag = "\"allow_compat_mode\""; + if (line.find(tag) != std::string::npos) { + // TODO: only replace the true/false value + // Enable compatiblity mode when cuDF does not fall back to host path + cudf_config_file << tag << ": " << (is_required() ? "true" : "false") << ",\n"; + } else { + cudf_config_file << line << '\n'; } - } - } - - public: - /** - * @brief Returns true when cuFile use is enabled. - */ - bool is_enabled() const { return policy == "ALWAYS" or policy == "GDS"; } - - /** - * @brief Returns true when cuDF should not fall back to host IO. - */ - bool is_required() const { return policy == "ALWAYS"; } - static cufile_config const *instance() - { - static cufile_config _instance; - return &_instance; + // Point libcufile to the modified config file + CUDF_EXPECTS(setenv(json_path_env_var.c_str(), cudf_config_path.c_str(), 0) == 0, + "Failed to set the cuFile config file environment variable."); + } } -}; +} +cufile_config const *cufile_config::instance() +{ + static cufile_config _instance; + return &_instance; +} /** * @brief Class that dynamically loads the cuFile library and manages the cuFile driver. diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp index 85399bdd44d..0119484aee5 100644 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ b/cpp/src/io/utilities/file_io_utilities.hpp @@ -24,6 +24,7 @@ #include #include +#include #include @@ -35,14 +36,14 @@ namespace detail { * @brief Class that provides RAII for file handling. */ class file_wrapper { - int const fd = -1; - long mutable _size = -1; + int fd = -1; + size_t _size; public: explicit file_wrapper(std::string const &filepath, int flags); explicit file_wrapper(std::string const &filepath, int flags, mode_t mode); ~file_wrapper(); - long size() const; + auto size() const { return _size; } auto desc() const { return fd; } }; @@ -128,6 +129,32 @@ class cufile_output : public cufile_io_base { class cufile_shim; +/** + * @brief Class that manages cuFile configuration. + */ +class cufile_config { + std::string const default_policy = "OFF"; + std::string const json_path_env_var = "CUFILE_ENV_PATH_JSON"; + + std::string const policy = default_policy; + temp_directory tmp_config_dir{"cudf_cufile_config"}; + + cufile_config(); + + public: + /** + * @brief Returns true when cuFile use is enabled. + */ + bool is_enabled() const { return policy == "ALWAYS" or policy == "GDS"; } + + /** + * @brief Returns true when cuDF should not fall back to host IO. + */ + bool is_required() const { return policy == "ALWAYS"; } + + static cufile_config const *instance(); +}; + /** * @brief Class that provides RAII for cuFile file registration. */