diff --git a/cpp/include/kvikio/posix_io.hpp b/cpp/include/kvikio/posix_io.hpp index a6786747f3..9f2bbd4fd6 100644 --- a/cpp/include/kvikio/posix_io.hpp +++ b/cpp/include/kvikio/posix_io.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -33,7 +35,60 @@ inline constexpr std::size_t posix_bounce_buffer_size = 2 << 23; // 16 MiB namespace detail { /** - * @brief Class to retain host memory allocations + * @brief Singleton class to retrieve a CUDA stream for device-host copying + * + * Call `StreamsByThread::get` to get the CUDA stream assigned to the current + * CUDA context and thread. + */ +class StreamsByThread { + private: + std::map, CUstream> _streams; + + public: + StreamsByThread() = default; + ~StreamsByThread() noexcept + { + for (auto& [_, stream] : _streams) { + try { + CUDA_DRIVER_TRY(cudaAPI::instance().StreamDestroy(stream)); + } catch (const CUfileException& e) { + std::cerr << e.what() << std::endl; + } + } + } + + static CUstream get(CUcontext ctx, std::thread::id thd_id) + { + static StreamsByThread _instance; + + // If no current context, we return the null/default stream + if (ctx == nullptr) { return nullptr; } + auto key = std::make_pair(ctx, thd_id); + + // Create a new stream if `ctx` doesn't have one. + if (_instance._streams.find(key) == _instance._streams.end()) { + CUstream stream{}; + CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT)); + _instance._streams[key] = stream; + } + return _instance._streams.at(key); + } + + static CUstream get() + { + CUcontext ctx{nullptr}; + CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); + return get(ctx, std::this_thread::get_id()); + } + + StreamsByThread(const StreamsByThread&) = delete; + StreamsByThread& operator=(StreamsByThread const&) = delete; + StreamsByThread(StreamsByThread&& o) = delete; + StreamsByThread& operator=(StreamsByThread&& o) = delete; +}; + +/** + * @brief Singleton class to retain host memory allocations * * Call `AllocRetain::get` to get an allocation that will be retained when it * goes out of scope (RAII). The size of all allocations are `posix_bounce_buffer_size`. @@ -93,6 +148,12 @@ class AllocRetain { } } + static AllocRetain& instance() + { + static AllocRetain _instance; + return _instance; + } + AllocRetain(const AllocRetain&) = delete; AllocRetain& operator=(AllocRetain const&) = delete; AllocRetain(AllocRetain&& o) = delete; @@ -100,8 +161,6 @@ class AllocRetain { ~AllocRetain() noexcept = default; }; -inline AllocRetain manager; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) - /** * @brief Read or write host memory to or from disk using POSIX * @@ -167,20 +226,26 @@ std::size_t posix_device_io(int fd, std::size_t file_offset, std::size_t devPtr_offset) { - auto alloc = manager.get(); + auto alloc = AllocRetain::instance().get(); CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset; off_t cur_file_offset = convert_size2off(file_offset); off_t byte_remaining = convert_size2off(size); const off_t chunk_size2 = convert_size2off(posix_bounce_buffer_size); + // Get a stream for the current CUDA context and thread + CUstream stream = StreamsByThread::get(); + while (byte_remaining > 0) { const off_t nbytes_requested = std::min(chunk_size2, byte_remaining); ssize_t nbytes_got = nbytes_requested; if constexpr (IsReadOperation) { nbytes_got = posix_host_io(fd, alloc.get(), nbytes_requested, cur_file_offset, true); - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoD(devPtr, alloc.get(), nbytes_got)); + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } else { // Is a write operation - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoH(alloc.get(), devPtr, nbytes_requested)); + CUDA_DRIVER_TRY( + cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); posix_host_io(fd, alloc.get(), nbytes_requested, cur_file_offset, false); } cur_file_offset += nbytes_got; diff --git a/cpp/include/kvikio/shim/cuda.hpp b/cpp/include/kvikio/shim/cuda.hpp index 7d4b08d9d8..e01df4643e 100644 --- a/cpp/include/kvikio/shim/cuda.hpp +++ b/cpp/include/kvikio/shim/cuda.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,8 +33,8 @@ class cudaAPI { decltype(cuInit)* Init{nullptr}; decltype(cuMemHostAlloc)* MemHostAlloc{nullptr}; decltype(cuMemFreeHost)* MemFreeHost{nullptr}; - decltype(cuMemcpyHtoD)* MemcpyHtoD{nullptr}; - decltype(cuMemcpyDtoH)* MemcpyDtoH{nullptr}; + decltype(cuMemcpyHtoDAsync)* MemcpyHtoDAsync{nullptr}; + decltype(cuMemcpyDtoHAsync)* MemcpyDtoHAsync{nullptr}; decltype(cuPointerGetAttribute)* PointerGetAttribute{nullptr}; decltype(cuPointerGetAttributes)* PointerGetAttributes{nullptr}; decltype(cuCtxPushCurrent)* CtxPushCurrent{nullptr}; @@ -47,6 +47,8 @@ class cudaAPI { decltype(cuDevicePrimaryCtxRetain)* DevicePrimaryCtxRetain{nullptr}; decltype(cuDevicePrimaryCtxRelease)* DevicePrimaryCtxRelease{nullptr}; decltype(cuStreamSynchronize)* StreamSynchronize{nullptr}; + decltype(cuStreamCreate)* StreamCreate{nullptr}; + decltype(cuStreamDestroy)* StreamDestroy{nullptr}; private: cudaAPI() @@ -58,8 +60,8 @@ class cudaAPI { // the name of the symbol through cude.h. get_symbol(MemHostAlloc, lib, KVIKIO_STRINGIFY(cuMemHostAlloc)); get_symbol(MemFreeHost, lib, KVIKIO_STRINGIFY(cuMemFreeHost)); - get_symbol(MemcpyHtoD, lib, KVIKIO_STRINGIFY(cuMemcpyHtoD)); - get_symbol(MemcpyDtoH, lib, KVIKIO_STRINGIFY(cuMemcpyDtoH)); + get_symbol(MemcpyHtoDAsync, lib, KVIKIO_STRINGIFY(cuMemcpyHtoDAsync)); + get_symbol(MemcpyDtoHAsync, lib, KVIKIO_STRINGIFY(cuMemcpyDtoHAsync)); get_symbol(PointerGetAttribute, lib, KVIKIO_STRINGIFY(cuPointerGetAttribute)); get_symbol(PointerGetAttributes, lib, KVIKIO_STRINGIFY(cuPointerGetAttributes)); get_symbol(CtxPushCurrent, lib, KVIKIO_STRINGIFY(cuCtxPushCurrent)); @@ -72,6 +74,8 @@ class cudaAPI { get_symbol(DevicePrimaryCtxRetain, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRetain)); get_symbol(DevicePrimaryCtxRelease, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRelease)); get_symbol(StreamSynchronize, lib, KVIKIO_STRINGIFY(cuStreamSynchronize)); + get_symbol(StreamCreate, lib, KVIKIO_STRINGIFY(cuStreamCreate)); + get_symbol(StreamDestroy, lib, KVIKIO_STRINGIFY(cuStreamDestroy)); } public: