From 7170ef3c44916ab91e05049889e034ddf6594fc0 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 21 Mar 2022 16:49:39 -0500 Subject: [PATCH 1/8] mpl: add MPL_gpu_launch_hostfn We only implemented the cuda version for now, which simply calls cudaLaunchHostFunc. --- src/mpl/include/mpl_gpu.h | 3 +++ src/mpl/include/mpl_gpu_cuda.h | 2 ++ src/mpl/include/mpl_gpu_fallback.h | 2 ++ src/mpl/include/mpl_gpu_hip.h | 2 ++ src/mpl/include/mpl_gpu_ze.h | 2 ++ src/mpl/src/gpu/mpl_gpu_cuda.c | 7 +++++++ src/mpl/src/gpu/mpl_gpu_fallback.c | 5 +++++ src/mpl/src/gpu/mpl_gpu_hip.c | 6 ++++++ src/mpl/src/gpu/mpl_gpu_ze.c | 5 +++++ 9 files changed, 34 insertions(+) diff --git a/src/mpl/include/mpl_gpu.h b/src/mpl/include/mpl_gpu.h index ea94f2900b2..f207be0bf05 100644 --- a/src/mpl/include/mpl_gpu.h +++ b/src/mpl/include/mpl_gpu.h @@ -81,4 +81,7 @@ int MPL_gpu_get_buffer_bounds(const void *ptr, void **pbase, uintptr_t * len); int MPL_gpu_free_hook_register(void (*free_hook) (void *dptr)); int MPL_gpu_get_dev_count(int *dev_cnt, int *dev_id); +typedef void (*MPL_gpu_hostfn) (void *data); +int MPL_gpu_launch_hostfn(MPL_gpu_stream_t stream, MPL_gpu_hostfn fn, void *data); + #endif /* ifndef MPL_GPU_H_INCLUDED */ diff --git a/src/mpl/include/mpl_gpu_cuda.h b/src/mpl/include/mpl_gpu_cuda.h index ddf8bc3935c..e79c6cf2552 100644 --- a/src/mpl/include/mpl_gpu_cuda.h +++ b/src/mpl/include/mpl_gpu_cuda.h @@ -13,6 +13,8 @@ typedef cudaIpcMemHandle_t MPL_gpu_ipc_mem_handle_t; typedef int MPL_gpu_device_handle_t; typedef struct cudaPointerAttributes MPL_gpu_device_attr; typedef cudaStream_t MPL_gpu_stream_t; + +#define MPL_GPU_STREAM_DEFAULT 0 #define MPL_GPU_DEVICE_INVALID -1 #endif /* ifndef MPL_GPU_CUDA_H_INCLUDED */ diff --git a/src/mpl/include/mpl_gpu_fallback.h b/src/mpl/include/mpl_gpu_fallback.h index 59d3172c68f..f937c0f619b 100644 --- a/src/mpl/include/mpl_gpu_fallback.h +++ b/src/mpl/include/mpl_gpu_fallback.h @@ -10,6 +10,8 @@ typedef int MPL_gpu_ipc_mem_handle_t; typedef int MPL_gpu_device_handle_t; typedef int MPL_gpu_device_attr; /* dummy type */ typedef int MPL_gpu_stream_t; + +#define MPL_GPU_STREAM_DEFAULT 0 #define MPL_GPU_DEVICE_INVALID -1 #endif /* ifndef MPL_GPU_CUDA_H_INCLUDED */ diff --git a/src/mpl/include/mpl_gpu_hip.h b/src/mpl/include/mpl_gpu_hip.h index 49ea83d422b..f71d38570d6 100644 --- a/src/mpl/include/mpl_gpu_hip.h +++ b/src/mpl/include/mpl_gpu_hip.h @@ -13,6 +13,8 @@ typedef hipIpcMemHandle_t MPL_gpu_ipc_mem_handle_t; typedef int MPL_gpu_device_handle_t; typedef struct hipPointerAttribute_t MPL_gpu_device_attr; typedef hipStream_t MPL_gpu_stream_t; + +#define MPL_GPU_STREAM_DEFAULT 0 #define MPL_GPU_DEVICE_INVALID -1 #endif /* ifndef MPL_GPU_HIP_H_INCLUDED */ diff --git a/src/mpl/include/mpl_gpu_ze.h b/src/mpl/include/mpl_gpu_ze.h index eb615e4d3ef..5736a5ac7eb 100644 --- a/src/mpl/include/mpl_gpu_ze.h +++ b/src/mpl/include/mpl_gpu_ze.h @@ -18,6 +18,8 @@ typedef ze_device_handle_t MPL_gpu_device_handle_t; typedef ze_alloc_attr_t MPL_gpu_device_attr; /* FIXME: implement ze stream */ typedef int MPL_gpu_stream_t; + +#define MPL_GPU_STREAM_DEFAULT 0 #define MPL_GPU_DEVICE_INVALID NULL #endif /* ifndef MPL_GPU_ZE_H_INCLUDED */ diff --git a/src/mpl/src/gpu/mpl_gpu_cuda.c b/src/mpl/src/gpu/mpl_gpu_cuda.c index 346de7e0dda..b40d42679b3 100644 --- a/src/mpl/src/gpu/mpl_gpu_cuda.c +++ b/src/mpl/src/gpu/mpl_gpu_cuda.c @@ -409,3 +409,10 @@ cudaError_t CUDARTAPI cudaFree(void *dptr) result = sys_cudaFree(dptr); return result; } + +int MPL_gpu_launch_hostfn(cudaStream_t stream, MPL_gpu_hostfn fn, void *data) +{ + cudaError_t result; + result = cudaLaunchHostFunc(stream, fn, data); + return result; +} diff --git a/src/mpl/src/gpu/mpl_gpu_fallback.c b/src/mpl/src/gpu/mpl_gpu_fallback.c index 8b50faee152..ec46e0f6719 100644 --- a/src/mpl/src/gpu/mpl_gpu_fallback.c +++ b/src/mpl/src/gpu/mpl_gpu_fallback.c @@ -97,3 +97,8 @@ int MPL_gpu_free_hook_register(void (*free_hook) (void *dptr)) { return MPL_SUCCESS; } + +int MPL_gpu_launch_hostfn(int stream, MPL_gpu_hostfn fn, void *data) +{ + return -1; +} diff --git a/src/mpl/src/gpu/mpl_gpu_hip.c b/src/mpl/src/gpu/mpl_gpu_hip.c index 10619f4529d..de92be1f491 100644 --- a/src/mpl/src/gpu/mpl_gpu_hip.c +++ b/src/mpl/src/gpu/mpl_gpu_hip.c @@ -367,4 +367,10 @@ hipError_t hipFree(void *dptr) result = sys_hipFree(dptr); return result; } + +int MPL_gpu_launch_hostfn(hipStream_t stream, MPL_gpu_hostfn fn, void *data) +{ + return -1; +} + #endif /* MPL_HAVE_HIP */ diff --git a/src/mpl/src/gpu/mpl_gpu_ze.c b/src/mpl/src/gpu/mpl_gpu_ze.c index d912fa2784c..7b5e5117107 100644 --- a/src/mpl/src/gpu/mpl_gpu_ze.c +++ b/src/mpl/src/gpu/mpl_gpu_ze.c @@ -367,4 +367,9 @@ int MPL_gpu_free_hook_register(void (*free_hook) (void *dptr)) return MPL_SUCCESS; } +int MPL_gpu_launch_hostfn(int stream, MPL_gpu_hostfn fn, void *data) +{ + return -1; +} + #endif /* MPL_HAVE_ZE */ From 0c441581e06120623a22dcf6f4cd7c7b13ea37c4 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 25 Apr 2022 14:55:27 -0500 Subject: [PATCH 2/8] mpl: add MPL_gpu_stream_is_valid Add a wrapper to validate the raw gpu stream value so we can alert the user as early as we can. --- src/mpl/include/mpl_gpu.h | 1 + src/mpl/src/gpu/mpl_gpu_cuda.c | 10 ++++++++++ src/mpl/src/gpu/mpl_gpu_fallback.c | 5 +++++ src/mpl/src/gpu/mpl_gpu_hip.c | 5 +++++ src/mpl/src/gpu/mpl_gpu_ze.c | 5 +++++ 5 files changed, 26 insertions(+) diff --git a/src/mpl/include/mpl_gpu.h b/src/mpl/include/mpl_gpu.h index f207be0bf05..54b387d6930 100644 --- a/src/mpl/include/mpl_gpu.h +++ b/src/mpl/include/mpl_gpu.h @@ -83,5 +83,6 @@ int MPL_gpu_get_dev_count(int *dev_cnt, int *dev_id); typedef void (*MPL_gpu_hostfn) (void *data); int MPL_gpu_launch_hostfn(MPL_gpu_stream_t stream, MPL_gpu_hostfn fn, void *data); +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream); #endif /* ifndef MPL_GPU_H_INCLUDED */ diff --git a/src/mpl/src/gpu/mpl_gpu_cuda.c b/src/mpl/src/gpu/mpl_gpu_cuda.c index b40d42679b3..894c2b2cdb2 100644 --- a/src/mpl/src/gpu/mpl_gpu_cuda.c +++ b/src/mpl/src/gpu/mpl_gpu_cuda.c @@ -416,3 +416,13 @@ int MPL_gpu_launch_hostfn(cudaStream_t stream, MPL_gpu_hostfn fn, void *data) result = cudaLaunchHostFunc(stream, fn, data); return result; } + +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream) +{ + cudaError_t result; + /* CUDA may blindly dereference the stream as pointer, which may segfault + * if the wrong value is passed in. This is still better than segfault later + * upon using the stream. */ + result = cudaStreamQuery(stream); + return (result != cudaErrorInvalidResourceHandle); +} diff --git a/src/mpl/src/gpu/mpl_gpu_fallback.c b/src/mpl/src/gpu/mpl_gpu_fallback.c index ec46e0f6719..c5f42e8e6ae 100644 --- a/src/mpl/src/gpu/mpl_gpu_fallback.c +++ b/src/mpl/src/gpu/mpl_gpu_fallback.c @@ -102,3 +102,8 @@ int MPL_gpu_launch_hostfn(int stream, MPL_gpu_hostfn fn, void *data) { return -1; } + +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream) +{ + return false; +} diff --git a/src/mpl/src/gpu/mpl_gpu_hip.c b/src/mpl/src/gpu/mpl_gpu_hip.c index de92be1f491..28ed2963e7c 100644 --- a/src/mpl/src/gpu/mpl_gpu_hip.c +++ b/src/mpl/src/gpu/mpl_gpu_hip.c @@ -373,4 +373,9 @@ int MPL_gpu_launch_hostfn(hipStream_t stream, MPL_gpu_hostfn fn, void *data) return -1; } +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream) +{ + return false; +} + #endif /* MPL_HAVE_HIP */ diff --git a/src/mpl/src/gpu/mpl_gpu_ze.c b/src/mpl/src/gpu/mpl_gpu_ze.c index 7b5e5117107..e5aa5d11176 100644 --- a/src/mpl/src/gpu/mpl_gpu_ze.c +++ b/src/mpl/src/gpu/mpl_gpu_ze.c @@ -372,4 +372,9 @@ int MPL_gpu_launch_hostfn(int stream, MPL_gpu_hostfn fn, void *data) return -1; } +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream) +{ + return false; +} + #endif /* MPL_HAVE_ZE */ From 4f3bfaab989fc7134c3de58ad5fdf0d591909cd0 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 25 Apr 2022 14:46:11 -0500 Subject: [PATCH 3/8] stream: better error handling for bad gpu stream infohint Validate the gpu stream value right away in MPIX_Stream_create. If we go ahead with an invalid value, it will segfault later in cudaAPI in a rather mysterious way. --- src/mpi/errhan/errnames.txt | 2 ++ src/mpi/stream/stream_impl.c | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/mpi/errhan/errnames.txt b/src/mpi/errhan/errnames.txt index 8ee0d7418cb..45433a292ee 100644 --- a/src/mpi/errhan/errnames.txt +++ b/src/mpi/errhan/errnames.txt @@ -955,6 +955,8 @@ is too big (> MPIU_SHMW_GHND_SZ) **ch4nostream:No streams available. Configure --enable-thread-cs=per-vci and --with-ch4-max-vcis=# to enable streams. **outofstream:No streams available. Use MPIR_CVAR_CH4_RESERVE_VCIS to reserve the number of streams can be allocated. **cannotfreestream:The stream is still in use, cannot be freed. +**missinggpustream:Info hint 'type' is set, but info hint 'value' is missing. +**invalidgpustream:Info hint 'type' is set, but info hint 'value' is invalid. # ----------------------------------------------------------------------------- # The following names are defined but not used (see the -careful option diff --git a/src/mpi/stream/stream_impl.c b/src/mpi/stream/stream_impl.c index 2078d136b2b..06cb06405ae 100644 --- a/src/mpi/stream/stream_impl.c +++ b/src/mpi/stream/stream_impl.c @@ -60,10 +60,13 @@ int MPIR_Stream_create_impl(MPIR_Info * info_ptr, MPIR_Stream ** p_stream_ptr) /* TODO: proper conversion for each gpu stream type */ const char *s_value = MPIR_Info_lookup(info_ptr, "value"); - MPIR_Assertp(s_value); + MPIR_ERR_CHKANDJUMP(!s_value, mpi_errno, MPI_ERR_OTHER, "**missinggpustream"); + mpi_errno = MPIR_Info_decode_hex(s_value, &stream_ptr->u.gpu_stream, sizeof(MPL_gpu_stream_t)); MPIR_ERR_CHECK(mpi_errno); + MPIR_ERR_CHKANDJUMP(!MPL_gpu_stream_is_valid(stream_ptr->u.gpu_stream), + mpi_errno, MPI_ERR_OTHER, "**invalidgpustream"); } else { stream_ptr->type = MPIR_STREAM_GENERAL; } From 236fa12552a7636e0f2dad000231f8a731d8e123 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 21 Mar 2022 18:55:09 -0500 Subject: [PATCH 4/8] datatype/typerep: add MPIR_Typerep_pack_stream Add MPIR_Typerep_pack_stream and MPIR_Typerep_unpack_stream as wrappers for yaksa_pack_stream and yaksa_unpack_stream. --- src/include/mpir_typerep.h | 7 +++ .../typerep/src/typerep_dataloop_pack.c | 22 +++++++++ .../datatype/typerep/src/typerep_yaksa_pack.c | 46 +++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/src/include/mpir_typerep.h b/src/include/mpir_typerep.h index 0d4b9ecb202..0eaeae0608f 100644 --- a/src/include/mpir_typerep.h +++ b/src/include/mpir_typerep.h @@ -91,4 +91,11 @@ int MPIR_Typerep_op(void *source_buf, MPI_Aint source_count, MPI_Datatype source bool source_is_packed, int mapped_device); int MPIR_Typerep_reduce(const void *in_buf, void *out_buf, MPI_Aint count, MPI_Datatype datatype, MPI_Op op); + +int MPIR_Typerep_pack_stream(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype, + MPI_Aint inoffset, void *outbuf, MPI_Aint max_pack_bytes, + MPI_Aint * actual_pack_bytes, void *stream); +int MPIR_Typerep_unpack_stream(const void *inbuf, MPI_Aint insize, + void *outbuf, MPI_Aint outcount, MPI_Datatype datatype, + MPI_Aint outoffset, MPI_Aint * actual_unpack_bytes, void *stream); #endif /* MPIR_TYPEREP_H_INCLUDED */ diff --git a/src/mpi/datatype/typerep/src/typerep_dataloop_pack.c b/src/mpi/datatype/typerep/src/typerep_dataloop_pack.c index 7e7b306f9d5..66bfee61254 100644 --- a/src/mpi/datatype/typerep/src/typerep_dataloop_pack.c +++ b/src/mpi/datatype/typerep/src/typerep_dataloop_pack.c @@ -210,3 +210,25 @@ int MPIR_Typerep_reduce(const void *in_buf, void *out_buf, MPI_Aint count, MPI_D return mpi_errno; } + +int MPIR_Typerep_pack_stream(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype, + MPI_Aint inoffset, void *outbuf, MPI_Aint max_pack_bytes, + MPI_Aint * actual_pack_bytes, void *stream) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_Assert(0); + + return mpi_errno; +} + +int MPIR_Typerep_unpack_stream(const void *inbuf, MPI_Aint insize, void *outbuf, + MPI_Aint outcount, MPI_Datatype datatype, MPI_Aint outoffset, + MPI_Aint * actual_unpack_bytes, void *stream) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_Assert(0); + + return mpi_errno; +} diff --git a/src/mpi/datatype/typerep/src/typerep_yaksa_pack.c b/src/mpi/datatype/typerep/src/typerep_yaksa_pack.c index 1c6e6585355..dbe546e6cde 100644 --- a/src/mpi/datatype/typerep/src/typerep_yaksa_pack.c +++ b/src/mpi/datatype/typerep/src/typerep_yaksa_pack.c @@ -579,3 +579,49 @@ static int typerep_op_pack(void *source_buf, void *target_buf, MPI_Aint count, fn_fail: goto fn_exit; } + +int MPIR_Typerep_pack_stream(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype, + MPI_Aint inoffset, void *outbuf, MPI_Aint max_pack_bytes, + MPI_Aint * actual_pack_bytes, void *stream) +{ + MPIR_FUNC_ENTER; + + int mpi_errno = MPI_SUCCESS; + int rc; + + yaksa_type_t type = MPII_Typerep_get_yaksa_type(datatype); + uintptr_t packed_bytes;; + rc = yaksa_pack_stream(inbuf, incount, type, inoffset, outbuf, max_pack_bytes, + &packed_bytes, NULL, YAKSA_OP__REPLACE, stream); + MPIR_ERR_CHKANDJUMP(rc, mpi_errno, MPI_ERR_INTERN, "**yaksa"); + *actual_pack_bytes = packed_bytes; + + fn_exit: + MPIR_FUNC_EXIT; + return mpi_errno; + fn_fail: + goto fn_exit; +} + +int MPIR_Typerep_unpack_stream(const void *inbuf, MPI_Aint insize, void *outbuf, + MPI_Aint outcount, MPI_Datatype datatype, MPI_Aint outoffset, + MPI_Aint * actual_unpack_bytes, void *stream) +{ + MPIR_FUNC_ENTER; + + int mpi_errno = MPI_SUCCESS; + int rc; + + yaksa_type_t type = MPII_Typerep_get_yaksa_type(datatype); + uintptr_t unpacked_bytes;; + rc = yaksa_unpack_stream(inbuf, insize, outbuf, outcount, type, outoffset, + &unpacked_bytes, NULL, YAKSA_OP__REPLACE, stream); + MPIR_ERR_CHKANDJUMP(rc, mpi_errno, MPI_ERR_INTERN, "**yaksa"); + *actual_unpack_bytes = unpacked_bytes; + + fn_exit: + MPIR_FUNC_EXIT; + return mpi_errno; + fn_fail: + goto fn_exit; +} From 81cd1a80a0d64ae3b921210ec77309b651d89b77 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Thu, 24 Mar 2022 23:35:11 -0500 Subject: [PATCH 5/8] stream: add MPIX_Send/Recv_enqueue Only supported if the communicator has a cuda stream associated. Enqueues operation to the cuda stream. --- src/binding/c/stream_api.txt | 17 +++ src/mpi/errhan/errnames.txt | 1 + src/mpi/stream/stream_impl.c | 205 +++++++++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+) diff --git a/src/binding/c/stream_api.txt b/src/binding/c/stream_api.txt index 0d808645406..a891f627c0c 100644 --- a/src/binding/c/stream_api.txt +++ b/src/binding/c/stream_api.txt @@ -11,3 +11,20 @@ MPIX_Stream_comm_create: comm: COMMUNICATOR, [communicator] stream: STREAM, [stream object] newcomm: COMMUNICATOR, direction=out, [new stream-associated communicator] + +MPIX_Send_enqueue: + buf: BUFFER, constant=True, [initial address of send buffer] + count: POLYXFER_NUM_ELEM_NNI, [number of elements in send buffer] + datatype: DATATYPE, [datatype of each send buffer element] + dest: RANK, [rank of destination] + tag: TAG, [message tag] + comm: COMMUNICATOR + +MPIX_Recv_enqueue: + buf: BUFFER, direction=out, [initial address of receive buffer] + count: POLYXFER_NUM_ELEM_NNI, [number of elements in receive buffer] + datatype: DATATYPE, [datatype of each receive buffer element] + source: RANK, [rank of source or MPI_ANY_SOURCE] + tag: TAG, [message tag or MPI_ANY_TAG] + comm: COMMUNICATOR + status: STATUS, direction=out diff --git a/src/mpi/errhan/errnames.txt b/src/mpi/errhan/errnames.txt index 45433a292ee..e053dcfc8a7 100644 --- a/src/mpi/errhan/errnames.txt +++ b/src/mpi/errhan/errnames.txt @@ -957,6 +957,7 @@ is too big (> MPIU_SHMW_GHND_SZ) **cannotfreestream:The stream is still in use, cannot be freed. **missinggpustream:Info hint 'type' is set, but info hint 'value' is missing. **invalidgpustream:Info hint 'type' is set, but info hint 'value' is invalid. +**notgpustream:The communicator does not have a local gpu stream attached. # ----------------------------------------------------------------------------- # The following names are defined but not used (see the -careful option diff --git a/src/mpi/stream/stream_impl.c b/src/mpi/stream/stream_impl.c index 06cb06405ae..db0cbe41fde 100644 --- a/src/mpi/stream/stream_impl.c +++ b/src/mpi/stream/stream_impl.c @@ -137,3 +137,208 @@ int MPIR_Stream_comm_create_impl(MPIR_Comm * comm_ptr, MPIR_Stream * stream_ptr, fn_fail: goto fn_exit; } + +/* ---- CUDA stream send/recv enqueue ---- */ + +static int get_local_gpu_stream(MPIR_Comm * comm_ptr, MPL_gpu_stream_t * gpu_stream) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_Stream *stream_ptr = NULL; + if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_SINGLE) { + stream_ptr = comm_ptr->stream_comm.single.stream; + } else if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_MULTIPLEX) { + stream_ptr = comm_ptr->stream_comm.multiplex.local_streams[comm_ptr->rank]; + } + + MPIR_ERR_CHKANDJUMP(!stream_ptr || stream_ptr->type != MPIR_STREAM_GPU, + mpi_errno, MPI_ERR_OTHER, "**notgpustream"); + *gpu_stream = stream_ptr->u.gpu_stream; + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +/* send enqueue */ +struct send_data { + const void *buf; + MPI_Aint count; + MPI_Datatype datatype; + int dest; + int tag; + MPIR_Comm *comm_ptr; + void *host_buf; + MPI_Aint data_sz; + MPI_Aint actual_pack_bytes; +}; + +static void send_stream_cb(void *data) +{ + int mpi_errno; + MPIR_Request *request_ptr = NULL; + + struct send_data *p = data; + if (p->host_buf) { + assert(p->actual_pack_bytes == p->data_sz); + + mpi_errno = MPID_Send(p->host_buf, p->data_sz, MPI_BYTE, p->dest, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, &request_ptr); + } else { + mpi_errno = MPID_Send(p->buf, p->count, p->datatype, p->dest, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, &request_ptr); + } + assert(mpi_errno == MPI_SUCCESS); + assert(request_ptr != NULL); + + mpi_errno = MPID_Wait(request_ptr, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + MPIR_Request_free(request_ptr); + + if (p->host_buf) { + MPIR_gpu_free_host(p->host_buf); + } + MPL_free(data); +} + +int MPIR_Send_enqueue_impl(const void *buf, MPI_Aint count, MPI_Datatype datatype, + int dest, int tag, MPIR_Comm * comm_ptr) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream; + mpi_errno = get_local_gpu_stream(comm_ptr, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + struct send_data *p; + p = MPL_malloc(sizeof(struct send_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + p->dest = dest; + p->tag = tag; + p->comm_ptr = comm_ptr; + + if (MPIR_GPU_query_pointer_is_dev(buf)) { + MPI_Aint dt_size; + MPIR_Datatype_get_size_macro(datatype, dt_size); + p->data_sz = dt_size * count; + + MPIR_gpu_malloc_host(&p->host_buf, p->data_sz); + + mpi_errno = MPIR_Typerep_pack_stream(buf, count, datatype, 0, p->host_buf, p->data_sz, + &p->actual_pack_bytes, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + } else { + p->host_buf = NULL; + p->buf = buf; + p->count = count; + p->datatype = datatype; + } + + MPL_gpu_launch_hostfn(gpu_stream, send_stream_cb, p); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +/* ---- recv enqueue ---- */ +struct recv_data { + void *buf; + MPI_Aint count; + MPI_Datatype datatype; + int source; + int tag; + MPIR_Comm *comm_ptr; + MPI_Status *status; + void *host_buf; + MPI_Aint data_sz; + MPI_Aint actual_unpack_bytes; +}; + +static void recv_stream_cb(void *data) +{ + int mpi_errno; + MPIR_Request *request_ptr = NULL; + + struct recv_data *p = data; + if (p->host_buf) { + mpi_errno = MPID_Recv(p->host_buf, p->data_sz, MPI_BYTE, p->source, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, p->status, &request_ptr); + } else { + mpi_errno = MPID_Recv(p->buf, p->count, p->datatype, p->source, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, p->status, &request_ptr); + } + assert(mpi_errno == MPI_SUCCESS); + assert(request_ptr != NULL); + + mpi_errno = MPID_Wait(request_ptr, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + MPIR_Request_extract_status(request_ptr, p->status); + MPIR_Request_free(request_ptr); + + if (!p->host_buf) { + /* we are done */ + MPL_free(p); + } +} + +static void recv_stream_cleanup_cb(void *data) +{ + struct recv_data *p = data; + assert(p->actual_unpack_bytes == p->data_sz); + + MPIR_gpu_free_host(p->host_buf); + MPL_free(data); +} + +int MPIR_Recv_enqueue_impl(void *buf, MPI_Aint count, MPI_Datatype datatype, + int source, int tag, MPIR_Comm * comm_ptr, MPI_Status * status) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream; + mpi_errno = get_local_gpu_stream(comm_ptr, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + struct recv_data *p; + p = MPL_malloc(sizeof(struct recv_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + p->source = source; + p->tag = tag; + p->comm_ptr = comm_ptr; + p->status = status; + + if (MPIR_GPU_query_pointer_is_dev(buf)) { + MPI_Aint dt_size; + MPIR_Datatype_get_size_macro(datatype, dt_size); + p->data_sz = dt_size * count; + + MPIR_gpu_malloc_host(&p->host_buf, p->data_sz); + + MPL_gpu_launch_hostfn(gpu_stream, recv_stream_cb, p); + + mpi_errno = MPIR_Typerep_unpack_stream(p->host_buf, p->data_sz, buf, count, datatype, 0, + &p->actual_unpack_bytes, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + MPL_gpu_launch_hostfn(gpu_stream, recv_stream_cleanup_cb, p); + } else { + p->host_buf = NULL; + p->buf = buf; + p->count = count; + p->datatype = datatype; + + MPL_gpu_launch_hostfn(gpu_stream, recv_stream_cb, p); + } + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} From 246a6b03a9ff9e2b708676c7abf9342575b92f2a Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 20 Apr 2022 16:20:39 -0500 Subject: [PATCH 6/8] stream: use a single vci for gpu streams It is often not critical for gpu streams to use separate vcis as long it is separated from normal traffic. In addition, it can be common for an application to use many GPU streams globally and we may not have enough dedicated vcis to match them. This commit default to reuse a single dedicated vci for gpu streams. --- src/mpi/stream/stream_impl.c | 53 ++++++++++++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/src/mpi/stream/stream_impl.c b/src/mpi/stream/stream_impl.c index db0cbe41fde..6c358468b4d 100644 --- a/src/mpi/stream/stream_impl.c +++ b/src/mpi/stream/stream_impl.c @@ -9,6 +9,55 @@ #define MPIR_STREAM_PREALLOC 8 #endif +#define GPU_STREAM_USE_SINGLE_VCI + +#ifdef GPU_STREAM_USE_SINGLE_VCI +static int gpu_stream_vci = 0; +static int gpu_stream_count = 0; + +static int allocate_vci(int *vci, bool is_gpu_stream) +{ + if (is_gpu_stream) { + int mpi_errno = MPI_SUCCESS; + if (!gpu_stream_vci) { + mpi_errno = MPID_Allocate_vci(&gpu_stream_vci); + } + gpu_stream_count++; + *vci = gpu_stream_vci; + return mpi_errno; + } else { + return MPID_Allocate_vci(vci); + } +} + +static int deallocate_vci(int vci) +{ + if (vci == gpu_stream_vci) { + gpu_stream_count--; + if (gpu_stream_count == 0) { + gpu_stream_vci = 0; + return MPID_Deallocate_vci(vci); + } else { + return MPI_SUCCESS; + } + } else { + return MPID_Deallocate_vci(vci); + } +} + +#else +static int allocate_vci(int *vci, bool is_gpu_stream) +{ + return MPID_Allocate_vci(vci); +} + +static int deallocate_vci(int *vci) +{ + return MPID_Deallocate_vci(vci); +} + +#endif + MPIR_Stream MPIR_Stream_direct[MPIR_STREAM_PREALLOC]; MPIR_Object_alloc_t MPIR_Stream_mem = { 0, 0, 0, 0, 0, 0, MPIR_STREAM, @@ -71,7 +120,7 @@ int MPIR_Stream_create_impl(MPIR_Info * info_ptr, MPIR_Stream ** p_stream_ptr) stream_ptr->type = MPIR_STREAM_GENERAL; } - mpi_errno = MPID_Allocate_vci(&stream_ptr->vci); + mpi_errno = allocate_vci(&stream_ptr->vci, stream_ptr->type == MPIR_STREAM_GPU); MPIR_ERR_CHECK(mpi_errno); *p_stream_ptr = stream_ptr; @@ -92,7 +141,7 @@ int MPIR_Stream_free_impl(MPIR_Stream * stream_ptr) MPIR_ERR_CHKANDJUMP(ref_cnt != 0, mpi_errno, MPI_ERR_OTHER, "**cannotfreestream"); if (stream_ptr->vci) { - mpi_errno = MPID_Deallocate_vci(stream_ptr->vci); + mpi_errno = deallocate_vci(stream_ptr->vci); } MPIR_Handle_obj_free(&MPIR_Stream_mem, stream_ptr); From 2a7b140c3453cb8769ebe7816edc3a9da876d203 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 20 Apr 2022 22:35:58 -0500 Subject: [PATCH 7/8] stream: add MPIX_I{send,recv}_enqueue and MPIX_Wait{all}_enqueue --- maint/local_python/binding_c.py | 3 + src/binding/c/stream_api.txt | 32 ++++ src/include/mpir_request.h | 7 + src/mpi/stream/stream_impl.c | 309 +++++++++++++++++++++++++++++++- 4 files changed, 346 insertions(+), 5 deletions(-) diff --git a/maint/local_python/binding_c.py b/maint/local_python/binding_c.py index df5c870e56a..a278f8ac03a 100644 --- a/maint/local_python/binding_c.py +++ b/maint/local_python/binding_c.py @@ -696,6 +696,9 @@ def process_func_parameters(func): if RE.match(r'mpi_startall', func['name'], re.IGNORECASE): impl_arg_list.append(ptrs_name) impl_param_list.append("MPIR_Request **%s" % ptrs_name) + else: + impl_arg_list.append(name) + impl_param_list.append("MPI_Request %s[]" % name) else: print("Unhandled handle array: " + name, file=sys.stderr) elif "code-handle_ptr-tail" in func and name in func['code-handle_ptr-tail']: diff --git a/src/binding/c/stream_api.txt b/src/binding/c/stream_api.txt index a891f627c0c..05d046f280c 100644 --- a/src/binding/c/stream_api.txt +++ b/src/binding/c/stream_api.txt @@ -28,3 +28,35 @@ MPIX_Recv_enqueue: tag: TAG, [message tag or MPI_ANY_TAG] comm: COMMUNICATOR status: STATUS, direction=out + +MPIX_Isend_enqueue: + buf: BUFFER, constant=True, [initial address of send buffer] + count: POLYXFER_NUM_ELEM_NNI, [number of elements in send buffer] + datatype: DATATYPE, [datatype of each send buffer element] + dest: RANK, [rank of destination] + tag: TAG, [message tag] + comm: COMMUNICATOR + request: REQUEST, direction=out + +MPIX_Irecv_enqueue: + buf: BUFFER, direction=out, [initial address of receive buffer] + count: POLYXFER_NUM_ELEM_NNI, [number of elements in receive buffer] + datatype: DATATYPE, [datatype of each receive buffer element] + source: RANK, [rank of source or MPI_ANY_SOURCE] + tag: TAG, [message tag or MPI_ANY_TAG] + comm: COMMUNICATOR + request: REQUEST, direction=out + +MPI_Wait_enqueue: + request: REQUEST, direction=inout, [request] + status: STATUS, direction=out + +MPI_Waitall_enqueue: + count: ARRAY_LENGTH_NNI, [lists length] + array_of_requests: REQUEST, direction=inout, length=count, [array of requests] + array_of_statuses: STATUS, direction=out, length=*, pointer=False, [array of status objects] +{ -- error_check -- array_of_statuses + if (count > 0) { + MPIR_ERRTEST_ARGNULL(array_of_statuses, "array_of_statuses", mpi_errno); + } +} diff --git a/src/include/mpir_request.h b/src/include/mpir_request.h index 27d3074d15d..f8accc7f608 100644 --- a/src/include/mpir_request.h +++ b/src/include/mpir_request.h @@ -65,6 +65,7 @@ typedef enum MPIR_Request_kind_t { MPIR_REQUEST_KIND__PART_SEND, /* Partitioned send req returned to user */ MPIR_REQUEST_KIND__PART_RECV, /* Partitioned recv req returned to user */ MPIR_REQUEST_KIND__PART, /* Partitioned pt2pt internal reqs */ + MPIR_REQUEST_KIND__ENQUEUE, /* enqueued (to gpu stream) request */ MPIR_REQUEST_KIND__GREQUEST, MPIR_REQUEST_KIND__COLL, MPIR_REQUEST_KIND__MPROBE, /* see NOTE-R1 */ @@ -220,6 +221,12 @@ struct MPIR_Request { MPL_atomic_int_t active_flag; /* flag indicating whether in a start-complete active period. * Value is 0 or 1. */ } part; /* kind : MPIR_REQUEST_KIND__PART_SEND or MPIR_REQUEST_KIND__PART_RECV */ + struct { + MPL_gpu_stream_t gpu_stream; + struct MPIR_Request *real_request; + bool is_send; + void *data; + } enqueue; struct { MPIR_Win *win; } rma; /* kind : MPIR_REQUEST_KIND__RMA */ diff --git a/src/mpi/stream/stream_impl.c b/src/mpi/stream/stream_impl.c index 6c358468b4d..49989508fad 100644 --- a/src/mpi/stream/stream_impl.c +++ b/src/mpi/stream/stream_impl.c @@ -210,6 +210,29 @@ static int get_local_gpu_stream(MPIR_Comm * comm_ptr, MPL_gpu_stream_t * gpu_str goto fn_exit; } +static int allocate_enqueue_request(MPIR_Comm * comm_ptr, MPIR_Request ** req) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_Stream *stream_ptr = NULL; + if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_SINGLE) { + stream_ptr = comm_ptr->stream_comm.single.stream; + } else if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_MULTIPLEX) { + stream_ptr = comm_ptr->stream_comm.multiplex.local_streams[comm_ptr->rank]; + } + MPIR_Assert(stream_ptr); + + int vci = stream_ptr->vci; + MPIR_Assert(vci > 0); + + /* stream vci are only accessed within a serialized context */ + (*req) = MPIR_Request_create_from_pool_safe(MPIR_REQUEST_KIND__ENQUEUE, vci, 1); + (*req)->u.enqueue.gpu_stream = stream_ptr->u.gpu_stream; + (*req)->u.enqueue.real_request = NULL; + + return mpi_errno; +} + /* send enqueue */ struct send_data { const void *buf; @@ -221,9 +244,11 @@ struct send_data { void *host_buf; MPI_Aint data_sz; MPI_Aint actual_pack_bytes; + /* for isend */ + MPIR_Request *req; }; -static void send_stream_cb(void *data) +static void send_enqueue_cb(void *data) { int mpi_errno; MPIR_Request *request_ptr = NULL; @@ -286,7 +311,7 @@ int MPIR_Send_enqueue_impl(const void *buf, MPI_Aint count, MPI_Datatype datatyp p->datatype = datatype; } - MPL_gpu_launch_hostfn(gpu_stream, send_stream_cb, p); + MPL_gpu_launch_hostfn(gpu_stream, send_enqueue_cb, p); fn_exit: return mpi_errno; @@ -306,9 +331,11 @@ struct recv_data { void *host_buf; MPI_Aint data_sz; MPI_Aint actual_unpack_bytes; + /* for irend */ + MPIR_Request *req; }; -static void recv_stream_cb(void *data) +static void recv_enqueue_cb(void *data) { int mpi_errno; MPIR_Request *request_ptr = NULL; @@ -370,7 +397,7 @@ int MPIR_Recv_enqueue_impl(void *buf, MPI_Aint count, MPI_Datatype datatype, MPIR_gpu_malloc_host(&p->host_buf, p->data_sz); - MPL_gpu_launch_hostfn(gpu_stream, recv_stream_cb, p); + MPL_gpu_launch_hostfn(gpu_stream, recv_enqueue_cb, p); mpi_errno = MPIR_Typerep_unpack_stream(p->host_buf, p->data_sz, buf, count, datatype, 0, &p->actual_unpack_bytes, &gpu_stream); @@ -383,7 +410,7 @@ int MPIR_Recv_enqueue_impl(void *buf, MPI_Aint count, MPI_Datatype datatype, p->count = count; p->datatype = datatype; - MPL_gpu_launch_hostfn(gpu_stream, recv_stream_cb, p); + MPL_gpu_launch_hostfn(gpu_stream, recv_enqueue_cb, p); } fn_exit: @@ -391,3 +418,275 @@ int MPIR_Recv_enqueue_impl(void *buf, MPI_Aint count, MPI_Datatype datatype, fn_fail: goto fn_exit; } + +/* ---- isend enqueue ---- */ +static void isend_enqueue_cb(void *data) +{ + int mpi_errno; + MPIR_Request *request_ptr = NULL; + + struct send_data *p = data; + if (p->host_buf) { + assert(p->actual_pack_bytes == p->data_sz); + + mpi_errno = MPID_Send(p->host_buf, p->data_sz, MPI_BYTE, p->dest, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, &request_ptr); + } else { + mpi_errno = MPID_Send(p->buf, p->count, p->datatype, p->dest, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, &request_ptr); + } + assert(mpi_errno == MPI_SUCCESS); + assert(request_ptr != NULL); + + p->req->u.enqueue.real_request = request_ptr; +} + +int MPIR_Isend_enqueue_impl(const void *buf, MPI_Aint count, MPI_Datatype datatype, + int dest, int tag, MPIR_Comm * comm_ptr, MPIR_Request ** req) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream; + mpi_errno = get_local_gpu_stream(comm_ptr, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + struct send_data *p; + p = MPL_malloc(sizeof(struct send_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + mpi_errno = allocate_enqueue_request(comm_ptr, req); + MPIR_ERR_CHECK(mpi_errno); + (*req)->u.enqueue.is_send = true; + (*req)->u.enqueue.data = p; + + p->req = *req; + p->dest = dest; + p->tag = tag; + p->comm_ptr = comm_ptr; + + if (MPIR_GPU_query_pointer_is_dev(buf)) { + MPI_Aint dt_size; + MPIR_Datatype_get_size_macro(datatype, dt_size); + p->data_sz = dt_size * count; + + MPIR_gpu_malloc_host(&p->host_buf, p->data_sz); + + mpi_errno = MPIR_Typerep_pack_stream(buf, count, datatype, 0, p->host_buf, p->data_sz, + &p->actual_pack_bytes, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + } else { + p->host_buf = NULL; + p->buf = buf; + p->count = count; + p->datatype = datatype; + } + + MPL_gpu_launch_hostfn(gpu_stream, isend_enqueue_cb, p); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +/* ---- irecv enqueue ---- */ +static void irecv_enqueue_cb(void *data) +{ + int mpi_errno; + MPIR_Request *request_ptr = NULL; + + struct recv_data *p = data; + if (p->host_buf) { + mpi_errno = MPID_Recv(p->host_buf, p->data_sz, MPI_BYTE, p->source, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, p->status, &request_ptr); + } else { + mpi_errno = MPID_Recv(p->buf, p->count, p->datatype, p->source, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, p->status, &request_ptr); + } + assert(mpi_errno == MPI_SUCCESS); + assert(request_ptr != NULL); + + p->req->u.enqueue.real_request = request_ptr; +} + +int MPIR_Irecv_enqueue_impl(void *buf, MPI_Aint count, MPI_Datatype datatype, + int source, int tag, MPIR_Comm * comm_ptr, MPIR_Request ** req) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream; + mpi_errno = get_local_gpu_stream(comm_ptr, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + struct recv_data *p; + p = MPL_malloc(sizeof(struct recv_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + mpi_errno = allocate_enqueue_request(comm_ptr, req); + MPIR_ERR_CHECK(mpi_errno); + (*req)->u.enqueue.is_send = false; + (*req)->u.enqueue.data = p; + + p->req = *req; + p->source = source; + p->tag = tag; + p->comm_ptr = comm_ptr; + p->status = MPI_STATUS_IGNORE; + + if (MPIR_GPU_query_pointer_is_dev(buf)) { + MPI_Aint dt_size; + MPIR_Datatype_get_size_macro(datatype, dt_size); + p->data_sz = dt_size * count; + + MPIR_gpu_malloc_host(&p->host_buf, p->data_sz); + + MPL_gpu_launch_hostfn(gpu_stream, recv_enqueue_cb, p); + + mpi_errno = MPIR_Typerep_unpack_stream(p->host_buf, p->data_sz, buf, count, datatype, 0, + &p->actual_unpack_bytes, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + MPL_gpu_launch_hostfn(gpu_stream, recv_stream_cleanup_cb, p); + } else { + p->host_buf = NULL; + p->buf = buf; + p->count = count; + p->datatype = datatype; + + MPL_gpu_launch_hostfn(gpu_stream, irecv_enqueue_cb, p); + } + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +/* ---- wait enqueue ---- */ +static void wait_enqueue_cb(void *data) +{ + int mpi_errno; + MPIR_Request *enqueue_req = data; + MPIR_Request *real_req = enqueue_req->u.enqueue.real_request; + + if (enqueue_req->u.enqueue.is_send) { + struct send_data *p = enqueue_req->u.enqueue.data; + + mpi_errno = MPID_Wait(real_req, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + MPIR_Request_free(real_req); + + if (p->host_buf) { + MPIR_gpu_free_host(p->host_buf); + } + MPL_free(p); + } else { + struct recv_data *p = enqueue_req->u.enqueue.data; + + mpi_errno = MPID_Wait(real_req, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + MPIR_Request_extract_status(real_req, p->status); + MPIR_Request_free(real_req); + + if (!p->host_buf) { + MPL_free(p); + } + } + MPIR_Request_free(enqueue_req); +} + +int MPIR_Wait_enqueue_impl(MPIR_Request * req_ptr, MPI_Status * status) +{ + int mpi_errno = MPI_SUCCESS; + MPIR_Assert(req_ptr && req_ptr->kind == MPIR_REQUEST_KIND__ENQUEUE); + + MPL_gpu_stream_t gpu_stream = req_ptr->u.enqueue.gpu_stream; + if (!req_ptr->u.enqueue.is_send) { + struct recv_data *p = req_ptr->u.enqueue.data; + p->status = status; + } + + MPL_gpu_launch_hostfn(gpu_stream, wait_enqueue_cb, req_ptr); + + return mpi_errno; +} + +/* ---- waitall enqueue ---- */ +struct waitall_data { + int count; + MPI_Request *array_of_requests; + MPI_Status *array_of_statuses; +}; + +static void waitall_enqueue_cb(void *data) +{ + struct waitall_data *p = data; + + MPI_Request *reqs = MPL_malloc(p->count * sizeof(MPI_Request), MPL_MEM_OTHER); + MPIR_Assert(reqs); + + for (int i = 0; i < p->count; i++) { + MPIR_Request *enqueue_req; + MPIR_Request_get_ptr(p->array_of_requests[i], enqueue_req); + reqs[i] = enqueue_req->u.enqueue.real_request->handle; + } + + MPIR_Waitall(p->count, reqs, p->array_of_statuses); + + for (int i = 0; i < p->count; i++) { + MPIR_Request *enqueue_req; + MPIR_Request_get_ptr(p->array_of_requests[i], enqueue_req); + + if (enqueue_req->u.enqueue.is_send) { + struct send_data *p2 = enqueue_req->u.enqueue.data; + if (p2->host_buf) { + MPIR_gpu_free_host(p2->host_buf); + } + MPL_free(p2); + } else { + struct recv_data *p2 = enqueue_req->u.enqueue.data; + if (!p2->host_buf) { + MPL_free(p2); + } + } + MPIR_Request_free(enqueue_req); + } + MPL_free(reqs); + MPL_free(p); +} + +int MPIR_Waitall_enqueue_impl(int count, MPI_Request * array_of_requests, + MPI_Status * array_of_statuses) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream = MPL_GPU_STREAM_DEFAULT; + for (int i = 0; i < count; i++) { + MPIR_Request *enqueue_req; + MPIR_Request_get_ptr(array_of_requests[i], enqueue_req); + + MPIR_Assert(enqueue_req && enqueue_req->kind == MPIR_REQUEST_KIND__ENQUEUE); + if (i == 0) { + gpu_stream = enqueue_req->u.enqueue.gpu_stream; + } else { + MPIR_Assert(gpu_stream == enqueue_req->u.enqueue.gpu_stream); + } + } + + struct waitall_data *p; + p = MPL_malloc(sizeof(struct waitall_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + p->count = count; + p->array_of_requests = array_of_requests; + p->array_of_statuses = array_of_statuses; + + MPL_gpu_launch_hostfn(gpu_stream, waitall_enqueue_cb, p); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} From 24e304f0f505cb1411d907bc9237e6f4280c57d3 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Thu, 24 Mar 2022 10:08:24 -0500 Subject: [PATCH 8/8] test: add test/mpi/impls/mpich/cuda/stream.cu It tests the CUDA stream enqueue functions via MPIX Stream Communicator. --- test/mpi/impls/mpich/cuda/Makefile.am | 3 +- test/mpi/impls/mpich/cuda/stream.cu | 133 ++++++++++++++++++++++++++ test/mpi/impls/mpich/cuda/testlist | 1 + 3 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 test/mpi/impls/mpich/cuda/stream.cu diff --git a/test/mpi/impls/mpich/cuda/Makefile.am b/test/mpi/impls/mpich/cuda/Makefile.am index 59b18b8778e..8f29cb4189d 100644 --- a/test/mpi/impls/mpich/cuda/Makefile.am +++ b/test/mpi/impls/mpich/cuda/Makefile.am @@ -8,4 +8,5 @@ include $(top_srcdir)/Makefile_cuda.mtest LDADD += -lm noinst_PROGRAMS = \ - saxpy + saxpy \ + stream diff --git a/test/mpi/impls/mpich/cuda/stream.cu b/test/mpi/impls/mpich/cuda/stream.cu new file mode 100644 index 00000000000..f2e36e7b336 --- /dev/null +++ b/test/mpi/impls/mpich/cuda/stream.cu @@ -0,0 +1,133 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#include +#include +#include + +const int N = 1000000; +const int a = 2.0; + +static void init_x(float *x) +{ + for (int i = 0; i < N; i++) { + x[i] = 1.0f; + } +} + +static void init_y(float *y) +{ + for (int i = 0; i < N; i++) { + y[i] = 2.0f; + } +} + +static int check_result(float *y) +{ + float maxError = 0.0f; + int errs = 0; + for (int i = 0; i < N; i++) { + if (abs(y[i] - 4.0f) > 0.01) { + errs++; + maxError = max(maxError, abs(y[i]-4.0f)); + } + } + if (errs > 0) { + printf("%d errors, Max error: %f\n", errs, maxError); + } + return errs; +} + +__global__ +void saxpy(int n, float a, float *x, float *y) +{ + int i = blockIdx.x*blockDim.x + threadIdx.x; + if (i < n) y[i] = a*x[i] + y[i]; +} + +int main(void) +{ + int errs = 0; + + cudaStream_t stream; + cudaStreamCreate(&stream); + + int mpi_errno; + int rank, size; + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + + if (size < 2) { + printf("This test require 2 processes\n"); + exit(1); + } + + float *x, *y, *d_x, *d_y; + x = (float*)malloc(N*sizeof(float)); + y = (float*)malloc(N*sizeof(float)); + + cudaMalloc(&d_x, N*sizeof(float)); + cudaMalloc(&d_y, N*sizeof(float)); + + if (rank == 0) { + init_x(x); + } else if (rank == 1) { + init_y(y); + } + + MPI_Info info; + MPI_Info_create(&info); + MPI_Info_set(info, "type", "cudaStream_t"); + MPIX_Info_set_hex(info, "value", &stream, sizeof(stream)); + + MPIX_Stream mpi_stream; + MPIX_Stream_create(info, &mpi_stream); + + MPI_Info_free(&info); + + MPI_Comm stream_comm; + MPIX_Stream_comm_create(MPI_COMM_WORLD, mpi_stream, &stream_comm); + + /* Rank 0 sends x data to Rank 1, Rank 1 performs a * x + y and checks result */ + if (rank == 0) { + cudaMemcpyAsync(d_x, x, N*sizeof(float), cudaMemcpyHostToDevice, stream); + + mpi_errno = MPIX_Send_enqueue(d_x, N, MPI_FLOAT, 1, 0, stream_comm); + assert(mpi_errno == MPI_SUCCESS); + + cudaStreamSynchronize(stream); + } else if (rank == 1) { + cudaMemcpyAsync(d_y, y, N*sizeof(float), cudaMemcpyHostToDevice, stream); + + mpi_errno = MPIX_Recv_enqueue(d_x, N, MPI_FLOAT, 0, 0, stream_comm, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + saxpy<<<(N+255)/256, 256, 0, stream>>>(N, a, d_x, d_y); + + cudaMemcpyAsync(y, d_y, N*sizeof(float), cudaMemcpyDeviceToHost, stream); + + cudaStreamSynchronize(stream); + } + + if (rank == 1) { + int errs = check_result(y); + if (errs == 0) { + printf("No Errors\n"); + } + } + + MPI_Comm_free(&stream_comm); + MPIX_Stream_free(&mpi_stream); + + cudaFree(d_x); + cudaFree(d_y); + free(x); + free(y); + + cudaStreamDestroy(stream); + MPI_Finalize(); + return errs; +} diff --git a/test/mpi/impls/mpich/cuda/testlist b/test/mpi/impls/mpich/cuda/testlist index 68bfd0db9cc..da2799094bd 100644 --- a/test/mpi/impls/mpich/cuda/testlist +++ b/test/mpi/impls/mpich/cuda/testlist @@ -1 +1,2 @@ saxpy 2 +stream 2 env=MPIR_CVAR_CH4_RESERVE_VCIS=1