Skip to content

Commit

Permalink
Merge pull request #5906 from hzhou/2203_stream_2
Browse files Browse the repository at this point in the history
stream: implement gpu stream enqueue functions

Approved-by: Ken Raffenetti
  • Loading branch information
hzhou authored Apr 27, 2022
2 parents a236bdc + 24e304f commit f39d5ee
Show file tree
Hide file tree
Showing 20 changed files with 892 additions and 4 deletions.
3 changes: 3 additions & 0 deletions maint/local_python/binding_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,9 @@ def process_func_parameters(func):
if RE.match(r'mpi_startall', func['name'], re.IGNORECASE):
impl_arg_list.append(ptrs_name)
impl_param_list.append("MPIR_Request **%s" % ptrs_name)
else:
impl_arg_list.append(name)
impl_param_list.append("MPI_Request %s[]" % name)
else:
print("Unhandled handle array: " + name, file=sys.stderr)
elif "code-handle_ptr-tail" in func and name in func['code-handle_ptr-tail']:
Expand Down
49 changes: 49 additions & 0 deletions src/binding/c/stream_api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,52 @@ MPIX_Stream_comm_create:
comm: COMMUNICATOR, [communicator]
stream: STREAM, [stream object]
newcomm: COMMUNICATOR, direction=out, [new stream-associated communicator]

MPIX_Send_enqueue:
buf: BUFFER, constant=True, [initial address of send buffer]
count: POLYXFER_NUM_ELEM_NNI, [number of elements in send buffer]
datatype: DATATYPE, [datatype of each send buffer element]
dest: RANK, [rank of destination]
tag: TAG, [message tag]
comm: COMMUNICATOR

MPIX_Recv_enqueue:
buf: BUFFER, direction=out, [initial address of receive buffer]
count: POLYXFER_NUM_ELEM_NNI, [number of elements in receive buffer]
datatype: DATATYPE, [datatype of each receive buffer element]
source: RANK, [rank of source or MPI_ANY_SOURCE]
tag: TAG, [message tag or MPI_ANY_TAG]
comm: COMMUNICATOR
status: STATUS, direction=out

MPIX_Isend_enqueue:
buf: BUFFER, constant=True, [initial address of send buffer]
count: POLYXFER_NUM_ELEM_NNI, [number of elements in send buffer]
datatype: DATATYPE, [datatype of each send buffer element]
dest: RANK, [rank of destination]
tag: TAG, [message tag]
comm: COMMUNICATOR
request: REQUEST, direction=out

MPIX_Irecv_enqueue:
buf: BUFFER, direction=out, [initial address of receive buffer]
count: POLYXFER_NUM_ELEM_NNI, [number of elements in receive buffer]
datatype: DATATYPE, [datatype of each receive buffer element]
source: RANK, [rank of source or MPI_ANY_SOURCE]
tag: TAG, [message tag or MPI_ANY_TAG]
comm: COMMUNICATOR
request: REQUEST, direction=out

MPI_Wait_enqueue:
request: REQUEST, direction=inout, [request]
status: STATUS, direction=out

MPI_Waitall_enqueue:
count: ARRAY_LENGTH_NNI, [lists length]
array_of_requests: REQUEST, direction=inout, length=count, [array of requests]
array_of_statuses: STATUS, direction=out, length=*, pointer=False, [array of status objects]
{ -- error_check -- array_of_statuses
if (count > 0) {
MPIR_ERRTEST_ARGNULL(array_of_statuses, "array_of_statuses", mpi_errno);
}
}
7 changes: 7 additions & 0 deletions src/include/mpir_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ typedef enum MPIR_Request_kind_t {
MPIR_REQUEST_KIND__PART_SEND, /* Partitioned send req returned to user */
MPIR_REQUEST_KIND__PART_RECV, /* Partitioned recv req returned to user */
MPIR_REQUEST_KIND__PART, /* Partitioned pt2pt internal reqs */
MPIR_REQUEST_KIND__ENQUEUE, /* enqueued (to gpu stream) request */
MPIR_REQUEST_KIND__GREQUEST,
MPIR_REQUEST_KIND__COLL,
MPIR_REQUEST_KIND__MPROBE, /* see NOTE-R1 */
Expand Down Expand Up @@ -220,6 +221,12 @@ struct MPIR_Request {
MPL_atomic_int_t active_flag; /* flag indicating whether in a start-complete active period.
* Value is 0 or 1. */
} part; /* kind : MPIR_REQUEST_KIND__PART_SEND or MPIR_REQUEST_KIND__PART_RECV */
struct {
MPL_gpu_stream_t gpu_stream;
struct MPIR_Request *real_request;
bool is_send;
void *data;
} enqueue;
struct {
MPIR_Win *win;
} rma; /* kind : MPIR_REQUEST_KIND__RMA */
Expand Down
7 changes: 7 additions & 0 deletions src/include/mpir_typerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,11 @@ int MPIR_Typerep_op(void *source_buf, MPI_Aint source_count, MPI_Datatype source
bool source_is_packed, int mapped_device);
int MPIR_Typerep_reduce(const void *in_buf, void *out_buf, MPI_Aint count, MPI_Datatype datatype,
MPI_Op op);

int MPIR_Typerep_pack_stream(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype,
MPI_Aint inoffset, void *outbuf, MPI_Aint max_pack_bytes,
MPI_Aint * actual_pack_bytes, void *stream);
int MPIR_Typerep_unpack_stream(const void *inbuf, MPI_Aint insize,
void *outbuf, MPI_Aint outcount, MPI_Datatype datatype,
MPI_Aint outoffset, MPI_Aint * actual_unpack_bytes, void *stream);
#endif /* MPIR_TYPEREP_H_INCLUDED */
22 changes: 22 additions & 0 deletions src/mpi/datatype/typerep/src/typerep_dataloop_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,25 @@ int MPIR_Typerep_reduce(const void *in_buf, void *out_buf, MPI_Aint count, MPI_D

return mpi_errno;
}

int MPIR_Typerep_pack_stream(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype,
MPI_Aint inoffset, void *outbuf, MPI_Aint max_pack_bytes,
MPI_Aint * actual_pack_bytes, void *stream)
{
int mpi_errno = MPI_SUCCESS;

MPIR_Assert(0);

return mpi_errno;
}

int MPIR_Typerep_unpack_stream(const void *inbuf, MPI_Aint insize, void *outbuf,
MPI_Aint outcount, MPI_Datatype datatype, MPI_Aint outoffset,
MPI_Aint * actual_unpack_bytes, void *stream)
{
int mpi_errno = MPI_SUCCESS;

MPIR_Assert(0);

return mpi_errno;
}
46 changes: 46 additions & 0 deletions src/mpi/datatype/typerep/src/typerep_yaksa_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -579,3 +579,49 @@ static int typerep_op_pack(void *source_buf, void *target_buf, MPI_Aint count,
fn_fail:
goto fn_exit;
}

int MPIR_Typerep_pack_stream(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype,
MPI_Aint inoffset, void *outbuf, MPI_Aint max_pack_bytes,
MPI_Aint * actual_pack_bytes, void *stream)
{
MPIR_FUNC_ENTER;

int mpi_errno = MPI_SUCCESS;
int rc;

yaksa_type_t type = MPII_Typerep_get_yaksa_type(datatype);
uintptr_t packed_bytes;;
rc = yaksa_pack_stream(inbuf, incount, type, inoffset, outbuf, max_pack_bytes,
&packed_bytes, NULL, YAKSA_OP__REPLACE, stream);
MPIR_ERR_CHKANDJUMP(rc, mpi_errno, MPI_ERR_INTERN, "**yaksa");
*actual_pack_bytes = packed_bytes;

fn_exit:
MPIR_FUNC_EXIT;
return mpi_errno;
fn_fail:
goto fn_exit;
}

int MPIR_Typerep_unpack_stream(const void *inbuf, MPI_Aint insize, void *outbuf,
MPI_Aint outcount, MPI_Datatype datatype, MPI_Aint outoffset,
MPI_Aint * actual_unpack_bytes, void *stream)
{
MPIR_FUNC_ENTER;

int mpi_errno = MPI_SUCCESS;
int rc;

yaksa_type_t type = MPII_Typerep_get_yaksa_type(datatype);
uintptr_t unpacked_bytes;;
rc = yaksa_unpack_stream(inbuf, insize, outbuf, outcount, type, outoffset,
&unpacked_bytes, NULL, YAKSA_OP__REPLACE, stream);
MPIR_ERR_CHKANDJUMP(rc, mpi_errno, MPI_ERR_INTERN, "**yaksa");
*actual_unpack_bytes = unpacked_bytes;

fn_exit:
MPIR_FUNC_EXIT;
return mpi_errno;
fn_fail:
goto fn_exit;
}
3 changes: 3 additions & 0 deletions src/mpi/errhan/errnames.txt
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,9 @@ is too big (> MPIU_SHMW_GHND_SZ)
**ch4nostream:No streams available. Configure --enable-thread-cs=per-vci and --with-ch4-max-vcis=# to enable streams.
**outofstream:No streams available. Use MPIR_CVAR_CH4_RESERVE_VCIS to reserve the number of streams can be allocated.
**cannotfreestream:The stream is still in use, cannot be freed.
**missinggpustream:Info hint 'type' is set, but info hint 'value' is missing.
**invalidgpustream:Info hint 'type' is set, but info hint 'value' is invalid.
**notgpustream:The communicator does not have a local gpu stream attached.

# -----------------------------------------------------------------------------
# The following names are defined but not used (see the -careful option
Expand Down
Loading

0 comments on commit f39d5ee

Please sign in to comment.