Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use KvikIO as the GDS backend #10468

Closed
wants to merge 12 commits into from
3 changes: 3 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ include(cmake/thirdparty/get_gtest.cmake)
include(cmake/Modules/JitifyPreprocessKernels.cmake)
# find cuFile
include(cmake/Modules/FindcuFile.cmake)
# find KvikIO
include(cmake/thirdparty/get_kvikio.cmake)

# ##################################################################################################
# * library targets -------------------------------------------------------------------------------
Expand Down Expand Up @@ -530,6 +532,7 @@ target_include_directories(
cudf
PUBLIC "$<BUILD_INTERFACE:${DLPACK_INCLUDE_DIR}>"
"$<BUILD_INTERFACE:${JITIFY_INCLUDE_DIR}>"
"$<BUILD_INTERFACE:${KvikIO_INCLUDE_DIR}>"
"$<BUILD_INTERFACE:${CUDF_SOURCE_DIR}/include>"
"$<BUILD_INTERFACE:${CUDF_GENERATED_INCLUDE_DIR}/include>"
PRIVATE "$<BUILD_INTERFACE:${CUDF_SOURCE_DIR}/src>"
Expand Down
34 changes: 34 additions & 0 deletions cpp/cmake/thirdparty/get_kvikio.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# =============================================================================
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
# =============================================================================

# This function finds KvikIO and sets `KvikIO_INCLUDE_DIR`
function(find_and_configure_kvikio)

rapids_cpm_find(
KvikIO 22.04
GLOBAL_TARGETS kvikio::kvikio
# CPM_ARGS GIT_REPOSITORY https://github.com/rapidsai/kvikio.git
CPM_ARGS # TODO: use version tags when they become available
GIT_REPOSITORY https://github.com/madsbk/kvikio.git SOURCE_SUBDIR cpp
GIT_TAG used_by_cudf_for_testing
OPTIONS "KvikIO_BUILD_EXAMPLES FALSE" # No need to build the KvikIO example
)
set(KvikIO_INCLUDE_DIR
${KvikIO_SOURCE_DIR}/cpp/include
PARENT_SCOPE
)

endfunction()

find_and_configure_kvikio()
16 changes: 11 additions & 5 deletions cpp/src/io/utilities/config_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,17 +37,23 @@ namespace {
*/
enum class usage_policy : uint8_t { OFF, GDS, ALWAYS };

/**
* @brief Get the current usage policy.
*/
usage_policy get_env_policy()
usage_policy _get_env_policy()
{
static auto const env_val = getenv_or<std::string>("LIBCUDF_CUFILE_POLICY", "GDS");
if (env_val == "OFF") return usage_policy::OFF;
if (env_val == "GDS") return usage_policy::GDS;
if (env_val == "ALWAYS") return usage_policy::ALWAYS;
CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val);
}

/**
* @brief Get the current usage policy.
*/
usage_policy get_env_policy()
{
static auto const ret = _get_env_policy();
return ret;
}
} // namespace

bool is_always_enabled() { return get_env_policy() == usage_policy::ALWAYS; }
Expand Down
40 changes: 19 additions & 21 deletions cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,12 @@

#include <fstream>

#include "file_io_utilities.hpp"
#include <io/utilities/config_utils.hpp>

#include <cudf/io/data_sink.hpp>
#include <cudf/utilities/error.hpp>

#include <kvikio/file_handle.hpp>
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
Expand All @@ -29,8 +31,7 @@ namespace io {
*/
class file_sink : public data_sink {
public:
explicit file_sink(std::string const& filepath)
: _cufile_out(detail::make_cufile_output(filepath))
explicit file_sink(std::string const& filepath) : _file(filepath, "w")
{
_output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc);
CUDF_EXPECTS(_output_stream.is_open(), "Cannot open output file");
Expand All @@ -49,36 +50,33 @@ class file_sink : public data_sink {

size_t bytes_written() override { return _bytes_written; }

[[nodiscard]] bool supports_device_write() const override { return _cufile_out != nullptr; }

[[nodiscard]] bool is_device_write_preferred(size_t size) const override
{
return _cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size);
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
[[nodiscard]] bool supports_device_write() const override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

_cufile_out->write(gpu_data, _bytes_written, size);
_bytes_written += size;
return detail::cufile_integration::is_gds_enabled();
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

auto result = _cufile_out->write_async(gpu_data, _bytes_written, size);
// KvikIO's `pwrite()` returns a `std::future<size_t>` so we convert it
// to `std::future<void>`
size_t offset = _bytes_written;
_bytes_written += size;
return result;
return std::async(std::launch::deferred, [this, gpu_data, size, offset] {
_file.pwrite(gpu_data, size, offset).get();
});
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
return device_write_async(gpu_data, size, stream).get();
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
std::unique_ptr<detail::cufile_output_impl> _cufile_out;
kvikio::FileHandle _file;
};

/**
Expand Down
92 changes: 41 additions & 51 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,11 +14,13 @@
* limitations under the License.
*/

#include "file_io_utilities.hpp"
#include <io/utilities/config_utils.hpp>

#include <cudf/io/datasource.hpp>
#include <cudf/utilities/error.hpp>
#include <io/utilities/config_utils.hpp>

#include <kvikio/file_handle.hpp>
#include <rmm/device_buffer.hpp>

#include <fcntl.h>
#include <sys/mman.h>
Expand All @@ -33,59 +35,46 @@ namespace {
*/
class file_source : public datasource {
public:
explicit file_source(const char* filepath)
: _file(filepath, O_RDONLY), _cufile_in(detail::make_cufile_input(filepath))
{
}
explicit file_source(const char* filepath) : _file(filepath) {}

virtual ~file_source() = default;

[[nodiscard]] bool supports_device_read() const override { return _cufile_in != nullptr; }

[[nodiscard]] bool is_device_read_preferred(size_t size) const override
[[nodiscard]] bool supports_device_read() const override
{
return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size);
return detail::cufile_integration::is_gds_enabled();
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, stream);
return _file.pread(dst, std::min(size, _file.nbytes() - offset), offset);
}

size_t device_read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, dst, stream);
return device_read_async(offset, size, dst, stream).get();
}

std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read_async(offset, read_size, dst, stream);
rmm::device_buffer out_data(size, stream);
size_t read_size =
device_read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read_size, stream);
return datasource::buffer::create(std::move(out_data));
}

[[nodiscard]] size_t size() const override { return _file.size(); }
[[nodiscard]] size_t size() const override { return _file.nbytes(); }

protected:
detail::file_wrapper _file;

private:
std::unique_ptr<detail::cufile_input_impl> _cufile_in;
kvikio::FileHandle _file;
};

/**
Expand All @@ -99,7 +88,7 @@ class memory_mapped_source : public file_source {
explicit memory_mapped_source(const char* filepath, size_t offset, size_t size)
: file_source(filepath)
{
if (_file.size() != 0) map(_file.desc(), offset, size);
if (_file.nbytes() != 0) map(_file.fd(), offset, size);
}

~memory_mapped_source() override
Expand Down Expand Up @@ -133,12 +122,12 @@ class memory_mapped_source : public file_source {
private:
void map(int fd, size_t offset, size_t size)
{
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file");
CUDF_EXPECTS(offset < _file.nbytes(), "Offset is past end of file");

// Offset for `mmap()` must be page aligned
_map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1);

if (size == 0 || (offset + size) > _file.size()) { size = _file.size() - offset; }
if (size == 0 || (offset + size) > _file.nbytes()) { size = _file.nbytes() - offset; }

// Size for `mmap()` needs to include the page padding
_map_size = size + (offset - _map_offset);
Expand Down Expand Up @@ -166,24 +155,26 @@ class direct_read_source : public file_source {

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
lseek(_file.desc(), offset, SEEK_SET);
CUDF_EXPECTS(offset < _file.nbytes(), "Offset is past end of file");
lseek(_file.fd(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);
ssize_t const read_size = std::min(size, _file.nbytes() - offset);

std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
CUDF_EXPECTS(read(_file.fd(), v.data(), read_size) == read_size, "read failed");
return buffer::create(std::move(v));
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
lseek(_file.desc(), offset, SEEK_SET);
CUDF_EXPECTS(offset < _file.nbytes(), "Offset is past end of file");
lseek(_file.fd(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);
auto const read_size = std::min(size, _file.nbytes() - offset);

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
CUDF_EXPECTS(read(_file.fd(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
return read_size;
}
Expand Down Expand Up @@ -242,13 +233,12 @@ std::unique_ptr<datasource> datasource::create(const std::string& filepath,
size_t offset,
size_t size)
{
#ifdef CUFILE_FOUND
if (detail::cufile_integration::is_always_enabled()) {
// avoid mmap as GDS is expected to be used for most reads
return std::make_unique<direct_read_source>(filepath.c_str());
}
#endif
// Use our own memory mapping implementation for direct file reads
// TODO: do we want an option to enable the use of `direct_read_source` instead
// of `memory_mapped_source`?
// return std::make_unique<direct_read_source>(filepath.c_str());

// Notice, some readers, such as avro, will call `host_read()` on more data
// than what is actually accessed thus the lazy nature of mmap is essential.
return std::make_unique<memory_mapped_source>(filepath.c_str(), offset, size);
}

Expand Down