From dd7064f8e5ea3adaaa41344948316a26ac645c49 Mon Sep 17 00:00:00 2001 From: Banit Agrawal Date: Tue, 22 Aug 2023 11:36:25 -0700 Subject: [PATCH] Reland Diff for fbgemm::all_to_one to reduce CUDAEvents costs (#1962) Summary: Pull Request resolved: https://github.com/pytorch/FBGEMM/pull/1962 Differential Revision: D48405576 fbshipit-source-id: 0f5fa56844dec0b172465dbd313b514996c53b50 --- .../src/merge_pooled_embeddings_gpu.cpp | 66 +++++++++++++------ 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/fbgemm_gpu/src/merge_pooled_embeddings_gpu.cpp b/fbgemm_gpu/src/merge_pooled_embeddings_gpu.cpp index 60dd21029d..fdd6ea6cc9 100644 --- a/fbgemm_gpu/src/merge_pooled_embeddings_gpu.cpp +++ b/fbgemm_gpu/src/merge_pooled_embeddings_gpu.cpp @@ -132,8 +132,23 @@ void all_to_one( at::Device target_device, bool skip_if_same_device) { auto num_gpus = at::cuda::getNumGPUs(); - std::vector copy_begin_events(num_gpus); - std::vector copy_completion_events(num_gpus); + // Create static thread local CUDAEvent as creating/destroying CUDAEvents + // can be expensive. In one call to this function, we use events created on + // the target device. Since target device can be different across each call, + // we store all the events in a 2-dimentionsal vector. + using PerDeviceEventList = std::vector; + static thread_local std::vector copy_begin_events; + static thread_local std::vector copy_completion_events; + static thread_local std::once_flag flag1; + std::call_once(flag1, [num_gpus]() { + for (auto i = 0; i < num_gpus; i++) { + copy_begin_events.push_back(PerDeviceEventList(num_gpus)); + copy_completion_events.push_back(PerDeviceEventList(num_gpus)); + } + }); + + auto target_device_index = target_device.index(); + TORCH_CHECK(target_device_index < num_gpus && target_device_index >= 0); std::vector two_hop_transfers; two_hop_transfers.reserve(input_tensors.size()); @@ -146,7 +161,7 @@ void all_to_one( const auto& src = input_tensors.at(i); Node src_device_id = src.get_device(); auto intermediate_node = - intermediate_nodes(src_device_id, target_device.index()); + intermediate_nodes(src_device_id, target_device_index); if (intermediate_node != -1) { two_hop_transfers.push_back( {.intermediate_tensor = at::empty( @@ -199,9 +214,9 @@ void all_to_one( // write-after-read dependencies on the destination side are handled, so // that no one is operating on the dst memory when we perform the copy. // src waits on dst barrier (src already waits on src) - auto& dst_ready = copy_begin_events[device_id]; + auto& dst_ready = copy_begin_events[target_device_index][device_id]; device_guard.set_device(target_device); - dst_ready.record(at::cuda::getCurrentCUDAStream(target_device.index())); + dst_ready.record(at::cuda::getCurrentCUDAStream(target_device_index)); device_guard.set_device(src_device); dst_ready.block(copy_stream); for (const auto i : c10::irange(input_tensors.size())) { @@ -248,9 +263,9 @@ void all_to_one( // wait on first hop transfer two_hop_transfer.transfer_cuda_event->block(copy_stream); // synchronize with target rank - auto& dst_ready = copy_begin_events[src_device_id]; + auto& dst_ready = copy_begin_events[target_device_index][src_device_id]; device_guard.set_device(target_device); - dst_ready.record(at::cuda::getCurrentCUDAStream(target_device.index())); + dst_ready.record(at::cuda::getCurrentCUDAStream(target_device_index)); device_guard.set_device(src_device); dst_ready.block(copy_stream); // originating tensor output position @@ -276,7 +291,7 @@ void all_to_one( auto& dst = output_tensors[i]; // single device memcpy, not that src_device == dst_device. at::cuda::CUDAStream copy_stream = - at::cuda::getCurrentCUDAStream(target_device.index()); + at::cuda::getCurrentCUDAStream(target_device_index); AT_CUDA_CHECK(cudaMemcpy2DAsync( dst.data_ptr(), dst.stride(0) * dst.element_size(), @@ -292,18 +307,18 @@ void all_to_one( // wait for cross-device copies to complete. for (const auto device_id : c10::irange(num_gpus)) { - if (device_id != target_device.index()) { + if (device_id != target_device_index) { auto src_device = at::Device(at::kCUDA, device_id); // Still on src_device, record stream event at::cuda::CUDAGuard device_guard(src_device); at::cuda::CUDAStream copy_stream = at::cuda::getCurrentCUDAStream(device_id); - auto& src_ready = copy_completion_events[device_id]; + auto& src_ready = copy_completion_events[target_device_index][device_id]; src_ready.record(copy_stream); device_guard.set_device(target_device); - src_ready.block(at::cuda::getCurrentCUDAStream(target_device.index())); + src_ready.block(at::cuda::getCurrentCUDAStream(target_device_index)); } } AT_CUDA_CHECK(cudaGetLastError()); @@ -313,7 +328,21 @@ Tensor sum_reduce_to_one( std::vector input_tensors, at::Device target_device) { auto num_gpus = at::cuda::getNumGPUs(); - std::vector copy_completion_events(num_gpus); + // Create static thread local CUDAEvent as creating/destroying CUDAEvents + // can be expensive. In one call to this function, we use events created on + // the target device. Since target device can be different across each call, + // we store all the events in a 2-dimentionsal vector. + using PerDeviceEventList = std::vector; + static thread_local std::vector copy_completion_events; + static thread_local std::once_flag flag1; + std::call_once(flag1, [num_gpus]() { + for (auto i = 0; i < num_gpus; i++) { + copy_completion_events.push_back(PerDeviceEventList(num_gpus)); + } + }); + + auto target_device_index = target_device.index(); + TORCH_CHECK(target_device_index < num_gpus && target_device_index >= 0); // Local reduction for tensors residing the same GPU. // And if there's a tensor already in target device, use it for output tensor. @@ -348,7 +377,7 @@ Tensor sum_reduce_to_one( continue; } auto intermediate_node = - intermediate_nodes(src.get_device(), target_device.index()); + intermediate_nodes(src.get_device(), target_device_index); if (intermediate_node == -1) { continue; } @@ -377,8 +406,7 @@ Tensor sum_reduce_to_one( // Wait for cross-device copies to complete, then reduce for (const auto device_id : c10::irange(num_gpus)) { - auto intermediate_node = - intermediate_nodes(device_id, target_device.index()); + auto intermediate_node = intermediate_nodes(device_id, target_device_index); if (intermediate_node == -1) { continue; } @@ -390,7 +418,7 @@ Tensor sum_reduce_to_one( at::cuda::CUDAStream copy_stream = at::cuda::getCurrentCUDAStream(device_id); - auto& src_ready = copy_completion_events[device_id]; + auto& src_ready = copy_completion_events[target_device_index][device_id]; src_ready.record(copy_stream); device_guard.set_device(intermediate_device); @@ -448,18 +476,18 @@ Tensor sum_reduce_to_one( // Wait for cross-device copies to complete, then reduce for (const auto device_id : c10::irange(num_gpus)) { - if (device_id != target_device.index()) { + if (device_id != target_device_index) { auto src_device = at::Device(at::kCUDA, device_id); // Still on src_device, record stream event at::cuda::CUDAGuard device_guard(src_device); at::cuda::CUDAStream copy_stream = at::cuda::getCurrentCUDAStream(device_id); - auto& src_ready = copy_completion_events[device_id]; + auto& src_ready = copy_completion_events[target_device_index][device_id]; src_ready.record(copy_stream); device_guard.set_device(target_device); - src_ready.block(at::cuda::getCurrentCUDAStream(target_device.index())); + src_ready.block(at::cuda::getCurrentCUDAStream(target_device_index)); for (const auto i : c10::irange(input_tensors.size())) { auto& src = input_tensors[i];