Skip to content

Commit

Permalink
Merge pull request #5 from openucx/master
Browse files Browse the repository at this point in the history
sync
  • Loading branch information
leibin2014 authored May 21, 2020
2 parents 1b61281 + 5476175 commit 742c3e1
Show file tree
Hide file tree
Showing 64 changed files with 838 additions and 208 deletions.
4 changes: 4 additions & 0 deletions bindings/java/src/main/java/org/openucx/jucx/NativeLibs.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ private static void loadLibrary(String resourceName) {
* @throws IOException if fails to extract resource properly
*/
private static File extractResource(URL resourceURL) throws IOException {
if (!resourceURL.getProtocol().equals("jar")) {
return new File(resourceURL.getPath());
}

InputStream is = resourceURL.openStream();
if (is == null) {
errorMessage = "Error extracting native library content";
Expand Down
4 changes: 4 additions & 0 deletions src/ucm/mmap/mmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ ucs_status_t ucm_mmap_test_installed_events(int events);

static UCS_F_ALWAYS_INLINE ucm_mmap_hook_mode_t ucm_mmap_hook_mode(void)
{
#ifdef __SANITIZE_ADDRESS__
return UCM_MMAP_HOOK_NONE;
#else
if (RUNNING_ON_VALGRIND && (ucm_global_opts.mmap_hook_mode == UCM_MMAP_HOOK_BISTRO)) {
return UCM_MMAP_HOOK_RELOC;
}

return ucm_global_opts.mmap_hook_mode;
#endif
}

#endif
51 changes: 50 additions & 1 deletion src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -3208,7 +3208,7 @@ ucs_status_ptr_t ucp_get_nb(ucp_ep_h ep, void *buffer, size_t length,
* to read from.
* @param [in] rkey Remote memory key associated with the
* remote memory address.
* @param [in] param Operation parameters, see @ref ucp_request_param_t
* @param [in] param Operation parameters, see @ref ucp_request_param_t.
*
* @return UCS_OK - The operation was completed immediately.
* @return UCS_PTR_IS_ERR(_ptr) - The operation failed.
Expand Down Expand Up @@ -3302,6 +3302,55 @@ ucp_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
ucp_send_callback_t cb);


/**
* @ingroup UCP_COMM
* @brief Post an atomic fetch operation.
*
* This routine will post an atomic fetch operation to remote memory.
* The remote value is described by the combination of the remote
* memory address @a remote_addr and the @ref ucp_rkey_h "remote memory handle"
* @a rkey.
* The routine is non-blocking and therefore returns immediately. However the
* actual atomic operation may be delayed. The atomic operation is not considered complete
* until the values in remote and local memory are completed.
*
* @note The user should not modify any part of the @a buffer or @a result after
* this operation is called, until the operation completes.
* @note Only ucp_dt_make_config(4) and ucp_dt_make_contig(8) are supported in
* @a param->datatype, see @ref ucp_dt_make_contig
*
* @param [in] ep UCP endpoint.
* @param [in] opcode One of @ref ucp_atomic_fetch_op_t.
* @param [in] buffer Address of operand for atomic operation. For
* @ref UCP_ATOMIC_FETCH_OP_CSWAP operation, this is
* the value with which the remote memory buffer is
* compared. For @ref UCP_ATOMIC_FETCH_OP_SWAP operation
* this is the value to be placed in remote memory.
* @param [inout] result Local memory buffer in which to store the result of
* the operation. In the case of CSWAP the value in
* result will be swapped into the @a remote_addr if
* the condition is true.
* @param [in] count Number of elements in @a buffer and @a result. The
* size of each element is specified by
* @ref ucp_request_param_t.datatype
* @param [in] remote_addr Remote address to operate on.
* @param [in] rkey Remote key handle for the remote memory address.
* @param [in] param Operation parameters, see @ref ucp_request_param_t.
*
* @return NULL - The operation completed immediately.
* @return UCS_PTR_IS_ERR(_ptr) - The operation failed.
* @return otherwise - Operation was scheduled and can be
* completed at some time in the future. The
* request handle is returned to the application
* in order to track progress of the operation.
*/
ucs_status_ptr_t
ucp_atomic_fetch_nbx(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
const void *buffer, void *result, size_t count,
uint64_t remote_addr, ucp_rkey_h rkey,
const ucp_request_param_t *param);


/**
* @ingroup UCP_COMM
* @brief Check the status of non-blocking request.
Expand Down
31 changes: 25 additions & 6 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ void ucp_ep_config_key_reset(ucp_ep_config_key_t *key)
memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes));
}

ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name,
const char *message, ucp_ep_h *ep_p)
ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
const char *message, ucp_ep_h *ep_p)
{
ucs_status_t status;
ucp_ep_config_key_t key;
Expand Down Expand Up @@ -136,7 +136,8 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name,
goto err_free_ep;
}

ucs_list_add_tail(&worker->all_eps, &ucp_ep_ext_gen(ep)->ep_list);
ucs_list_head_init(&ucp_ep_ext_gen(ep)->ep_list);

*ep_p = ep;
ucs_debug("created ep %p to %s %s", ep, ucp_ep_peer_name(ep), message);
return UCS_OK;
Expand All @@ -147,6 +148,24 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name,
return status;
}

ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, const char *peer_name,
const char *message, ucp_ep_h *ep_p)
{
ucs_status_t status;
ucp_ep_h ep;

status = ucp_ep_create_base(worker, peer_name, message, &ep);
if (status != UCS_OK) {
return status;
}

ucs_list_add_tail(&worker->all_eps, &ucp_ep_ext_gen(ep)->ep_list);

*ep_p = ep;

return UCS_OK;
}

void ucp_ep_delete(ucp_ep_h ep)
{
ucs_callbackq_remove_if(&ep->worker->uct->progress_q,
Expand All @@ -166,7 +185,7 @@ ucp_ep_create_sockaddr_aux(ucp_worker_h worker, unsigned ep_init_flags,
ucp_ep_h ep;

/* allocate endpoint */
status = ucp_ep_new(worker, remote_address->name, "listener", &ep);
status = ucp_worker_create_ep(worker, remote_address->name, "listener", &ep);
if (status != UCS_OK) {
goto err;
}
Expand Down Expand Up @@ -335,7 +354,7 @@ ucs_status_t ucp_ep_create_to_worker_addr(ucp_worker_h worker,
ucp_ep_h ep;

/* allocate endpoint */
status = ucp_ep_new(worker, remote_address->name, message, &ep);
status = ucp_worker_create_ep(worker, remote_address->name, message, &ep);
if (status != UCS_OK) {
goto err;
}
Expand Down Expand Up @@ -378,7 +397,7 @@ static ucs_status_t ucp_ep_create_to_sock_addr(ucp_worker_h worker,
/* allocate endpoint */
ucs_sockaddr_str(params->sockaddr.addr, peer_name, sizeof(peer_name));

status = ucp_ep_new(worker, peer_name, "from api call", &ep);
status = ucp_worker_create_ep(worker, peer_name, "from api call", &ep);
if (status != UCS_OK) {
goto err;
}
Expand Down
7 changes: 5 additions & 2 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,11 @@ void ucp_ep_config_lane_info_str(ucp_context_h context,
ucp_rsc_index_t aux_rsc_index,
char *buf, size_t max);

ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name,
const char *message, ucp_ep_h *ep_p);
ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
const char *message, ucp_ep_h *ep_p);

ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, const char *peer_name,
const char *message, ucp_ep_h *ep_p);

void ucp_ep_delete(ucp_ep_h ep);

Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ static void ucp_worker_iface_async_cb_event(void *arg, unsigned flags)
ucp_worker_iface_event_common(wiface);
}

static void ucp_worker_iface_async_fd_event(int fd, void *arg)
static void ucp_worker_iface_async_fd_event(int fd, int events, void *arg)
{
ucp_worker_iface_t *wiface = arg;
int event_fd = ucp_worker_iface_get_event_fd(wiface);;
Expand Down
76 changes: 63 additions & 13 deletions src/ucp/rma/amo_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@
}


#define UCP_AMO_CHECK_PARAM_NBX(_context, _remote_addr, _size, _count, \
_opcode, _last_opcode, _action) \
{ \
if (ENABLE_PARAMS_CHECK) { \
if ((_count) != 1) { \
ucs_error("unsupported number of elements: %zu", (_count)); \
_action; \
} \
} \
\
UCP_AMO_CHECK_PARAM(_context, _remote_addr, _size, _opcode, \
_last_opcode, _action); \
}


static uct_atomic_op_t ucp_uct_op_table[] = {
[UCP_ATOMIC_POST_OP_ADD] = UCT_ATOMIC_OP_ADD,
[UCP_ATOMIC_POST_OP_AND] = UCT_ATOMIC_OP_AND,
Expand Down Expand Up @@ -118,37 +133,72 @@ ucs_status_ptr_t ucp_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
uint64_t value, void *result, size_t op_size,
uint64_t remote_addr, ucp_rkey_h rkey,
ucp_send_callback_t cb)
{
ucp_request_param_t param = {
.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_DATATYPE,
.datatype = ucp_dt_make_contig(op_size),
.cb.send = (ucp_send_nbx_callback_t)cb
};

return ucp_atomic_fetch_nbx(ep, opcode, &value, result, 1,
remote_addr, rkey, &param);
}

ucs_status_ptr_t
ucp_atomic_fetch_nbx(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
const void *buffer, void *result, size_t count,
uint64_t remote_addr, ucp_rkey_h rkey,
const ucp_request_param_t *param)
{
ucs_status_ptr_t status_p;
ucs_status_t status;
ucp_request_t *req;
uint64_t value;
size_t op_size;

UCP_AMO_CHECK_PARAM(ep->worker->context, remote_addr, op_size, opcode,
UCP_ATOMIC_FETCH_OP_LAST,
return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM));
if (ucs_unlikely(!(param->op_attr_mask & UCP_OP_ATTR_FIELD_DATATYPE))) {
ucs_error("missing atomic operation datatype");
return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM);
}

if (param->datatype == ucp_dt_make_contig(8)) {
value = *(uint64_t*)buffer;
op_size = sizeof(uint64_t);
} else if (param->datatype == ucp_dt_make_contig(4)) {
value = *(uint32_t*)buffer;
op_size = sizeof(uint32_t);
} else {
ucs_error("invalid atomic operation datatype: %zu", param->datatype);
return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM);
}

UCP_AMO_CHECK_PARAM_NBX(ep->worker->context, remote_addr, op_size,
count, opcode, UCP_ATOMIC_FETCH_OP_LAST,
return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM));
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);

ucs_trace_req("atomic_fetch_nb opcode %d value %"PRIu64" buffer %p size %zu"
" remote_addr %"PRIx64" rkey %p to %s cb %p",
opcode, value, result, op_size, remote_addr, rkey,
ucp_ep_peer_name(ep), cb);
ucs_trace_req("atomic_fetch_nb opcode %d buffer %p result %p "
"datatype %zu remote_addr %"PRIx64" rkey %p to %s cb %p",
opcode, buffer, result, param->datatype, remote_addr, rkey,
ucp_ep_peer_name(ep),
(param->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) ?
param->cb.send : NULL);

status = UCP_RKEY_RESOLVE(rkey, ep, amo);
if (status != UCS_OK) {
status_p = UCS_STATUS_PTR(UCS_ERR_UNREACHABLE);
goto out;
}

req = ucp_request_get(ep->worker);
if (ucs_unlikely(NULL == req)) {
status_p = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);
goto out;
}
req = ucp_request_get_param(ep->worker, param,
{status_p = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);
goto out;});

ucp_amo_init_fetch(req, ep, result, ucp_uct_fop_table[opcode], op_size,
remote_addr, rkey, value, rkey->cache.amo_proto);

status_p = ucp_rma_send_request_cb(req, cb);
status_p = ucp_rma_send_request(req, param);

out:
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/wireup/wireup.c
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
msg->conn_sn ^ (remote_uuid == worker->uuid));
if (ep == NULL) {
/* Create a new endpoint if does not exist */
status = ucp_ep_new(worker, remote_address->name, "remote-request",
&ep);
status = ucp_worker_create_ep(worker, remote_address->name,
"remote-request", &ep);
if (status != UCS_OK) {
return;
}
Expand Down
Loading

0 comments on commit 742c3e1

Please sign in to comment.