diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index d9a31570cd..9cd74b2319 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -308,8 +308,7 @@ class FileHandle { std::size_t devPtr_offset) { if (_compat_mode) { - return posix_device_read( - _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset, nullptr); + return posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset); } #ifdef KVIKIO_CUFILE_FOUND ssize_t ret = cuFileAPI::instance().Read( @@ -359,8 +358,7 @@ class FileHandle { _nbytes = 0; // Invalidate the computed file size if (_compat_mode) { - return posix_device_write( - _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset, nullptr); + return posix_device_write(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset); } #ifdef KVIKIO_CUFILE_FOUND ssize_t ret = cuFileAPI::instance().Write( @@ -422,7 +420,7 @@ class FileHandle { if (size < gds_threshold) { auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { PushAndPopContext c(ctx); - return posix_device_read(_fd_direct_off, buf, size, file_offset, 0, nullptr); + return posix_device_read(_fd_direct_off, buf, size, file_offset, 0); }; return std::async(std::launch::deferred, task); } @@ -483,7 +481,7 @@ class FileHandle { if (size < gds_threshold) { auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { PushAndPopContext c(ctx); - return posix_device_write(_fd_direct_off, buf, size, file_offset, 0, nullptr); + return posix_device_write(_fd_direct_off, buf, size, file_offset, 0); }; return std::async(std::launch::deferred, task); } @@ -548,14 +546,10 @@ class FileHandle { return; } #endif + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); - if (_compat_mode) { - *bytes_read_p = static_cast(posix_device_read( - _fd_direct_off, devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p, stream)); - } else { - *bytes_read_p = - static_cast(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p)); - } + *bytes_read_p = + static_cast(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p)); } /** @@ -645,14 +639,10 @@ class FileHandle { return; } #endif + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); - if (_compat_mode) { - *bytes_written_p = static_cast(posix_device_write( - _fd_direct_off, devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p, stream)); - } else { - *bytes_written_p = - static_cast(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p)); - } + *bytes_written_p = + static_cast(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p)); } /** diff --git a/cpp/include/kvikio/posix_io.hpp b/cpp/include/kvikio/posix_io.hpp index 02bc394c6a..560d266655 100644 --- a/cpp/include/kvikio/posix_io.hpp +++ b/cpp/include/kvikio/posix_io.hpp @@ -37,7 +37,7 @@ namespace detail { /** * @brief Singleton class to retrieve a CUDA stream for device-host copying * - * Call `AllocRetain::get` to get the CUDA stream assigned to the current + * Call `StreamsByThread::get` to get the CUDA stream assigned to the current * CUDA context and thread. */ class StreamsByThread { @@ -217,7 +217,6 @@ ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset, bool * @param size Number of bytes to read or write. * @param file_offset Byte offset to the start of the file. * @param devPtr_offset Byte offset to the start of the device pointer. - * @param stream CUDA stream in which to enqueue the operation. * @return Number of bytes read or written. */ template @@ -225,8 +224,7 @@ std::size_t posix_device_io(int fd, const void* devPtr_base, std::size_t size, std::size_t file_offset, - std::size_t devPtr_offset, - CUstream stream) + std::size_t devPtr_offset) { auto alloc = AllocRetain::instance().get(); CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset; @@ -234,8 +232,8 @@ std::size_t posix_device_io(int fd, off_t byte_remaining = convert_size2off(size); const off_t chunk_size2 = convert_size2off(posix_bounce_buffer_size); - // Get a stream if none were given by the caller - if (stream == nullptr) { stream = StreamsByThread::get(); } + // Get a stream for the current CUDA context and thread + CUstream stream = StreamsByThread::get(); while (byte_remaining > 0) { const off_t nbytes_requested = std::min(chunk_size2, byte_remaining); @@ -308,17 +306,15 @@ inline std::size_t posix_host_write( * @param size Size in bytes to read. * @param file_offset Offset in the file to read from. * @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into. - * @param stream CUDA stream in which to enqueue the operation. * @return Size of bytes that were successfully read. */ inline std::size_t posix_device_read(int fd, const void* devPtr_base, std::size_t size, std::size_t file_offset, - std::size_t devPtr_offset, - CUstream stream) + std::size_t devPtr_offset) { - return detail::posix_device_io(fd, devPtr_base, size, file_offset, devPtr_offset, stream); + return detail::posix_device_io(fd, devPtr_base, size, file_offset, devPtr_offset); } /** @@ -332,17 +328,15 @@ inline std::size_t posix_device_read(int fd, * @param size Size in bytes to write. * @param file_offset Offset in the file to write to. * @param devPtr_offset Offset relative to the `devPtr_base` pointer to write into. - * @param stream CUDA stream in which to enqueue the operation. * @return Size of bytes that were successfully written. */ inline std::size_t posix_device_write(int fd, const void* devPtr_base, std::size_t size, std::size_t file_offset, - std::size_t devPtr_offset, - CUstream stream) + std::size_t devPtr_offset) { - return detail::posix_device_io(fd, devPtr_base, size, file_offset, devPtr_offset, stream); + return detail::posix_device_io(fd, devPtr_base, size, file_offset, devPtr_offset); } } // namespace kvikio