Skip to content

Commit

Permalink
Memory map the input file only when GDS compatiblity mode is not used (
Browse files Browse the repository at this point in the history
…#7717)

`mmap` is expensive on some systems and we can expect better performance with file reads when GDS is used, especially with compatibility mode.
This PR adds a source type that does not use `mmap` for host reads. This type is used when GDS and its compatibility mode are enabled.
`file_source` is now a base class for file-based input and only implements the device_read functions. 
`memory_mapped_source` class implements the host reads through the memory mapped file. 
`direct_read_source` is a newly implemented class that uses read for host reads, no `mmap`.
Selection is done in `datasource::create` based on `cufile_config`.

Authors:
  - Vukasin Milovanovic (@vuule)

Approvers:
  - Devavret Makkar (@devavret)
  - David (@davidwendt)

URL: #7717
  • Loading branch information
vuule authored Mar 29, 2021
1 parent 42c3bf9 commit 4dd75c4
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 130 deletions.
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class datasource {
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
*
* @return The data buffer
* @return The data buffer (can be smaller than size)
*/
virtual std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) = 0;

Expand Down
160 changes: 102 additions & 58 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,69 @@

namespace cudf {
namespace io {
namespace {

/**
* @brief Implementation class for reading from a file or memory source using
* memory mapped access.
*
* Unlike Arrow's memory mapped IO class, this implementation allows memory
* mapping a subset of the file where the starting offset may not be zero.
* @brief Base class for file input. Only implements direct device reads.
*/
class memory_mapped_source : public datasource {
class memory_mapped_buffer : public buffer {
size_t _size = 0;
uint8_t *_data = nullptr;
class file_source : public datasource {
public:
explicit file_source(const char *filepath)
: _file(filepath, O_RDONLY), _cufile_in(detail::make_cufile_input(filepath))
{
}

virtual ~file_source() = default;

bool supports_device_read() const override { return _cufile_in != nullptr; }

bool is_device_read_preferred(size_t size) const
{
return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size);
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, stream);
}

size_t device_read(size_t offset,
size_t size,
uint8_t *dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

public:
memory_mapped_buffer(uint8_t *data, size_t size) : _size(size), _data(data) {}
size_t size() const override { return _size; }
const uint8_t *data() const override { return _data; }
};
auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, dst, stream);
}

size_t size() const override { return _file.size(); }

protected:
detail::file_wrapper _file;

private:
std::unique_ptr<detail::cufile_input_impl> _cufile_in;
};

/**
* @brief Implementation class for reading from a file using memory mapped access.
*
* Unlike Arrow's memory mapped IO class, this implementation allows memory mapping a subset of the
* file where the starting offset may not be zero.
*/
class memory_mapped_source : public file_source {
public:
explicit memory_mapped_source(const char *filepath, size_t offset, size_t size)
: _cufile_in(detail::make_cufile_input(filepath))
: file_source(filepath)
{
auto const file = detail::file_wrapper(filepath, O_RDONLY);
_file_size = file.size();
if (_file_size != 0) { map(file.desc(), offset, size); }
if (_file.size() != 0) map(_file.desc(), offset, size);
}

virtual ~memory_mapped_source()
Expand All @@ -65,7 +102,7 @@ class memory_mapped_source : public datasource {
// Clamp length to available data in the mapped region
auto const read_size = std::min(size, _map_size - (offset - _map_offset));

return std::make_unique<memory_mapped_buffer>(
return std::make_unique<non_owning_buffer>(
static_cast<uint8_t *>(_map_addr) + (offset - _map_offset), read_size);
}

Expand All @@ -81,49 +118,15 @@ class memory_mapped_source : public datasource {
return read_size;
}

bool supports_device_read() const override { return _cufile_in != nullptr; }

bool is_device_read_preferred(size_t size) const
{
return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size);
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
if (!supports_device_read()) CUDF_FAIL("Device reads are not supported for this file.");

auto const read_size = std::min(size, _map_size - (offset - _map_offset));
return _cufile_in->read(offset, read_size, stream);
}

size_t device_read(size_t offset,
size_t size,
uint8_t *dst,
rmm::cuda_stream_view stream) override
{
if (!supports_device_read()) CUDF_FAIL("Device reads are not supported for this file.");
auto const read_size = std::min(size, _map_size - (offset - _map_offset));
return _cufile_in->read(offset, read_size, dst, stream);
}

size_t size() const override { return _file_size; }

private:
void map(int fd, size_t offset, size_t size)
{
CUDF_EXPECTS(offset < _file_size, "Offset is past end of file");
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file");

// Offset for `mmap()` must be page aligned
_map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1);

// Clamp length to available data in the file
if (size == 0) {
size = _file_size - offset;
} else {
if ((offset + size) > _file_size) { size = _file_size - offset; }
}
if (size == 0 || (offset + size) > _file.size()) { size = _file.size() - offset; }

// Size for `mmap()` needs to include the page padding
_map_size = size + (offset - _map_offset);
Expand All @@ -134,11 +137,44 @@ class memory_mapped_source : public datasource {
}

private:
size_t _file_size = 0;
void *_map_addr = nullptr;
size_t _map_size = 0;
size_t _map_offset = 0;
std::unique_ptr<detail::cufile_input_impl> _cufile_in;
void *_map_addr = nullptr;
};

/**
* @brief Implementation class for reading from a file using `read` calls
*
* Potentially faster than `memory_mapped_source` when only a small portion of the file is read
* through the host.
*/
class direct_read_source : public file_source {
public:
explicit direct_read_source(const char *filepath) : file_source(filepath) {}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);

std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
return buffer::create(std::move(v));
}

size_t host_read(size_t offset, size_t size, uint8_t *dst) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
return read_size;
}
};

/**
Expand Down Expand Up @@ -185,10 +221,18 @@ class user_datasource_wrapper : public datasource {
datasource *const source; ///< A non-owning pointer to the user-implemented datasource
};

} // namespace

std::unique_ptr<datasource> datasource::create(const std::string &filepath,
size_t offset,
size_t size)
{
#ifdef CUFILE_FOUND
if (detail::cufile_config::instance()->is_required()) {
// avoid mmap as GDS is expected to be used for most reads
return std::make_unique<direct_read_source>(filepath.c_str());
}
#endif
// Use our own memory mapping implementation for direct file reads
return std::make_unique<memory_mapped_source>(filepath.c_str(), offset, size);
}
Expand Down
109 changes: 41 additions & 68 deletions cpp/src/io/utilities/file_io_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cudf_test/file_utilities.hpp>
#include <io/utilities/file_io_utilities.hpp>

#include <rmm/device_buffer.hpp>
Expand All @@ -26,93 +25,67 @@ namespace cudf {
namespace io {
namespace detail {

size_t get_file_size(int file_descriptor)
{
struct stat st;
CUDF_EXPECTS(fstat(file_descriptor, &st) != -1, "Cannot query file size");
return static_cast<size_t>(st.st_size);
}

file_wrapper::file_wrapper(std::string const &filepath, int flags)
: fd(open(filepath.c_str(), flags))
: fd(open(filepath.c_str(), flags)), _size{get_file_size(fd)}
{
CUDF_EXPECTS(fd != -1, "Cannot open file " + filepath);
}

file_wrapper::file_wrapper(std::string const &filepath, int flags, mode_t mode)
: fd(open(filepath.c_str(), flags, mode))
: fd(open(filepath.c_str(), flags, mode)), _size{get_file_size(fd)}
{
CUDF_EXPECTS(fd != -1, "Cannot open file " + filepath);
}

file_wrapper::~file_wrapper() { close(fd); }

long file_wrapper::size() const
std::string getenv_or(std::string const &env_var_name, std::string const &default_val)
{
if (_size < 0) {
struct stat st;
CUDF_EXPECTS(fstat(fd, &st) != -1, "Cannot query file size");
_size = static_cast<size_t>(st.st_size);
}
return _size;
auto const env_val = std::getenv(env_var_name.c_str());
return (env_val == nullptr) ? default_val : std::string(env_val);
}

#ifdef CUFILE_FOUND

/**
* @brief Class that manages cuFile configuration.
*/
class cufile_config {
std::string const default_policy = "OFF";
std::string const json_path_env_var = "CUFILE_ENV_PATH_JSON";

std::string const policy = default_policy;
temp_directory tmp_config_dir{"cudf_cufile_config"};

std::string getenv_or(std::string const &env_var_name, std::string const &default_val)
{
auto const env_val = std::getenv(env_var_name.c_str());
return (env_val == nullptr) ? default_val : std::string(env_val);
}

cufile_config() : policy{getenv_or("LIBCUDF_CUFILE_POLICY", default_policy)}
{
if (is_enabled()) {
// Modify the config file based on the policy
auto const config_file_path = getenv_or(json_path_env_var, "/etc/cufile.json");
std::ifstream user_config_file(config_file_path);
// Modified config file is stored in a temporary directory
auto const cudf_config_path = tmp_config_dir.path() + "/cufile.json";
std::ofstream cudf_config_file(cudf_config_path);

std::string line;
while (std::getline(user_config_file, line)) {
std::string const tag = "\"allow_compat_mode\"";
if (line.find(tag) != std::string::npos) {
// TODO: only replace the true/false value
// Enable compatiblity mode when cuDF does not fall back to host path
cudf_config_file << tag << ": " << (is_required() ? "true" : "false") << ",\n";
} else {
cudf_config_file << line << '\n';
}

// Point libcufile to the modified config file
CUDF_EXPECTS(setenv(json_path_env_var.c_str(), cudf_config_path.c_str(), 0) == 0,
"Failed to set the cuFile config file environment variable.");
cufile_config::cufile_config() : policy{getenv_or("LIBCUDF_CUFILE_POLICY", default_policy)}
{
if (is_enabled()) {
// Modify the config file based on the policy
auto const config_file_path = getenv_or(json_path_env_var, "/etc/cufile.json");
std::ifstream user_config_file(config_file_path);
// Modified config file is stored in a temporary directory
auto const cudf_config_path = tmp_config_dir.path() + "/cufile.json";
std::ofstream cudf_config_file(cudf_config_path);

std::string line;
while (std::getline(user_config_file, line)) {
std::string const tag = "\"allow_compat_mode\"";
if (line.find(tag) != std::string::npos) {
// TODO: only replace the true/false value
// Enable compatiblity mode when cuDF does not fall back to host path
cudf_config_file << tag << ": " << (is_required() ? "true" : "false") << ",\n";
} else {
cudf_config_file << line << '\n';
}
}
}

public:
/**
* @brief Returns true when cuFile use is enabled.
*/
bool is_enabled() const { return policy == "ALWAYS" or policy == "GDS"; }

/**
* @brief Returns true when cuDF should not fall back to host IO.
*/
bool is_required() const { return policy == "ALWAYS"; }

static cufile_config const *instance()
{
static cufile_config _instance;
return &_instance;
// Point libcufile to the modified config file
CUDF_EXPECTS(setenv(json_path_env_var.c_str(), cudf_config_path.c_str(), 0) == 0,
"Failed to set the cuFile config file environment variable.");
}
}
};
}
cufile_config const *cufile_config::instance()
{
static cufile_config _instance;
return &_instance;
}

/**
* @brief Class that dynamically loads the cuFile library and manages the cuFile driver.
Expand Down
Loading

0 comments on commit 4dd75c4

Please sign in to comment.