Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POSIX IO: use async memcpy #374

Merged
merged 7 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 72 additions & 7 deletions cpp/include/kvikio/posix_io.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,10 @@
#include <unistd.h>
#include <cstddef>
#include <cstdlib>
#include <map>
#include <mutex>
#include <stack>
#include <thread>

#include <cstring>
#include <kvikio/error.hpp>
Expand All @@ -33,7 +35,60 @@ inline constexpr std::size_t posix_bounce_buffer_size = 2 << 23; // 16 MiB
namespace detail {

/**
* @brief Class to retain host memory allocations
* @brief Singleton class to retrieve a CUDA stream for device-host copying
*
* Call `StreamsByThread::get` to get the CUDA stream assigned to the current
* CUDA context and thread.
*/
class StreamsByThread {
private:
std::map<std::pair<CUcontext, std::thread::id>, CUstream> _streams;

public:
StreamsByThread() = default;
~StreamsByThread() noexcept
{
for (auto& [_, stream] : _streams) {
try {
CUDA_DRIVER_TRY(cudaAPI::instance().StreamDestroy(stream));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this segfault when destroying the singleton? We had to let the libcudf stream pool "leak" to avoid this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not on my workstation or in CI. Maybe it is because we are using a newer CTK. Do you have a link to the libcudf issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh. I can't repro the crash now.
I don't think we had an issue open.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking

} catch (const CUfileException& e) {
std::cerr << e.what() << std::endl;
}
}
}

static CUstream get(CUcontext ctx, std::thread::id thd_id)
{
static StreamsByThread _instance;

// If no current context, we return the null/default stream
if (ctx == nullptr) { return nullptr; }
auto key = std::make_pair(ctx, thd_id);

// Create a new stream if `ctx` doesn't have one.
if (_instance._streams.find(key) == _instance._streams.end()) {
CUstream stream{};
CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT));
_instance._streams[key] = stream;
}
return _instance._streams.at(key);
}

static CUstream get()
{
CUcontext ctx{nullptr};
CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx));
return get(ctx, std::this_thread::get_id());
}

StreamsByThread(const StreamsByThread&) = delete;
StreamsByThread& operator=(StreamsByThread const&) = delete;
StreamsByThread(StreamsByThread&& o) = delete;
StreamsByThread& operator=(StreamsByThread&& o) = delete;
};

/**
* @brief Singleton class to retain host memory allocations
*
* Call `AllocRetain::get` to get an allocation that will be retained when it
* goes out of scope (RAII). The size of all allocations are `posix_bounce_buffer_size`.
Expand Down Expand Up @@ -93,15 +148,19 @@ class AllocRetain {
}
}

static AllocRetain& instance()
{
static AllocRetain _instance;
return _instance;
}

AllocRetain(const AllocRetain&) = delete;
AllocRetain& operator=(AllocRetain const&) = delete;
AllocRetain(AllocRetain&& o) = delete;
AllocRetain& operator=(AllocRetain&& o) = delete;
~AllocRetain() noexcept = default;
};

inline AllocRetain manager; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)

/**
* @brief Read or write host memory to or from disk using POSIX
*
Expand Down Expand Up @@ -167,20 +226,26 @@ std::size_t posix_device_io(int fd,
std::size_t file_offset,
std::size_t devPtr_offset)
{
auto alloc = manager.get();
auto alloc = AllocRetain::instance().get();
CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
off_t cur_file_offset = convert_size2off(file_offset);
off_t byte_remaining = convert_size2off(size);
const off_t chunk_size2 = convert_size2off(posix_bounce_buffer_size);

// Get a stream for the current CUDA context and thread
CUstream stream = StreamsByThread::get();

while (byte_remaining > 0) {
const off_t nbytes_requested = std::min(chunk_size2, byte_remaining);
ssize_t nbytes_got = nbytes_requested;
if constexpr (IsReadOperation) {
nbytes_got = posix_host_io<true>(fd, alloc.get(), nbytes_requested, cur_file_offset, true);
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoD(devPtr, alloc.get(), nbytes_got));
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
} else { // Is a write operation
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyDtoH(alloc.get(), devPtr, nbytes_requested));
CUDA_DRIVER_TRY(
cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
posix_host_io<false>(fd, alloc.get(), nbytes_requested, cur_file_offset, false);
}
cur_file_offset += nbytes_got;
Expand Down
14 changes: 9 additions & 5 deletions cpp/include/kvikio/shim/cuda.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,8 +33,8 @@ class cudaAPI {
decltype(cuInit)* Init{nullptr};
decltype(cuMemHostAlloc)* MemHostAlloc{nullptr};
decltype(cuMemFreeHost)* MemFreeHost{nullptr};
decltype(cuMemcpyHtoD)* MemcpyHtoD{nullptr};
decltype(cuMemcpyDtoH)* MemcpyDtoH{nullptr};
decltype(cuMemcpyHtoDAsync)* MemcpyHtoDAsync{nullptr};
decltype(cuMemcpyDtoHAsync)* MemcpyDtoHAsync{nullptr};
decltype(cuPointerGetAttribute)* PointerGetAttribute{nullptr};
decltype(cuPointerGetAttributes)* PointerGetAttributes{nullptr};
decltype(cuCtxPushCurrent)* CtxPushCurrent{nullptr};
Expand All @@ -47,6 +47,8 @@ class cudaAPI {
decltype(cuDevicePrimaryCtxRetain)* DevicePrimaryCtxRetain{nullptr};
decltype(cuDevicePrimaryCtxRelease)* DevicePrimaryCtxRelease{nullptr};
decltype(cuStreamSynchronize)* StreamSynchronize{nullptr};
decltype(cuStreamCreate)* StreamCreate{nullptr};
decltype(cuStreamDestroy)* StreamDestroy{nullptr};

private:
cudaAPI()
Expand All @@ -58,8 +60,8 @@ class cudaAPI {
// the name of the symbol through cude.h.
get_symbol(MemHostAlloc, lib, KVIKIO_STRINGIFY(cuMemHostAlloc));
get_symbol(MemFreeHost, lib, KVIKIO_STRINGIFY(cuMemFreeHost));
get_symbol(MemcpyHtoD, lib, KVIKIO_STRINGIFY(cuMemcpyHtoD));
get_symbol(MemcpyDtoH, lib, KVIKIO_STRINGIFY(cuMemcpyDtoH));
get_symbol(MemcpyHtoDAsync, lib, KVIKIO_STRINGIFY(cuMemcpyHtoDAsync));
get_symbol(MemcpyDtoHAsync, lib, KVIKIO_STRINGIFY(cuMemcpyDtoHAsync));
get_symbol(PointerGetAttribute, lib, KVIKIO_STRINGIFY(cuPointerGetAttribute));
get_symbol(PointerGetAttributes, lib, KVIKIO_STRINGIFY(cuPointerGetAttributes));
get_symbol(CtxPushCurrent, lib, KVIKIO_STRINGIFY(cuCtxPushCurrent));
Expand All @@ -72,6 +74,8 @@ class cudaAPI {
get_symbol(DevicePrimaryCtxRetain, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRetain));
get_symbol(DevicePrimaryCtxRelease, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRelease));
get_symbol(StreamSynchronize, lib, KVIKIO_STRINGIFY(cuStreamSynchronize));
get_symbol(StreamCreate, lib, KVIKIO_STRINGIFY(cuStreamCreate));
get_symbol(StreamDestroy, lib, KVIKIO_STRINGIFY(cuStreamDestroy));
}

public:
Expand Down
Loading