Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
kingcrimsontianyu committed Nov 15, 2024
1 parent 74dc84c commit f89965e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 71 deletions.
29 changes: 29 additions & 0 deletions cpp/include/kvikio/detail/io_operation.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cstdint>

namespace kvikio::detail {

/**
* @brief Type of the IO operation.
*/
enum class IOOperationType : uint8_t {
READ,
WRITE,
};
} // namespace kvikio::detail
119 changes: 57 additions & 62 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <kvikio/buffer.hpp>
#include <kvikio/cufile/config.hpp>
#include <kvikio/defaults.hpp>
#include <kvikio/detail/io_operation.hpp>
#include <kvikio/error.hpp>
#include <kvikio/parallel_operation.hpp>
#include <kvikio/posix_io.hpp>
Expand All @@ -51,6 +52,58 @@ class FileHandle {
mutable std::size_t _nbytes{0}; // The size of the underlying file, zero means unknown.
CUfileHandle_t _handle{};

template <detail::IOOperationType Operation>
void handle_async_io(void* devPtr_base,
std::size_t* size_p,
off_t* file_offset_p,
off_t* devPtr_offset_p,
ssize_t* bytes_done_p,
CUstream stream)
{
auto posix_fallback = [&] {
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
if constexpr (Operation == detail::IOOperationType::READ) {
*bytes_done_p =
static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
} else {
*bytes_done_p =
static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
}
};

if (defaults::can_compat_mode_reduce_to_off(_compat_mode)) {
if (!kvikio::is_batch_and_stream_available()) {
if (_compat_mode == CompatMode::AUTO) {
posix_fallback();
return;
}
throw std::runtime_error("Missing cuFile batch or stream library symbol.");
}

// When checking for availability, we also check if cuFile's config file exist. This is
// because even when the stream API is available, it doesn't work if no config file exist.
if (config_path().empty()) {
if (_compat_mode == CompatMode::AUTO) {
posix_fallback();
return;
}
throw std::runtime_error("Missing cuFile configuration file.");
}

if constexpr (Operation == detail::IOOperationType::READ) {
CUFILE_TRY(cuFileAPI::instance().ReadAsync(
_handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_done_p, stream));
} else {
CUFILE_TRY(cuFileAPI::instance().WriteAsync(
_handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_done_p, stream));
}

return;
}

posix_fallback();
}

public:
static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
FileHandle() noexcept = default;
Expand Down Expand Up @@ -473,37 +526,8 @@ class FileHandle {
ssize_t* bytes_read_p,
CUstream stream)
{
auto posix_fallback = [&] {
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
*bytes_read_p =
static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
};

if (defaults::can_compat_mode_reduce_to_off(_compat_mode)) {
if (!kvikio::is_batch_and_stream_available()) {
if (_compat_mode == CompatMode::AUTO) {
posix_fallback();
return;
}
throw std::runtime_error("Missing cuFile batch or stream library symbol.");
}

// When checking for availability, we also check if cuFile's config file exist. This is
// because even when the stream API is available, it doesn't work if no config file exist.
if (config_path().empty()) {
if (_compat_mode == CompatMode::AUTO) {
posix_fallback();
return;
}
throw std::runtime_error("Missing cuFile configuration file.");
}

CUFILE_TRY(cuFileAPI::instance().ReadAsync(
_handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
return;
}

posix_fallback();
handle_async_io<detail::IOOperationType::READ>(
devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream);
}

/**
Expand Down Expand Up @@ -586,37 +610,8 @@ class FileHandle {
ssize_t* bytes_written_p,
CUstream stream)
{
auto posix_fallback = [&] {
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
*bytes_written_p =
static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
};

if (defaults::can_compat_mode_reduce_to_off(_compat_mode)) {
if (!kvikio::is_batch_and_stream_available()) {
if (_compat_mode == CompatMode::AUTO) {
posix_fallback();
return;
}
throw std::runtime_error("Missing cuFile batch or stream library symbol.");
}

// When checking for availability, we also check if cuFile's config file exist. This is
// because even when the stream API is available, it doesn't work if no config file exist.
if (config_path().empty()) {
if (_compat_mode == CompatMode::AUTO) {
posix_fallback();
return;
}
throw std::runtime_error("Missing cuFile configuration file.");
}

CUFILE_TRY(cuFileAPI::instance().WriteAsync(
_handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
return;
}

posix_fallback();
handle_async_io<detail::IOOperationType::WRITE>(
devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream);
}

/**
Expand Down
10 changes: 1 addition & 9 deletions cpp/include/kvikio/posix_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,12 @@
#include <thread>

#include <kvikio/bounce_buffer.hpp>
#include <kvikio/detail/io_operation.hpp>
#include <kvikio/error.hpp>
#include <kvikio/shim/cuda.hpp>
#include <kvikio/utils.hpp>

namespace kvikio::detail {

/**
* @brief Type of the IO operation.
*/
enum class IOOperationType : uint8_t {
READ, ///< POSIX read.
WRITE, ///< POSIX write.
};

/**
* @brief Specifies whether all requested bytes are to be processed or not.
*/
Expand Down

0 comments on commit f89965e

Please sign in to comment.