Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reland Diff for fbgemm::all_to_one to reduce CUDAEvents costs #1962

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 47 additions & 19 deletions fbgemm_gpu/src/merge_pooled_embeddings_gpu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,23 @@ void all_to_one(
at::Device target_device,
bool skip_if_same_device) {
auto num_gpus = at::cuda::getNumGPUs();
std::vector<at::cuda::CUDAEvent> copy_begin_events(num_gpus);
std::vector<at::cuda::CUDAEvent> copy_completion_events(num_gpus);
// Create static thread local CUDAEvent as creating/destroying CUDAEvents
// can be expensive. In one call to this function, we use events created on
// the target device. Since target device can be different across each call,
// we store all the events in a 2-dimentionsal vector.
using PerDeviceEventList = std::vector<at::cuda::CUDAEvent>;
static thread_local std::vector<PerDeviceEventList> copy_begin_events;
static thread_local std::vector<PerDeviceEventList> copy_completion_events;
static thread_local std::once_flag flag1;
std::call_once(flag1, [num_gpus]() {
for (auto i = 0; i < num_gpus; i++) {
copy_begin_events.push_back(PerDeviceEventList(num_gpus));
copy_completion_events.push_back(PerDeviceEventList(num_gpus));
}
});

auto target_device_index = target_device.index();
TORCH_CHECK(target_device_index < num_gpus && target_device_index >= 0);

std::vector<TwoHopTransferContainer> two_hop_transfers;
two_hop_transfers.reserve(input_tensors.size());
Expand All @@ -146,7 +161,7 @@ void all_to_one(
const auto& src = input_tensors.at(i);
Node src_device_id = src.get_device();
auto intermediate_node =
intermediate_nodes(src_device_id, target_device.index());
intermediate_nodes(src_device_id, target_device_index);
if (intermediate_node != -1) {
two_hop_transfers.push_back(
{.intermediate_tensor = at::empty(
Expand Down Expand Up @@ -199,9 +214,9 @@ void all_to_one(
// write-after-read dependencies on the destination side are handled, so
// that no one is operating on the dst memory when we perform the copy.
// src waits on dst barrier (src already waits on src)
auto& dst_ready = copy_begin_events[device_id];
auto& dst_ready = copy_begin_events[target_device_index][device_id];
device_guard.set_device(target_device);
dst_ready.record(at::cuda::getCurrentCUDAStream(target_device.index()));
dst_ready.record(at::cuda::getCurrentCUDAStream(target_device_index));
device_guard.set_device(src_device);
dst_ready.block(copy_stream);
for (const auto i : c10::irange(input_tensors.size())) {
Expand Down Expand Up @@ -248,9 +263,9 @@ void all_to_one(
// wait on first hop transfer
two_hop_transfer.transfer_cuda_event->block(copy_stream);
// synchronize with target rank
auto& dst_ready = copy_begin_events[src_device_id];
auto& dst_ready = copy_begin_events[target_device_index][src_device_id];
device_guard.set_device(target_device);
dst_ready.record(at::cuda::getCurrentCUDAStream(target_device.index()));
dst_ready.record(at::cuda::getCurrentCUDAStream(target_device_index));
device_guard.set_device(src_device);
dst_ready.block(copy_stream);
// originating tensor output position
Expand All @@ -276,7 +291,7 @@ void all_to_one(
auto& dst = output_tensors[i];
// single device memcpy, not that src_device == dst_device.
at::cuda::CUDAStream copy_stream =
at::cuda::getCurrentCUDAStream(target_device.index());
at::cuda::getCurrentCUDAStream(target_device_index);
AT_CUDA_CHECK(cudaMemcpy2DAsync(
dst.data_ptr(),
dst.stride(0) * dst.element_size(),
Expand All @@ -292,18 +307,18 @@ void all_to_one(

// wait for cross-device copies to complete.
for (const auto device_id : c10::irange(num_gpus)) {
if (device_id != target_device.index()) {
if (device_id != target_device_index) {
auto src_device = at::Device(at::kCUDA, device_id);
// Still on src_device, record stream event
at::cuda::CUDAGuard device_guard(src_device);
at::cuda::CUDAStream copy_stream =
at::cuda::getCurrentCUDAStream(device_id);

auto& src_ready = copy_completion_events[device_id];
auto& src_ready = copy_completion_events[target_device_index][device_id];
src_ready.record(copy_stream);

device_guard.set_device(target_device);
src_ready.block(at::cuda::getCurrentCUDAStream(target_device.index()));
src_ready.block(at::cuda::getCurrentCUDAStream(target_device_index));
}
}
AT_CUDA_CHECK(cudaGetLastError());
Expand All @@ -313,7 +328,21 @@ Tensor sum_reduce_to_one(
std::vector<Tensor> input_tensors,
at::Device target_device) {
auto num_gpus = at::cuda::getNumGPUs();
std::vector<at::cuda::CUDAEvent> copy_completion_events(num_gpus);
// Create static thread local CUDAEvent as creating/destroying CUDAEvents
// can be expensive. In one call to this function, we use events created on
// the target device. Since target device can be different across each call,
// we store all the events in a 2-dimentionsal vector.
using PerDeviceEventList = std::vector<at::cuda::CUDAEvent>;
static thread_local std::vector<PerDeviceEventList> copy_completion_events;
static thread_local std::once_flag flag1;
std::call_once(flag1, [num_gpus]() {
for (auto i = 0; i < num_gpus; i++) {
copy_completion_events.push_back(PerDeviceEventList(num_gpus));
}
});

auto target_device_index = target_device.index();
TORCH_CHECK(target_device_index < num_gpus && target_device_index >= 0);

// Local reduction for tensors residing the same GPU.
// And if there's a tensor already in target device, use it for output tensor.
Expand Down Expand Up @@ -348,7 +377,7 @@ Tensor sum_reduce_to_one(
continue;
}
auto intermediate_node =
intermediate_nodes(src.get_device(), target_device.index());
intermediate_nodes(src.get_device(), target_device_index);
if (intermediate_node == -1) {
continue;
}
Expand Down Expand Up @@ -377,8 +406,7 @@ Tensor sum_reduce_to_one(

// Wait for cross-device copies to complete, then reduce
for (const auto device_id : c10::irange(num_gpus)) {
auto intermediate_node =
intermediate_nodes(device_id, target_device.index());
auto intermediate_node = intermediate_nodes(device_id, target_device_index);
if (intermediate_node == -1) {
continue;
}
Expand All @@ -390,7 +418,7 @@ Tensor sum_reduce_to_one(
at::cuda::CUDAStream copy_stream =
at::cuda::getCurrentCUDAStream(device_id);

auto& src_ready = copy_completion_events[device_id];
auto& src_ready = copy_completion_events[target_device_index][device_id];
src_ready.record(copy_stream);

device_guard.set_device(intermediate_device);
Expand Down Expand Up @@ -448,18 +476,18 @@ Tensor sum_reduce_to_one(

// Wait for cross-device copies to complete, then reduce
for (const auto device_id : c10::irange(num_gpus)) {
if (device_id != target_device.index()) {
if (device_id != target_device_index) {
auto src_device = at::Device(at::kCUDA, device_id);
// Still on src_device, record stream event
at::cuda::CUDAGuard device_guard(src_device);
at::cuda::CUDAStream copy_stream =
at::cuda::getCurrentCUDAStream(device_id);

auto& src_ready = copy_completion_events[device_id];
auto& src_ready = copy_completion_events[target_device_index][device_id];
src_ready.record(copy_stream);

device_guard.set_device(target_device);
src_ready.block(at::cuda::getCurrentCUDAStream(target_device.index()));
src_ready.block(at::cuda::getCurrentCUDAStream(target_device_index));

for (const auto i : c10::irange(input_tensors.size())) {
auto& src = input_tensors[i];
Expand Down
Loading