diff --git a/maint/local_python/binding_c.py b/maint/local_python/binding_c.py index df5c870e56a..a278f8ac03a 100644 --- a/maint/local_python/binding_c.py +++ b/maint/local_python/binding_c.py @@ -696,6 +696,9 @@ def process_func_parameters(func): if RE.match(r'mpi_startall', func['name'], re.IGNORECASE): impl_arg_list.append(ptrs_name) impl_param_list.append("MPIR_Request **%s" % ptrs_name) + else: + impl_arg_list.append(name) + impl_param_list.append("MPI_Request %s[]" % name) else: print("Unhandled handle array: " + name, file=sys.stderr) elif "code-handle_ptr-tail" in func and name in func['code-handle_ptr-tail']: diff --git a/src/binding/c/stream_api.txt b/src/binding/c/stream_api.txt index 0d808645406..05d046f280c 100644 --- a/src/binding/c/stream_api.txt +++ b/src/binding/c/stream_api.txt @@ -11,3 +11,52 @@ MPIX_Stream_comm_create: comm: COMMUNICATOR, [communicator] stream: STREAM, [stream object] newcomm: COMMUNICATOR, direction=out, [new stream-associated communicator] + +MPIX_Send_enqueue: + buf: BUFFER, constant=True, [initial address of send buffer] + count: POLYXFER_NUM_ELEM_NNI, [number of elements in send buffer] + datatype: DATATYPE, [datatype of each send buffer element] + dest: RANK, [rank of destination] + tag: TAG, [message tag] + comm: COMMUNICATOR + +MPIX_Recv_enqueue: + buf: BUFFER, direction=out, [initial address of receive buffer] + count: POLYXFER_NUM_ELEM_NNI, [number of elements in receive buffer] + datatype: DATATYPE, [datatype of each receive buffer element] + source: RANK, [rank of source or MPI_ANY_SOURCE] + tag: TAG, [message tag or MPI_ANY_TAG] + comm: COMMUNICATOR + status: STATUS, direction=out + +MPIX_Isend_enqueue: + buf: BUFFER, constant=True, [initial address of send buffer] + count: POLYXFER_NUM_ELEM_NNI, [number of elements in send buffer] + datatype: DATATYPE, [datatype of each send buffer element] + dest: RANK, [rank of destination] + tag: TAG, [message tag] + comm: COMMUNICATOR + request: REQUEST, direction=out + +MPIX_Irecv_enqueue: + buf: BUFFER, direction=out, [initial address of receive buffer] + count: POLYXFER_NUM_ELEM_NNI, [number of elements in receive buffer] + datatype: DATATYPE, [datatype of each receive buffer element] + source: RANK, [rank of source or MPI_ANY_SOURCE] + tag: TAG, [message tag or MPI_ANY_TAG] + comm: COMMUNICATOR + request: REQUEST, direction=out + +MPI_Wait_enqueue: + request: REQUEST, direction=inout, [request] + status: STATUS, direction=out + +MPI_Waitall_enqueue: + count: ARRAY_LENGTH_NNI, [lists length] + array_of_requests: REQUEST, direction=inout, length=count, [array of requests] + array_of_statuses: STATUS, direction=out, length=*, pointer=False, [array of status objects] +{ -- error_check -- array_of_statuses + if (count > 0) { + MPIR_ERRTEST_ARGNULL(array_of_statuses, "array_of_statuses", mpi_errno); + } +} diff --git a/src/include/mpir_request.h b/src/include/mpir_request.h index 27d3074d15d..f8accc7f608 100644 --- a/src/include/mpir_request.h +++ b/src/include/mpir_request.h @@ -65,6 +65,7 @@ typedef enum MPIR_Request_kind_t { MPIR_REQUEST_KIND__PART_SEND, /* Partitioned send req returned to user */ MPIR_REQUEST_KIND__PART_RECV, /* Partitioned recv req returned to user */ MPIR_REQUEST_KIND__PART, /* Partitioned pt2pt internal reqs */ + MPIR_REQUEST_KIND__ENQUEUE, /* enqueued (to gpu stream) request */ MPIR_REQUEST_KIND__GREQUEST, MPIR_REQUEST_KIND__COLL, MPIR_REQUEST_KIND__MPROBE, /* see NOTE-R1 */ @@ -220,6 +221,12 @@ struct MPIR_Request { MPL_atomic_int_t active_flag; /* flag indicating whether in a start-complete active period. * Value is 0 or 1. */ } part; /* kind : MPIR_REQUEST_KIND__PART_SEND or MPIR_REQUEST_KIND__PART_RECV */ + struct { + MPL_gpu_stream_t gpu_stream; + struct MPIR_Request *real_request; + bool is_send; + void *data; + } enqueue; struct { MPIR_Win *win; } rma; /* kind : MPIR_REQUEST_KIND__RMA */ diff --git a/src/include/mpir_typerep.h b/src/include/mpir_typerep.h index 0d4b9ecb202..0eaeae0608f 100644 --- a/src/include/mpir_typerep.h +++ b/src/include/mpir_typerep.h @@ -91,4 +91,11 @@ int MPIR_Typerep_op(void *source_buf, MPI_Aint source_count, MPI_Datatype source bool source_is_packed, int mapped_device); int MPIR_Typerep_reduce(const void *in_buf, void *out_buf, MPI_Aint count, MPI_Datatype datatype, MPI_Op op); + +int MPIR_Typerep_pack_stream(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype, + MPI_Aint inoffset, void *outbuf, MPI_Aint max_pack_bytes, + MPI_Aint * actual_pack_bytes, void *stream); +int MPIR_Typerep_unpack_stream(const void *inbuf, MPI_Aint insize, + void *outbuf, MPI_Aint outcount, MPI_Datatype datatype, + MPI_Aint outoffset, MPI_Aint * actual_unpack_bytes, void *stream); #endif /* MPIR_TYPEREP_H_INCLUDED */ diff --git a/src/mpi/datatype/typerep/src/typerep_dataloop_pack.c b/src/mpi/datatype/typerep/src/typerep_dataloop_pack.c index 7e7b306f9d5..66bfee61254 100644 --- a/src/mpi/datatype/typerep/src/typerep_dataloop_pack.c +++ b/src/mpi/datatype/typerep/src/typerep_dataloop_pack.c @@ -210,3 +210,25 @@ int MPIR_Typerep_reduce(const void *in_buf, void *out_buf, MPI_Aint count, MPI_D return mpi_errno; } + +int MPIR_Typerep_pack_stream(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype, + MPI_Aint inoffset, void *outbuf, MPI_Aint max_pack_bytes, + MPI_Aint * actual_pack_bytes, void *stream) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_Assert(0); + + return mpi_errno; +} + +int MPIR_Typerep_unpack_stream(const void *inbuf, MPI_Aint insize, void *outbuf, + MPI_Aint outcount, MPI_Datatype datatype, MPI_Aint outoffset, + MPI_Aint * actual_unpack_bytes, void *stream) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_Assert(0); + + return mpi_errno; +} diff --git a/src/mpi/datatype/typerep/src/typerep_yaksa_pack.c b/src/mpi/datatype/typerep/src/typerep_yaksa_pack.c index 1c6e6585355..dbe546e6cde 100644 --- a/src/mpi/datatype/typerep/src/typerep_yaksa_pack.c +++ b/src/mpi/datatype/typerep/src/typerep_yaksa_pack.c @@ -579,3 +579,49 @@ static int typerep_op_pack(void *source_buf, void *target_buf, MPI_Aint count, fn_fail: goto fn_exit; } + +int MPIR_Typerep_pack_stream(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype, + MPI_Aint inoffset, void *outbuf, MPI_Aint max_pack_bytes, + MPI_Aint * actual_pack_bytes, void *stream) +{ + MPIR_FUNC_ENTER; + + int mpi_errno = MPI_SUCCESS; + int rc; + + yaksa_type_t type = MPII_Typerep_get_yaksa_type(datatype); + uintptr_t packed_bytes;; + rc = yaksa_pack_stream(inbuf, incount, type, inoffset, outbuf, max_pack_bytes, + &packed_bytes, NULL, YAKSA_OP__REPLACE, stream); + MPIR_ERR_CHKANDJUMP(rc, mpi_errno, MPI_ERR_INTERN, "**yaksa"); + *actual_pack_bytes = packed_bytes; + + fn_exit: + MPIR_FUNC_EXIT; + return mpi_errno; + fn_fail: + goto fn_exit; +} + +int MPIR_Typerep_unpack_stream(const void *inbuf, MPI_Aint insize, void *outbuf, + MPI_Aint outcount, MPI_Datatype datatype, MPI_Aint outoffset, + MPI_Aint * actual_unpack_bytes, void *stream) +{ + MPIR_FUNC_ENTER; + + int mpi_errno = MPI_SUCCESS; + int rc; + + yaksa_type_t type = MPII_Typerep_get_yaksa_type(datatype); + uintptr_t unpacked_bytes;; + rc = yaksa_unpack_stream(inbuf, insize, outbuf, outcount, type, outoffset, + &unpacked_bytes, NULL, YAKSA_OP__REPLACE, stream); + MPIR_ERR_CHKANDJUMP(rc, mpi_errno, MPI_ERR_INTERN, "**yaksa"); + *actual_unpack_bytes = unpacked_bytes; + + fn_exit: + MPIR_FUNC_EXIT; + return mpi_errno; + fn_fail: + goto fn_exit; +} diff --git a/src/mpi/errhan/errnames.txt b/src/mpi/errhan/errnames.txt index 8ee0d7418cb..e053dcfc8a7 100644 --- a/src/mpi/errhan/errnames.txt +++ b/src/mpi/errhan/errnames.txt @@ -955,6 +955,9 @@ is too big (> MPIU_SHMW_GHND_SZ) **ch4nostream:No streams available. Configure --enable-thread-cs=per-vci and --with-ch4-max-vcis=# to enable streams. **outofstream:No streams available. Use MPIR_CVAR_CH4_RESERVE_VCIS to reserve the number of streams can be allocated. **cannotfreestream:The stream is still in use, cannot be freed. +**missinggpustream:Info hint 'type' is set, but info hint 'value' is missing. +**invalidgpustream:Info hint 'type' is set, but info hint 'value' is invalid. +**notgpustream:The communicator does not have a local gpu stream attached. # ----------------------------------------------------------------------------- # The following names are defined but not used (see the -careful option diff --git a/src/mpi/stream/stream_impl.c b/src/mpi/stream/stream_impl.c index 2078d136b2b..49989508fad 100644 --- a/src/mpi/stream/stream_impl.c +++ b/src/mpi/stream/stream_impl.c @@ -9,6 +9,55 @@ #define MPIR_STREAM_PREALLOC 8 #endif +#define GPU_STREAM_USE_SINGLE_VCI + +#ifdef GPU_STREAM_USE_SINGLE_VCI +static int gpu_stream_vci = 0; +static int gpu_stream_count = 0; + +static int allocate_vci(int *vci, bool is_gpu_stream) +{ + if (is_gpu_stream) { + int mpi_errno = MPI_SUCCESS; + if (!gpu_stream_vci) { + mpi_errno = MPID_Allocate_vci(&gpu_stream_vci); + } + gpu_stream_count++; + *vci = gpu_stream_vci; + return mpi_errno; + } else { + return MPID_Allocate_vci(vci); + } +} + +static int deallocate_vci(int vci) +{ + if (vci == gpu_stream_vci) { + gpu_stream_count--; + if (gpu_stream_count == 0) { + gpu_stream_vci = 0; + return MPID_Deallocate_vci(vci); + } else { + return MPI_SUCCESS; + } + } else { + return MPID_Deallocate_vci(vci); + } +} + +#else +static int allocate_vci(int *vci, bool is_gpu_stream) +{ + return MPID_Allocate_vci(vci); +} + +static int deallocate_vci(int *vci) +{ + return MPID_Deallocate_vci(vci); +} + +#endif + MPIR_Stream MPIR_Stream_direct[MPIR_STREAM_PREALLOC]; MPIR_Object_alloc_t MPIR_Stream_mem = { 0, 0, 0, 0, 0, 0, MPIR_STREAM, @@ -60,15 +109,18 @@ int MPIR_Stream_create_impl(MPIR_Info * info_ptr, MPIR_Stream ** p_stream_ptr) /* TODO: proper conversion for each gpu stream type */ const char *s_value = MPIR_Info_lookup(info_ptr, "value"); - MPIR_Assertp(s_value); + MPIR_ERR_CHKANDJUMP(!s_value, mpi_errno, MPI_ERR_OTHER, "**missinggpustream"); + mpi_errno = MPIR_Info_decode_hex(s_value, &stream_ptr->u.gpu_stream, sizeof(MPL_gpu_stream_t)); MPIR_ERR_CHECK(mpi_errno); + MPIR_ERR_CHKANDJUMP(!MPL_gpu_stream_is_valid(stream_ptr->u.gpu_stream), + mpi_errno, MPI_ERR_OTHER, "**invalidgpustream"); } else { stream_ptr->type = MPIR_STREAM_GENERAL; } - mpi_errno = MPID_Allocate_vci(&stream_ptr->vci); + mpi_errno = allocate_vci(&stream_ptr->vci, stream_ptr->type == MPIR_STREAM_GPU); MPIR_ERR_CHECK(mpi_errno); *p_stream_ptr = stream_ptr; @@ -89,7 +141,7 @@ int MPIR_Stream_free_impl(MPIR_Stream * stream_ptr) MPIR_ERR_CHKANDJUMP(ref_cnt != 0, mpi_errno, MPI_ERR_OTHER, "**cannotfreestream"); if (stream_ptr->vci) { - mpi_errno = MPID_Deallocate_vci(stream_ptr->vci); + mpi_errno = deallocate_vci(stream_ptr->vci); } MPIR_Handle_obj_free(&MPIR_Stream_mem, stream_ptr); @@ -134,3 +186,507 @@ int MPIR_Stream_comm_create_impl(MPIR_Comm * comm_ptr, MPIR_Stream * stream_ptr, fn_fail: goto fn_exit; } + +/* ---- CUDA stream send/recv enqueue ---- */ + +static int get_local_gpu_stream(MPIR_Comm * comm_ptr, MPL_gpu_stream_t * gpu_stream) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_Stream *stream_ptr = NULL; + if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_SINGLE) { + stream_ptr = comm_ptr->stream_comm.single.stream; + } else if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_MULTIPLEX) { + stream_ptr = comm_ptr->stream_comm.multiplex.local_streams[comm_ptr->rank]; + } + + MPIR_ERR_CHKANDJUMP(!stream_ptr || stream_ptr->type != MPIR_STREAM_GPU, + mpi_errno, MPI_ERR_OTHER, "**notgpustream"); + *gpu_stream = stream_ptr->u.gpu_stream; + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +static int allocate_enqueue_request(MPIR_Comm * comm_ptr, MPIR_Request ** req) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_Stream *stream_ptr = NULL; + if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_SINGLE) { + stream_ptr = comm_ptr->stream_comm.single.stream; + } else if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_MULTIPLEX) { + stream_ptr = comm_ptr->stream_comm.multiplex.local_streams[comm_ptr->rank]; + } + MPIR_Assert(stream_ptr); + + int vci = stream_ptr->vci; + MPIR_Assert(vci > 0); + + /* stream vci are only accessed within a serialized context */ + (*req) = MPIR_Request_create_from_pool_safe(MPIR_REQUEST_KIND__ENQUEUE, vci, 1); + (*req)->u.enqueue.gpu_stream = stream_ptr->u.gpu_stream; + (*req)->u.enqueue.real_request = NULL; + + return mpi_errno; +} + +/* send enqueue */ +struct send_data { + const void *buf; + MPI_Aint count; + MPI_Datatype datatype; + int dest; + int tag; + MPIR_Comm *comm_ptr; + void *host_buf; + MPI_Aint data_sz; + MPI_Aint actual_pack_bytes; + /* for isend */ + MPIR_Request *req; +}; + +static void send_enqueue_cb(void *data) +{ + int mpi_errno; + MPIR_Request *request_ptr = NULL; + + struct send_data *p = data; + if (p->host_buf) { + assert(p->actual_pack_bytes == p->data_sz); + + mpi_errno = MPID_Send(p->host_buf, p->data_sz, MPI_BYTE, p->dest, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, &request_ptr); + } else { + mpi_errno = MPID_Send(p->buf, p->count, p->datatype, p->dest, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, &request_ptr); + } + assert(mpi_errno == MPI_SUCCESS); + assert(request_ptr != NULL); + + mpi_errno = MPID_Wait(request_ptr, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + MPIR_Request_free(request_ptr); + + if (p->host_buf) { + MPIR_gpu_free_host(p->host_buf); + } + MPL_free(data); +} + +int MPIR_Send_enqueue_impl(const void *buf, MPI_Aint count, MPI_Datatype datatype, + int dest, int tag, MPIR_Comm * comm_ptr) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream; + mpi_errno = get_local_gpu_stream(comm_ptr, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + struct send_data *p; + p = MPL_malloc(sizeof(struct send_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + p->dest = dest; + p->tag = tag; + p->comm_ptr = comm_ptr; + + if (MPIR_GPU_query_pointer_is_dev(buf)) { + MPI_Aint dt_size; + MPIR_Datatype_get_size_macro(datatype, dt_size); + p->data_sz = dt_size * count; + + MPIR_gpu_malloc_host(&p->host_buf, p->data_sz); + + mpi_errno = MPIR_Typerep_pack_stream(buf, count, datatype, 0, p->host_buf, p->data_sz, + &p->actual_pack_bytes, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + } else { + p->host_buf = NULL; + p->buf = buf; + p->count = count; + p->datatype = datatype; + } + + MPL_gpu_launch_hostfn(gpu_stream, send_enqueue_cb, p); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +/* ---- recv enqueue ---- */ +struct recv_data { + void *buf; + MPI_Aint count; + MPI_Datatype datatype; + int source; + int tag; + MPIR_Comm *comm_ptr; + MPI_Status *status; + void *host_buf; + MPI_Aint data_sz; + MPI_Aint actual_unpack_bytes; + /* for irend */ + MPIR_Request *req; +}; + +static void recv_enqueue_cb(void *data) +{ + int mpi_errno; + MPIR_Request *request_ptr = NULL; + + struct recv_data *p = data; + if (p->host_buf) { + mpi_errno = MPID_Recv(p->host_buf, p->data_sz, MPI_BYTE, p->source, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, p->status, &request_ptr); + } else { + mpi_errno = MPID_Recv(p->buf, p->count, p->datatype, p->source, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, p->status, &request_ptr); + } + assert(mpi_errno == MPI_SUCCESS); + assert(request_ptr != NULL); + + mpi_errno = MPID_Wait(request_ptr, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + MPIR_Request_extract_status(request_ptr, p->status); + MPIR_Request_free(request_ptr); + + if (!p->host_buf) { + /* we are done */ + MPL_free(p); + } +} + +static void recv_stream_cleanup_cb(void *data) +{ + struct recv_data *p = data; + assert(p->actual_unpack_bytes == p->data_sz); + + MPIR_gpu_free_host(p->host_buf); + MPL_free(data); +} + +int MPIR_Recv_enqueue_impl(void *buf, MPI_Aint count, MPI_Datatype datatype, + int source, int tag, MPIR_Comm * comm_ptr, MPI_Status * status) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream; + mpi_errno = get_local_gpu_stream(comm_ptr, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + struct recv_data *p; + p = MPL_malloc(sizeof(struct recv_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + p->source = source; + p->tag = tag; + p->comm_ptr = comm_ptr; + p->status = status; + + if (MPIR_GPU_query_pointer_is_dev(buf)) { + MPI_Aint dt_size; + MPIR_Datatype_get_size_macro(datatype, dt_size); + p->data_sz = dt_size * count; + + MPIR_gpu_malloc_host(&p->host_buf, p->data_sz); + + MPL_gpu_launch_hostfn(gpu_stream, recv_enqueue_cb, p); + + mpi_errno = MPIR_Typerep_unpack_stream(p->host_buf, p->data_sz, buf, count, datatype, 0, + &p->actual_unpack_bytes, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + MPL_gpu_launch_hostfn(gpu_stream, recv_stream_cleanup_cb, p); + } else { + p->host_buf = NULL; + p->buf = buf; + p->count = count; + p->datatype = datatype; + + MPL_gpu_launch_hostfn(gpu_stream, recv_enqueue_cb, p); + } + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +/* ---- isend enqueue ---- */ +static void isend_enqueue_cb(void *data) +{ + int mpi_errno; + MPIR_Request *request_ptr = NULL; + + struct send_data *p = data; + if (p->host_buf) { + assert(p->actual_pack_bytes == p->data_sz); + + mpi_errno = MPID_Send(p->host_buf, p->data_sz, MPI_BYTE, p->dest, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, &request_ptr); + } else { + mpi_errno = MPID_Send(p->buf, p->count, p->datatype, p->dest, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, &request_ptr); + } + assert(mpi_errno == MPI_SUCCESS); + assert(request_ptr != NULL); + + p->req->u.enqueue.real_request = request_ptr; +} + +int MPIR_Isend_enqueue_impl(const void *buf, MPI_Aint count, MPI_Datatype datatype, + int dest, int tag, MPIR_Comm * comm_ptr, MPIR_Request ** req) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream; + mpi_errno = get_local_gpu_stream(comm_ptr, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + struct send_data *p; + p = MPL_malloc(sizeof(struct send_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + mpi_errno = allocate_enqueue_request(comm_ptr, req); + MPIR_ERR_CHECK(mpi_errno); + (*req)->u.enqueue.is_send = true; + (*req)->u.enqueue.data = p; + + p->req = *req; + p->dest = dest; + p->tag = tag; + p->comm_ptr = comm_ptr; + + if (MPIR_GPU_query_pointer_is_dev(buf)) { + MPI_Aint dt_size; + MPIR_Datatype_get_size_macro(datatype, dt_size); + p->data_sz = dt_size * count; + + MPIR_gpu_malloc_host(&p->host_buf, p->data_sz); + + mpi_errno = MPIR_Typerep_pack_stream(buf, count, datatype, 0, p->host_buf, p->data_sz, + &p->actual_pack_bytes, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + } else { + p->host_buf = NULL; + p->buf = buf; + p->count = count; + p->datatype = datatype; + } + + MPL_gpu_launch_hostfn(gpu_stream, isend_enqueue_cb, p); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +/* ---- irecv enqueue ---- */ +static void irecv_enqueue_cb(void *data) +{ + int mpi_errno; + MPIR_Request *request_ptr = NULL; + + struct recv_data *p = data; + if (p->host_buf) { + mpi_errno = MPID_Recv(p->host_buf, p->data_sz, MPI_BYTE, p->source, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, p->status, &request_ptr); + } else { + mpi_errno = MPID_Recv(p->buf, p->count, p->datatype, p->source, p->tag, p->comm_ptr, + MPIR_CONTEXT_INTRA_PT2PT, p->status, &request_ptr); + } + assert(mpi_errno == MPI_SUCCESS); + assert(request_ptr != NULL); + + p->req->u.enqueue.real_request = request_ptr; +} + +int MPIR_Irecv_enqueue_impl(void *buf, MPI_Aint count, MPI_Datatype datatype, + int source, int tag, MPIR_Comm * comm_ptr, MPIR_Request ** req) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream; + mpi_errno = get_local_gpu_stream(comm_ptr, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + struct recv_data *p; + p = MPL_malloc(sizeof(struct recv_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + mpi_errno = allocate_enqueue_request(comm_ptr, req); + MPIR_ERR_CHECK(mpi_errno); + (*req)->u.enqueue.is_send = false; + (*req)->u.enqueue.data = p; + + p->req = *req; + p->source = source; + p->tag = tag; + p->comm_ptr = comm_ptr; + p->status = MPI_STATUS_IGNORE; + + if (MPIR_GPU_query_pointer_is_dev(buf)) { + MPI_Aint dt_size; + MPIR_Datatype_get_size_macro(datatype, dt_size); + p->data_sz = dt_size * count; + + MPIR_gpu_malloc_host(&p->host_buf, p->data_sz); + + MPL_gpu_launch_hostfn(gpu_stream, recv_enqueue_cb, p); + + mpi_errno = MPIR_Typerep_unpack_stream(p->host_buf, p->data_sz, buf, count, datatype, 0, + &p->actual_unpack_bytes, &gpu_stream); + MPIR_ERR_CHECK(mpi_errno); + + MPL_gpu_launch_hostfn(gpu_stream, recv_stream_cleanup_cb, p); + } else { + p->host_buf = NULL; + p->buf = buf; + p->count = count; + p->datatype = datatype; + + MPL_gpu_launch_hostfn(gpu_stream, irecv_enqueue_cb, p); + } + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +/* ---- wait enqueue ---- */ +static void wait_enqueue_cb(void *data) +{ + int mpi_errno; + MPIR_Request *enqueue_req = data; + MPIR_Request *real_req = enqueue_req->u.enqueue.real_request; + + if (enqueue_req->u.enqueue.is_send) { + struct send_data *p = enqueue_req->u.enqueue.data; + + mpi_errno = MPID_Wait(real_req, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + MPIR_Request_free(real_req); + + if (p->host_buf) { + MPIR_gpu_free_host(p->host_buf); + } + MPL_free(p); + } else { + struct recv_data *p = enqueue_req->u.enqueue.data; + + mpi_errno = MPID_Wait(real_req, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + MPIR_Request_extract_status(real_req, p->status); + MPIR_Request_free(real_req); + + if (!p->host_buf) { + MPL_free(p); + } + } + MPIR_Request_free(enqueue_req); +} + +int MPIR_Wait_enqueue_impl(MPIR_Request * req_ptr, MPI_Status * status) +{ + int mpi_errno = MPI_SUCCESS; + MPIR_Assert(req_ptr && req_ptr->kind == MPIR_REQUEST_KIND__ENQUEUE); + + MPL_gpu_stream_t gpu_stream = req_ptr->u.enqueue.gpu_stream; + if (!req_ptr->u.enqueue.is_send) { + struct recv_data *p = req_ptr->u.enqueue.data; + p->status = status; + } + + MPL_gpu_launch_hostfn(gpu_stream, wait_enqueue_cb, req_ptr); + + return mpi_errno; +} + +/* ---- waitall enqueue ---- */ +struct waitall_data { + int count; + MPI_Request *array_of_requests; + MPI_Status *array_of_statuses; +}; + +static void waitall_enqueue_cb(void *data) +{ + struct waitall_data *p = data; + + MPI_Request *reqs = MPL_malloc(p->count * sizeof(MPI_Request), MPL_MEM_OTHER); + MPIR_Assert(reqs); + + for (int i = 0; i < p->count; i++) { + MPIR_Request *enqueue_req; + MPIR_Request_get_ptr(p->array_of_requests[i], enqueue_req); + reqs[i] = enqueue_req->u.enqueue.real_request->handle; + } + + MPIR_Waitall(p->count, reqs, p->array_of_statuses); + + for (int i = 0; i < p->count; i++) { + MPIR_Request *enqueue_req; + MPIR_Request_get_ptr(p->array_of_requests[i], enqueue_req); + + if (enqueue_req->u.enqueue.is_send) { + struct send_data *p2 = enqueue_req->u.enqueue.data; + if (p2->host_buf) { + MPIR_gpu_free_host(p2->host_buf); + } + MPL_free(p2); + } else { + struct recv_data *p2 = enqueue_req->u.enqueue.data; + if (!p2->host_buf) { + MPL_free(p2); + } + } + MPIR_Request_free(enqueue_req); + } + MPL_free(reqs); + MPL_free(p); +} + +int MPIR_Waitall_enqueue_impl(int count, MPI_Request * array_of_requests, + MPI_Status * array_of_statuses) +{ + int mpi_errno = MPI_SUCCESS; + + MPL_gpu_stream_t gpu_stream = MPL_GPU_STREAM_DEFAULT; + for (int i = 0; i < count; i++) { + MPIR_Request *enqueue_req; + MPIR_Request_get_ptr(array_of_requests[i], enqueue_req); + + MPIR_Assert(enqueue_req && enqueue_req->kind == MPIR_REQUEST_KIND__ENQUEUE); + if (i == 0) { + gpu_stream = enqueue_req->u.enqueue.gpu_stream; + } else { + MPIR_Assert(gpu_stream == enqueue_req->u.enqueue.gpu_stream); + } + } + + struct waitall_data *p; + p = MPL_malloc(sizeof(struct waitall_data), MPL_MEM_OTHER); + MPIR_ERR_CHKANDJUMP(!p, mpi_errno, MPI_ERR_OTHER, "**nomem"); + + p->count = count; + p->array_of_requests = array_of_requests; + p->array_of_statuses = array_of_statuses; + + MPL_gpu_launch_hostfn(gpu_stream, waitall_enqueue_cb, p); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} diff --git a/src/mpl/include/mpl_gpu.h b/src/mpl/include/mpl_gpu.h index ea94f2900b2..54b387d6930 100644 --- a/src/mpl/include/mpl_gpu.h +++ b/src/mpl/include/mpl_gpu.h @@ -81,4 +81,8 @@ int MPL_gpu_get_buffer_bounds(const void *ptr, void **pbase, uintptr_t * len); int MPL_gpu_free_hook_register(void (*free_hook) (void *dptr)); int MPL_gpu_get_dev_count(int *dev_cnt, int *dev_id); +typedef void (*MPL_gpu_hostfn) (void *data); +int MPL_gpu_launch_hostfn(MPL_gpu_stream_t stream, MPL_gpu_hostfn fn, void *data); +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream); + #endif /* ifndef MPL_GPU_H_INCLUDED */ diff --git a/src/mpl/include/mpl_gpu_cuda.h b/src/mpl/include/mpl_gpu_cuda.h index ddf8bc3935c..e79c6cf2552 100644 --- a/src/mpl/include/mpl_gpu_cuda.h +++ b/src/mpl/include/mpl_gpu_cuda.h @@ -13,6 +13,8 @@ typedef cudaIpcMemHandle_t MPL_gpu_ipc_mem_handle_t; typedef int MPL_gpu_device_handle_t; typedef struct cudaPointerAttributes MPL_gpu_device_attr; typedef cudaStream_t MPL_gpu_stream_t; + +#define MPL_GPU_STREAM_DEFAULT 0 #define MPL_GPU_DEVICE_INVALID -1 #endif /* ifndef MPL_GPU_CUDA_H_INCLUDED */ diff --git a/src/mpl/include/mpl_gpu_fallback.h b/src/mpl/include/mpl_gpu_fallback.h index 59d3172c68f..f937c0f619b 100644 --- a/src/mpl/include/mpl_gpu_fallback.h +++ b/src/mpl/include/mpl_gpu_fallback.h @@ -10,6 +10,8 @@ typedef int MPL_gpu_ipc_mem_handle_t; typedef int MPL_gpu_device_handle_t; typedef int MPL_gpu_device_attr; /* dummy type */ typedef int MPL_gpu_stream_t; + +#define MPL_GPU_STREAM_DEFAULT 0 #define MPL_GPU_DEVICE_INVALID -1 #endif /* ifndef MPL_GPU_CUDA_H_INCLUDED */ diff --git a/src/mpl/include/mpl_gpu_hip.h b/src/mpl/include/mpl_gpu_hip.h index 49ea83d422b..f71d38570d6 100644 --- a/src/mpl/include/mpl_gpu_hip.h +++ b/src/mpl/include/mpl_gpu_hip.h @@ -13,6 +13,8 @@ typedef hipIpcMemHandle_t MPL_gpu_ipc_mem_handle_t; typedef int MPL_gpu_device_handle_t; typedef struct hipPointerAttribute_t MPL_gpu_device_attr; typedef hipStream_t MPL_gpu_stream_t; + +#define MPL_GPU_STREAM_DEFAULT 0 #define MPL_GPU_DEVICE_INVALID -1 #endif /* ifndef MPL_GPU_HIP_H_INCLUDED */ diff --git a/src/mpl/include/mpl_gpu_ze.h b/src/mpl/include/mpl_gpu_ze.h index eb615e4d3ef..5736a5ac7eb 100644 --- a/src/mpl/include/mpl_gpu_ze.h +++ b/src/mpl/include/mpl_gpu_ze.h @@ -18,6 +18,8 @@ typedef ze_device_handle_t MPL_gpu_device_handle_t; typedef ze_alloc_attr_t MPL_gpu_device_attr; /* FIXME: implement ze stream */ typedef int MPL_gpu_stream_t; + +#define MPL_GPU_STREAM_DEFAULT 0 #define MPL_GPU_DEVICE_INVALID NULL #endif /* ifndef MPL_GPU_ZE_H_INCLUDED */ diff --git a/src/mpl/src/gpu/mpl_gpu_cuda.c b/src/mpl/src/gpu/mpl_gpu_cuda.c index 346de7e0dda..894c2b2cdb2 100644 --- a/src/mpl/src/gpu/mpl_gpu_cuda.c +++ b/src/mpl/src/gpu/mpl_gpu_cuda.c @@ -409,3 +409,20 @@ cudaError_t CUDARTAPI cudaFree(void *dptr) result = sys_cudaFree(dptr); return result; } + +int MPL_gpu_launch_hostfn(cudaStream_t stream, MPL_gpu_hostfn fn, void *data) +{ + cudaError_t result; + result = cudaLaunchHostFunc(stream, fn, data); + return result; +} + +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream) +{ + cudaError_t result; + /* CUDA may blindly dereference the stream as pointer, which may segfault + * if the wrong value is passed in. This is still better than segfault later + * upon using the stream. */ + result = cudaStreamQuery(stream); + return (result != cudaErrorInvalidResourceHandle); +} diff --git a/src/mpl/src/gpu/mpl_gpu_fallback.c b/src/mpl/src/gpu/mpl_gpu_fallback.c index 8b50faee152..c5f42e8e6ae 100644 --- a/src/mpl/src/gpu/mpl_gpu_fallback.c +++ b/src/mpl/src/gpu/mpl_gpu_fallback.c @@ -97,3 +97,13 @@ int MPL_gpu_free_hook_register(void (*free_hook) (void *dptr)) { return MPL_SUCCESS; } + +int MPL_gpu_launch_hostfn(int stream, MPL_gpu_hostfn fn, void *data) +{ + return -1; +} + +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream) +{ + return false; +} diff --git a/src/mpl/src/gpu/mpl_gpu_hip.c b/src/mpl/src/gpu/mpl_gpu_hip.c index 10619f4529d..28ed2963e7c 100644 --- a/src/mpl/src/gpu/mpl_gpu_hip.c +++ b/src/mpl/src/gpu/mpl_gpu_hip.c @@ -367,4 +367,15 @@ hipError_t hipFree(void *dptr) result = sys_hipFree(dptr); return result; } + +int MPL_gpu_launch_hostfn(hipStream_t stream, MPL_gpu_hostfn fn, void *data) +{ + return -1; +} + +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream) +{ + return false; +} + #endif /* MPL_HAVE_HIP */ diff --git a/src/mpl/src/gpu/mpl_gpu_ze.c b/src/mpl/src/gpu/mpl_gpu_ze.c index d912fa2784c..e5aa5d11176 100644 --- a/src/mpl/src/gpu/mpl_gpu_ze.c +++ b/src/mpl/src/gpu/mpl_gpu_ze.c @@ -367,4 +367,14 @@ int MPL_gpu_free_hook_register(void (*free_hook) (void *dptr)) return MPL_SUCCESS; } +int MPL_gpu_launch_hostfn(int stream, MPL_gpu_hostfn fn, void *data) +{ + return -1; +} + +bool MPL_gpu_stream_is_valid(MPL_gpu_stream_t stream) +{ + return false; +} + #endif /* MPL_HAVE_ZE */ diff --git a/test/mpi/impls/mpich/cuda/Makefile.am b/test/mpi/impls/mpich/cuda/Makefile.am index 59b18b8778e..8f29cb4189d 100644 --- a/test/mpi/impls/mpich/cuda/Makefile.am +++ b/test/mpi/impls/mpich/cuda/Makefile.am @@ -8,4 +8,5 @@ include $(top_srcdir)/Makefile_cuda.mtest LDADD += -lm noinst_PROGRAMS = \ - saxpy + saxpy \ + stream diff --git a/test/mpi/impls/mpich/cuda/stream.cu b/test/mpi/impls/mpich/cuda/stream.cu new file mode 100644 index 00000000000..f2e36e7b336 --- /dev/null +++ b/test/mpi/impls/mpich/cuda/stream.cu @@ -0,0 +1,133 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#include +#include +#include + +const int N = 1000000; +const int a = 2.0; + +static void init_x(float *x) +{ + for (int i = 0; i < N; i++) { + x[i] = 1.0f; + } +} + +static void init_y(float *y) +{ + for (int i = 0; i < N; i++) { + y[i] = 2.0f; + } +} + +static int check_result(float *y) +{ + float maxError = 0.0f; + int errs = 0; + for (int i = 0; i < N; i++) { + if (abs(y[i] - 4.0f) > 0.01) { + errs++; + maxError = max(maxError, abs(y[i]-4.0f)); + } + } + if (errs > 0) { + printf("%d errors, Max error: %f\n", errs, maxError); + } + return errs; +} + +__global__ +void saxpy(int n, float a, float *x, float *y) +{ + int i = blockIdx.x*blockDim.x + threadIdx.x; + if (i < n) y[i] = a*x[i] + y[i]; +} + +int main(void) +{ + int errs = 0; + + cudaStream_t stream; + cudaStreamCreate(&stream); + + int mpi_errno; + int rank, size; + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + + if (size < 2) { + printf("This test require 2 processes\n"); + exit(1); + } + + float *x, *y, *d_x, *d_y; + x = (float*)malloc(N*sizeof(float)); + y = (float*)malloc(N*sizeof(float)); + + cudaMalloc(&d_x, N*sizeof(float)); + cudaMalloc(&d_y, N*sizeof(float)); + + if (rank == 0) { + init_x(x); + } else if (rank == 1) { + init_y(y); + } + + MPI_Info info; + MPI_Info_create(&info); + MPI_Info_set(info, "type", "cudaStream_t"); + MPIX_Info_set_hex(info, "value", &stream, sizeof(stream)); + + MPIX_Stream mpi_stream; + MPIX_Stream_create(info, &mpi_stream); + + MPI_Info_free(&info); + + MPI_Comm stream_comm; + MPIX_Stream_comm_create(MPI_COMM_WORLD, mpi_stream, &stream_comm); + + /* Rank 0 sends x data to Rank 1, Rank 1 performs a * x + y and checks result */ + if (rank == 0) { + cudaMemcpyAsync(d_x, x, N*sizeof(float), cudaMemcpyHostToDevice, stream); + + mpi_errno = MPIX_Send_enqueue(d_x, N, MPI_FLOAT, 1, 0, stream_comm); + assert(mpi_errno == MPI_SUCCESS); + + cudaStreamSynchronize(stream); + } else if (rank == 1) { + cudaMemcpyAsync(d_y, y, N*sizeof(float), cudaMemcpyHostToDevice, stream); + + mpi_errno = MPIX_Recv_enqueue(d_x, N, MPI_FLOAT, 0, 0, stream_comm, MPI_STATUS_IGNORE); + assert(mpi_errno == MPI_SUCCESS); + + saxpy<<<(N+255)/256, 256, 0, stream>>>(N, a, d_x, d_y); + + cudaMemcpyAsync(y, d_y, N*sizeof(float), cudaMemcpyDeviceToHost, stream); + + cudaStreamSynchronize(stream); + } + + if (rank == 1) { + int errs = check_result(y); + if (errs == 0) { + printf("No Errors\n"); + } + } + + MPI_Comm_free(&stream_comm); + MPIX_Stream_free(&mpi_stream); + + cudaFree(d_x); + cudaFree(d_y); + free(x); + free(y); + + cudaStreamDestroy(stream); + MPI_Finalize(); + return errs; +} diff --git a/test/mpi/impls/mpich/cuda/testlist b/test/mpi/impls/mpich/cuda/testlist index 68bfd0db9cc..da2799094bd 100644 --- a/test/mpi/impls/mpich/cuda/testlist +++ b/test/mpi/impls/mpich/cuda/testlist @@ -1 +1,2 @@ saxpy 2 +stream 2 env=MPIR_CVAR_CH4_RESERVE_VCIS=1