Skip to content

Commit

Permalink
POSIX IO: use async memcpy (#374)
Browse files Browse the repository at this point in the history
Closes #311

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #374
  • Loading branch information
madsbk authored May 7, 2024
1 parent ff837ff commit 8081fbb
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 12 deletions.
79 changes: 72 additions & 7 deletions cpp/include/kvikio/posix_io.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,10 @@
#include <unistd.h>
#include <cstddef>
#include <cstdlib>
#include <map>
#include <mutex>
#include <stack>
#include <thread>

#include <cstring>
#include <kvikio/error.hpp>
Expand All @@ -33,7 +35,60 @@ inline constexpr std::size_t posix_bounce_buffer_size = 2 << 23; // 16 MiB
namespace detail {

/**
* @brief Class to retain host memory allocations
* @brief Singleton class to retrieve a CUDA stream for device-host copying
*
* Call `StreamsByThread::get` to get the CUDA stream assigned to the current
* CUDA context and thread.
*/
class StreamsByThread {
private:
std::map<std::pair<CUcontext, std::thread::id>, CUstream> _streams;

public:
StreamsByThread() = default;
~StreamsByThread() noexcept
{
for (auto& [_, stream] : _streams) {
try {
CUDA_DRIVER_TRY(cudaAPI::instance().StreamDestroy(stream));
} catch (const CUfileException& e) {
std::cerr << e.what() << std::endl;
}
}
}

static CUstream get(CUcontext ctx, std::thread::id thd_id)
{
static StreamsByThread _instance;

// If no current context, we return the null/default stream
if (ctx == nullptr) { return nullptr; }
auto key = std::make_pair(ctx, thd_id);

// Create a new stream if `ctx` doesn't have one.
if (_instance._streams.find(key) == _instance._streams.end()) {
CUstream stream{};
CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT));
_instance._streams[key] = stream;
}
return _instance._streams.at(key);
}

static CUstream get()
{
CUcontext ctx{nullptr};
CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx));
return get(ctx, std::this_thread::get_id());
}

StreamsByThread(const StreamsByThread&) = delete;
StreamsByThread& operator=(StreamsByThread const&) = delete;
StreamsByThread(StreamsByThread&& o) = delete;
StreamsByThread& operator=(StreamsByThread&& o) = delete;
};

/**
* @brief Singleton class to retain host memory allocations
*
* Call `AllocRetain::get` to get an allocation that will be retained when it
* goes out of scope (RAII). The size of all allocations are `posix_bounce_buffer_size`.
Expand Down Expand Up @@ -93,15 +148,19 @@ class AllocRetain {
}
}

static AllocRetain& instance()
{
static AllocRetain _instance;
return _instance;
}

AllocRetain(const AllocRetain&) = delete;
AllocRetain& operator=(AllocRetain const&) = delete;
AllocRetain(AllocRetain&& o) = delete;
AllocRetain& operator=(AllocRetain&& o) = delete;
~AllocRetain() noexcept = default;
};

inline AllocRetain manager; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)

/**
* @brief Read or write host memory to or from disk using POSIX
*
Expand Down Expand Up @@ -167,20 +226,26 @@ std::size_t posix_device_io(int fd,
std::size_t file_offset,
std::size_t devPtr_offset)
{
auto alloc = manager.get();
auto alloc = AllocRetain::instance().get();
CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
off_t cur_file_offset = convert_size2off(file_offset);
off_t byte_remaining = convert_size2off(size);
const off_t chunk_size2 = convert_size2off(posix_bounce_buffer_size);

// Get a stream for the current CUDA context and thread
CUstream stream = StreamsByThread::get();

while (byte_remaining > 0) {
const off_t nbytes_requested = std::min(chunk_size2, byte_remaining);
ssize_t nbytes_got = nbytes_requested;
if constexpr (IsReadOperation) {
nbytes_got = posix_host_io<true>(fd, alloc.get(), nbytes_requested, cur_file_offset, true);
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoD(devPtr, alloc.get(), nbytes_got));
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
} else { // Is a write operation
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoH(alloc.get(), devPtr, nbytes_requested));
CUDA_DRIVER_TRY(
cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
posix_host_io<false>(fd, alloc.get(), nbytes_requested, cur_file_offset, false);
}
cur_file_offset += nbytes_got;
Expand Down
14 changes: 9 additions & 5 deletions cpp/include/kvikio/shim/cuda.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,8 +33,8 @@ class cudaAPI {
decltype(cuInit)* Init{nullptr};
decltype(cuMemHostAlloc)* MemHostAlloc{nullptr};
decltype(cuMemFreeHost)* MemFreeHost{nullptr};
decltype(cuMemcpyHtoD)* MemcpyHtoD{nullptr};
decltype(cuMemcpyDtoH)* MemcpyDtoH{nullptr};
decltype(cuMemcpyHtoDAsync)* MemcpyHtoDAsync{nullptr};
decltype(cuMemcpyDtoHAsync)* MemcpyDtoHAsync{nullptr};
decltype(cuPointerGetAttribute)* PointerGetAttribute{nullptr};
decltype(cuPointerGetAttributes)* PointerGetAttributes{nullptr};
decltype(cuCtxPushCurrent)* CtxPushCurrent{nullptr};
Expand All @@ -47,6 +47,8 @@ class cudaAPI {
decltype(cuDevicePrimaryCtxRetain)* DevicePrimaryCtxRetain{nullptr};
decltype(cuDevicePrimaryCtxRelease)* DevicePrimaryCtxRelease{nullptr};
decltype(cuStreamSynchronize)* StreamSynchronize{nullptr};
decltype(cuStreamCreate)* StreamCreate{nullptr};
decltype(cuStreamDestroy)* StreamDestroy{nullptr};

private:
cudaAPI()
Expand All @@ -58,8 +60,8 @@ class cudaAPI {
// the name of the symbol through cude.h.
get_symbol(MemHostAlloc, lib, KVIKIO_STRINGIFY(cuMemHostAlloc));
get_symbol(MemFreeHost, lib, KVIKIO_STRINGIFY(cuMemFreeHost));
get_symbol(MemcpyHtoD, lib, KVIKIO_STRINGIFY(cuMemcpyHtoD));
get_symbol(MemcpyDtoH, lib, KVIKIO_STRINGIFY(cuMemcpyDtoH));
get_symbol(MemcpyHtoDAsync, lib, KVIKIO_STRINGIFY(cuMemcpyHtoDAsync));
get_symbol(MemcpyDtoHAsync, lib, KVIKIO_STRINGIFY(cuMemcpyDtoHAsync));
get_symbol(PointerGetAttribute, lib, KVIKIO_STRINGIFY(cuPointerGetAttribute));
get_symbol(PointerGetAttributes, lib, KVIKIO_STRINGIFY(cuPointerGetAttributes));
get_symbol(CtxPushCurrent, lib, KVIKIO_STRINGIFY(cuCtxPushCurrent));
Expand All @@ -72,6 +74,8 @@ class cudaAPI {
get_symbol(DevicePrimaryCtxRetain, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRetain));
get_symbol(DevicePrimaryCtxRelease, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRelease));
get_symbol(StreamSynchronize, lib, KVIKIO_STRINGIFY(cuStreamSynchronize));
get_symbol(StreamCreate, lib, KVIKIO_STRINGIFY(cuStreamCreate));
get_symbol(StreamDestroy, lib, KVIKIO_STRINGIFY(cuStreamDestroy));
}

public:
Expand Down

0 comments on commit 8081fbb

Please sign in to comment.