From 11e1dcc3196e8ef2a9c4050dff63e88e9b61c356 Mon Sep 17 00:00:00 2001 From: "Ma, Guokai" Date: Thu, 21 Sep 2023 00:39:35 -0700 Subject: [PATCH 1/4] use parallel version of memcpy --- csrc/cpu/comm/ccl.cpp | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/csrc/cpu/comm/ccl.cpp b/csrc/cpu/comm/ccl.cpp index 7fbf5f61392e..2446b66c3d0f 100644 --- a/csrc/cpu/comm/ccl.cpp +++ b/csrc/cpu/comm/ccl.cpp @@ -499,6 +499,17 @@ void all_reduce_caching(torch::Tensor& data, .wait()); } +static void parallel_memcpy(void* to, void* from, size_t n_bytes) + __attribute__((target("avx512bw"))); +static void parallel_memcpy(void* to, void* from, size_t n_bytes) +{ + #pragma omp parallel for + for (int i=0;i Date: Thu, 21 Sep 2023 01:49:18 -0700 Subject: [PATCH 2/4] include max buf size to 16MB per rank --- csrc/cpu/comm/ccl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/csrc/cpu/comm/ccl.cpp b/csrc/cpu/comm/ccl.cpp index 2446b66c3d0f..b57f5cb91ea0 100644 --- a/csrc/cpu/comm/ccl.cpp +++ b/csrc/cpu/comm/ccl.cpp @@ -70,7 +70,7 @@ void shared_close(SharedData* data) // SHM based allreduce helper functions // buffer that holds shm name #define NAME_BUF_SIZE 1000 -#define MAX_BUF_SIZE 1048576 +#define MAX_BUF_SIZE 1048576*16 #define SHM_BUFFER_NAME "deepspeed_allreduce_buffer" SharedData allreduce_buffer; struct allreduce_workspace { From da784b9eca5a051ff2bd3ffe544ad88cd4f7cdd9 Mon Sep 17 00:00:00 2001 From: "Ma, Guokai" Date: Thu, 21 Sep 2023 05:10:26 -0700 Subject: [PATCH 3/4] support any input buffer size --- csrc/cpu/comm/ccl.cpp | 72 +++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/csrc/cpu/comm/ccl.cpp b/csrc/cpu/comm/ccl.cpp index b57f5cb91ea0..e2b2789ab778 100644 --- a/csrc/cpu/comm/ccl.cpp +++ b/csrc/cpu/comm/ccl.cpp @@ -70,7 +70,7 @@ void shared_close(SharedData* data) // SHM based allreduce helper functions // buffer that holds shm name #define NAME_BUF_SIZE 1000 -#define MAX_BUF_SIZE 1048576*16 +#define MAX_BUF_SIZE 1048576 #define SHM_BUFFER_NAME "deepspeed_allreduce_buffer" SharedData allreduce_buffer; struct allreduce_workspace { @@ -528,7 +528,7 @@ void inference_all_reduce(torch::Tensor& data, py::object op, py::object group, default: data_type_fallback = true; } - if (data_size > MAX_BUF_SIZE || data_type_fallback || + if (data_type_fallback || (data_size % VECTOR_LENGTH_IN_BYTES) != 0 || !all_ranks_local_p) { // fallback to oneccl allreduce CCLCHECK(ccl::allreduce(data.data_ptr(), @@ -541,42 +541,46 @@ void inference_all_reduce(torch::Tensor& data, py::object op, py::object group, return; } - auto data_ptr = data.data_ptr(); + for (int offset = 0; offset < data_size; offset+=MAX_BUF_SIZE) { + auto data_ptr = ((char*)(data.data_ptr())+offset); + size_t chunk_size = data_size - offset > MAX_BUF_SIZE ? MAX_BUF_SIZE : data_size - offset; + size_t chunk_el = chunk_size/(data_size/numel); - parallel_memcpy(workspace[world_rank].buffer, data_ptr, data_size); - std::atomic_thread_fence(std::memory_order_release); - workspace[world_rank].state = coll_allreduce_naive__copy_in_done; + parallel_memcpy(workspace[world_rank].buffer, data_ptr, chunk_size); + std::atomic_thread_fence(std::memory_order_release); + workspace[world_rank].state = coll_allreduce_naive__copy_in_done; - if (world_rank == 0) { - // compute allreduce result on rank 0 - for (int i = 1; i < world_size; i++) { - // wait until the other rank copy the buffer - wait_buffer_state_until(i, coll_allreduce_naive__copy_in_done); + if (world_rank == 0) { + // compute allreduce result on rank 0 + for (int i = 1; i < world_size; i++) { + // wait until the other rank copy the buffer + wait_buffer_state_until(i, coll_allreduce_naive__copy_in_done); + } + reduce_all_buffers(workspace, chunk_el, data.scalar_type(), world_size); + std::atomic_thread_fence(std::memory_order_release); + workspace[world_rank].state = coll_allreduce_naive__reduce_done; + parallel_memcpy(data_ptr, workspace[0].buffer, chunk_size); } - reduce_all_buffers(workspace, numel, data.scalar_type(), world_size); - std::atomic_thread_fence(std::memory_order_release); - workspace[world_rank].state = coll_allreduce_naive__reduce_done; - parallel_memcpy(data_ptr, workspace[0].buffer, data_size); - } - if (world_rank != 0) { - wait_buffer_state_until(0, coll_allreduce_naive__reduce_done); - parallel_memcpy(data_ptr, workspace[0].buffer, data_size); - std::atomic_thread_fence(std::memory_order_release); - workspace[world_rank].state = coll_allreduce_naive__copy_out_done; - } - if (world_rank == 0) { - for (int i = 1; i < world_size; i++) { - wait_buffer_state_until(i, coll_allreduce_naive__copy_out_done); + if (world_rank != 0) { + wait_buffer_state_until(0, coll_allreduce_naive__reduce_done); + parallel_memcpy(data_ptr, workspace[0].buffer, chunk_size); + std::atomic_thread_fence(std::memory_order_release); + workspace[world_rank].state = coll_allreduce_naive__copy_out_done; + } + if (world_rank == 0) { + for (int i = 1; i < world_size; i++) { + wait_buffer_state_until(i, coll_allreduce_naive__copy_out_done); + } + std::atomic_thread_fence(std::memory_order_release); + workspace[world_rank].state = coll_begin; + } + if (world_rank != 0) { + // if rank 0 spin too fast it could be in state 1 of next allreduce + // in this case wait_buffer_state_until(0, 0) may cause deadlock + // what we are certain is when rank 0 finishes the state won't be 2 + wait_buffer_state_until_not(0, coll_allreduce_naive__reduce_done); + workspace[world_rank].state = coll_begin; } - std::atomic_thread_fence(std::memory_order_release); - workspace[world_rank].state = coll_begin; - } - if (world_rank != 0) { - // if rank 0 spin too fast it could be in state 1 of next allreduce - // in this case wait_buffer_state_until(0, 0) may cause deadlock - // what we are certain is when rank 0 finishes the state won't be 2 - wait_buffer_state_until_not(0, coll_allreduce_naive__reduce_done); - workspace[world_rank].state = coll_begin; } } From e4ac57cc7fcae3c8362658cfb744db6aa4966094 Mon Sep 17 00:00:00 2001 From: "Ma, Guokai" Date: Wed, 27 Sep 2023 00:56:39 +0000 Subject: [PATCH 4/4] fix format error --- csrc/cpu/comm/ccl.cpp | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/csrc/cpu/comm/ccl.cpp b/csrc/cpu/comm/ccl.cpp index e2b2789ab778..accf431f6929 100644 --- a/csrc/cpu/comm/ccl.cpp +++ b/csrc/cpu/comm/ccl.cpp @@ -503,8 +503,8 @@ static void parallel_memcpy(void* to, void* from, size_t n_bytes) __attribute__((target("avx512bw"))); static void parallel_memcpy(void* to, void* from, size_t n_bytes) { - #pragma omp parallel for - for (int i=0;i MAX_BUF_SIZE ? MAX_BUF_SIZE : data_size - offset; - size_t chunk_el = chunk_size/(data_size/numel); + size_t chunk_el = chunk_size / (data_size / numel); parallel_memcpy(workspace[world_rank].buffer, data_ptr, chunk_size); std::atomic_thread_fence(std::memory_order_release);