Skip to content

Commit

Permalink
posix: roll back stream argument
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Apr 29, 2024
1 parent 439a304 commit f83475e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 35 deletions.
32 changes: 11 additions & 21 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -308,8 +308,7 @@ class FileHandle {
std::size_t devPtr_offset)
{
if (_compat_mode) {
return posix_device_read(
_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset, nullptr);
return posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
}
#ifdef KVIKIO_CUFILE_FOUND
ssize_t ret = cuFileAPI::instance().Read(
Expand Down Expand Up @@ -359,8 +358,7 @@ class FileHandle {
_nbytes = 0; // Invalidate the computed file size

if (_compat_mode) {
return posix_device_write(
_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset, nullptr);
return posix_device_write(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
}
#ifdef KVIKIO_CUFILE_FOUND
ssize_t ret = cuFileAPI::instance().Write(
Expand Down Expand Up @@ -422,7 +420,7 @@ class FileHandle {
if (size < gds_threshold) {
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
PushAndPopContext c(ctx);
return posix_device_read(_fd_direct_off, buf, size, file_offset, 0, nullptr);
return posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
};
return std::async(std::launch::deferred, task);
}
Expand Down Expand Up @@ -483,7 +481,7 @@ class FileHandle {
if (size < gds_threshold) {
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
PushAndPopContext c(ctx);
return posix_device_write(_fd_direct_off, buf, size, file_offset, 0, nullptr);
return posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
};
return std::async(std::launch::deferred, task);
}
Expand Down Expand Up @@ -548,14 +546,10 @@ class FileHandle {
return;
}
#endif

CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
if (_compat_mode) {
*bytes_read_p = static_cast<ssize_t>(posix_device_read(
_fd_direct_off, devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p, stream));
} else {
*bytes_read_p =
static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
}
*bytes_read_p =
static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
}

/**
Expand Down Expand Up @@ -645,14 +639,10 @@ class FileHandle {
return;
}
#endif

CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
if (_compat_mode) {
*bytes_written_p = static_cast<ssize_t>(posix_device_write(
_fd_direct_off, devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p, stream));
} else {
*bytes_written_p =
static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
}
*bytes_written_p =
static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
}

/**
Expand Down
22 changes: 8 additions & 14 deletions cpp/include/kvikio/posix_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace detail {
/**
* @brief Singleton class to retrieve a CUDA stream for device-host copying
*
* Call `AllocRetain::get` to get the CUDA stream assigned to the current
* Call `StreamsByThread::get` to get the CUDA stream assigned to the current
* CUDA context and thread.
*/
class StreamsByThread {
Expand Down Expand Up @@ -217,25 +217,23 @@ ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset, bool
* @param size Number of bytes to read or write.
* @param file_offset Byte offset to the start of the file.
* @param devPtr_offset Byte offset to the start of the device pointer.
* @param stream CUDA stream in which to enqueue the operation.
* @return Number of bytes read or written.
*/
template <bool IsReadOperation>
std::size_t posix_device_io(int fd,
const void* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset,
CUstream stream)
std::size_t devPtr_offset)
{
auto alloc = AllocRetain::instance().get();
CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
off_t cur_file_offset = convert_size2off(file_offset);
off_t byte_remaining = convert_size2off(size);
const off_t chunk_size2 = convert_size2off(posix_bounce_buffer_size);

// Get a stream if none were given by the caller
if (stream == nullptr) { stream = StreamsByThread::get(); }
// Get a stream for the current CUDA context and thread
CUstream stream = StreamsByThread::get();

while (byte_remaining > 0) {
const off_t nbytes_requested = std::min(chunk_size2, byte_remaining);
Expand Down Expand Up @@ -308,17 +306,15 @@ inline std::size_t posix_host_write(
* @param size Size in bytes to read.
* @param file_offset Offset in the file to read from.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into.
* @param stream CUDA stream in which to enqueue the operation.
* @return Size of bytes that were successfully read.
*/
inline std::size_t posix_device_read(int fd,
const void* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset,
CUstream stream)
std::size_t devPtr_offset)
{
return detail::posix_device_io<true>(fd, devPtr_base, size, file_offset, devPtr_offset, stream);
return detail::posix_device_io<true>(fd, devPtr_base, size, file_offset, devPtr_offset);
}

/**
Expand All @@ -332,17 +328,15 @@ inline std::size_t posix_device_read(int fd,
* @param size Size in bytes to write.
* @param file_offset Offset in the file to write to.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to write into.
* @param stream CUDA stream in which to enqueue the operation.
* @return Size of bytes that were successfully written.
*/
inline std::size_t posix_device_write(int fd,
const void* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset,
CUstream stream)
std::size_t devPtr_offset)
{
return detail::posix_device_io<false>(fd, devPtr_base, size, file_offset, devPtr_offset, stream);
return detail::posix_device_io<false>(fd, devPtr_base, size, file_offset, devPtr_offset);
}

} // namespace kvikio

0 comments on commit f83475e

Please sign in to comment.