Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KvikIO as an alternative GDS backend #10593

Merged
merged 16 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ include(cmake/thirdparty/get_gtest.cmake)
include(cmake/Modules/JitifyPreprocessKernels.cmake)
# find cuFile
include(cmake/Modules/FindcuFile.cmake)
# find KvikIO
include(cmake/thirdparty/get_kvikio.cmake)

# Workaround until https://github.com/rapidsai/rapids-cmake/issues/176 is resolved
if(NOT BUILD_SHARED_LIBS)
Expand Down Expand Up @@ -584,7 +586,7 @@ add_dependencies(cudf jitify_preprocess_run)
target_link_libraries(
cudf
PUBLIC ${ARROW_LIBRARIES} libcudacxx::libcudacxx cudf::Thrust rmm::rmm
PRIVATE cuco::cuco ZLIB::ZLIB nvcomp::nvcomp
PRIVATE cuco::cuco ZLIB::ZLIB nvcomp::nvcomp kvikio::kvikio
)

# Add Conda library, and include paths if specified
Expand Down
31 changes: 31 additions & 0 deletions cpp/cmake/thirdparty/get_kvikio.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# =============================================================================
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
# =============================================================================

# This function finds KvikIO
function(find_and_configure_kvikio VERSION)

rapids_cpm_find(
KvikIO ${VERSION}
GLOBAL_TARGETS kvikio::kvikio
CPM_ARGS
GIT_REPOSITORY https://github.com/rapidsai/kvikio.git
GIT_TAG branch-${VERSION}
GIT_SHALLOW TRUE SOURCE_SUBDIR cpp
OPTIONS "KvikIO_BUILD_EXAMPLES OFF"
madsbk marked this conversation as resolved.
Show resolved Hide resolved
)

endfunction()

set(KVIKIO_MIN_VERSION_cudf "${CUDF_VERSION_MAJOR}.${CUDF_VERSION_MINOR}")
find_and_configure_kvikio(${KVIKIO_MIN_VERSION_cudf})
5 changes: 4 additions & 1 deletion cpp/src/io/utilities/config_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace {
/**
* @brief Defines which cuFile usage to enable.
*/
enum class usage_policy : uint8_t { OFF, GDS, ALWAYS };
enum class usage_policy : uint8_t { OFF, GDS, ALWAYS, KVIKIO };

/**
* @brief Get the current usage policy.
Expand All @@ -46,6 +46,7 @@ usage_policy get_env_policy()
if (env_val == "OFF") return usage_policy::OFF;
if (env_val == "GDS") return usage_policy::GDS;
if (env_val == "ALWAYS") return usage_policy::ALWAYS;
if (env_val == "KVIKIO") return usage_policy::KVIKIO;
CUDF_FAIL("Invalid LIBCUDF_CUFILE_POLICY value: " + env_val);
}
} // namespace
Expand All @@ -54,6 +55,8 @@ bool is_always_enabled() { return get_env_policy() == usage_policy::ALWAYS; }

bool is_gds_enabled() { return is_always_enabled() or get_env_policy() == usage_policy::GDS; }

bool is_kvikio_enabled() { return get_env_policy() == usage_policy::KVIKIO; }

} // namespace cufile_integration

namespace nvcomp_integration {
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/io/utilities/config_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ bool is_always_enabled();
*/
bool is_gds_enabled();

/**
* @brief Returns true if KvikIO is enabled.
*/
bool is_kvikio_enabled();

} // namespace cufile_integration

namespace nvcomp_integration {
Expand Down
44 changes: 31 additions & 13 deletions cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,9 @@
#include "file_io_utilities.hpp"
#include <cudf/io/data_sink.hpp>
#include <cudf/utilities/error.hpp>
#include <io/utilities/config_utils.hpp>

#include <kvikio/file_handle.hpp>
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
Expand All @@ -30,10 +32,15 @@ namespace io {
class file_sink : public data_sink {
public:
explicit file_sink(std::string const& filepath)
: _cufile_out(detail::make_cufile_output(filepath))
{
_output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc);
CUDF_EXPECTS(_output_stream.is_open(), "Cannot open output file");

if (detail::cufile_integration::is_kvikio_enabled()) {
_kvikio_file = kvikio::FileHandle(filepath, "w");
} else {
_cufile_out = detail::make_cufile_output(filepath);
}
}

virtual ~file_sink() { flush(); }
Expand All @@ -49,19 +56,15 @@ class file_sink : public data_sink {

size_t bytes_written() override { return _bytes_written; }

[[nodiscard]] bool supports_device_write() const override { return _cufile_out != nullptr; }

[[nodiscard]] bool is_device_write_preferred(size_t size) const override
[[nodiscard]] bool supports_device_write() const override
{
return _cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size);
return !_kvikio_file.closed() || _cufile_out != nullptr;
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
[[nodiscard]] bool is_device_write_preferred(size_t size) const override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

_cufile_out->write(gpu_data, _bytes_written, size);
_bytes_written += size;
return !_kvikio_file.closed() ||
(_cufile_out != nullptr && _cufile_out->is_cufile_io_preferred(size));
}

std::future<void> device_write_async(void const* gpu_data,
Expand All @@ -70,15 +73,30 @@ class file_sink : public data_sink {
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

auto result = _cufile_out->write_async(gpu_data, _bytes_written, size);
size_t offset = _bytes_written;
_bytes_written += size;
return result;

if (!_kvikio_file.closed()) {
// KvikIO's `pwrite()` returns a `std::future<size_t>` so we convert it
// to `std::future<void>`
return std::async(std::launch::deferred, [this, gpu_data, size, offset] {
_kvikio_file.pwrite(gpu_data, size, offset).get();
});
}
return _cufile_out->write_async(gpu_data, offset, size);
}

void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");
return device_write_async(gpu_data, _bytes_written, stream).get();
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
std::unique_ptr<detail::cufile_output_impl> _cufile_out;
kvikio::FileHandle _kvikio_file;
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down
52 changes: 31 additions & 21 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,9 @@
#include <cudf/utilities/error.hpp>
#include <io/utilities/config_utils.hpp>

#include <kvikio/file_handle.hpp>
#include <rmm/device_buffer.hpp>

#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
Expand All @@ -33,50 +36,56 @@ namespace {
*/
class file_source : public datasource {
public:
explicit file_source(const char* filepath)
: _file(filepath, O_RDONLY), _cufile_in(detail::make_cufile_input(filepath))
explicit file_source(const char* filepath) : _file(filepath, O_RDONLY)
{
if (detail::cufile_integration::is_kvikio_enabled()) {
_kvikio_file = kvikio::FileHandle(filepath);
} else {
_cufile_in = detail::make_cufile_input(filepath);
}
}

virtual ~file_source() = default;

[[nodiscard]] bool supports_device_read() const override { return _cufile_in != nullptr; }
[[nodiscard]] bool supports_device_read() const override
{
return !_kvikio_file.closed() || _cufile_in != nullptr;
}

[[nodiscard]] bool is_device_read_preferred(size_t size) const override
{
return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size);
return !_kvikio_file.closed() ||
(_cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size));
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, stream);
if (!_kvikio_file.closed()) { return _kvikio_file.pread(dst, read_size, offset); }
return _cufile_in->read_async(offset, read_size, dst, stream);
}

size_t device_read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read(offset, read_size, dst, stream);
return device_read_async(offset, size, dst, stream).get();
madsbk marked this conversation as resolved.
Show resolved Hide resolved
}

std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read_async(offset, read_size, dst, stream);
rmm::device_buffer out_data(size, stream);
size_t read = device_read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read, stream);
return datasource::buffer::create(std::move(out_data));
}

[[nodiscard]] size_t size() const override { return _file.size(); }
Expand All @@ -86,6 +95,7 @@ class file_source : public datasource {

private:
std::unique_ptr<detail::cufile_input_impl> _cufile_in;
kvikio::FileHandle _kvikio_file;
};

/**
Expand Down
24 changes: 0 additions & 24 deletions cpp/src/io/utilities/file_io_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,6 @@ cufile_input_impl::cufile_input_impl(std::string const& filepath)
pool.sleep_duration = 10;
}

std::unique_ptr<datasource::buffer> cufile_input_impl::read(size_t offset,
size_t size,
rmm::cuda_stream_view stream)
{
rmm::device_buffer out_data(size, stream);
auto read_size = read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read_size, stream);
return datasource::buffer::create(std::move(out_data));
}

namespace {

template <typename DataT,
Expand Down Expand Up @@ -234,27 +224,13 @@ std::future<size_t> cufile_input_impl::read_async(size_t offset,
return std::async(std::launch::deferred, waiter, std::move(slice_tasks));
}

size_t cufile_input_impl::read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream)
{
auto result = read_async(offset, size, dst, stream);
return result.get();
}

cufile_output_impl::cufile_output_impl(std::string const& filepath)
: shim{cufile_shim::instance()},
cf_file(shim, filepath, O_CREAT | O_RDWR | O_DIRECT, 0664),
pool(getenv_or("LIBCUDF_CUFILE_THREAD_COUNT", 16))
{
}

void cufile_output_impl::write(void const* data, size_t offset, size_t size)
{
write_async(data, offset, size).wait();
}

std::future<void> cufile_output_impl::write_async(void const* data, size_t offset, size_t size)
{
int device;
Expand Down
Loading