Skip to content

Commit

Permalink
Create CUDA events for CUDAProduct only when needed (#292)
Browse files Browse the repository at this point in the history
Do not create event if the CUDA stream is idle, i.e. has already
finished all work that was queued, at the point when data products are
wrapped/emplaced for/to edm::Event.

When creating an event, create only a single event per producer, i.e.
all products of a producer share the same event.

Include a unit test checking the assumed behaviour of CUDA events and
streams.
  • Loading branch information
makortel authored and fwyzard committed Mar 21, 2019
1 parent 0fe0b5e commit 4b36e2a
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 48 deletions.
4 changes: 2 additions & 2 deletions CUDADataFormats/Common/interface/CUDAProduct.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class CUDAProduct: public CUDAProductBase {
friend class CUDAScopedContext;
friend class edm::Wrapper<CUDAProduct<T>>;

explicit CUDAProduct(int device, std::shared_ptr<cuda::stream_t<>> stream, T data):
CUDAProductBase(device, std::move(stream)),
explicit CUDAProduct(int device, std::shared_ptr<cuda::stream_t<>> stream, std::shared_ptr<cuda::event_t> event, T data):
CUDAProductBase(device, std::move(stream), std::move(event)),
data_(std::move(data))
{}

Expand Down
7 changes: 4 additions & 3 deletions CUDADataFormats/Common/interface/CUDAProductBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ class CUDAProductBase {
CUDAProductBase() = default; // Needed only for ROOT dictionary generation

bool isValid() const { return stream_.get() != nullptr; }
bool isAvailable() const;

int device() const { return device_; }

const cuda::stream_t<>& stream() const { return *stream_; }
cuda::stream_t<>& stream() { return *stream_; }
const std::shared_ptr<cuda::stream_t<>>& streamPtr() const { return stream_; }

const cuda::event_t& event() const { return *event_; }
cuda::event_t& event() { return *event_; }
const cuda::event_t *event() const { return event_.get(); }
cuda::event_t *event() { return event_.get(); }

protected:
explicit CUDAProductBase(int device, std::shared_ptr<cuda::stream_t<>> stream);
explicit CUDAProductBase(int device, std::shared_ptr<cuda::stream_t<>> stream, std::shared_ptr<cuda::event_t> event);

private:
// The cuda::stream_t is really shared among edm::Event products, so
Expand Down
20 changes: 10 additions & 10 deletions CUDADataFormats/Common/src/CUDAProductBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "HeterogeneousCore/CUDAServices/interface/CUDAService.h"

CUDAProductBase::CUDAProductBase(int device, std::shared_ptr<cuda::stream_t<>> stream):
CUDAProductBase::CUDAProductBase(int device, std::shared_ptr<cuda::stream_t<>> stream, std::shared_ptr<cuda::event_t> event):
stream_(std::move(stream)),
event_(std::move(event)),
device_(device)
{
edm::Service<CUDAService> cs;
event_ = cs->getCUDAEvent();
{}

// Record CUDA event to the CUDA stream. The event will become
// "occurred" after all work queued to the stream before this
// point has been finished.
event_->record(stream_->id());
bool CUDAProductBase::isAvailable() const {
// In absence of event, the product was available already at the end
// of produce() of the producer.
if(not event_) {
return true;
}
return event_->has_occurred();
}


14 changes: 10 additions & 4 deletions CUDADataFormats/Common/test/test_CUDAProduct.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ namespace cudatest {
class TestCUDAScopedContext {
public:
static
CUDAScopedContext make(int dev) {
CUDAScopedContext make(int dev, bool createEvent) {
auto device = cuda::device::get(dev);
return CUDAScopedContext(dev, std::make_unique<cuda::stream_t<>>(device.create_stream(cuda::stream::implicitly_synchronizes_with_default_stream)));
std::unique_ptr<cuda::event_t> event;
if(createEvent) {
event = std::make_unique<cuda::event_t>(device.create_event());
}
return CUDAScopedContext(dev,
std::make_unique<cuda::stream_t<>>(device.create_stream(cuda::stream::implicitly_synchronizes_with_default_stream)),
std::move(event));
}
};
}
Expand All @@ -30,15 +36,15 @@ TEST_CASE("Use of CUDAProduct template", "[CUDACore]") {

constexpr int defaultDevice = 0;
{
auto ctx = cudatest::TestCUDAScopedContext::make(defaultDevice);
auto ctx = cudatest::TestCUDAScopedContext::make(defaultDevice, true);
std::unique_ptr<CUDAProduct<int>> dataPtr = ctx.wrap(10);
auto& data = *dataPtr;

SECTION("Construct from CUDAScopedContext") {
REQUIRE(data.isValid());
REQUIRE(data.device() == defaultDevice);
REQUIRE(data.stream().id() == ctx.stream().id());
REQUIRE(&data.event() != nullptr);
REQUIRE(data.event() != nullptr);
}

SECTION("Move constructor") {
Expand Down
18 changes: 11 additions & 7 deletions HeterogeneousCore/CUDACore/interface/CUDAScopedContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class CUDAScopedContext {

template <typename T>
const T& get(const CUDAProduct<T>& data) {
synchronizeStreams(data.device(), data.stream(), data.event());
synchronizeStreams(data.device(), data.stream(), data.isAvailable(), data.event());
return data.data_;
}

Expand All @@ -78,32 +78,36 @@ class CUDAScopedContext {
}

template <typename T>
std::unique_ptr<CUDAProduct<T> > wrap(T data) const {
std::unique_ptr<CUDAProduct<T> > wrap(T data) {
createEventIfStreamBusy();
// make_unique doesn't work because of private constructor
//
// CUDAProduct<T> constructor records CUDA event to the CUDA
// stream. The event will become "occurred" after all work queued
// to the stream before this point has been finished.
return std::unique_ptr<CUDAProduct<T> >(new CUDAProduct<T>(device(), streamPtr(), std::move(data)));
return std::unique_ptr<CUDAProduct<T> >(new CUDAProduct<T>(device(), streamPtr(), event_, std::move(data)));
}

template <typename T, typename... Args>
auto emplace(edm::Event& iEvent, edm::EDPutTokenT<T> token, Args&&... args) const {
return iEvent.emplace(token, device(), streamPtr(), std::forward<Args>(args)...);
auto emplace(edm::Event& iEvent, edm::EDPutTokenT<T> token, Args&&... args) {
createEventIfStreamBusy();
return iEvent.emplace(token, device(), streamPtr(), event_, std::forward<Args>(args)...);
}

private:
friend class cudatest::TestCUDAScopedContext;

// This construcor is only meant for testing
explicit CUDAScopedContext(int device, std::unique_ptr<cuda::stream_t<>> stream);
explicit CUDAScopedContext(int device, std::unique_ptr<cuda::stream_t<>> stream, std::unique_ptr<cuda::event_t> event);

void synchronizeStreams(int dataDevice, const cuda::stream_t<>& dataStream, const cuda::event_t& dataEvent);
void createEventIfStreamBusy();
void synchronizeStreams(int dataDevice, const cuda::stream_t<>& dataStream, bool available, const cuda::event_t *dataEvent);

int currentDevice_;
std::optional<edm::WaitingTaskWithArenaHolder> waitingTaskHolder_;
cuda::device::current::scoped_override_t<> setDeviceForThisScope_;
std::shared_ptr<cuda::stream_t<>> stream_;
std::shared_ptr<cuda::event_t> event_;
};

#endif
22 changes: 17 additions & 5 deletions HeterogeneousCore/CUDACore/src/CUDAScopedContext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ CUDAScopedContext::CUDAScopedContext(edm::StreamID streamID):
stream_ = cs->getCUDAStream();
}

CUDAScopedContext::CUDAScopedContext(int device, std::unique_ptr<cuda::stream_t<>> stream):
CUDAScopedContext::CUDAScopedContext(int device, std::unique_ptr<cuda::stream_t<>> stream, std::unique_ptr<cuda::event_t> event):
currentDevice_(device),
setDeviceForThisScope_(device),
stream_(std::move(stream))
stream_(std::move(stream)),
event_(std::move(event))
{}

CUDAScopedContext::~CUDAScopedContext() {
if(event_) {
event_->record(stream_->id());
}
if(waitingTaskHolder_.has_value()) {
stream_->enqueue.callback([device=currentDevice_,
waitingTaskHolder=*waitingTaskHolder_]
Expand All @@ -45,7 +49,15 @@ CUDAScopedContext::~CUDAScopedContext() {
}
}

void CUDAScopedContext::synchronizeStreams(int dataDevice, const cuda::stream_t<>& dataStream, const cuda::event_t& dataEvent) {
void CUDAScopedContext::createEventIfStreamBusy() {
if(event_ or stream_->is_clear()) {
return;
}
edm::Service<CUDAService> cs;
event_ = cs->getCUDAEvent();
}

void CUDAScopedContext::synchronizeStreams(int dataDevice, const cuda::stream_t<>& dataStream, bool available, const cuda::event_t *dataEvent) {
if(dataDevice != currentDevice_) {
// Eventually replace with prefetch to current device (assuming unified memory works)
// If we won't go to unified memory, need to figure out something else...
Expand All @@ -54,13 +66,13 @@ void CUDAScopedContext::synchronizeStreams(int dataDevice, const cuda::stream_t<

if(dataStream.id() != stream_->id()) {
// Different streams, need to synchronize
if(!dataEvent.has_occurred()) {
if(not available and not dataEvent->has_occurred()) {
// Event not yet occurred, so need to add synchronization
// here. Sychronization is done by making the CUDA stream to
// wait for an event, so all subsequent work in the stream
// will run only after the event has "occurred" (i.e. data
// product became available).
auto ret = cudaStreamWaitEvent(stream_->id(), dataEvent.id(), 0);
auto ret = cudaStreamWaitEvent(stream_->id(), dataEvent->id(), 0);
cuda::throw_if_error(ret, "Failed to make a stream to wait for an event");
}
}
Expand Down
6 changes: 5 additions & 1 deletion HeterogeneousCore/CUDACore/test/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
<bin file="test*.cc test*.cu" name="testHeterogeneousCoreCUDACore">
<bin file="test_*.cc test_*.cu" name="testHeterogeneousCoreCUDACore">
<use name="FWCore/TestProcessor"/>
<use name="HeterogeneousCore/CUDACore"/>
<use name="catch2"/>
<use name="cuda"/>
</bin>
<bin file="testStreamEvent.cu" name="testHeterogeneousCoreCUDACoreStreamEvent">
<use name="HeterogeneousCore/CUDAUtilities"/>
<use name="cuda"/>
</bin>
117 changes: 117 additions & 0 deletions HeterogeneousCore/CUDACore/test/testStreamEvent.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* The purpose of this test program is to ensure that the logic for
* CUDA event use in CUDAProduct and CUDAScopedContext
*/

#include <iostream>
#include <memory>
#include <type_traits>
#include <chrono>
#include <thread>
#include <cassert>

#include <cuda_runtime.h>

#include "HeterogeneousCore/CUDAUtilities/interface/exitSansCUDADevices.h"

namespace {
constexpr int ARRAY_SIZE = 20000000;
constexpr int NLOOPS = 10;
}

__global__ void kernel_looping(float* point, unsigned int num) {
unsigned int idx = threadIdx.x + blockIdx.x * blockDim.x;

for(int iloop=0; iloop<NLOOPS; ++iloop) {
for (size_t offset = idx; offset < num; offset += gridDim.x * blockDim.x) {
point[offset] += 1;
}
}
}

int main() {
exitSansCUDADevices();

constexpr bool debug = false;

float *dev_points1;
float *host_points1;
cudaStream_t stream1, stream2;
cudaEvent_t event1, event2;

cudaMalloc(&dev_points1, ARRAY_SIZE * sizeof(float));
cudaMallocHost(&host_points1, ARRAY_SIZE * sizeof(float));
cudaStreamCreateWithFlags(&stream1, cudaStreamNonBlocking);
cudaStreamCreateWithFlags(&stream2, cudaStreamNonBlocking);
cudaEventCreate(&event1);
cudaEventCreate(&event2);

for (size_t j = 0; j < ARRAY_SIZE; ++j) {
host_points1[j] = static_cast<float>(j);
}

cudaMemcpyAsync(dev_points1, host_points1,
ARRAY_SIZE * sizeof(float),
cudaMemcpyHostToDevice, stream1);
kernel_looping<<<1, 16, 0, stream1>>>(dev_points1, ARRAY_SIZE);
if(debug) std::cout << "Kernel launched on stream1" << std::endl;

auto status = cudaStreamQuery(stream1);
if(debug) std::cout << "Stream1 busy? " << (status == cudaErrorNotReady) << " idle? " << (status == cudaSuccess) << std::endl;
cudaEventRecord(event1, stream1);
status = cudaEventQuery(event1);
if (debug) std::cout << "Event1 recorded? " << (status == cudaErrorNotReady) << " occurred? " << (status == cudaSuccess) << std::endl;
assert(status == cudaErrorNotReady);

status = cudaStreamQuery(stream2);
if(debug) std::cout << "Stream2 busy? " << (status == cudaErrorNotReady) << " idle? " << (status == cudaSuccess) << std::endl;
assert(status == cudaSuccess);
if(debug) {
cudaEventRecord(event2, stream2);
status = cudaEventQuery(event2);
std::cout << "Event2 recorded? " << (status == cudaErrorNotReady) << " occurred? " << (status == cudaSuccess) << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
status = cudaEventQuery(event2);
std::cout << "Event2 recorded? " << (status == cudaErrorNotReady) << " occurred? " << (status == cudaSuccess) << std::endl;
}

cudaStreamWaitEvent(stream2, event1, 0);
if(debug) std::cout << "\nStream2 waiting for event1" << std::endl;
status = cudaStreamQuery(stream2);
if(debug) std::cout << "Stream2 busy? " << (status == cudaErrorNotReady) << " idle? " << (status == cudaSuccess) << std::endl;
assert(status == cudaErrorNotReady);
cudaEventRecord(event2, stream2);
status = cudaEventQuery(event2);
if(debug) std::cout << "Event2 recorded? " << (status == cudaErrorNotReady) << " occurred? " << (status == cudaSuccess) << std::endl;
assert(status == cudaErrorNotReady);
if(debug) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
status = cudaEventQuery(event2);
std::cout << "Event2 recorded? " << (status == cudaErrorNotReady) << " occurred? " << (status == cudaSuccess) << std::endl;
}

status = cudaStreamQuery(stream1);
if(debug) {
std::cout << "\nStream1 busy? " << (status == cudaErrorNotReady) << " idle? " << (status == cudaSuccess) << std::endl;
std::cout << "Synchronizing stream1" << std::endl;
}
assert(status == cudaErrorNotReady);
cudaStreamSynchronize(stream1);
if(debug) std::cout << "Synchronized stream1" << std::endl;

status = cudaEventQuery(event1);
if(debug) std::cout << "Event1 recorded? " << (status == cudaErrorNotReady) << " occurred? " << (status == cudaSuccess) << std::endl;
assert(status == cudaSuccess);
status = cudaEventQuery(event2);
if(debug) std::cout << "Event2 recorded? " << (status == cudaErrorNotReady) << " occurred? " << (status == cudaSuccess) << std::endl;
assert(status == cudaSuccess);

cudaFree(dev_points1);
cudaFreeHost(host_points1);
cudaStreamDestroy(stream1);
cudaStreamDestroy(stream2);
cudaEventDestroy(event1);
cudaEventDestroy(event2);

return 0;
}
27 changes: 17 additions & 10 deletions HeterogeneousCore/CUDACore/test/test_CUDAScopedContext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ namespace cudatest {
class TestCUDAScopedContext {
public:
static
CUDAScopedContext make(int dev) {
CUDAScopedContext make(int dev, bool createEvent) {
auto device = cuda::device::get(dev);
return CUDAScopedContext(dev, std::make_unique<cuda::stream_t<>>(device.create_stream(cuda::stream::implicitly_synchronizes_with_default_stream)));
std::unique_ptr<cuda::event_t> event;
if(createEvent) {
event = std::make_unique<cuda::event_t>(device.create_event());
}
return CUDAScopedContext(dev,
std::make_unique<cuda::stream_t<>>(device.create_stream(cuda::stream::implicitly_synchronizes_with_default_stream)),
std::move(event));
}
};
}

namespace {
std::unique_ptr<CUDAProduct<int *> > produce(int device, int *d, int *h) {
auto ctx = cudatest::TestCUDAScopedContext::make(device);
auto ctx = cudatest::TestCUDAScopedContext::make(device, true);

cuda::memory::async::copy(d, h, sizeof(int), ctx.stream().id());
testCUDAScopedContextKernels_single(d, ctx.stream());
Expand All @@ -33,7 +39,7 @@ TEST_CASE("Use of CUDAScopedContext", "[CUDACore]") {

constexpr int defaultDevice = 0;
{
auto ctx = cudatest::TestCUDAScopedContext::make(defaultDevice);
auto ctx = cudatest::TestCUDAScopedContext::make(defaultDevice, true);

SECTION("Construct from device ID") {
REQUIRE(cuda::device::current::get().id() == defaultDevice);
Expand Down Expand Up @@ -75,7 +81,7 @@ TEST_CASE("Use of CUDAScopedContext", "[CUDACore]") {
cuda::device::current::scoped_override_t<> setDeviceForThisScope(defaultDevice);
auto current_device = cuda::device::current::get();

// Mimick a producer on the second CUDA stream
// Mimick a producer on the first CUDA stream
int h_a1 = 1;
auto d_a1 = cuda::memory::device::make_unique<int>(current_device);
auto wprod1 = produce(defaultDevice, d_a1.get(), &h_a1);
Expand All @@ -90,13 +96,14 @@ TEST_CASE("Use of CUDAScopedContext", "[CUDACore]") {
// Mimick a third producer "joining" the two streams
CUDAScopedContext ctx2{*wprod1};

auto prod1 = ctx.get(*wprod1);
auto prod2 = ctx.get(*wprod2);
auto prod1 = ctx2.get(*wprod1);
auto prod2 = ctx2.get(*wprod2);

auto d_a3 = cuda::memory::device::make_unique<int>(current_device);
testCUDAScopedContextKernels_join(prod1, prod2, d_a3.get(), ctx.stream());
ctx.stream().synchronize();
REQUIRE(wprod2->event().has_occurred());
testCUDAScopedContextKernels_join(prod1, prod2, d_a3.get(), ctx2.stream());
ctx2.stream().synchronize();
REQUIRE(wprod2->isAvailable());
REQUIRE(wprod2->event()->has_occurred());

h_a1 = 0;
h_a2 = 0;
Expand Down
Loading

0 comments on commit 4b36e2a

Please sign in to comment.