Skip to content

Commit

Permalink
Merge pull request #6062 from hzhou/2206_stream_workq
Browse files Browse the repository at this point in the history
stream: implement stream_workq

Approved-by: Ken Raffenetti
  • Loading branch information
hzhou authored Aug 30, 2022
2 parents ce08816 + a4aac5d commit b603e80
Show file tree
Hide file tree
Showing 47 changed files with 1,416 additions and 101 deletions.
63 changes: 63 additions & 0 deletions confdb/cudalt.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/bin/sh
##
## Copyright (C) by Argonne National Laboratory
## See COPYRIGHT in top-level directory
##

# Wrapper to create .lo objects with non-standard compilers, e.g. NVCC.
# This is hack assuming
# 1. option "-Xcompiler -fPIC" to create PIC object.
# 2. the format will remain compatible with libtool.
#

set -e

verbose=
if test "$1" = "--verbose" ; then
verbose=1
shift
fi

LO_FILEPATH="$1"
O_FILEPATH="${LO_FILEPATH%%.lo}.o"
shift # handle the rest of the arguments together with ${@}

LO_DIR=$(dirname $O_FILEPATH)
O_FILENAME=$(basename $O_FILEPATH)

LOCAL_PIC_DIR=".libs/"
LOCAL_NPIC_DIR=""
PIC_DIR="$LO_DIR/$LOCAL_PIC_DIR"
NPIC_DIR="$LO_DIR/$LOCAL_NPIC_DIR"

PIC_FILEPATH="$PIC_DIR/$O_FILENAME"
NPIC_FILEPATH="$NPIC_DIR/$O_FILENAME"
LOCAL_PIC_FILEPATH="$LOCAL_PIC_DIR$O_FILENAME"
LOCAL_NPIC_FILEPATH="$LOCAL_NPIC_DIR$O_FILENAME"

if test ! -d "$PIC_DIR" ; then
mkdir -p "$PIC_DIR"
fi

CMD="${@} -Xcompiler -fPIC -o $PIC_FILEPATH"
if test "$verbose" ; then echo "$CMD" ; fi
eval "$CMD"

CMD="${@} -o $NPIC_FILEPATH"
if test "$verbose" ; then echo "$CMD" ; fi
eval "$CMD"

cat > $LO_FILEPATH <<EOF
# $LO_FILEPATH - a libtool object file
# Generated by libtool (GNU libtool) 2.4.5
#
# Please DO NOT delete this file!
# It is necessary for linking the library.
# Name of the PIC object.
pic_object="$LOCAL_PIC_FILEPATH"
# Name of the non-PIC object.
non_pic_object="$LOCAL_NPIC_FILEPATH"
EOF

23 changes: 22 additions & 1 deletion src/binding/c/stream_api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ MPIX_Comm_get_stream:
idx: INDEX
stream: STREAM, direction=out, [stream object]

MPIX_Stream_progress:
stream: STREAM, [stream object]
.impl: mpid

MPIX_Start_progress_thread:
stream: STREAM, [stream object]

MPIX_Stop_progress_thread:
stream: STREAM, [stream object]

MPIX_Stream_send:
buf: BUFFER, constant=True, [initial address of send buffer]
count: POLYXFER_NUM_ELEM_NNI, [number of elements in send buffer]
Expand Down Expand Up @@ -155,6 +165,8 @@ MPIX_Send_enqueue:
dest: RANK, [rank of destination]
tag: TAG, [message tag]
comm: COMMUNICATOR
.impl: mpid
.decl: MPIR_Send_enqueue_impl

MPIX_Recv_enqueue:
buf: BUFFER, direction=out, [initial address of receive buffer]
Expand All @@ -164,6 +176,8 @@ MPIX_Recv_enqueue:
tag: TAG, [message tag or MPI_ANY_TAG]
comm: COMMUNICATOR
status: STATUS, direction=out
.impl: mpid
.decl: MPIR_Recv_enqueue_impl

MPIX_Isend_enqueue:
buf: BUFFER, constant=True, [initial address of send buffer]
Expand All @@ -173,25 +187,32 @@ MPIX_Isend_enqueue:
tag: TAG, [message tag]
comm: COMMUNICATOR
request: REQUEST, direction=out
.impl: mpid
.decl: MPIR_Isend_enqueue_impl

MPIX_Irecv_enqueue:
buf: BUFFER, direction=out, [initial address of receive buffer]
count: POLYXFER_NUM_ELEM_NNI, [number of elements in receive buffer]
datatype: DATATYPE, [datatype of each receive buffer element]
source: RANK, [rank of source or MPI_ANY_SOURCE]
tag: TAG, [message tag or MPI_ANY_TAG]

comm: COMMUNICATOR
request: REQUEST, direction=out
.impl: mpid
.decl: MPIR_Irecv_enqueue_impl

MPIX_Wait_enqueue:
request: REQUEST, direction=inout, [request]
status: STATUS, direction=out
.impl: mpid
.decl: MPIR_Wait_enqueue_impl

MPIX_Waitall_enqueue:
count: ARRAY_LENGTH_NNI, [lists length]
array_of_requests: REQUEST, direction=inout, length=count, [array of requests]
array_of_statuses: STATUS, direction=out, length=*, pointer=False, [array of status objects]
.impl: mpid
.decl: MPIR_Waitall_enqueue_impl
{ -- error_check -- array_of_statuses
if (count > 0) {
MPIR_ERRTEST_ARGNULL(array_of_statuses, "array_of_statuses", mpi_errno);
Expand Down
2 changes: 1 addition & 1 deletion src/include/mpiimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ typedef struct MPIR_Stream MPIR_Stream;
#include "mpir_pt2pt.h"
#include "mpir_ext.h"
#include "mpir_gpu.h"
#include "mpir_stream.h"

#ifdef HAVE_CXX_BINDING
#include "mpii_cxxinterface.h"
Expand All @@ -202,6 +201,7 @@ typedef struct MPIR_Stream MPIR_Stream;
/*****************************************************************************/

#include "mpir_thread.h" /* come first as mutexes are often depended on, e.g. request */
#include "mpir_stream.h"
#include "mpir_err.h"
#include "mpir_attr.h"
#include "mpir_group.h"
Expand Down
12 changes: 12 additions & 0 deletions src/include/mpir_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,18 @@ int MPIR_Comm_delete_internal(MPIR_Comm * comm_ptr);
void MPIR_stream_comm_init(MPIR_Comm * comm_ptr);
void MPIR_stream_comm_free(MPIR_Comm * comm_ptr);
int MPIR_Comm_copy_stream(MPIR_Comm * oldcomm, MPIR_Comm * newcomm);
int MPIR_get_local_gpu_stream(MPIR_Comm * comm_ptr, MPL_gpu_stream_t * gpu_stream);

MPL_STATIC_INLINE_PREFIX MPIR_Stream *MPIR_stream_comm_get_local_stream(MPIR_Comm * comm_ptr)
{
if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_SINGLE) {
return comm_ptr->stream_comm.single.stream;
} else if (comm_ptr->stream_comm_type == MPIR_STREAM_COMM_MULTIPLEX) {
return comm_ptr->stream_comm.multiplex.local_streams[comm_ptr->rank];
} else {
return NULL;
}
}

#define MPIR_Comm_add_ref(comm_p_) \
do { MPIR_Object_add_ref((comm_p_)); } while (0)
Expand Down
12 changes: 12 additions & 0 deletions src/include/mpir_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@
and we do not query the buffer type internally because we assume
no GPU buffer is use.
- name : MPIR_CVAR_GPU_HAS_WAIT_KERNEL
category : GPU
type : int
default : 0
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
If set to 1, avoid allocate allocating GPU registered host buffers
for temporary buffers. When stream workq and GPU wait kernels are
in use, access APIs for GPU registered memory may cause deadlock.
=== END_MPI_T_CVAR_INFO_BLOCK ===
*/

Expand Down
4 changes: 0 additions & 4 deletions src/include/mpir_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,4 @@ int MPIR_Find_external(struct MPIR_Comm *comm, int *external_size_p, int *extern
int MPIR_Get_internode_rank(MPIR_Comm * comm_ptr, int r);
int MPIR_Get_intranode_rank(MPIR_Comm * comm_ptr, int r);

/* Default routines for asynchronous progress thread */
int MPIR_Init_async_thread(void);
int MPIR_Finalize_async_thread(void);

#endif /* MPIR_MISC_H_INCLUDED */
4 changes: 3 additions & 1 deletion src/include/mpir_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ struct MPIR_Request {
* Value is 0 or 1. */
} part; /* kind : MPIR_REQUEST_KIND__PART_SEND or MPIR_REQUEST_KIND__PART_RECV */
struct {
MPL_gpu_stream_t gpu_stream;
MPIR_Stream *stream_ptr;
struct MPIR_Request *real_request;
bool is_send;
void *data;
Expand Down Expand Up @@ -465,6 +465,8 @@ static inline MPIR_Request *MPIR_Request_create(MPIR_Request_kind_t kind)
return req;
}

int MPIR_allocate_enqueue_request(MPIR_Comm * comm_ptr, MPIR_Request ** req);

#define MPIR_Request_add_ref(req_p_) \
do { MPIR_Object_add_ref(req_p_); } while (0)

Expand Down
3 changes: 3 additions & 0 deletions src/include/mpir_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ struct MPIR_Stream {
void *dummy; /* to force a minimum pointer alignment, required by handlemem */
} u;
int vci;
#ifdef MPID_DEV_STREAM_DECL
MPID_DEV_STREAM_DECL
#endif
};

extern MPIR_Object_alloc_t MPIR_Stream_mem;
Expand Down
8 changes: 7 additions & 1 deletion src/mpi/datatype/typerep/src/typerep_yaksa_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,13 @@ void MPIR_Typerep_init(void)
yaksa_info_keyval_append(MPII_yaksa_info_nogpu, "yaksa_gpu_driver", "nogpu", 6);

if (MPIR_CVAR_ENABLE_GPU) {
yaksa_init(NULL);
yaksa_info_t info = NULL;
if (MPIR_CVAR_GPU_HAS_WAIT_KERNEL) {
yaksa_info_create(&info);
yaksa_info_keyval_append(info, "yaksa_has_wait_kernel", "1", 2);
}

yaksa_init(info);
} else {
/* prevent yaksa to query gpu devices, which can be very expensive */
yaksa_init(MPII_yaksa_info_nogpu);
Expand Down
Loading

0 comments on commit b603e80

Please sign in to comment.