Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Add per-thread-default stream support to pool_memory_resource using thread-local CUDA events #425

Merged
merged 33 commits into from
Jul 10, 2020
Merged
Changes from 4 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e2be0ba
Use per-thread events rather than per-block events
harrism Jun 22, 2020
5514703
Disable event timing; cleanup
harrism Jun 23, 2020
93bebd7
Add thread-local unique_event inner class, streamline PTDS and non-PT…
harrism Jun 24, 2020
2a18521
TODO comment
harrism Jun 24, 2020
78467d5
Simplify event wrapper class and remove ids
harrism Jun 24, 2020
4110fa0
Changelog for #425
harrism Jun 24, 2020
29d65f9
get_event looks for 0 or cudaStreamPerThread
harrism Jun 25, 2020
e53a59e
Synchronize on event destruction. Clean up docs.
harrism Jun 25, 2020
3d178ff
remove_event->destroy_event
harrism Jun 25, 2020
889bf55
Be consistent with the default location for blocks
harrism Jun 25, 2020
0d314b9
Refactor mr_tests.cpp to enable creating multithreaded tests that sha…
harrism Jun 26, 2020
b1620d9
Changes missed in previous commit
harrism Jun 26, 2020
07af3c8
Add multithreaded tests (some still failing)
harrism Jun 26, 2020
714e924
Fix failing multithreaded tests by adding specializations for test co…
harrism Jun 29, 2020
c14d1af
Make pool_memory_resource thread safe and fix PTDS destroy_event bug
harrism Jun 30, 2020
2ffbcaa
Add DEVICE_MR_PTDS_TEST
harrism Jun 30, 2020
ccea946
include mutex always
harrism Jun 30, 2020
af3b9c7
Merge branch 'branch-0.15' into fea-ptds-events
harrism Jun 30, 2020
1d039ff
Fix use-after-free race with cuda_events.
harrism Jul 1, 2020
22c77cb
Fix memory leak in test
harrism Jul 1, 2020
a6390a4
Add tests that alloc / free on different threads.
harrism Jul 1, 2020
6fd4544
better documentation of cuda_event
harrism Jul 1, 2020
1d24547
Document that this is now thread-safe and PTDS-compatible
harrism Jul 1, 2020
0d7cb97
Improve changelog
harrism Jul 1, 2020
fc133ae
Merge branch 'branch-0.15' into fea-ptds-events
harrism Jul 1, 2020
4af5652
Fix gcc7 compilation failure
harrism Jul 2, 2020
6e9bac0
Address review suggestions.
harrism Jul 2, 2020
26c2909
Only add event to ptds_events_ once!
harrism Jul 2, 2020
01385bc
Merge branch 'multi-thread-replay' into fea-ptds-events
harrism Jul 2, 2020
3df09f9
Fix streams passed to mutithreaded test
harrism Jul 3, 2020
13486e7
Update copyright
harrism Jul 10, 2020
0a57993
Combine streams and events in a struct.
harrism Jul 10, 2020
94cae03
Merge branch 'branch-0.15' into fea-ptds-events
harrism Jul 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 128 additions & 38 deletions include/rmm/mr/device/pool_memory_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
#pragma once

#include <cstdint>
harrism marked this conversation as resolved.
Show resolved Hide resolved
#include <rmm/detail/error.hpp>
#include <rmm/mr/device/detail/free_list.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>

#include <cuda_runtime_api.h>

#include <algorithm>
#include <atomic>
#include <cassert>
#include <exception>
#include <iostream>
Expand All @@ -30,6 +32,8 @@
#include <mutex>
harrism marked this conversation as resolved.
Show resolved Hide resolved
#include <numeric>
#include <set>
#include <thread>
harrism marked this conversation as resolved.
Show resolved Hide resolved
#include <unordered_map>
#include <vector>

namespace rmm {
Expand Down Expand Up @@ -76,7 +80,7 @@ class pool_memory_resource final : public device_memory_resource {
if (maximum_pool_size == default_maximum_size) maximum_pool_size_ = props.totalGlobalMem;

// Allocate initial block
stream_free_blocks_[0].insert(block_from_upstream(initial_pool_size, 0));
stream_free_blocks_[get_event(0)].insert(block_from_upstream(initial_pool_size, 0));
}

/**
Expand Down Expand Up @@ -108,6 +112,7 @@ class pool_memory_resource final : public device_memory_resource {
Upstream* get_upstream() const noexcept { return upstream_mr_; }

private:
using id_type = uint32_t;
using block = rmm::mr::detail::block;
using free_list = rmm::mr::detail::free_list<>;

Expand All @@ -124,59 +129,60 @@ class pool_memory_resource final : public device_memory_resource {
* available in `blocks`.
*/
block block_from_stream(free_list& blocks,
cudaStream_t blocks_stream,
cudaEvent_t blocks_event,
size_t size,
cudaStream_t stream)
cudaStream_t stream,
cudaEvent_t stream_event)
{
block const b = blocks.best_fit(size); // get the best fit block

// If we found a block associated with a different stream,
// we have to synchronize the stream in order to use it
if ((blocks_stream != stream) && b.is_valid()) {
cudaError_t result = cudaStreamSynchronize(blocks_stream);

RMM_EXPECTS((result == cudaSuccess || // stream synced
result == cudaErrorInvalidResourceHandle), // stream deleted
rmm::bad_alloc,
"cudaStreamSynchronize failure");

// Now that this stream is synced, insert all other blocks into this stream's list
// Note: This could cause thrashing between two streams. On the other hand, it reduces
// fragmentation by coalescing.
stream_free_blocks_[stream].insert(blocks.begin(), blocks.end());

// remove this stream from the freelist
stream_free_blocks_.erase(blocks_stream);
if (b.is_valid()) {
// If we found a block associated with a different stream, we have to insert a wait on the
// stream's associated event into the allocating stream.
if (stream_event != blocks_event) {
stream_free_blocks_[stream_event].insert(blocks.begin(), blocks.end());
stream_free_blocks_.erase(blocks_event);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like there is a potential optimization here. We're merging two sorted std::lists and coalescing adjacent blocks. If we ignored the fact that we're coalescing blocks, you could just use std::list::merge .

The current implementation is O(m*n) as it requires doing a linear search of the destination list for every element in the source list. It seems like we should be able to make that be O(m + n).

At the very least, I think we should add a free_list::merge(free_list&& other) function and use that here. Then we can explore doing something more optimal that exploits the fact that both lists are already sorted by pointer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option would be to just use std::list::merge and ignore coalescing, and then do a second pass through the merged list and coalesce any adjacent blocks. But that operation likely can be fused with a custom merge algorithm.

harrism marked this conversation as resolved.
Show resolved Hide resolved

// TODO: could eliminate this ifdef and have the same behavior for PTDS and non-PTDS
// But the cudaEventRecord() on every free_block reduces performance significantly
#ifdef CUDA_API_PER_THREAD_DEFAULT_STREAM
RMM_CUDA_TRY(cudaStreamWaitEvent(stream, blocks_event, 0));
remove_event(blocks_event); // only removes non-default-stream-events
#else
RMM_CUDA_TRY(cudaStreamSynchronize(event_streams_[blocks_event]));
#endif
}
}

return b;
}

/**
* @brief Find an available block in the pool of at least `size` bytes, for use on `stream`.
*
* Attempts to find a free block that was last used on `stream` to avoid synchronization. If none
* is available, it finds a block last used on another stream. In this case, the stream associated
* with the found block is synchronized to ensure all asynchronous work on the memory is finished
* before it is used on `stream`.
* Attempts to find a free block that was last used on `stream` to avoid synchronization. If
* none is available, it finds a block last used on another stream. In this case, the stream
* associated with the found block is synchronized to ensure all asynchronous work on the memory
* is finished before it is used on `stream`.
*
* @param size The size of the requested allocation, in bytes.
* @param stream The stream on which the allocation will be used.
* @return block A block with non-null pointer and size >= `size`.
*/
block available_larger_block(size_t size, cudaStream_t stream)
block available_larger_block(size_t size, cudaStream_t stream, cudaEvent_t event)
{
// Try to find a larger block in free list for the same stream
auto iter = stream_free_blocks_.find(stream);
auto iter = stream_free_blocks_.find(event);
if (iter != stream_free_blocks_.end()) {
block b = block_from_stream(iter->second, stream, size, stream);
block b = block_from_stream(iter->second, event, size, stream, event);
if (b.is_valid()) return b;
}

// nothing in this stream's free list, look for one on another stream
auto s = stream_free_blocks_.begin();
while (s != stream_free_blocks_.end()) {
if (s->first != stream) {
block b = block_from_stream(s->second, s->first, size, stream);
if (s->first != event) {
block b = block_from_stream(s->second, s->first, size, stream, event);
if (b.is_valid()) return b;
}
++s;
Expand All @@ -198,13 +204,13 @@ class pool_memory_resource final : public device_memory_resource {
* @param stream The stream on which the allocation will be used.
* @return void* The pointer to the allocated memory.
*/
void* allocate_from_block(block const& b, size_t size, cudaStream_t stream)
void* allocate_from_block(block const& b, size_t size, cudaEvent_t event)
{
block const alloc{b.pointer(), size, b.is_head()};

if (b.size() > size) {
block rest{b.pointer() + size, b.size() - size, false};
stream_free_blocks_[stream].insert(rest);
stream_free_blocks_[event].insert(rest);
}

allocated_blocks_.insert(alloc);
Expand All @@ -224,9 +230,19 @@ class pool_memory_resource final : public device_memory_resource {

auto const i = allocated_blocks_.find(static_cast<char*>(p));
assert(i != allocated_blocks_.end());
assert(i->size == rmm::detail::align_up(size, allocation_alignment));
assert(i->size() == rmm::detail::align_up(size, allocation_alignment));

cudaEvent_t event = get_event(stream);

stream_free_blocks_[stream].insert(*i);
// TODO: cudaEventRecord has significant overhead on deallocations, however it could mean less
// synchronization So we need to test in real non-PTDS applications that have multiple streams
// whether or not the overhead is worth it
#ifdef CUDA_API_PER_THREAD_DEFAULT_STREAM
auto result = cudaEventRecord(event, stream);
assert(cudaSuccess == result);
#endif

stream_free_blocks_[event].insert(*i);
allocated_blocks_.erase(i);
}

Expand Down Expand Up @@ -287,6 +303,7 @@ class pool_memory_resource final : public device_memory_resource {
for (auto b : upstream_blocks_)
upstream_mr_->deallocate(b.pointer(), b.size());
upstream_blocks_.clear();
// TODO empty free lists and allocated blocks
current_pool_size_ = 0;
}

Expand All @@ -306,7 +323,7 @@ class pool_memory_resource final : public device_memory_resource {

for (auto h : upstream_blocks_) {
h.print();
upstream_total += h.size;
upstream_total += h.size();
}
std::cout << "total upstream: " << upstream_total << " B\n";

Expand Down Expand Up @@ -338,9 +355,10 @@ class pool_memory_resource final : public device_memory_resource {
void* do_allocate(std::size_t bytes, cudaStream_t stream) override
{
if (bytes <= 0) return nullptr;
bytes = rmm::detail::align_up(bytes, allocation_alignment);
block const b = available_larger_block(bytes, stream);
return allocate_from_block(b, bytes, stream);
bytes = rmm::detail::align_up(bytes, allocation_alignment);
cudaEvent_t event = get_event(stream);
block const b = available_larger_block(bytes, stream, event);
return allocate_from_block(b, bytes, event);
}

/**
Expand Down Expand Up @@ -378,12 +396,84 @@ class pool_memory_resource final : public device_memory_resource {

// map of [stream_id, free_list] pairs
// stream stream_id must be synced before allocating from this list to a different stream
std::map<cudaStream_t, free_list> stream_free_blocks_;
// std::map<cudaStream_t, free_list> stream_free_blocks_;
std::map<cudaEvent_t, free_list> stream_free_blocks_;

std::set<block, rmm::mr::detail::compare_blocks<block>> allocated_blocks_;

// blocks allocated from upstream: so they can be easily freed
std::vector<block> upstream_blocks_;

#ifdef CUDA_API_PER_THREAD_DEFAULT_STREAM
struct unique_event {
unique_event(std::set<id_type>& ids) : ids_(ids), id(next_id())
{
ids_.insert(id);
auto result = cudaEventCreateWithFlags(&event, cudaEventDisableTiming);
assert(cudaSuccess == result);
}
~unique_event()
{
ids_.erase(id);
auto result = cudaEventDestroy(event);
assert(cudaSuccess == result);
}
id_type get_id() const noexcept { return id; }
cudaEvent_t get_event() const noexcept { return event; }

private:
static id_type next_id()
{
static std::atomic<id_type> s_id{};
return ++s_id;
}

id_type id;
cudaEvent_t event;
std::set<id_type>& ids_; // reference to external set
};
#endif

cudaEvent_t get_event(cudaStream_t stream)
harrism marked this conversation as resolved.
Show resolved Hide resolved
{
#ifdef CUDA_API_PER_THREAD_DEFAULT_STREAM
if (stream == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to do something other than check against 0 for the default stream. Someone could pass cudaStreamPerThread explicitly, which is different from 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thought about that. Need to check for either 0 or cudaStreamPerThread, good point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

thread_local unique_event e{ids_};
return e.get_event();
} else
#endif
{
auto iter = stream_events_.find(stream);
if (iter == stream_events_.end()) {
// create event
cudaEvent_t event{};
auto result = cudaEventCreateWithFlags(&event, cudaEventDisableTiming);
assert(cudaSuccess == result);
stream_events_[stream] = event;
event_streams_[event] = stream;
return event;
} else {
return iter->second;
}
}
}

void remove_event(cudaEvent_t event)
{
auto iter = event_streams_.find(event);
if (iter != event_streams_.end()) { // this is a non-default stream event
cudaStream_t stream = iter->second;
RMM_CUDA_TRY(cudaEventDestroy(event));
event_streams_.erase(iter);
stream_events_.erase(stream);
}
}

#ifdef CUDA_API_PER_THREAD_DEFAULT_STREAM
std::set<id_type> ids_;
#endif
std::unordered_map<cudaStream_t, cudaEvent_t> stream_events_;
std::unordered_map<cudaEvent_t, cudaStream_t> event_streams_;
};

} // namespace mr
Expand Down