Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory map the input file only when GDS compatiblity mode is not used #7717

Merged
merged 9 commits into from
Mar 29, 2021
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class datasource {
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
*
* @return The data buffer
* @return The data buffer (can be smaller than size)
*/
virtual std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) = 0;

Expand Down
157 changes: 99 additions & 58 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,66 @@ namespace cudf {
namespace io {

/**
* @brief Implementation class for reading from a file or memory source using
* memory mapped access.
*
* Unlike Arrow's memory mapped IO class, this implementation allows memory
* mapping a subset of the file where the starting offset may not be zero.
* @brief Base class for file input. Only implements direct device reads.
*/
class memory_mapped_source : public datasource {
class memory_mapped_buffer : public buffer {
size_t _size = 0;
uint8_t *_data = nullptr;
class file_source : public datasource {
vuule marked this conversation as resolved.
Show resolved Hide resolved
public:
explicit file_source(const char *filepath)
: _file(filepath, O_RDONLY), _cufile_in(detail::make_cufile_input(filepath))
{
}

virtual ~file_source() = default;

bool supports_device_read() const override { return _cufile_in != nullptr; }

bool is_device_read_preferred(size_t size) const
{
return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size);
}

public:
memory_mapped_buffer(uint8_t *data, size_t size) : _size(size), _data(data) {}
size_t size() const override { return _size; }
const uint8_t *data() const override { return _data; }
};
std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, stream);
}

size_t device_read(size_t offset,
size_t size,
uint8_t *dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, dst, stream);
}

size_t size() const override { return _file.size(); }

protected:
detail::file_wrapper _file;

private:
std::unique_ptr<detail::cufile_input_impl> _cufile_in;
};

/**
* @brief Implementation class for reading from a file using memory mapped access.
*
* Unlike Arrow's memory mapped IO class, this implementation allows memory mapping a subset of the
* file where the starting offset may not be zero.
*/
class memory_mapped_source : public file_source {
public:
explicit memory_mapped_source(const char *filepath, size_t offset, size_t size)
: _cufile_in(detail::make_cufile_input(filepath))
: file_source(filepath)
{
auto const file = detail::file_wrapper(filepath, O_RDONLY);
_file_size = file.size();
if (_file_size != 0) { map(file.desc(), offset, size); }
if (_file.size() != 0) map(_file.desc(), offset, size);
}

virtual ~memory_mapped_source()
Expand All @@ -65,7 +101,7 @@ class memory_mapped_source : public datasource {
// Clamp length to available data in the mapped region
auto const read_size = std::min(size, _map_size - (offset - _map_offset));

return std::make_unique<memory_mapped_buffer>(
return std::make_unique<non_owning_buffer>(
static_cast<uint8_t *>(_map_addr) + (offset - _map_offset), read_size);
}

Expand All @@ -81,49 +117,15 @@ class memory_mapped_source : public datasource {
return read_size;
}

bool supports_device_read() const override { return _cufile_in != nullptr; }

bool is_device_read_preferred(size_t size) const
{
return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size);
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
if (!supports_device_read()) CUDF_FAIL("Device reads are not supported for this file.");

auto const read_size = std::min(size, _map_size - (offset - _map_offset));
return _cufile_in->read(offset, read_size, stream);
}

size_t device_read(size_t offset,
size_t size,
uint8_t *dst,
rmm::cuda_stream_view stream) override
{
if (!supports_device_read()) CUDF_FAIL("Device reads are not supported for this file.");
auto const read_size = std::min(size, _map_size - (offset - _map_offset));
return _cufile_in->read(offset, read_size, dst, stream);
}

size_t size() const override { return _file_size; }

private:
void map(int fd, size_t offset, size_t size)
{
CUDF_EXPECTS(offset < _file_size, "Offset is past end of file");
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file");

// Offset for `mmap()` must be page aligned
_map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1);

// Clamp length to available data in the file
if (size == 0) {
size = _file_size - offset;
} else {
if ((offset + size) > _file_size) { size = _file_size - offset; }
}
if (size == 0 || (offset + size) > _file.size()) { size = _file.size() - offset; }

// Size for `mmap()` needs to include the page padding
_map_size = size + (offset - _map_offset);
Expand All @@ -134,11 +136,44 @@ class memory_mapped_source : public datasource {
}

private:
size_t _file_size = 0;
void *_map_addr = nullptr;
size_t _map_size = 0;
size_t _map_offset = 0;
std::unique_ptr<detail::cufile_input_impl> _cufile_in;
void *_map_addr = nullptr;
};

/**
* @brief Implementation class for reading from a file using `read` calls
*
* Potentially faster than `memory_mapped_source` when only a small portion of the file is read
* through the host.
*/
class direct_read_source : public file_source {
public:
explicit direct_read_source(const char *filepath) : file_source(filepath) {}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);

std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
return buffer::create(std::move(v));
}

size_t host_read(size_t offset, size_t size, uint8_t *dst) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
return read_size;
}
};

/**
Expand Down Expand Up @@ -189,6 +224,12 @@ std::unique_ptr<datasource> datasource::create(const std::string &filepath,
size_t offset,
size_t size)
{
#ifdef CUFILE_FOUND
if (detail::cufile_config::instance()->is_required()) {
// avoid mmap as GDS is expected to be used for most reads
return std::make_unique<direct_read_source>(filepath.c_str());
}
#endif
// Use our own memory mapping implementation for direct file reads
return std::make_unique<memory_mapped_source>(filepath.c_str(), offset, size);
}
Expand Down
109 changes: 41 additions & 68 deletions cpp/src/io/utilities/file_io_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cudf_test/file_utilities.hpp>
#include <io/utilities/file_io_utilities.hpp>

#include <rmm/device_buffer.hpp>
Expand All @@ -26,93 +25,67 @@ namespace cudf {
namespace io {
namespace detail {

size_t get_file_size(int file_descriptor)
{
struct stat st;
CUDF_EXPECTS(fstat(file_descriptor, &st) != -1, "Cannot query file size");
return static_cast<size_t>(st.st_size);
}

file_wrapper::file_wrapper(std::string const &filepath, int flags)
: fd(open(filepath.c_str(), flags))
: fd(open(filepath.c_str(), flags)), _size{get_file_size(fd)}
{
CUDF_EXPECTS(fd != -1, "Cannot open file " + filepath);
}

file_wrapper::file_wrapper(std::string const &filepath, int flags, mode_t mode)
: fd(open(filepath.c_str(), flags, mode))
: fd(open(filepath.c_str(), flags, mode)), _size{get_file_size(fd)}
{
CUDF_EXPECTS(fd != -1, "Cannot open file " + filepath);
}

file_wrapper::~file_wrapper() { close(fd); }

long file_wrapper::size() const
std::string getenv_or(std::string const &env_var_name, std::string const &default_val)
{
if (_size < 0) {
struct stat st;
CUDF_EXPECTS(fstat(fd, &st) != -1, "Cannot query file size");
_size = static_cast<size_t>(st.st_size);
}
return _size;
auto const env_val = std::getenv(env_var_name.c_str());
return (env_val == nullptr) ? default_val : std::string(env_val);
}

#ifdef CUFILE_FOUND

/**
* @brief Class that manages cuFile configuration.
*/
class cufile_config {
std::string const default_policy = "OFF";
std::string const json_path_env_var = "CUFILE_ENV_PATH_JSON";

std::string const policy = default_policy;
temp_directory tmp_config_dir{"cudf_cufile_config"};

std::string getenv_or(std::string const &env_var_name, std::string const &default_val)
{
auto const env_val = std::getenv(env_var_name.c_str());
return (env_val == nullptr) ? default_val : std::string(env_val);
}

cufile_config() : policy{getenv_or("LIBCUDF_CUFILE_POLICY", default_policy)}
{
if (is_enabled()) {
// Modify the config file based on the policy
auto const config_file_path = getenv_or(json_path_env_var, "/etc/cufile.json");
std::ifstream user_config_file(config_file_path);
// Modified config file is stored in a temporary directory
auto const cudf_config_path = tmp_config_dir.path() + "/cufile.json";
std::ofstream cudf_config_file(cudf_config_path);

std::string line;
while (std::getline(user_config_file, line)) {
std::string const tag = "\"allow_compat_mode\"";
if (line.find(tag) != std::string::npos) {
// TODO: only replace the true/false value
// Enable compatiblity mode when cuDF does not fall back to host path
cudf_config_file << tag << ": " << (is_required() ? "true" : "false") << ",\n";
} else {
cudf_config_file << line << '\n';
}

// Point libcufile to the modified config file
CUDF_EXPECTS(setenv(json_path_env_var.c_str(), cudf_config_path.c_str(), 0) == 0,
"Failed to set the cuFile config file environment variable.");
cufile_config::cufile_config() : policy{getenv_or("LIBCUDF_CUFILE_POLICY", default_policy)}
{
if (is_enabled()) {
// Modify the config file based on the policy
auto const config_file_path = getenv_or(json_path_env_var, "/etc/cufile.json");
std::ifstream user_config_file(config_file_path);
// Modified config file is stored in a temporary directory
auto const cudf_config_path = tmp_config_dir.path() + "/cufile.json";
std::ofstream cudf_config_file(cudf_config_path);

std::string line;
while (std::getline(user_config_file, line)) {
std::string const tag = "\"allow_compat_mode\"";
if (line.find(tag) != std::string::npos) {
// TODO: only replace the true/false value
// Enable compatiblity mode when cuDF does not fall back to host path
cudf_config_file << tag << ": " << (is_required() ? "true" : "false") << ",\n";
} else {
cudf_config_file << line << '\n';
}
}
}

public:
/**
* @brief Returns true when cuFile use is enabled.
*/
bool is_enabled() const { return policy == "ALWAYS" or policy == "GDS"; }

/**
* @brief Returns true when cuDF should not fall back to host IO.
*/
bool is_required() const { return policy == "ALWAYS"; }

static cufile_config const *instance()
{
static cufile_config _instance;
return &_instance;
// Point libcufile to the modified config file
CUDF_EXPECTS(setenv(json_path_env_var.c_str(), cudf_config_path.c_str(), 0) == 0,
"Failed to set the cuFile config file environment variable.");
}
}
};
}
cufile_config const *cufile_config::instance()
{
static cufile_config _instance;
return &_instance;
}

/**
* @brief Class that dynamically loads the cuFile library and manages the cuFile driver.
Expand Down
Loading