Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a memory resource based on cudaMallocAsync #4900

Merged
merged 7 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 177 additions & 94 deletions dali/core/mm/async_pool_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include "dali/core/cuda_stream.h"
#include "dali/core/mm/cuda_vm_resource.h"

#if CUDA_VERSION >= 11020
#include "dali/core/mm/malloc_resource.h"
#endif

namespace dali {
namespace mm {

Expand Down Expand Up @@ -125,7 +129,7 @@ struct block {
};

template <typename Pool, typename Mutex>
void AsyncPoolTest(Pool &pool, vector<block> &blocks, Mutex &mtx, CUDAStream &stream,
void AsyncPoolTest(Pool &pool, std::vector<block> &blocks, Mutex &mtx, CUDAStream &stream,
int max_iters = 20000, bool use_hog = false) {
stream_view sv(stream);
std::mt19937_64 rng(12345);
Expand All @@ -146,8 +150,10 @@ void AsyncPoolTest(Pool &pool, vector<block> &blocks, Mutex &mtx, CUDAStream &st
int max_hogs = sync_dist(rng);
CUDAEvent event = CUDAEvent::Create();
for (int i = 0; i < max_iters; i++) {
if (i == max_iters / 2)
pool.release_unused();
if constexpr (std::is_base_of_v<mm::pool_resource_base<typename Pool::memory_kind>, Pool>) {
if (i == max_iters / 2)
pool.release_unused();
}

if (use_hog && hog_dist(rng)) {
if (hogs++ > max_hogs) {
Expand Down Expand Up @@ -214,7 +220,7 @@ TEST(MMAsyncPool, SingleStreamRandom) {

{
async_pool_resource<memory_kind::device> pool(&upstream);
vector<block> blocks;
std::vector<block> blocks;
detail::dummy_lock mtx;
AsyncPoolTest(pool, blocks, mtx, stream);
}
Expand All @@ -229,12 +235,12 @@ TEST(MMAsyncPool, MultiThreadedSingleStreamRandom) {
CUDAStream stream = CUDAStream::Create(true);
mm::test::test_device_resource upstream;
{
vector<block> blocks;
std::vector<block> blocks;
std::mutex mtx;

async_pool_resource<memory_kind::device> pool(&upstream);

vector<std::thread> threads;
std::vector<std::thread> threads;

for (int t = 0; t < 10; t++) {
threads.push_back(std::thread([&]() {
Expand All @@ -255,12 +261,12 @@ TEST(MMAsyncPool, MultiThreadedMultiStreamRandom) {
{
async_pool_resource<memory_kind::device> pool(&upstream);

vector<std::thread> threads;
std::vector<std::thread> threads;

for (int t = 0; t < 10; t++) {
threads.push_back(std::thread([&]() {
CUDAStream stream = CUDAStream::Create(true);
vector<block> blocks;
std::vector<block> blocks;
detail::dummy_lock mtx;
AsyncPoolTest(pool, blocks, mtx, stream);
CUDA_CALL(cudaStreamSynchronize(stream));
Expand All @@ -279,13 +285,13 @@ TEST(MMAsyncPool, MultiStreamRandomWithGPUHogs) {
{
async_pool_resource<memory_kind::device> pool(&upstream, false);

vector<std::thread> threads;
std::vector<std::thread> threads;

for (int t = 0; t < 10; t++) {
threads.push_back(std::thread([&]() {
// 0-th thread uses null stream, which triggers non-async API usage
CUDAStream stream = t ? CUDAStream::Create(true) : CUDAStream();
vector<block> blocks;
std::vector<block> blocks;
detail::dummy_lock mtx;
AsyncPoolTest(pool, blocks, mtx, stream, 20000, true);
CUDA_CALL(cudaStreamSynchronize(stream));
Expand All @@ -305,10 +311,10 @@ TEST(MMAsyncPool, CrossStream) {
{
async_pool_resource<memory_kind::device> pool(&upstream, false);

vector<std::thread> threads;
vector<CUDAStream> streams;
std::vector<std::thread> threads;
std::vector<CUDAStream> streams;

vector<block> blocks;
std::vector<block> blocks;
std::mutex mtx;

const int N = 10;
Expand All @@ -334,10 +340,10 @@ TEST(MMAsyncPool, CrossStreamWithHogs) {
{
async_pool_resource<memory_kind::device> pool(&upstream);

vector<std::thread> threads;
vector<CUDAStream> streams;
std::vector<std::thread> threads;
std::vector<CUDAStream> streams;

vector<block> blocks;
std::vector<block> blocks;
std::mutex mtx;

const int N = 10;
Expand All @@ -358,124 +364,201 @@ TEST(MMAsyncPool, CrossStreamWithHogs) {
upstream.check_leaks();
}

#if DALI_USE_CUDA_VM_MAP
class MMAsyncPoolTest : public ::testing::Test {
public:
template <typename MememoryResource>
void MultiThreadedSingleStreamRandom() {
CUDAStream stream = CUDAStream::Create(true);
{
std::vector<block> blocks;
std::mutex mtx;

TEST(MM_VMAsyncPool, MultiThreadedSingleStreamRandom) {
if (!cuvm::IsSupported())
GTEST_SKIP() << "Virtual memory management API is not supported on this machine.";
MememoryResource pool;

CUDAStream stream = CUDAStream::Create(true);
{
vector<block> blocks;
std::mutex mtx;
std::vector<std::thread> threads;

async_pool_resource<memory_kind::device, cuda_vm_resource> pool;
for (int t = 0; t < 10; t++) {
threads.push_back(std::thread([&]() {
AsyncPoolTest(pool, blocks, mtx, stream);
}));
}
for (auto &t : threads)
t.join();
}
}

template <typename MemoryResource>
void MultiThreadedMultiStreamRandom() {
MemoryResource pool;

vector<std::thread> threads;
std::vector<std::thread> threads;

for (int t = 0; t < 10; t++) {
threads.push_back(std::thread([&]() {
CUDAStream stream = CUDAStream::Create(true);
std::vector<block> blocks;
detail::dummy_lock mtx;
AsyncPoolTest(pool, blocks, mtx, stream);
CUDA_CALL(cudaStreamSynchronize(stream));
}));
}
for (auto &t : threads)
t.join();
}
}

TEST(MM_VMAsyncPool, MultiThreadedMultiStreamRandom) {
template <typename MemoryResource>
void MultiStreamRandomWithGPUHogs() {
MemoryResource pool;

std::vector<std::thread> threads;

for (int t = 0; t < 10; t++) {
threads.push_back(std::thread([&]() {
// 0-th thread uses null stream, which triggers non-async API usage
CUDAStream stream = t ? CUDAStream::Create(true) : CUDAStream();
std::vector<block> blocks;
detail::dummy_lock mtx;
AsyncPoolTest(pool, blocks, mtx, stream, 20000, true);
CUDA_CALL(cudaStreamSynchronize(stream));
}));
}
for (auto &t : threads)
t.join();
}

template <typename MemoryResource>
void CrossStream() {
async_pool_resource<memory_kind::device, cuda_vm_resource> pool;

std::vector<std::thread> threads;
std::vector<CUDAStream> streams;

std::vector<block> blocks;
std::mutex mtx;

const int N = 10;
streams.resize(N);
for (int t = 0; t < N; t++) {
if (t != 0) // keep empty stream at index 0 to mix sync/async allocations
streams[t] = CUDAStream::Create(true);
threads.push_back(std::thread([&, t]() {
AsyncPoolTest(pool, blocks, mtx, streams[t]);
CUDA_CALL(cudaStreamSynchronize(streams[t]));
}));
}
for (auto &t : threads)
t.join();
}

template <typename MemoryResource>
void CrossStreamWithHogs() {
MemoryResource pool;

std::vector<std::thread> threads;
std::vector<CUDAStream> streams;

std::vector<block> blocks;
std::mutex mtx;

const int N = 10;
streams.resize(N);
for (int t = 0; t < N; t++) {
if (t != 0) // keep empty stream at index 0 to mix sync/async allocations
streams[t] = CUDAStream::Create(true);
threads.push_back(std::thread([&, t]() {
AsyncPoolTest(pool, blocks, mtx, streams[t], 10000, true);
CUDA_CALL(cudaStreamSynchronize(streams[t]));
}));
}
for (auto &t : threads)
t.join();
}
};

#if DALI_USE_CUDA_VM_MAP

TEST_F(MMAsyncPoolTest, VM_MultiThreadedSingleStreamRandom) {
if (!cuvm::IsSupported())
GTEST_SKIP() << "Virtual memory management API is not supported on this machine.";

async_pool_resource<memory_kind::device, cuda_vm_resource> pool;
using MR = async_pool_resource<memory_kind::device, cuda_vm_resource>;
this->MultiThreadedSingleStreamRandom<MR>();
}

vector<std::thread> threads;
TEST_F(MMAsyncPoolTest, VM_MultiThreadedMultiStreamRandom) {
if (!cuvm::IsSupported())
GTEST_SKIP() << "Virtual memory management API is not supported on this machine.";

for (int t = 0; t < 10; t++) {
threads.push_back(std::thread([&]() {
CUDAStream stream = CUDAStream::Create(true);
vector<block> blocks;
detail::dummy_lock mtx;
AsyncPoolTest(pool, blocks, mtx, stream);
CUDA_CALL(cudaStreamSynchronize(stream));
}));
}
for (auto &t : threads)
t.join();
using MR = async_pool_resource<memory_kind::device, cuda_vm_resource>;
this->MultiThreadedMultiStreamRandom<MR>();
}

TEST(MM_VMAsyncPool, MultiStreamRandomWithGPUHogs) {
TEST_F(MMAsyncPoolTest, VM_MultiStreamRandomWithGPUHogs) {
if (!cuvm::IsSupported())
GTEST_SKIP() << "Virtual memory management API is not supported on this machine.";

async_pool_resource<memory_kind::device, cuda_vm_resource> pool;
using MR = async_pool_resource<memory_kind::device, cuda_vm_resource>;
this->MultiStreamRandomWithGPUHogs<MR>();
}

vector<std::thread> threads;
TEST_F(MMAsyncPoolTest, VM_CrossStream) {
if (!cuvm::IsSupported())
GTEST_SKIP() << "Virtual memory management API is not supported on this machine.";

for (int t = 0; t < 10; t++) {
threads.push_back(std::thread([&]() {
// 0-th thread uses null stream, which triggers non-async API usage
CUDAStream stream = t ? CUDAStream::Create(true) : CUDAStream();
vector<block> blocks;
detail::dummy_lock mtx;
AsyncPoolTest(pool, blocks, mtx, stream, 20000, true);
CUDA_CALL(cudaStreamSynchronize(stream));
}));
}
for (auto &t : threads)
t.join();
using MR = async_pool_resource<memory_kind::device, cuda_vm_resource>;
this->CrossStream<MR>();
}

TEST(MM_VMAsyncPool, CrossStream) {
TEST_F(MMAsyncPoolTest, VM_CrossStreamWithHogs) {
if (!cuvm::IsSupported())
GTEST_SKIP() << "Virtual memory management API is not supported on this machine.";

async_pool_resource<memory_kind::device, cuda_vm_resource> pool;
using MR = async_pool_resource<memory_kind::device, cuda_vm_resource>;
this->CrossStreamWithHogs<MR>();
}

vector<std::thread> threads;
vector<CUDAStream> streams;
#endif

vector<block> blocks;
std::mutex mtx;
#if CUDA_VERSION >= 11020

const int N = 10;
streams.resize(N);
for (int t = 0; t < N; t++) {
if (t != 0) // keep empty stream at index 0 to mix sync/async allocations
streams[t] = CUDAStream::Create(true);
threads.push_back(std::thread([&, t]() {
AsyncPoolTest(pool, blocks, mtx, streams[t]);
CUDA_CALL(cudaStreamSynchronize(streams[t]));
}));
}
for (auto &t : threads)
t.join();
TEST_F(MMAsyncPoolTest, cudaMallocAsync_MultiThreadedSingleStreamRandom) {
if (!cuda_malloc_async_memory_resource::is_supported())
GTEST_SKIP() << "cudaMallocAsync not supported";

using MR = cuda_malloc_async_memory_resource;
this->MultiThreadedSingleStreamRandom<MR>();
}

TEST(MM_VMAsyncPool, CrossStreamWithHogs) {
if (!cuvm::IsSupported())
GTEST_SKIP() << "Virtual memory management API is not supported on this machine.";
TEST_F(MMAsyncPoolTest, cudaMallocAsync_MultiThreadedMultiStreamRandom) {
if (!cuda_malloc_async_memory_resource::is_supported())
GTEST_SKIP() << "cudaMallocAsync not supported";

async_pool_resource<memory_kind::device, cuda_vm_resource> pool;
using MR = cuda_malloc_async_memory_resource;
this->MultiThreadedMultiStreamRandom<MR>();
}

vector<std::thread> threads;
vector<CUDAStream> streams;
TEST_F(MMAsyncPoolTest, cudaMallocAsync_MultiStreamRandomWithGPUHogs) {
if (!cuda_malloc_async_memory_resource::is_supported())
GTEST_SKIP() << "cudaMallocAsync not supported";

vector<block> blocks;
std::mutex mtx;
using MR = cuda_malloc_async_memory_resource;
this->MultiStreamRandomWithGPUHogs<MR>();
}

const int N = 10;
streams.resize(N);
for (int t = 0; t < N; t++) {
if (t != 0) // keep empty stream at index 0 to mix sync/async allocations
streams[t] = CUDAStream::Create(true);
threads.push_back(std::thread([&, t]() {
AsyncPoolTest(pool, blocks, mtx, streams[t], 10000, true);
CUDA_CALL(cudaStreamSynchronize(streams[t]));
}));
}
for (auto &t : threads)
t.join();
TEST_F(MMAsyncPoolTest, cudaMallocAsync_CrossStream) {
if (!cuda_malloc_async_memory_resource::is_supported())
GTEST_SKIP() << "cudaMallocAsync not supported";

using MR = cuda_malloc_async_memory_resource;
this->CrossStream<MR>();
}

TEST_F(MMAsyncPoolTest, cudaMallocAsync_CrossStreamWithHogs) {
if (!cuda_malloc_async_memory_resource::is_supported())
GTEST_SKIP() << "cudaMallocAsync not supported";

using MR = cuda_malloc_async_memory_resource;
this->CrossStreamWithHogs<MR>();
}

#endif
Expand Down
Loading