diff --git a/bindings/java/src/main/java/org/openucx/jucx/NativeLibs.java b/bindings/java/src/main/java/org/openucx/jucx/NativeLibs.java index 59f22a6e94b..61e8e1360e3 100644 --- a/bindings/java/src/main/java/org/openucx/jucx/NativeLibs.java +++ b/bindings/java/src/main/java/org/openucx/jucx/NativeLibs.java @@ -80,6 +80,10 @@ private static void loadLibrary(String resourceName) { * @throws IOException if fails to extract resource properly */ private static File extractResource(URL resourceURL) throws IOException { + if (!resourceURL.getProtocol().equals("jar")) { + return new File(resourceURL.getPath()); + } + InputStream is = resourceURL.openStream(); if (is == null) { errorMessage = "Error extracting native library content"; diff --git a/src/ucm/mmap/mmap.h b/src/ucm/mmap/mmap.h index 58252de7dfc..73e983f8c1a 100644 --- a/src/ucm/mmap/mmap.h +++ b/src/ucm/mmap/mmap.h @@ -38,11 +38,15 @@ ucs_status_t ucm_mmap_test_installed_events(int events); static UCS_F_ALWAYS_INLINE ucm_mmap_hook_mode_t ucm_mmap_hook_mode(void) { +#ifdef __SANITIZE_ADDRESS__ + return UCM_MMAP_HOOK_NONE; +#else if (RUNNING_ON_VALGRIND && (ucm_global_opts.mmap_hook_mode == UCM_MMAP_HOOK_BISTRO)) { return UCM_MMAP_HOOK_RELOC; } return ucm_global_opts.mmap_hook_mode; +#endif } #endif diff --git a/src/ucp/api/ucp.h b/src/ucp/api/ucp.h index b1b2c7db077..0c8bbeb3a08 100644 --- a/src/ucp/api/ucp.h +++ b/src/ucp/api/ucp.h @@ -3208,7 +3208,7 @@ ucs_status_ptr_t ucp_get_nb(ucp_ep_h ep, void *buffer, size_t length, * to read from. * @param [in] rkey Remote memory key associated with the * remote memory address. - * @param [in] param Operation parameters, see @ref ucp_request_param_t + * @param [in] param Operation parameters, see @ref ucp_request_param_t. * * @return UCS_OK - The operation was completed immediately. * @return UCS_PTR_IS_ERR(_ptr) - The operation failed. @@ -3302,6 +3302,55 @@ ucp_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode, ucp_send_callback_t cb); +/** + * @ingroup UCP_COMM + * @brief Post an atomic fetch operation. + * + * This routine will post an atomic fetch operation to remote memory. + * The remote value is described by the combination of the remote + * memory address @a remote_addr and the @ref ucp_rkey_h "remote memory handle" + * @a rkey. + * The routine is non-blocking and therefore returns immediately. However the + * actual atomic operation may be delayed. The atomic operation is not considered complete + * until the values in remote and local memory are completed. + * + * @note The user should not modify any part of the @a buffer or @a result after + * this operation is called, until the operation completes. + * @note Only ucp_dt_make_config(4) and ucp_dt_make_contig(8) are supported in + * @a param->datatype, see @ref ucp_dt_make_contig + * + * @param [in] ep UCP endpoint. + * @param [in] opcode One of @ref ucp_atomic_fetch_op_t. + * @param [in] buffer Address of operand for atomic operation. For + * @ref UCP_ATOMIC_FETCH_OP_CSWAP operation, this is + * the value with which the remote memory buffer is + * compared. For @ref UCP_ATOMIC_FETCH_OP_SWAP operation + * this is the value to be placed in remote memory. + * @param [inout] result Local memory buffer in which to store the result of + * the operation. In the case of CSWAP the value in + * result will be swapped into the @a remote_addr if + * the condition is true. + * @param [in] count Number of elements in @a buffer and @a result. The + * size of each element is specified by + * @ref ucp_request_param_t.datatype + * @param [in] remote_addr Remote address to operate on. + * @param [in] rkey Remote key handle for the remote memory address. + * @param [in] param Operation parameters, see @ref ucp_request_param_t. + * + * @return NULL - The operation completed immediately. + * @return UCS_PTR_IS_ERR(_ptr) - The operation failed. + * @return otherwise - Operation was scheduled and can be + * completed at some time in the future. The + * request handle is returned to the application + * in order to track progress of the operation. + */ +ucs_status_ptr_t +ucp_atomic_fetch_nbx(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode, + const void *buffer, void *result, size_t count, + uint64_t remote_addr, ucp_rkey_h rkey, + const ucp_request_param_t *param); + + /** * @ingroup UCP_COMM * @brief Check the status of non-blocking request. diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 617d1ff0bff..37db92cb68f 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -82,8 +82,8 @@ void ucp_ep_config_key_reset(ucp_ep_config_key_t *key) memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes)); } -ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name, - const char *message, ucp_ep_h *ep_p) +ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name, + const char *message, ucp_ep_h *ep_p) { ucs_status_t status; ucp_ep_config_key_t key; @@ -136,7 +136,8 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name, goto err_free_ep; } - ucs_list_add_tail(&worker->all_eps, &ucp_ep_ext_gen(ep)->ep_list); + ucs_list_head_init(&ucp_ep_ext_gen(ep)->ep_list); + *ep_p = ep; ucs_debug("created ep %p to %s %s", ep, ucp_ep_peer_name(ep), message); return UCS_OK; @@ -147,6 +148,24 @@ ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name, return status; } +ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, const char *peer_name, + const char *message, ucp_ep_h *ep_p) +{ + ucs_status_t status; + ucp_ep_h ep; + + status = ucp_ep_create_base(worker, peer_name, message, &ep); + if (status != UCS_OK) { + return status; + } + + ucs_list_add_tail(&worker->all_eps, &ucp_ep_ext_gen(ep)->ep_list); + + *ep_p = ep; + + return UCS_OK; +} + void ucp_ep_delete(ucp_ep_h ep) { ucs_callbackq_remove_if(&ep->worker->uct->progress_q, @@ -166,7 +185,7 @@ ucp_ep_create_sockaddr_aux(ucp_worker_h worker, unsigned ep_init_flags, ucp_ep_h ep; /* allocate endpoint */ - status = ucp_ep_new(worker, remote_address->name, "listener", &ep); + status = ucp_worker_create_ep(worker, remote_address->name, "listener", &ep); if (status != UCS_OK) { goto err; } @@ -335,7 +354,7 @@ ucs_status_t ucp_ep_create_to_worker_addr(ucp_worker_h worker, ucp_ep_h ep; /* allocate endpoint */ - status = ucp_ep_new(worker, remote_address->name, message, &ep); + status = ucp_worker_create_ep(worker, remote_address->name, message, &ep); if (status != UCS_OK) { goto err; } @@ -378,7 +397,7 @@ static ucs_status_t ucp_ep_create_to_sock_addr(ucp_worker_h worker, /* allocate endpoint */ ucs_sockaddr_str(params->sockaddr.addr, peer_name, sizeof(peer_name)); - status = ucp_ep_new(worker, peer_name, "from api call", &ep); + status = ucp_worker_create_ep(worker, peer_name, "from api call", &ep); if (status != UCS_OK) { goto err; } diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index 4b8c957ee6b..51b5dde4435 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -439,8 +439,11 @@ void ucp_ep_config_lane_info_str(ucp_context_h context, ucp_rsc_index_t aux_rsc_index, char *buf, size_t max); -ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name, - const char *message, ucp_ep_h *ep_p); +ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name, + const char *message, ucp_ep_h *ep_p); + +ucs_status_t ucp_worker_create_ep(ucp_worker_h worker, const char *peer_name, + const char *message, ucp_ep_h *ep_p); void ucp_ep_delete(ucp_ep_h ep); diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 40f578711eb..c03df8dfd96 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -835,7 +835,7 @@ static void ucp_worker_iface_async_cb_event(void *arg, unsigned flags) ucp_worker_iface_event_common(wiface); } -static void ucp_worker_iface_async_fd_event(int fd, void *arg) +static void ucp_worker_iface_async_fd_event(int fd, int events, void *arg) { ucp_worker_iface_t *wiface = arg; int event_fd = ucp_worker_iface_get_event_fd(wiface);; diff --git a/src/ucp/rma/amo_send.c b/src/ucp/rma/amo_send.c index 7f912dcbb12..b44dc1bf10e 100644 --- a/src/ucp/rma/amo_send.c +++ b/src/ucp/rma/amo_send.c @@ -50,6 +50,21 @@ } +#define UCP_AMO_CHECK_PARAM_NBX(_context, _remote_addr, _size, _count, \ + _opcode, _last_opcode, _action) \ + { \ + if (ENABLE_PARAMS_CHECK) { \ + if ((_count) != 1) { \ + ucs_error("unsupported number of elements: %zu", (_count)); \ + _action; \ + } \ + } \ + \ + UCP_AMO_CHECK_PARAM(_context, _remote_addr, _size, _opcode, \ + _last_opcode, _action); \ + } + + static uct_atomic_op_t ucp_uct_op_table[] = { [UCP_ATOMIC_POST_OP_ADD] = UCT_ATOMIC_OP_ADD, [UCP_ATOMIC_POST_OP_AND] = UCT_ATOMIC_OP_AND, @@ -118,20 +133,57 @@ ucs_status_ptr_t ucp_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode, uint64_t value, void *result, size_t op_size, uint64_t remote_addr, ucp_rkey_h rkey, ucp_send_callback_t cb) +{ + ucp_request_param_t param = { + .op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_DATATYPE, + .datatype = ucp_dt_make_contig(op_size), + .cb.send = (ucp_send_nbx_callback_t)cb + }; + + return ucp_atomic_fetch_nbx(ep, opcode, &value, result, 1, + remote_addr, rkey, ¶m); +} + +ucs_status_ptr_t +ucp_atomic_fetch_nbx(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode, + const void *buffer, void *result, size_t count, + uint64_t remote_addr, ucp_rkey_h rkey, + const ucp_request_param_t *param) { ucs_status_ptr_t status_p; ucs_status_t status; ucp_request_t *req; + uint64_t value; + size_t op_size; - UCP_AMO_CHECK_PARAM(ep->worker->context, remote_addr, op_size, opcode, - UCP_ATOMIC_FETCH_OP_LAST, - return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM)); + if (ucs_unlikely(!(param->op_attr_mask & UCP_OP_ATTR_FIELD_DATATYPE))) { + ucs_error("missing atomic operation datatype"); + return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM); + } + + if (param->datatype == ucp_dt_make_contig(8)) { + value = *(uint64_t*)buffer; + op_size = sizeof(uint64_t); + } else if (param->datatype == ucp_dt_make_contig(4)) { + value = *(uint32_t*)buffer; + op_size = sizeof(uint32_t); + } else { + ucs_error("invalid atomic operation datatype: %zu", param->datatype); + return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM); + } + + UCP_AMO_CHECK_PARAM_NBX(ep->worker->context, remote_addr, op_size, + count, opcode, UCP_ATOMIC_FETCH_OP_LAST, + return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM)); UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker); - ucs_trace_req("atomic_fetch_nb opcode %d value %"PRIu64" buffer %p size %zu" - " remote_addr %"PRIx64" rkey %p to %s cb %p", - opcode, value, result, op_size, remote_addr, rkey, - ucp_ep_peer_name(ep), cb); + ucs_trace_req("atomic_fetch_nb opcode %d buffer %p result %p " + "datatype %zu remote_addr %"PRIx64" rkey %p to %s cb %p", + opcode, buffer, result, param->datatype, remote_addr, rkey, + ucp_ep_peer_name(ep), + (param->op_attr_mask & UCP_OP_ATTR_FIELD_CALLBACK) ? + param->cb.send : NULL); status = UCP_RKEY_RESOLVE(rkey, ep, amo); if (status != UCS_OK) { @@ -139,16 +191,14 @@ ucs_status_ptr_t ucp_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode, goto out; } - req = ucp_request_get(ep->worker); - if (ucs_unlikely(NULL == req)) { - status_p = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); - goto out; - } + req = ucp_request_get_param(ep->worker, param, + {status_p = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); + goto out;}); ucp_amo_init_fetch(req, ep, result, ucp_uct_fop_table[opcode], op_size, remote_addr, rkey, value, rkey->cache.amo_proto); - status_p = ucp_rma_send_request_cb(req, cb); + status_p = ucp_rma_send_request(req, param); out: UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker); diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index d58ac803791..61ad4b64ac5 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -441,8 +441,8 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, msg->conn_sn ^ (remote_uuid == worker->uuid)); if (ep == NULL) { /* Create a new endpoint if does not exist */ - status = ucp_ep_new(worker, remote_address->name, "remote-request", - &ep); + status = ucp_worker_create_ep(worker, remote_address->name, + "remote-request", &ep); if (status != UCS_OK) { return; } diff --git a/src/ucp/wireup/wireup_cm.c b/src/ucp/wireup/wireup_cm.c index 10ad4f5aed6..4c5670840b0 100644 --- a/src/ucp/wireup/wireup_cm.c +++ b/src/ucp/wireup/wireup_cm.c @@ -117,6 +117,7 @@ static ssize_t ucp_cm_client_priv_pack_cb(void *arg, ucp_lane_index_t lane_idx; ucp_rsc_index_t rsc_idx; const char *dev_name; + ucp_ep_h tmp_ep; UCS_ASYNC_BLOCK(&worker->async); @@ -133,16 +134,21 @@ static ssize_t ucp_cm_client_priv_pack_cb(void *arg, /* At this point the ep has only CM lane */ ucs_assert((ucp_ep_num_lanes(ep) == 1) && (ucp_ep_get_cm_lane(ep) != UCP_NULL_LANE)); - /* Detach it before reconfiguration and restore then */ cm_wireup_ep = ucp_ep_get_cm_wireup_ep(ep); ucs_assert(cm_wireup_ep != NULL); - status = ucp_worker_get_ep_config(worker, &key, 0, &ep->cfg_index); + /* Create tmp ep which will hold local tl addresses until connect + * event arrives, to avoid asynchronous ep reconfiguration. */ + status = ucp_ep_create_base(worker, "tmp_cm", "tmp cm client", &tmp_ep); if (status != UCS_OK) { goto out; } + cm_wireup_ep->tmp_ep = tmp_ep; - ep->am_lane = key.am_lane; + status = ucp_worker_get_ep_config(worker, &key, 0, &tmp_ep->cfg_index); + if (status != UCS_OK) { + goto out; + } cm_attr.field_mask = UCT_CM_ATTR_FIELD_MAX_CONN_PRIV; status = uct_cm_query(cm, &cm_attr); @@ -151,18 +157,17 @@ static ssize_t ucp_cm_client_priv_pack_cb(void *arg, } tl_bitmap = 0; - for (lane_idx = 0; lane_idx < ucp_ep_num_lanes(ep); ++lane_idx) { - if (lane_idx == ucp_ep_get_cm_lane(ep)) { - ep->uct_eps[lane_idx] = &cm_wireup_ep->super.super; + for (lane_idx = 0; lane_idx < ucp_ep_num_lanes(tmp_ep); ++lane_idx) { + if (lane_idx == ucp_ep_get_cm_lane(tmp_ep)) { continue; } - rsc_idx = ucp_ep_get_rsc_index(ep, lane_idx); + rsc_idx = ucp_ep_get_rsc_index(tmp_ep, lane_idx); if (rsc_idx == UCP_NULL_RESOURCE) { continue; } - status = ucp_wireup_ep_create(ep, &ep->uct_eps[lane_idx]); + status = ucp_wireup_ep_create(tmp_ep, &tmp_ep->uct_eps[lane_idx]); if (status != UCS_OK) { goto out; } @@ -176,27 +181,24 @@ static ssize_t ucp_cm_client_priv_pack_cb(void *arg, tl_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE | UCT_EP_PARAM_FIELD_PATH_INDEX; tl_ep_params.iface = ucp_worker_iface(worker, rsc_idx)->iface; - tl_ep_params.path_index = ucp_ep_get_path_index(ep, lane_idx); + tl_ep_params.path_index = ucp_ep_get_path_index(tmp_ep, lane_idx); status = uct_ep_create(&tl_ep_params, &tl_ep); if (status != UCS_OK) { /* coverity[leaked_storage] */ goto out; } - ucp_wireup_ep_set_next_ep(ep->uct_eps[lane_idx], tl_ep); + ucp_wireup_ep_set_next_ep(tmp_ep->uct_eps[lane_idx], tl_ep); } else { ucs_assert(ucp_worker_iface_get_attr(worker, rsc_idx)->cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE); } } - /* Make sure that CM lane is restored */ - ucs_assert(cm_wireup_ep == ucp_ep_get_cm_wireup_ep(ep)); - /* Don't pack the device address to reduce address size, it will be * delivered by uct_cm_listener_conn_request_callback_t in * uct_cm_remote_data_t */ - status = ucp_address_pack(worker, ep, tl_bitmap, + status = ucp_address_pack(worker, tmp_ep, tl_bitmap, UCP_ADDRESS_PACK_FLAG_IFACE_ADDR | UCP_ADDRESS_PACK_FLAG_EP_ADDR, NULL, &ucp_addr_size, &ucp_addr); @@ -206,13 +208,15 @@ static ssize_t ucp_cm_client_priv_pack_cb(void *arg, if (cm_attr.max_conn_priv < (sizeof(*sa_data) + ucp_addr_size)) { ucs_error("CM private data buffer is to small to pack UCP endpoint info, " - "ep %p service data %lu, address length %lu, cm %p max_conn_priv %lu", - ep, sizeof(*sa_data), ucp_addr_size, cm, + "ep %p/%p service data %lu, address length %lu, cm %p max_conn_priv %lu", + ep, tmp_ep, sizeof(*sa_data), ucp_addr_size, cm, cm_attr.max_conn_priv); status = UCS_ERR_BUFFER_TOO_SMALL; goto free_addr; } + /* Pass real ep (not tmp_ep), because only its pointer and err_mode is + * taken from the config. */ ucp_cm_priv_data_pack(sa_data, ep, dev_index, ucp_addr, ucp_addr_size); free_addr: @@ -239,6 +243,26 @@ ucp_cm_client_connect_prog_arg_free(ucp_cm_client_connect_progress_arg_t *arg) ucs_free(arg); } +static void ucp_cm_client_restore_ep(ucp_wireup_ep_t *wireup_cm_ep, + ucp_ep_h ucp_ep) +{ + ucp_ep_h tmp_ep = wireup_cm_ep->tmp_ep; + ucp_wireup_ep_t *w_ep; + ucp_lane_index_t lane_idx; + + for (lane_idx = 0; lane_idx < ucp_ep_num_lanes(tmp_ep); ++lane_idx) { + if (tmp_ep->uct_eps[lane_idx] != NULL) { + ucs_assert(ucp_ep->uct_eps[lane_idx] == NULL); + ucp_ep->uct_eps[lane_idx] = tmp_ep->uct_eps[lane_idx]; + w_ep = ucs_derived_of(ucp_ep->uct_eps[lane_idx], ucp_wireup_ep_t); + w_ep->super.ucp_ep = ucp_ep; + } + } + + ucp_ep_delete(tmp_ep); /* not needed anymore */ + wireup_cm_ep->tmp_ep = NULL; +} + /* * The main thread progress part of connection establishment on client side */ @@ -285,13 +309,16 @@ static unsigned ucp_cm_client_connect_progress(void *arg) ucs_assert(addr.address_count <= UCP_MAX_RESOURCES); ucs_assert(wireup_ep->ep_init_flags & UCP_EP_INIT_CM_WIREUP_CLIENT); - /* extend tl_bitmap to all TLs on the same device as initial configuration - since TL can be changed due to server side configuration */ - tl_bitmap = ucp_ep_get_tl_bitmap(ucp_ep); + /* Get tl bitmap from tmp_ep, because it contains initial configuration. */ + tl_bitmap = ucp_ep_get_tl_bitmap(wireup_ep->tmp_ep); ucs_assert(tl_bitmap != 0); rsc_index = ucs_ffs64(tl_bitmap); dev_index = context->tl_rscs[rsc_index].dev_index; + /* Restore initial configuration from tmp_ep created for packing local + * addresses. */ + ucp_cm_client_restore_ep(wireup_ep, ucp_ep); + #ifdef ENABLE_ASSERT ucs_for_each_bit(rsc_index, tl_bitmap) { ucs_assert(dev_index == context->tl_rscs[rsc_index].dev_index); diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index bb3eca59028..53fdc20cd98 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -349,6 +349,7 @@ UCS_CLASS_INIT_FUNC(ucp_wireup_ep_t, ucp_ep_h ucp_ep) self->aux_ep = NULL; self->sockaddr_ep = NULL; + self->tmp_ep = NULL; self->aux_rsc_index = UCP_NULL_RESOURCE; self->sockaddr_rsc_index = UCP_NULL_RESOURCE; self->pending_count = 0; @@ -385,6 +386,10 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t) uct_ep_destroy(self->sockaddr_ep); } + if (self->tmp_ep != NULL) { + ucp_ep_disconnected(self->tmp_ep, 1); + } + UCS_ASYNC_BLOCK(&worker->async); --worker->flush_ops_count; UCS_ASYNC_UNBLOCK(&worker->async); diff --git a/src/ucp/wireup/wireup_ep.h b/src/ucp/wireup/wireup_ep.h index 136fbb39d52..efa3eb60f17 100644 --- a/src/ucp/wireup/wireup_ep.h +++ b/src/ucp/wireup/wireup_ep.h @@ -34,6 +34,7 @@ struct ucp_wireup_ep { ucs_queue_head_t pending_q; /**< Queue of pending operations */ uct_ep_h aux_ep; /**< Used to wireup the "real" endpoint */ uct_ep_h sockaddr_ep; /**< Used for client-server wireup */ + ucp_ep_h tmp_ep; /**< Used by the client for local tls setup */ ucp_rsc_index_t aux_rsc_index; /**< Index of auxiliary transport */ ucp_rsc_index_t sockaddr_rsc_index; /**< Index of sockaddr transport */ volatile uint32_t pending_count; /**< Number of pending wireup operations */ diff --git a/src/ucs/arch/cpu.c b/src/ucs/arch/cpu.c index 9dd6d31a92c..6d9ebbafeae 100644 --- a/src/ucs/arch/cpu.c +++ b/src/ucs/arch/cpu.c @@ -46,9 +46,11 @@ const ucs_cpu_builtin_memcpy_t ucs_cpu_builtin_memcpy[UCS_CPU_VENDOR_LAST] = { .min = 1 * UCS_KBYTE, .max = 8 * UCS_MBYTE }, + /* TODO: investigate why `rep movsb` is slow for shared buffers + * on some AMD configurations */ [UCS_CPU_VENDOR_AMD] = { - .min = 1 * UCS_KBYTE, - .max = 136 * UCS_KBYTE + .min = UCS_MEMUNITS_INF, + .max = UCS_MEMUNITS_INF }, [UCS_CPU_VENDOR_GENERIC_ARM] = { .min = UCS_MEMUNITS_INF, diff --git a/src/ucs/async/async.c b/src/ucs/async/async.c index e3cb5f74ef2..30d638a0c05 100644 --- a/src/ucs/async/async.c +++ b/src/ucs/async/async.c @@ -25,6 +25,8 @@ #define UCS_ASYNC_HANDLER_CALLER_NULL ((pthread_t)-1) +#define UCS_ASYNC_MISSED_QUEUE_SHIFT 32 +#define UCS_ASYNC_MISSED_QUEUE_MASK UCS_MASK(UCS_ASYNC_MISSED_QUEUE_SHIFT) /* Hash table for all event and timer handlers */ KHASH_MAP_INIT_INT(ucs_async_handler, ucs_async_handler_t *); @@ -95,6 +97,18 @@ static inline int ucs_async_handler_kh_is_end(khiter_t hash_it) return hash_it == kh_end(&ucs_async_global_context.handlers); } +static inline uint64_t ucs_async_missed_event_pack(int id, int events) +{ + return ((uint64_t)id << UCS_ASYNC_MISSED_QUEUE_SHIFT) | (uint32_t)events; +} + +static inline void ucs_async_missed_event_unpack(uint64_t value, int *id_p, + int *events_p) +{ + *id_p = value >> UCS_ASYNC_MISSED_QUEUE_SHIFT; + *events_p = value & UCS_ASYNC_MISSED_QUEUE_MASK; +} + static void ucs_async_handler_hold(ucs_async_handler_t *handler) { ucs_atomic_add32(&handler->refcount, 1); @@ -210,7 +224,7 @@ static ucs_status_t ucs_async_handler_add(int min_id, int max_id, return status; } -static void ucs_async_handler_invoke(ucs_async_handler_t *handler) +static void ucs_async_handler_invoke(ucs_async_handler_t *handler, int events) { ucs_trace_async("calling async handler " UCS_ASYNC_HANDLER_FMT, UCS_ASYNC_HANDLER_ARG(handler)); @@ -221,33 +235,37 @@ static void ucs_async_handler_invoke(ucs_async_handler_t *handler) */ ucs_assert(handler->caller == UCS_ASYNC_HANDLER_CALLER_NULL); handler->caller = pthread_self(); - handler->cb(handler->id, handler->arg); + handler->cb(handler->id, events, handler->arg); handler->caller = UCS_ASYNC_HANDLER_CALLER_NULL; } -static ucs_status_t ucs_async_handler_dispatch(ucs_async_handler_t *handler) +static ucs_status_t ucs_async_handler_dispatch(ucs_async_handler_t *handler, + int events) { ucs_async_context_t *async; ucs_async_mode_t mode; ucs_status_t status; + uint64_t value; mode = handler->mode; async = handler->async; if (async == NULL) { - ucs_async_handler_invoke(handler); + ucs_async_handler_invoke(handler, events); return UCS_OK; } async->last_wakeup = ucs_get_time(); if (ucs_async_method_call(mode, context_try_block, async)) { - ucs_async_handler_invoke(handler); + ucs_async_handler_invoke(handler, events); ucs_async_method_call(mode, context_unblock, async); } else { ucs_trace_async("missed " UCS_ASYNC_HANDLER_FMT ", last_wakeup %lu", UCS_ASYNC_HANDLER_ARG(handler), async->last_wakeup); if (ucs_atomic_cswap32(&handler->missed, 0, 1) == 0) { - status = ucs_mpmc_queue_push(&async->missed, handler->id); + /* save both the handler_id and events */ + value = ucs_async_missed_event_pack(handler->id, events); + status = ucs_mpmc_queue_push(&async->missed, value); if (status != UCS_OK) { ucs_fatal("Failed to push event %d to miss queue: %s", handler->id, ucs_status_string(status)); @@ -258,19 +276,20 @@ static ucs_status_t ucs_async_handler_dispatch(ucs_async_handler_t *handler) return UCS_OK; } -ucs_status_t ucs_async_dispatch_handlers(int *events, size_t count) +ucs_status_t ucs_async_dispatch_handlers(int *handler_ids, size_t count, + int events) { ucs_status_t status = UCS_OK, tmp_status; ucs_async_handler_t *handler; - for (; count > 0; --count, ++events) { - handler = ucs_async_handler_get(*events); + for (; count > 0; --count, ++handler_ids) { + handler = ucs_async_handler_get(*handler_ids); if (handler == NULL) { - ucs_trace_async("handler for %d not found - ignoring", *events); + ucs_trace_async("handler for %d not found - ignoring", *handler_ids); continue; } - tmp_status = ucs_async_handler_dispatch(handler); + tmp_status = ucs_async_handler_dispatch(handler, events); if (tmp_status != UCS_OK) { status = tmp_status; } @@ -297,7 +316,8 @@ ucs_status_t ucs_async_dispatch_timerq(ucs_timer_queue_t *timerq, } }) - return ucs_async_dispatch_handlers(expired_timers, num_timers); + return ucs_async_dispatch_handlers(expired_timers, num_timers, + UCS_ASYNC_EVENT_DUMMY); } ucs_status_t ucs_async_context_init(ucs_async_context_t *async, ucs_async_mode_t mode) @@ -580,8 +600,9 @@ ucs_status_t ucs_async_modify_handler(int fd, int events) void __ucs_async_poll_missed(ucs_async_context_t *async) { ucs_async_handler_t *handler; + int handler_id, events; ucs_status_t status; - uint32_t value; + uint64_t value; ucs_trace_async("miss handler"); @@ -596,11 +617,13 @@ void __ucs_async_poll_missed(ucs_async_context_t *async) ucs_async_method_call_all(block); UCS_ASYNC_BLOCK(async); - handler = ucs_async_handler_get(value); + + ucs_async_missed_event_unpack(value, &handler_id, &events); + handler = ucs_async_handler_get(handler_id); if (handler != NULL) { ucs_assert(handler->async == async); handler->missed = 0; - ucs_async_handler_invoke(handler); + ucs_async_handler_invoke(handler, events); ucs_async_handler_put(handler); } UCS_ASYNC_UNBLOCK(async); @@ -630,7 +653,8 @@ void ucs_async_poll(ucs_async_context_t *async) pthread_rwlock_unlock(&ucs_async_global_context.handlers_lock); for (i = 0; i < n; ++i) { - ucs_async_handler_dispatch(handlers[i]); + /* dispatch the handler with all the registered events */ + ucs_async_handler_dispatch(handlers[i], handlers[i]->events); ucs_async_handler_put(handlers[i]); } } diff --git a/src/ucs/async/async.h b/src/ucs/async/async.h index 0a884ec4022..e324574f4cc 100644 --- a/src/ucs/async/async.h +++ b/src/ucs/async/async.h @@ -20,6 +20,9 @@ BEGIN_C_DECLS /** @file async.h */ +#define UCS_ASYNC_EVENT_DUMMY 0 + + /** * Async event context. Manages timer and fd notifications. */ diff --git a/src/ucs/async/async_fwd.h b/src/ucs/async/async_fwd.h index ed77ddafcff..ca81dc7975d 100644 --- a/src/ucs/async/async_fwd.h +++ b/src/ucs/async/async_fwd.h @@ -25,9 +25,10 @@ typedef struct ucs_async_context ucs_async_context_t; * Async event callback. * * @param id Event id (timer or file descriptor). + * @param events The events that triggered the callback. * @param arg User-defined argument. */ -typedef void (*ucs_async_event_cb_t)(int id, void *arg); +typedef void (*ucs_async_event_cb_t)(int id, int events, void *arg); /** diff --git a/src/ucs/async/async_int.h b/src/ucs/async/async_int.h index b3415c20b7e..1e03f4dba72 100644 --- a/src/ucs/async/async_int.h +++ b/src/ucs/async/async_int.h @@ -31,10 +31,11 @@ struct ucs_async_handler { /** * Dispatch event coming from async context. * - * @param id Array of event IDs to dispatch. - * @param count Number of events + * @param handler_ids Array of handler IDs to dispatch. + * @param count Number of events + * @param events Events to pass to the handler */ -ucs_status_t ucs_async_dispatch_handlers(int *events, size_t count); +ucs_status_t ucs_async_dispatch_handlers(int *handler_ids, size_t count, int events); /** diff --git a/src/ucs/async/signal.c b/src/ucs/async/signal.c index 7dca1e6ac12..30562907077 100644 --- a/src/ucs/async/signal.c +++ b/src/ucs/async/signal.c @@ -181,6 +181,29 @@ static ucs_status_t ucs_async_signal_dispatch_timer(int uid) return ucs_async_dispatch_timerq(&timer->timerq, ucs_get_time()); } +static inline int ucs_signal_map_to_events(int si_code) +{ + int events; + + switch (si_code) { + case POLL_IN: + case POLL_MSG: + case POLL_PRI: + events = UCS_EVENT_SET_EVREAD; + return events; + case POLL_OUT: + events = UCS_EVENT_SET_EVWRITE; + return events; + case POLL_HUP: + case POLL_ERR: + events = UCS_EVENT_SET_EVERR; + return events; + default: + ucs_warn("unexpected si_code %d", si_code); + return UCS_ASYNC_EVENT_DUMMY; + } +} + static void ucs_async_signal_handler(int signo, siginfo_t *siginfo, void *arg) { ucs_assert(signo == ucs_global_opts.async_signo); @@ -198,7 +221,8 @@ static void ucs_async_signal_handler(int signo, siginfo_t *siginfo, void *arg) case POLL_MSG: case POLL_PRI: ucs_trace_async("async signal handler called for fd %d", siginfo->si_fd); - ucs_async_dispatch_handlers(&siginfo->si_fd, 1); + ucs_async_dispatch_handlers(&siginfo->si_fd, 1, + ucs_signal_map_to_events(siginfo->si_code)); return; default: ucs_warn("signal handler called with unexpected event code %d, ignoring", diff --git a/src/ucs/async/thread.c b/src/ucs/async/thread.c index 30712d56c4b..74f7f8a7886 100644 --- a/src/ucs/async/thread.c +++ b/src/ucs/async/thread.c @@ -83,7 +83,7 @@ static void ucs_async_thread_ev_handler(void *callback_data, int event, return; } - status = ucs_async_dispatch_handlers(&fd, 1); + status = ucs_async_dispatch_handlers(&fd, 1, event); if (status == UCS_ERR_NO_PROGRESS) { *cb_arg->is_missed = 1; } diff --git a/src/ucs/datastruct/arbiter.c b/src/ucs/datastruct/arbiter.c index 159f81b81e1..e29ac44f778 100644 --- a/src/ucs/datastruct/arbiter.c +++ b/src/ucs/datastruct/arbiter.c @@ -70,8 +70,7 @@ void ucs_arbiter_group_push_elem_always(ucs_arbiter_group_t *group, ucs_arbiter_elem_set_scheduled(elem, group); } -void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_t *arbiter, - ucs_arbiter_group_t *group, +void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_group_t *group, ucs_arbiter_elem_t *elem) { ucs_arbiter_elem_t *tail = group->tail; diff --git a/src/ucs/datastruct/arbiter.h b/src/ucs/datastruct/arbiter.h index 80e2587dcc7..dccfa644533 100644 --- a/src/ucs/datastruct/arbiter.h +++ b/src/ucs/datastruct/arbiter.h @@ -215,8 +215,7 @@ void ucs_arbiter_group_push_elem_always(ucs_arbiter_group_t *group, /** * Add a new work element to the head of a group - internal function */ -void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_t *arbiter, - ucs_arbiter_group_t *group, +void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_group_t *group, ucs_arbiter_elem_t *elem); @@ -340,22 +339,18 @@ ucs_arbiter_group_push_elem(ucs_arbiter_group_t *group, /** * Add a new work element to the head of a group if it is not already there * - * @param [in] arbiter Arbiter object the group is on (since we modify the head - * element of a potentially scheduled group). If the group - * is not scheduled, arbiter may be NULL. * @param [in] group Group to add the element to. * @param [in] elem Work element to add. */ static inline void -ucs_arbiter_group_push_head_elem(ucs_arbiter_t *arbiter, - ucs_arbiter_group_t *group, +ucs_arbiter_group_push_head_elem(ucs_arbiter_group_t *group, ucs_arbiter_elem_t *elem) { if (ucs_arbiter_elem_is_scheduled(elem)) { return; } - ucs_arbiter_group_push_head_elem_always(arbiter, group, elem); + ucs_arbiter_group_push_head_elem_always(group, elem); } diff --git a/src/ucs/datastruct/mpmc.c b/src/ucs/datastruct/mpmc.c index 611ea2642dc..07b49bafc4e 100644 --- a/src/ucs/datastruct/mpmc.c +++ b/src/ucs/datastruct/mpmc.c @@ -23,7 +23,7 @@ ucs_status_t ucs_mpmc_queue_init(ucs_mpmc_queue_t *mpmc, uint32_t length) mpmc->length = ucs_roundup_pow2(length); mpmc->shift = ucs_ilog2(mpmc->length); - if (mpmc->length >= UCS_BIT(UCS_MPMC_VALID_SHIFT)) { + if (mpmc->shift >= UCS_MPMC_VALID_SHIFT) { return UCS_ERR_INVALID_PARAM; } @@ -46,12 +46,12 @@ void ucs_mpmc_queue_cleanup(ucs_mpmc_queue_t *mpmc) ucs_free(mpmc->queue); } -static inline uint32_t __ucs_mpmc_queue_valid_bit(ucs_mpmc_queue_t *mpmc, uint32_t location) +static inline uint64_t __ucs_mpmc_queue_valid_bit(ucs_mpmc_queue_t *mpmc, uint32_t location) { - return (location >> mpmc->shift) & 1; + return ((uint64_t)location >> mpmc->shift) & 1; } -ucs_status_t ucs_mpmc_queue_push(ucs_mpmc_queue_t *mpmc, uint32_t value) +ucs_status_t ucs_mpmc_queue_push(ucs_mpmc_queue_t *mpmc, uint64_t value) { uint32_t location; @@ -71,9 +71,10 @@ ucs_status_t ucs_mpmc_queue_push(ucs_mpmc_queue_t *mpmc, uint32_t value) } -ucs_status_t ucs_mpmc_queue_pull(ucs_mpmc_queue_t *mpmc, uint32_t *value_p) +ucs_status_t ucs_mpmc_queue_pull(ucs_mpmc_queue_t *mpmc, uint64_t *value_p) { - uint32_t location, value; + uint32_t location; + uint64_t value; location = mpmc->consumer; if (location == mpmc->producer) { diff --git a/src/ucs/datastruct/mpmc.h b/src/ucs/datastruct/mpmc.h index 6c9df6c3dc4..3b9770f436b 100644 --- a/src/ucs/datastruct/mpmc.h +++ b/src/ucs/datastruct/mpmc.h @@ -10,7 +10,7 @@ #include #include -#define UCS_MPMC_VALID_SHIFT 31 +#define UCS_MPMC_VALID_SHIFT 63 #define UCS_MPMC_VALUE_MAX UCS_BIT(UCS_MPMC_VALID_SHIFT) /** @@ -25,7 +25,7 @@ typedef struct ucs_mpmc_queue { int shift; volatile uint32_t producer; /* Producer index */ volatile uint32_t consumer; /* Consumer index */ - uint32_t *queue; /* Array of data */ + uint64_t *queue; /* Array of data */ } ucs_mpmc_queue_t; @@ -49,7 +49,7 @@ void ucs_mpmc_queue_cleanup(ucs_mpmc_queue_t *mpmc); * @param value Value to push. * @return UCS_ERR_EXCEEDS_LIMIT if the queue is full. */ -ucs_status_t ucs_mpmc_queue_push(ucs_mpmc_queue_t *mpmc, uint32_t value); +ucs_status_t ucs_mpmc_queue_push(ucs_mpmc_queue_t *mpmc, uint64_t value); /** @@ -59,7 +59,7 @@ ucs_status_t ucs_mpmc_queue_push(ucs_mpmc_queue_t *mpmc, uint32_t value); * @param UCS_ERR_NO_PROGRESS if there is currently no available item to retrieve, * or another thread removed the current item. */ -ucs_status_t ucs_mpmc_queue_pull(ucs_mpmc_queue_t *mpmc, uint32_t *value_p); +ucs_status_t ucs_mpmc_queue_pull(ucs_mpmc_queue_t *mpmc, uint64_t *value_p); /** diff --git a/src/uct/base/uct_iface.h b/src/uct/base/uct_iface.h index b5f481fbeb7..f9aea39627d 100644 --- a/src/uct/base/uct_iface.h +++ b/src/uct/base/uct_iface.h @@ -420,8 +420,8 @@ uct_pending_req_priv_arb_elem(uct_pending_req_t *req) #define uct_pending_req_arb_group_push(_arbiter_group, _req) \ do { \ ucs_arbiter_elem_init(uct_pending_req_priv_arb_elem(_req)); \ - ucs_arbiter_group_push_elem(_arbiter_group, \ - uct_pending_req_priv_arb_elem(_req)); \ + ucs_arbiter_group_push_elem_always(_arbiter_group, \ + uct_pending_req_priv_arb_elem(_req)); \ } while (0) @@ -431,8 +431,8 @@ uct_pending_req_priv_arb_elem(uct_pending_req_t *req) #define uct_pending_req_arb_group_push_head(_arbiter, _arbiter_group, _req) \ do { \ ucs_arbiter_elem_init(uct_pending_req_priv_arb_elem(_req)); \ - ucs_arbiter_group_push_head_elem(_arbiter, _arbiter_group, \ - uct_pending_req_priv_arb_elem(_req)); \ + ucs_arbiter_group_push_head_elem_always(_arbiter_group, \ + uct_pending_req_priv_arb_elem(_req)); \ } while (0) diff --git a/src/uct/ib/base/ib_device.c b/src/uct/ib/base/ib_device.c index 8dac47c6c00..37b74038ccb 100644 --- a/src/uct/ib/base/ib_device.c +++ b/src/uct/ib/base/ib_device.c @@ -158,15 +158,14 @@ static void uct_ib_device_get_locality(const char *dev_name, *numa_node = (status == UCS_OK) ? n : -1; } -static void uct_ib_async_event_handler(int fd, void *arg) +static void uct_ib_async_event_handler(int fd, int events, void *arg) { uct_ib_device_t *dev = arg; - struct ibv_async_event event; - ucs_log_level_t level; - char event_info[200]; + struct ibv_async_event ibevent; + uct_ib_async_event_t event; int ret; - ret = ibv_get_async_event(dev->ibv_context, &event); + ret = ibv_get_async_event(dev->ibv_context, &ibevent); if (ret != 0) { if (errno != EAGAIN) { ucs_warn("ibv_get_async_event() failed: %m"); @@ -174,10 +173,65 @@ static void uct_ib_async_event_handler(int fd, void *arg) return; } + event.event_type = ibevent.event_type; switch (event.event_type) { + case IBV_EVENT_CQ_ERR: + event.cookie = ibevent.element.cq; + break; + case IBV_EVENT_QP_FATAL: + case IBV_EVENT_QP_REQ_ERR: + case IBV_EVENT_QP_ACCESS_ERR: + case IBV_EVENT_COMM_EST: + case IBV_EVENT_SQ_DRAINED: + case IBV_EVENT_PATH_MIG: + case IBV_EVENT_PATH_MIG_ERR: + case IBV_EVENT_QP_LAST_WQE_REACHED: + event.qp_num = ibevent.element.qp->qp_num; + break; + case IBV_EVENT_SRQ_ERR: + case IBV_EVENT_SRQ_LIMIT_REACHED: + event.cookie = ibevent.element.srq; + break; + case IBV_EVENT_DEVICE_FATAL: + case IBV_EVENT_PORT_ERR: + case IBV_EVENT_PORT_ACTIVE: +#if HAVE_DECL_IBV_EVENT_GID_CHANGE + case IBV_EVENT_GID_CHANGE: +#endif + case IBV_EVENT_LID_CHANGE: + case IBV_EVENT_PKEY_CHANGE: + case IBV_EVENT_SM_CHANGE: + case IBV_EVENT_CLIENT_REREGISTER: + event.port_num = ibevent.element.port_num; + break; +#ifdef HAVE_STRUCT_IBV_ASYNC_EVENT_ELEMENT_DCT + case IBV_EXP_EVENT_DCT_KEY_VIOLATION: + case IBV_EXP_EVENT_DCT_ACCESS_ERR: + case IBV_EXP_EVENT_DCT_REQ_ERR: + if (ibevent.element.dct) { + event.dct_num = ibevent.element.dct->dct_num; + } else { + event.dct_num = 0; + } + break; +#endif + default: + break; + }; + + uct_ib_handle_async_event(dev, &event); + ibv_ack_async_event(&ibevent); +} + +void uct_ib_handle_async_event(uct_ib_device_t *dev, uct_ib_async_event_t *event) +{ + char event_info[200]; + ucs_log_level_t level; + + switch (event->event_type) { case IBV_EVENT_CQ_ERR: snprintf(event_info, sizeof(event_info), "%s on CQ %p", - ibv_event_type_str(event.event_type), event.element.cq); + ibv_event_type_str(event->event_type), event->cookie); level = UCS_LOG_LEVEL_ERROR; break; case IBV_EVENT_QP_FATAL: @@ -188,28 +242,28 @@ static void uct_ib_async_event_handler(int fd, void *arg) case IBV_EVENT_PATH_MIG: case IBV_EVENT_PATH_MIG_ERR: snprintf(event_info, sizeof(event_info), "%s on QPN 0x%x", - ibv_event_type_str(event.event_type), event.element.qp->qp_num); + ibv_event_type_str(event->event_type), event->qp_num); level = UCS_LOG_LEVEL_ERROR; break; case IBV_EVENT_QP_LAST_WQE_REACHED: snprintf(event_info, sizeof(event_info), "SRQ-attached QP 0x%x was flushed", - event.element.qp->qp_num); + event->qp_num); level = UCS_LOG_LEVEL_DEBUG; break; case IBV_EVENT_SRQ_ERR: level = UCS_LOG_LEVEL_ERROR; snprintf(event_info, sizeof(event_info), "%s on SRQ %p", - ibv_event_type_str(event.event_type), event.element.srq); + ibv_event_type_str(event->event_type), event->cookie); break; case IBV_EVENT_SRQ_LIMIT_REACHED: snprintf(event_info, sizeof(event_info), "%s on SRQ %p", - ibv_event_type_str(event.event_type), event.element.srq); + ibv_event_type_str(event->event_type), event->cookie); level = UCS_LOG_LEVEL_DEBUG; break; case IBV_EVENT_DEVICE_FATAL: case IBV_EVENT_PORT_ERR: snprintf(event_info, sizeof(event_info), "%s on port %d", - ibv_event_type_str(event.event_type), event.element.port_num); + ibv_event_type_str(event->event_type), event->port_num); level = UCS_LOG_LEVEL_ERROR; break; case IBV_EVENT_PORT_ACTIVE: @@ -221,19 +275,19 @@ static void uct_ib_async_event_handler(int fd, void *arg) case IBV_EVENT_SM_CHANGE: case IBV_EVENT_CLIENT_REREGISTER: snprintf(event_info, sizeof(event_info), "%s on port %d", - ibv_event_type_str(event.event_type), event.element.port_num); + ibv_event_type_str(event->event_type), event->port_num); level = UCS_LOG_LEVEL_WARN; break; #ifdef HAVE_STRUCT_IBV_ASYNC_EVENT_ELEMENT_DCT case IBV_EXP_EVENT_DCT_KEY_VIOLATION: snprintf(event_info, sizeof(event_info), "%s on DCTN 0x%x", - "DCT key violation", event.element.dct->dct_num); + "DCT key violation", event->dct_num); level = UCS_LOG_LEVEL_ERROR; break; case IBV_EXP_EVENT_DCT_ACCESS_ERR: - if (event.element.dct) { + if (event->dct_num) { snprintf(event_info, sizeof(event_info), "%s on DCTN 0x%x", - "DCT access error", event.element.dct->dct_num); + "DCT access error", event->dct_num); } else { snprintf(event_info, sizeof(event_info), "%s on DCTN UNKNOWN", "DCT access error"); @@ -242,20 +296,19 @@ static void uct_ib_async_event_handler(int fd, void *arg) break; case IBV_EXP_EVENT_DCT_REQ_ERR: snprintf(event_info, sizeof(event_info), "%s on DCTN 0x%x", - "DCT requester error", event.element.dct->dct_num); + "DCT requester error", event->dct_num); level = UCS_LOG_LEVEL_ERROR; break; #endif default: snprintf(event_info, sizeof(event_info), "%s (%d)", - ibv_event_type_str(event.event_type), event.event_type); + ibv_event_type_str(event->event_type), event->event_type); level = UCS_LOG_LEVEL_INFO; break; }; UCS_STATS_UPDATE_COUNTER(dev->stats, UCT_IB_DEVICE_STAT_ASYNC_EVENT, +1); ucs_log(level, "IB Async event on %s: %s", uct_ib_device_name(dev), event_info); - ibv_ack_async_event(&event); } static void uct_ib_device_get_ids(uct_ib_device_t *dev) diff --git a/src/uct/ib/base/ib_device.h b/src/uct/ib/base/ib_device.h index 84c12cc715c..de6e599b25b 100644 --- a/src/uct/ib/base/ib_device.h +++ b/src/uct/ib/base/ib_device.h @@ -197,6 +197,17 @@ typedef struct { } uct_ib_device_gid_info_t; +typedef struct { + enum ibv_event_type event_type; + union { + uint8_t port_num; + uint32_t qp_num; + uint32_t dct_num; + void *cookie; + }; +} uct_ib_async_event_t; + + extern const double uct_ib_qp_rnr_time_ms[]; @@ -370,4 +381,6 @@ static inline ucs_status_t uct_ib_poll_cq(struct ibv_cq *cq, unsigned *count, st return UCS_OK; } +void uct_ib_handle_async_event(uct_ib_device_t *dev, uct_ib_async_event_t *event); + #endif diff --git a/src/uct/ib/cm/cm_iface.c b/src/uct/ib/cm/cm_iface.c index 557de0541c6..f54d8a07b8a 100644 --- a/src/uct/ib/cm/cm_iface.c +++ b/src/uct/ib/cm/cm_iface.c @@ -195,7 +195,7 @@ static void uct_cm_iface_outstanding_purge(uct_cm_iface_t *iface) iface->num_outstanding = 0; } -static void uct_cm_iface_event_handler(int fd, void *arg) +static void uct_cm_iface_event_handler(int fd, int events, void *arg) { uct_cm_iface_t *iface = arg; struct ib_cm_event *event; diff --git a/src/uct/ib/configure.m4 b/src/uct/ib/configure.m4 index 9931d7b9583..21902b5a726 100644 --- a/src/uct/ib/configure.m4 +++ b/src/uct/ib/configure.m4 @@ -203,6 +203,7 @@ AS_IF([test "x$with_ib" = "xyes"], mlx5dv_init_obj, mlx5dv_create_qp, mlx5dv_is_supported, + mlx5dv_devx_subscribe_devx_event, MLX5DV_CQ_INIT_ATTR_MASK_CQE_SIZE, MLX5DV_QP_CREATE_ALLOW_SCATTER_TO_CQE], [], [], [[#include ]]) diff --git a/src/uct/ib/mlx5/dv/ib_mlx5_ifc.h b/src/uct/ib/mlx5/dv/ib_mlx5_ifc.h index e2b8e4a2f64..9835ba1e377 100644 --- a/src/uct/ib/mlx5/dv/ib_mlx5_ifc.h +++ b/src/uct/ib/mlx5/dv/ib_mlx5_ifc.h @@ -1379,4 +1379,9 @@ struct uct_ib_mlx5_modify_qp_in_bits { uint8_t reserved_at_60[0x20]; }; + +enum { + UCT_IB_MLX5_EVENT_TYPE_SRQ_LAST_WQE = 0x13 +}; + #endif diff --git a/src/uct/ib/mlx5/ib_mlx5.c b/src/uct/ib/mlx5/ib_mlx5.c index 92cc54852d1..c996d2266bf 100644 --- a/src/uct/ib/mlx5/ib_mlx5.c +++ b/src/uct/ib/mlx5/ib_mlx5.c @@ -668,3 +668,13 @@ void uct_ib_mlx5_verbs_srq_cleanup(uct_ib_mlx5_srq_t *srq, srq->tail, srq_info.dv.tail); } +ucs_status_t uct_ib_mlx5_modify_qp_state(uct_ib_mlx5_md_t *md, + uct_ib_mlx5_qp_t *qp, + enum ibv_qp_state state) +{ + if (md->flags & UCT_IB_MLX5_MD_FLAG_DEVX) { + return uct_ib_mlx5_devx_modify_qp_state(qp, state); + } else { + return uct_ib_modify_qp(qp->verbs.qp, state); + } +} diff --git a/src/uct/ib/mlx5/ib_mlx5.h b/src/uct/ib/mlx5/ib_mlx5.h index 6cdef28518a..a76ace196c1 100644 --- a/src/uct/ib/mlx5/ib_mlx5.h +++ b/src/uct/ib/mlx5/ib_mlx5.h @@ -133,6 +133,9 @@ struct mlx5_grh_av { #define UCT_IB_MLX5_MD_FLAG_DEVX_OBJS(_obj) \ UCT_IB_MLX5_MD_FLAGS_DEVX_OBJS(UCS_BIT(UCT_IB_DEVX_OBJ_ ## _obj)) +#define UCT_IB_MLX5_DEVX_EVENT_TYPE_MASK 0xffff +#define UCT_IB_MLX5_DEVX_EVENT_DATA_SHIFT 16 + enum { /* Device supports KSM */ UCT_IB_MLX5_MD_FLAG_KSM = UCS_BIT(0), @@ -472,6 +475,10 @@ ucs_status_t uct_ib_mlx5_iface_create_qp(uct_ib_iface_t *iface, uct_ib_mlx5_qp_t *qp, uct_ib_mlx5_qp_attr_t *attr); +ucs_status_t uct_ib_mlx5_modify_qp_state(uct_ib_mlx5_md_t *md, + uct_ib_mlx5_qp_t *qp, + enum ibv_qp_state state); + /** * Create CQ with DV */ diff --git a/src/uct/ib/mlx5/ib_mlx5.inl b/src/uct/ib/mlx5/ib_mlx5.inl index 2d2bbc528b2..f30046b9a10 100644 --- a/src/uct/ib/mlx5/ib_mlx5.inl +++ b/src/uct/ib/mlx5/ib_mlx5.inl @@ -552,14 +552,3 @@ uct_ib_mlx5_destroy_qp(uct_ib_mlx5_qp_t *qp) break; } } - -static ucs_status_t UCS_F_MAYBE_UNUSED -uct_ib_mlx5_modify_qp_state(uct_ib_mlx5_md_t *md, uct_ib_mlx5_qp_t *qp, - enum ibv_qp_state state) -{ - if (md->flags & UCT_IB_MLX5_MD_FLAG_DEVX) { - return uct_ib_mlx5_devx_modify_qp_state(qp, state); - } else { - return uct_ib_modify_qp(qp->verbs.qp, state); - } -} diff --git a/src/uct/ib/rc/accel/rc_mlx5.inl b/src/uct/ib/rc/accel/rc_mlx5.inl index 2aad039ccdc..425d432ca33 100644 --- a/src/uct/ib/rc/accel/rc_mlx5.inl +++ b/src/uct/ib/rc/accel/rc_mlx5.inl @@ -317,6 +317,14 @@ uct_rc_mlx5_iface_common_data(uct_rc_mlx5_iface_common_t *iface, return hdr; } +static UCS_F_ALWAYS_INLINE uct_rc_mlx5_mp_context_t* +uct_rc_mlx5_iface_single_frag_context(uct_rc_mlx5_iface_common_t *iface, + unsigned *flags) +{ + *flags |= UCT_CB_PARAM_FLAG_FIRST; + return &iface->tm.mp.last_frag_ctx; +} + static UCS_F_ALWAYS_INLINE void* uct_rc_mlx5_iface_tm_common_data(uct_rc_mlx5_iface_common_t *iface, struct mlx5_cqe64 *cqe, unsigned byte_len, @@ -330,18 +338,22 @@ uct_rc_mlx5_iface_tm_common_data(uct_rc_mlx5_iface_common_t *iface, if (!UCT_RC_MLX5_MP_ENABLED(iface)) { /* uct_rc_mlx5_iface_common_data will initialize flags value */ hdr = uct_rc_mlx5_iface_common_data(iface, cqe, byte_len, flags); - *flags |= UCT_CB_PARAM_FLAG_FIRST; - *context_p = &iface->tm.mp.last_frag_ctx; + *context_p = uct_rc_mlx5_iface_single_frag_context(iface, flags); return hdr; } ucs_assert(byte_len <= UCT_RC_MLX5_MP_RQ_BYTE_CNT_FIELD_MASK); - *flags = 0; + *flags = 0; - if (poll_flags & UCT_RC_MLX5_POLL_FLAG_HAS_EP) { + if (ucs_test_all_flags(poll_flags, UCT_RC_MLX5_POLL_FLAG_HAS_EP | + UCT_RC_MLX5_POLL_FLAG_TAG_CQE)) { *context_p = uct_rc_mlx5_iface_rx_mp_context_from_ep(iface, cqe, flags); - } else { + } else if (poll_flags & UCT_RC_MLX5_POLL_FLAG_TAG_CQE) { *context_p = uct_rc_mlx5_iface_rx_mp_context_from_hash(iface, cqe, flags); + } else { + /* Non-tagged messages (AM, RNDV Fin) should always arrive in + * a single frgament */ + *context_p = uct_rc_mlx5_iface_single_frag_context(iface, flags); } /* Get a pointer to the tag header or the payload (if it is not the first @@ -1222,7 +1234,9 @@ uct_rc_mlx5_iface_tag_handle_unexp(uct_rc_mlx5_iface_common_t *iface, uct_rc_mlx5_mp_context_t *msg_ctx; tmh = uct_rc_mlx5_iface_tm_common_data(iface, cqe, byte_len, &flags, - poll_flags, &msg_ctx); + poll_flags | + UCT_RC_MLX5_POLL_FLAG_TAG_CQE, + &msg_ctx); /* Fast path: single fragment eager message */ if (ucs_likely(UCT_RC_MLX5_SINGLE_FRAG_MSG(flags) && diff --git a/src/uct/ib/rc/accel/rc_mlx5_common.h b/src/uct/ib/rc/accel/rc_mlx5_common.h index 951ddce5b5d..4d1f6137946 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_common.h +++ b/src/uct/ib/rc/accel/rc_mlx5_common.h @@ -147,7 +147,8 @@ enum { enum { UCT_RC_MLX5_POLL_FLAG_TM = UCS_BIT(0), - UCT_RC_MLX5_POLL_FLAG_HAS_EP = UCS_BIT(1) + UCT_RC_MLX5_POLL_FLAG_HAS_EP = UCS_BIT(1), + UCT_RC_MLX5_POLL_FLAG_TAG_CQE = UCS_BIT(2) }; @@ -418,6 +419,9 @@ typedef struct uct_rc_mlx5_iface_common { ucs_status_t (*tag_short)(uct_ep_h tl_ep, uct_tag_t tag, const void *data, size_t length); } dm; +#endif +#if HAVE_DECL_MLX5DV_DEVX_SUBSCRIBE_DEVX_EVENT + struct mlx5dv_devx_event_channel *event_channel; #endif struct { uint8_t atomic_fence_flag; @@ -722,6 +726,16 @@ uct_rc_mlx5_iface_common_devx_connect_qp(uct_rc_mlx5_iface_common_t *iface, } #endif +ucs_status_t uct_rc_mlx5_devx_iface_init_events(uct_rc_mlx5_iface_common_t *iface); + +void uct_rc_mlx5_devx_iface_free_events(uct_rc_mlx5_iface_common_t *iface); + +ucs_status_t uct_rc_mlx5_devx_iface_subscribe_event(uct_rc_mlx5_iface_common_t *iface, + uct_ib_mlx5_qp_t *qp, + unsigned event_num, + enum ibv_event_type event_type, + unsigned event_data); + void uct_rc_mlx5_iface_fill_attr(uct_rc_mlx5_iface_common_t *iface, uct_ib_mlx5_qp_attr_t *qp_attr, unsigned max_send_wr, diff --git a/src/uct/ib/rc/accel/rc_mlx5_devx.c b/src/uct/ib/rc/accel/rc_mlx5_devx.c index febc5fc0ed7..5434831cb05 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_devx.c +++ b/src/uct/ib/rc/accel/rc_mlx5_devx.c @@ -12,9 +12,126 @@ #include #include +#include #include #include +ucs_status_t uct_rc_mlx5_devx_iface_subscribe_event(uct_rc_mlx5_iface_common_t *iface, + uct_ib_mlx5_qp_t *qp, + unsigned event_num, + enum ibv_event_type event_type, + unsigned event_data) +{ +#if HAVE_DECL_MLX5DV_DEVX_SUBSCRIBE_DEVX_EVENT + uint64_t cookie; + uint16_t event; + int ret; + + if (iface->event_channel == NULL) { + return UCS_OK; + } + + event = event_num; + cookie = event_type | ((uint64_t)event_data << UCT_IB_MLX5_DEVX_EVENT_DATA_SHIFT); + ret = mlx5dv_devx_subscribe_devx_event(iface->event_channel, qp->devx.obj, + sizeof(event), &event, cookie); + if (ret) { + ucs_error("mlx5dv_devx_subscribe_devx_event() failed: %m"); + return UCS_ERR_IO_ERROR; + } +#endif + + return UCS_OK; +} + +#if HAVE_DECL_MLX5DV_DEVX_SUBSCRIBE_DEVX_EVENT +static void uct_rc_mlx5_devx_iface_event_handler(int fd, int events, void *arg) +{ + uct_rc_mlx5_iface_common_t *iface = arg; + uct_ib_md_t *md = uct_ib_iface_md(&iface->super.super); + struct mlx5dv_devx_async_event_hdr devx_event; + uct_ib_async_event_t event; + int ret; + + ret = mlx5dv_devx_get_event(iface->event_channel, &devx_event, sizeof(devx_event)); + if (ret < 0) { + ucs_warn("mlx5dv_devx_get_event() failed: %m"); + return; + } + + event.event_type = devx_event.cookie & UCT_IB_MLX5_DEVX_EVENT_TYPE_MASK; + switch (event.event_type) { + case IBV_EVENT_QP_LAST_WQE_REACHED: + event.qp_num = devx_event.cookie >> UCT_IB_MLX5_DEVX_EVENT_DATA_SHIFT; + break; + default: + ucs_warn("unexpected async event: %d", event.event_type); + return; + } + + uct_ib_handle_async_event(&md->dev, &event); +} +#endif + +ucs_status_t uct_rc_mlx5_devx_iface_init_events(uct_rc_mlx5_iface_common_t *iface) +{ + ucs_status_t status = UCS_OK; +#if HAVE_DECL_MLX5DV_DEVX_SUBSCRIBE_DEVX_EVENT + uct_ib_mlx5_md_t *md = ucs_derived_of(uct_ib_iface_md(&iface->super.super), + uct_ib_mlx5_md_t); + struct mlx5dv_devx_event_channel *event_channel; + + if (!(md->flags & UCT_IB_MLX5_MD_FLAG_DEVX) || !md->super.dev.async_events) { + iface->event_channel = NULL; + return UCS_OK; + } + + event_channel = mlx5dv_devx_create_event_channel( + md->super.dev.ibv_context, + MLX5_IB_UAPI_DEVX_CR_EV_CH_FLAGS_OMIT_DATA); + + if (event_channel == NULL) { + ucs_error("mlx5dv_devx_create_event_channel() failed: %m"); + status = UCS_ERR_IO_ERROR; + goto err; + + } + + status = ucs_sys_fcntl_modfl(event_channel->fd, O_NONBLOCK, 0); + if (status != UCS_OK) { + goto err_destroy_channel; + } + + status = ucs_async_set_event_handler(iface->super.super.super.worker->async->mode, + event_channel->fd, UCS_EVENT_SET_EVREAD, + uct_rc_mlx5_devx_iface_event_handler, iface, + iface->super.super.super.worker->async); + if (status != UCS_OK) { + goto err_destroy_channel; + } + + iface->event_channel = event_channel; + return UCS_OK; + +err_destroy_channel: + mlx5dv_devx_destroy_event_channel(event_channel); + iface->event_channel = NULL; +err: +#endif + return status; +} + +void uct_rc_mlx5_devx_iface_free_events(uct_rc_mlx5_iface_common_t *iface) +{ +#if HAVE_DECL_MLX5DV_DEVX_SUBSCRIBE_DEVX_EVENT + if (iface->event_channel == NULL) { + return; + } + + ucs_async_remove_handler(iface->event_channel->fd, 1); + mlx5dv_devx_destroy_event_channel(iface->event_channel); +#endif +} static ucs_status_t uct_rc_mlx5_devx_init_rx_common(uct_rc_mlx5_iface_common_t *iface, @@ -178,7 +295,7 @@ ucs_status_t uct_rc_mlx5_devx_init_rx(uct_rc_mlx5_iface_common_t *iface, status = uct_rc_mlx5_devx_init_rx_common(iface, md, config, &dvpd, UCT_IB_MLX5DV_ADDR_OF(rmpc, rmpc, wq)); if (status != UCS_OK) { - return UCS_OK; + return status; } iface->rx.srq.devx.obj = mlx5dv_devx_obj_create(dev->ibv_context, diff --git a/src/uct/ib/rc/accel/rc_mlx5_iface.c b/src/uct/ib/rc/accel/rc_mlx5_iface.c index fcf55b6ad28..fb2a8587d1f 100644 --- a/src/uct/ib/rc/accel/rc_mlx5_iface.c +++ b/src/uct/ib/rc/accel/rc_mlx5_iface.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -262,7 +263,19 @@ ucs_status_t uct_rc_mlx5_iface_create_qp(uct_rc_mlx5_iface_common_t *iface, if (md->flags & UCT_IB_MLX5_MD_FLAG_DEVX_RC_QP) { attr->mmio_mode = iface->tx.mmio_mode; - return uct_ib_mlx5_devx_create_qp(ib_iface, qp, txwq, attr); + status = uct_ib_mlx5_devx_create_qp(ib_iface, qp, txwq, attr); + if (status != UCS_OK) { + return status; + } + + status = uct_rc_mlx5_devx_iface_subscribe_event(iface, qp, + UCT_IB_MLX5_EVENT_TYPE_SRQ_LAST_WQE, + IBV_EVENT_QP_LAST_WQE_REACHED, qp->qp_num); + if (status != UCS_OK) { + goto err_destory_qp; + } + + return UCS_OK; } status = uct_ib_mlx5_iface_fill_attr(ib_iface, qp, attr); @@ -309,7 +322,7 @@ ucs_status_t uct_rc_mlx5_iface_create_qp(uct_rc_mlx5_iface_common_t *iface, return UCS_OK; err_destory_qp: - ibv_destroy_qp(qp->verbs.qp); + uct_ib_mlx5_destroy_qp(qp); err: return status; } @@ -668,6 +681,13 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_iface_common_t, goto cleanup_dm; } +#if HAVE_DEVX + status = uct_rc_mlx5_devx_iface_init_events(self); + if (status != UCS_OK) { + goto cleanup_dm; + } +#endif + /* For little-endian atomic reply, override the default functions, to still * treat the response as big-endian when it arrives in the CQE. */ @@ -694,6 +714,9 @@ UCS_CLASS_INIT_FUNC(uct_rc_mlx5_iface_common_t, static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_iface_common_t) { +#if HAVE_DEVX + uct_rc_mlx5_devx_iface_free_events(self); +#endif ucs_mpool_cleanup(&self->tx.atomic_desc_mp, 1); uct_rc_mlx5_iface_common_dm_cleanup(self); uct_rc_mlx5_iface_common_tag_cleanup(self); diff --git a/src/uct/ib/rdmacm/rdmacm_cm.c b/src/uct/ib/rdmacm/rdmacm_cm.c index 0ea9aa1f036..fb942b88c10 100644 --- a/src/uct/ib/rdmacm/rdmacm_cm.c +++ b/src/uct/ib/rdmacm/rdmacm_cm.c @@ -464,7 +464,7 @@ uct_rdmacm_cm_process_event(uct_rdmacm_cm_t *cm, struct rdma_cm_event *event) } } -static void uct_rdmacm_cm_event_handler(int fd, void *arg) +static void uct_rdmacm_cm_event_handler(int fd, int events, void *arg) { uct_rdmacm_cm_t *cm = (uct_rdmacm_cm_t *)arg; struct rdma_cm_event *event; diff --git a/src/uct/ib/rdmacm/rdmacm_iface.c b/src/uct/ib/rdmacm/rdmacm_iface.c index 178f31644f3..4f3dbcc9c79 100644 --- a/src/uct/ib/rdmacm/rdmacm_iface.c +++ b/src/uct/ib/rdmacm/rdmacm_iface.c @@ -427,7 +427,7 @@ uct_rdmacm_iface_process_event(uct_rdmacm_iface_t *iface, return ret_flags; } -static void uct_rdmacm_iface_event_handler(int fd, void *arg) +static void uct_rdmacm_iface_event_handler(int fd, int events, void *arg) { uct_rdmacm_iface_t *iface = arg; uct_rdmacm_ctx_t *cm_id_ctx = NULL; diff --git a/src/uct/ib/ud/accel/ud_mlx5.c b/src/uct/ib/ud/accel/ud_mlx5.c index bf590515296..96b7ed00a57 100644 --- a/src/uct/ib/ud/accel/ud_mlx5.c +++ b/src/uct/ib/ud/accel/ud_mlx5.c @@ -535,7 +535,7 @@ static unsigned uct_ud_mlx5_iface_async_progress(uct_ud_iface_t *ud_iface) do { n = uct_ud_mlx5_iface_poll_rx(iface, 1); count += n; - } while (n > 0); + } while ((n > 0) && (count < iface->super.rx.async_max_poll)); count += uct_ud_mlx5_iface_poll_tx(iface, 1); diff --git a/src/uct/ib/ud/base/ud_ep.c b/src/uct/ib/ud/base/ud_ep.c index c83b62a1c05..ae94fe1107a 100644 --- a/src/uct/ib/ud/base/ud_ep.c +++ b/src/uct/ib/ud/base/ud_ep.c @@ -66,7 +66,7 @@ uct_ud_ep_resend_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep) if (UCT_UD_PSN_COMPARE(ep->tx.acked_psn, <, ep->resend.max_psn)) { /* new ack arrived that acked something in our resend window. */ - if (UCT_UD_PSN_COMPARE(ep->resend.psn, <=, ep->tx.acked_psn + 1)) { + if (UCT_UD_PSN_COMPARE(ep->resend.psn, <=, ep->tx.acked_psn)) { ucs_debug("ep(%p): ack received during resend resend.psn=%d tx.acked_psn=%d", ep, ep->resend.psn, ep->tx.acked_psn); ep->resend.pos = ucs_queue_iter_begin(&ep->tx.window); @@ -155,12 +155,16 @@ uct_ud_skb_is_completed(uct_ud_send_skb_t *skb, uct_ud_psn_t ack_psn) static UCS_F_ALWAYS_INLINE void uct_ud_ep_window_release_inline(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uct_ud_psn_t ack_psn, ucs_status_t status, - int is_async) + int is_async, int invalidate_resend) { uct_ud_send_skb_t *skb; ucs_queue_for_each_extract(skb, &ep->tx.window, queue, uct_ud_skb_is_completed(skb, ack_psn)) { + if (invalidate_resend && (ep->resend.pos == &skb->queue.next)) { + ep->resend.pos = ucs_queue_iter_begin(&ep->tx.window); + ep->resend.psn = ep->tx.acked_psn + 1; + } if (ucs_likely(!(skb->flags & UCT_UD_SEND_SKB_FLAG_COMP))) { /* fast path case: skb without completion callback */ uct_ud_skb_release(skb, 1); @@ -183,19 +187,14 @@ uct_ud_ep_window_release(uct_ud_ep_t *ep, ucs_status_t status, int is_async) { uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); - uct_ud_ep_window_release_inline(iface, ep, ep->tx.acked_psn, status, is_async); + uct_ud_ep_window_release_inline(iface, ep, ep->tx.acked_psn, status, is_async, 0); } void uct_ud_ep_window_release_completed(uct_ud_ep_t *ep, int is_async) { uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); - uct_ud_ep_window_release(ep, UCS_OK, is_async); - - /* Since we could have released some skb's from the window, make sure there - * are no resend operations which can still be using them. - */ - uct_ud_ep_resend_ack(iface, ep); + uct_ud_ep_window_release_inline(iface, ep, ep->tx.acked_psn, UCS_OK, is_async, 1); } static void uct_ud_ep_purge_outstanding(uct_ud_ep_t *ep) @@ -565,7 +564,7 @@ uct_ud_ep_process_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep, ep->tx.acked_psn = ack_psn; - uct_ud_ep_window_release_inline(iface, ep, ack_psn, UCS_OK, is_async); + uct_ud_ep_window_release_inline(iface, ep, ack_psn, UCS_OK, is_async, 0); uct_ud_ep_ca_ack(ep); uct_ud_ep_resend_ack(iface, ep); @@ -674,8 +673,8 @@ static void uct_ud_ep_rx_ctl(uct_ud_iface_t *iface, uct_ud_ep_t *ep, ucs_trace_func(""); ucs_assert_always(ctl->type == UCT_UD_PACKET_CREP); - ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID || - ep->dest_ep_id == ctl->conn_rep.src_ep_id); + ucs_assert_always(!uct_ud_ep_is_connected(ep) || + (ep->dest_ep_id == ctl->conn_rep.src_ep_id)); /* Discard duplicate CREP */ if (UCT_UD_PSN_COMPARE(neth->psn, <, ep->rx.ooo_pkts.head_sn)) { @@ -762,7 +761,7 @@ void uct_ud_ep_process_rx(uct_ud_iface_t *iface, uct_ud_neth_t *neth, unsigned b uct_ud_ep_rx_creq(iface, neth); goto out; } else if (ucs_unlikely(!ucs_ptr_array_lookup(&iface->eps, dest_id, ep) || - ep->ep_id != dest_id)) + (ep->ep_id != dest_id))) { /* Drop the packet because it is * allowed to do disconnect without flush/barrier. So it @@ -795,8 +794,8 @@ void uct_ud_ep_process_rx(uct_ud_iface_t *iface, uct_ud_neth_t *neth, unsigned b ooo_type = ucs_frag_list_insert(&ep->rx.ooo_pkts, &skb->u.ooo.elem, neth->psn); if (ucs_unlikely(ooo_type != UCS_FRAG_LIST_INSERT_FAST)) { - if (ooo_type != UCS_FRAG_LIST_INSERT_DUP && - ooo_type != UCS_FRAG_LIST_INSERT_FAIL) { + if ((ooo_type != UCS_FRAG_LIST_INSERT_DUP) && + (ooo_type != UCS_FRAG_LIST_INSERT_FAIL)) { ucs_fatal("Out of order is not implemented: got %d", ooo_type); } ucs_trace_data("DUP/OOB - schedule ack, head_sn=%d sn=%d", @@ -1070,8 +1069,8 @@ static void uct_ud_ep_resend(uct_ud_ep_t *ep) /* creq/crep must remove creq packet from window */ ucs_assertv_always(!(uct_ud_ep_is_connected(ep) && - (uct_ud_neth_get_dest_id(sent_skb->neth) == UCT_UD_EP_NULL_ID) && - !(sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_AM)), + (uct_ud_neth_get_dest_id(sent_skb->neth) == UCT_UD_EP_NULL_ID) && + !(sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_AM)), "ep(%p): CREQ resend on endpoint which is already connected", ep); /* Allocate a control skb which would refer to the original skb. @@ -1259,13 +1258,12 @@ uct_ud_ep_do_pending(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group, ucs_arbiter_elem_t *elem, void *arg) { - uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, - priv); uct_ud_ep_t *ep = ucs_container_of(group, uct_ud_ep_t, tx.pending.group); uct_ud_iface_t *iface = ucs_container_of(arbiter, uct_ud_iface_t, tx.pending_q); uintptr_t in_async_progress = (uintptr_t)arg; + uct_pending_req_t *req; int allow_callback; int async_before_pending; ucs_status_t status; @@ -1293,10 +1291,9 @@ uct_ud_ep_do_pending(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group, /* we can desched group: iff * - no control * - no ep resources (connect or window) - **/ - + */ if (!uct_ud_ep_ctl_op_isany(ep) && - (!uct_ud_ep_is_connected(ep) || + (!uct_ud_ep_is_connected(ep) || uct_ud_ep_no_window(ep))) { return UCS_ARBITER_CB_RESULT_DESCHED_GROUP; } @@ -1316,6 +1313,8 @@ uct_ud_ep_do_pending(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group, * - not in async progress * - there are no high priority pending control messages */ + + req = ucs_container_of(elem, uct_pending_req_t, priv); allow_callback = !in_async_progress || (uct_ud_pending_req_priv(req)->flags & UCT_CB_FLAG_ASYNC); if (allow_callback && !uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CTL_HI_PRIO)) { @@ -1360,7 +1359,7 @@ uct_ud_ep_do_pending(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group, ucs_status_t uct_ud_ep_pending_add(uct_ep_h ep_h, uct_pending_req_t *req, unsigned flags) { - uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t); + uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t); uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); @@ -1428,18 +1427,16 @@ uct_ud_ep_pending_purge_cb(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group, void uct_ud_ep_pending_purge(uct_ep_h ep_h, uct_pending_purge_callback_t cb, void *arg) { - uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t); - uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, - uct_ud_iface_t); + uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t); + uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, + uct_ud_iface_t); uct_purge_cb_args_t args = {cb, arg}; uct_ud_enter(iface); ucs_arbiter_group_purge(&iface->tx.pending_q, &ep->tx.pending.group, uct_ud_ep_pending_purge_cb, &args); if (uct_ud_ep_ctl_op_isany(ep)) { - ucs_arbiter_group_push_elem(&ep->tx.pending.group, - &ep->tx.pending.elem); - ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group); + uct_ud_ep_ctl_op_schedule(iface, ep); } uct_ud_leave(iface); } diff --git a/src/uct/ib/ud/base/ud_ep.h b/src/uct/ib/ud/base/ud_ep.h index 600cec8905e..b8ff0ae00ec 100644 --- a/src/uct/ib/ud/base/ud_ep.h +++ b/src/uct/ib/ud/base/ud_ep.h @@ -215,17 +215,17 @@ struct uct_ud_ep { uint32_t ep_id; uint32_t dest_ep_id; struct { - uct_ud_psn_t psn; /* Next PSN to send */ - uct_ud_psn_t max_psn; /* Largest PSN that can be sent */ - uct_ud_psn_t acked_psn; /* last psn that was acked by remote side */ - uint16_t resend_count; /* number of in-flight resends on the ep */ - ucs_queue_head_t window; /* send window: [acked_psn+1, psn-1] */ - uct_ud_ep_pending_op_t pending; /* pending ops */ - ucs_time_t send_time; /* tx time of last packet */ - ucs_time_t resend_time; /* tx time of last resent packet */ - ucs_time_t tick; /* timeout to trigger timer */ - UCS_STATS_NODE_DECLARE(stats) - UCT_UD_EP_HOOK_DECLARE(tx_hook) + uct_ud_psn_t psn; /* Next PSN to send */ + uct_ud_psn_t max_psn; /* Largest PSN that can be sent */ + uct_ud_psn_t acked_psn; /* last psn that was acked by remote side */ + uint16_t resend_count; /* number of in-flight resends on the ep */ + ucs_queue_head_t window; /* send window: [acked_psn+1, psn-1] */ + uct_ud_ep_pending_op_t pending; /* pending ops */ + ucs_time_t send_time; /* tx time of last packet */ + ucs_time_t resend_time; /* tx time of last resent packet */ + ucs_time_t tick; /* timeout to trigger timer */ + UCS_STATS_NODE_DECLARE(stats) + UCT_UD_EP_HOOK_DECLARE(tx_hook) } tx; struct { uct_ud_psn_t acked_psn; /* Last psn we acked */ diff --git a/src/uct/ib/ud/base/ud_iface.c b/src/uct/ib/ud/base/ud_iface.c index e666b0bc172..255e8ed0e7f 100644 --- a/src/uct/ib/ud/base/ud_iface.c +++ b/src/uct/ib/ud/base/ud_iface.c @@ -40,7 +40,7 @@ SGLIB_DEFINE_HASHED_CONTAINER_FUNCTIONS(uct_ud_iface_peer_t, UCT_UD_HASH_SIZE, uct_ud_iface_peer_hash) -static void uct_ud_iface_timer(int timer_id, void *arg); +static void uct_ud_iface_timer(int timer_id, int events, void *arg); static void uct_ud_iface_free_pending_rx(uct_ud_iface_t *iface); static void uct_ud_iface_free_async_comps(uct_ud_iface_t *iface); @@ -448,6 +448,8 @@ UCS_CLASS_INIT_FUNC(uct_ud_iface_t, uct_ud_iface_ops_t *ops, uct_md_h md, self->config.max_window = config->max_window; + self->rx.async_max_poll = config->rx_async_max_poll; + if (config->timer_tick <= 0.) { ucs_error("The timer tick should be > 0 (%lf)", config->timer_tick); @@ -595,6 +597,10 @@ ucs_config_field_t uct_ud_iface_config_table[] = { UCS_PP_MAKE_STRING(UCT_UD_CA_MAX_WINDOW), ucs_offsetof(uct_ud_iface_config_t, max_window), UCS_CONFIG_TYPE_UINT}, + {"RX_ASYNC_MAX_POLL", "64", + "Max number of receive completions to pick during asynchronous TX poll", + ucs_offsetof(uct_ud_iface_config_t, rx_async_max_poll), UCS_CONFIG_TYPE_UINT}, + {NULL} }; @@ -816,7 +822,7 @@ static inline void uct_ud_iface_async_progress(uct_ud_iface_t *iface) } } -static void uct_ud_iface_timer(int timer_id, void *arg) +static void uct_ud_iface_timer(int timer_id, int events, void *arg) { uct_ud_iface_t *iface = arg; diff --git a/src/uct/ib/ud/base/ud_iface.h b/src/uct/ib/ud/base/ud_iface.h index 921279ea726..a0faa85d078 100644 --- a/src/uct/ib/ud/base/ud_iface.h +++ b/src/uct/ib/ud/base/ud_iface.h @@ -52,6 +52,7 @@ typedef struct uct_ud_iface_config { double event_timer_tick; int dgid_check; unsigned max_window; + unsigned rx_async_max_poll; } uct_ud_iface_config_t; @@ -131,6 +132,7 @@ struct uct_ud_iface { ucs_mpool_t mp; unsigned available; unsigned quota; + unsigned async_max_poll; ucs_queue_head_t pending_q; UCT_UD_IFACE_HOOK_DECLARE(hook) } rx; diff --git a/src/uct/ib/ud/verbs/ud_verbs.c b/src/uct/ib/ud/verbs/ud_verbs.c index 6e742ca9692..86515df8fab 100644 --- a/src/uct/ib/ud/verbs/ud_verbs.c +++ b/src/uct/ib/ud/verbs/ud_verbs.c @@ -403,7 +403,7 @@ static unsigned uct_ud_verbs_iface_async_progress(uct_ud_iface_t *ud_iface) do { n = uct_ud_verbs_iface_poll_rx(iface, 1); count += n; - } while (n > 0); + } while ((n > 0) && (count < iface->super.rx.async_max_poll)); count += uct_ud_verbs_iface_poll_tx(iface, 1); diff --git a/src/uct/tcp/sockcm/sockcm_ep.c b/src/uct/tcp/sockcm/sockcm_ep.c index f4638d2a23e..cbeae27d883 100644 --- a/src/uct/tcp/sockcm/sockcm_ep.c +++ b/src/uct/tcp/sockcm/sockcm_ep.c @@ -212,7 +212,7 @@ static void uct_sockcm_handle_info_sent(uct_sockcm_ep_t *ep) } } -static void uct_sockcm_ep_event_handler(int fd, void *arg) +static void uct_sockcm_ep_event_handler(int fd, int events, void *arg) { uct_sockcm_ep_t *ep = (uct_sockcm_ep_t *) arg; diff --git a/src/uct/tcp/sockcm/sockcm_iface.c b/src/uct/tcp/sockcm/sockcm_iface.c index b24e6f7a6db..51c8841f15a 100644 --- a/src/uct/tcp/sockcm/sockcm_iface.c +++ b/src/uct/tcp/sockcm/sockcm_iface.c @@ -157,7 +157,7 @@ static ucs_status_t uct_sockcm_iface_process_conn_req(uct_sockcm_ctx_t *sock_id_ return UCS_OK; } -static void uct_sockcm_iface_recv_handler(int fd, void *arg) +static void uct_sockcm_iface_recv_handler(int fd, int events, void *arg) { uct_sockcm_ctx_t *sock_id_ctx = (uct_sockcm_ctx_t *) arg; ucs_status_t status; @@ -196,7 +196,7 @@ static void uct_sockcm_iface_recv_handler(int fd, void *arg) } } -static void uct_sockcm_iface_event_handler(int fd, void *arg) +static void uct_sockcm_iface_event_handler(int fd, int events, void *arg) { size_t recv_len = 0; uct_sockcm_iface_t *iface = arg; diff --git a/src/uct/tcp/tcp_iface.c b/src/uct/tcp/tcp_iface.c index 7ddf121ac78..5c4bbfe0fa6 100644 --- a/src/uct/tcp/tcp_iface.c +++ b/src/uct/tcp/tcp_iface.c @@ -257,7 +257,7 @@ static void uct_tcp_iface_listen_close(uct_tcp_iface_t *iface) } } -static void uct_tcp_iface_connect_handler(int listen_fd, void *arg) +static void uct_tcp_iface_connect_handler(int listen_fd, int events, void *arg) { uct_tcp_iface_t *iface = arg; struct sockaddr_in peer_addr; diff --git a/src/uct/tcp/tcp_listener.c b/src/uct/tcp/tcp_listener.c index 52e8b568a4a..5314d61f6cc 100644 --- a/src/uct/tcp/tcp_listener.c +++ b/src/uct/tcp/tcp_listener.c @@ -14,7 +14,7 @@ #include -static void uct_tcp_listener_conn_req_handler(int fd, void *arg) +static void uct_tcp_listener_conn_req_handler(int fd, int events, void *arg) { uct_tcp_listener_t *listener = (uct_tcp_listener_t *)arg; char ip_port_str[UCS_SOCKADDR_STRING_LEN]; diff --git a/src/uct/tcp/tcp_sockcm.c b/src/uct/tcp/tcp_sockcm.c index a1b32c80aaf..0ae43bd056f 100644 --- a/src/uct/tcp/tcp_sockcm.c +++ b/src/uct/tcp/tcp_sockcm.c @@ -55,7 +55,7 @@ static void uct_tcp_close_ep(uct_tcp_sockcm_ep_t *ep) UCS_CLASS_DELETE(uct_tcp_sockcm_ep_t, ep); } -void uct_tcp_sa_data_handler(int fd, void *arg) +void uct_tcp_sa_data_handler(int fd, int events, void *arg) { uct_tcp_sockcm_ep_t *ep = (uct_tcp_sockcm_ep_t*)arg; ucs_status_t status; diff --git a/src/uct/tcp/tcp_sockcm.h b/src/uct/tcp/tcp_sockcm.h index c450196f809..f4dae1f43f5 100644 --- a/src/uct/tcp/tcp_sockcm.h +++ b/src/uct/tcp/tcp_sockcm.h @@ -38,4 +38,4 @@ UCS_CLASS_DECLARE_NEW_FUNC(uct_tcp_sockcm_t, uct_cm_t, uct_component_h, uct_worker_h, const uct_cm_config_t *); UCS_CLASS_DECLARE_DELETE_FUNC(uct_tcp_sockcm_t, uct_cm_t); -void uct_tcp_sa_data_handler(int fd, void *arg); +void uct_tcp_sa_data_handler(int fd, int events, void *arg); diff --git a/src/uct/ugni/udt/ugni_udt_iface.c b/src/uct/ugni/udt/ugni_udt_iface.c index bc49fc503af..a5eccf25a57 100644 --- a/src/uct/ugni/udt/ugni_udt_iface.c +++ b/src/uct/ugni/udt/ugni_udt_iface.c @@ -192,7 +192,7 @@ static ucs_status_t uct_ugni_udt_iface_query(uct_iface_h tl_iface, uct_iface_att return UCS_OK; } -void uct_ugni_proccess_datagram_pipe(int event_id, void *arg) { +void uct_ugni_proccess_datagram_pipe(int event_id, int events, void *arg) { uct_ugni_udt_iface_t *iface = (uct_ugni_udt_iface_t *)arg; uct_ugni_udt_ep_t *ep; uct_ugni_udt_desc_t *datagram; @@ -337,7 +337,8 @@ static UCS_CLASS_CLEANUP_FUNC(uct_ugni_udt_iface_t) uct_ugni_udt_clean_wildcard(self); ucs_async_remove_handler(ucs_async_pipe_rfd(&self->event_pipe),1); if (self->events_ready) { - uct_ugni_proccess_datagram_pipe(ucs_async_pipe_rfd(&self->event_pipe),self); + uct_ugni_proccess_datagram_pipe(ucs_async_pipe_rfd(&self->event_pipe), + UCS_EVENT_SET_EVREAD, self); } uct_ugni_udt_terminate_thread(self); pthread_join(self->event_thread, &dummy); diff --git a/test/gtest/Makefile.am b/test/gtest/Makefile.am index 5d5e5ba23c3..a134e4e1e05 100644 --- a/test/gtest/Makefile.am +++ b/test/gtest/Makefile.am @@ -177,7 +177,8 @@ gtest_SOURCES += \ uct/ib/test_ib_md.cc \ uct/ib/test_cq_moderation.cc \ uct/ib/test_ib_xfer.cc \ - uct/ib/test_ib_pkey.cc + uct/ib/test_ib_pkey.cc \ + uct/ib/test_ib_event.cc gtest_CPPFLAGS += \ $(IBVERBS_CPPFLAGS) gtest_LDADD += \ diff --git a/test/gtest/common/test_helpers.cc b/test/gtest/common/test_helpers.cc index e7cb969b83c..7d2244485fe 100644 --- a/test/gtest/common/test_helpers.cc +++ b/test/gtest/common/test_helpers.cc @@ -699,4 +699,11 @@ std::vector > supported_mem_type_pairs() { return result; } +void skip_on_address_sanitizer() +{ +#ifdef __SANITIZE_ADDRESS__ + UCS_TEST_SKIP_R("Address sanitizer"); +#endif +} + } // ucs diff --git a/test/gtest/common/test_helpers.h b/test/gtest/common/test_helpers.h index 5fdc1849693..c796dacbf78 100644 --- a/test/gtest/common/test_helpers.h +++ b/test/gtest/common/test_helpers.h @@ -172,6 +172,7 @@ } + namespace ucs { extern const double test_timeout_in_sec; @@ -852,6 +853,12 @@ void cartesian_product(std::vector > &output, std::vector > supported_mem_type_pairs(); + +/** + * Skip test if address sanitizer is enabled + */ +void skip_on_address_sanitizer(); + } // ucs #endif /* UCS_TEST_HELPERS_H */ diff --git a/test/gtest/ucm/malloc_hook.cc b/test/gtest/ucm/malloc_hook.cc index 1471afd143c..1374b47198b 100644 --- a/test/gtest/ucm/malloc_hook.cc +++ b/test/gtest/ucm/malloc_hook.cc @@ -212,6 +212,8 @@ class malloc_hook : public ucs::test { ucs_status_t status; mmap_event event(this); + ucs::skip_on_address_sanitizer(); + m_got_event = 0; ucm_malloc_state_reset(128 * 1024, 128 * 1024); malloc_trim(0); diff --git a/test/gtest/ucp/test_ucp_mmap.cc b/test/gtest/ucp/test_ucp_mmap.cc index 04e2252ab4a..b1015918895 100644 --- a/test/gtest/ucp/test_ucp_mmap.cc +++ b/test/gtest/ucp/test_ucp_mmap.cc @@ -27,6 +27,11 @@ class test_ucp_mmap : public test_ucp_memheap { } } + virtual void init() { + ucs::skip_on_address_sanitizer(); + test_ucp_memheap::init(); + } + protected: bool resolve_rma(entity *e, ucp_rkey_h rkey); bool resolve_amo(entity *e, ucp_rkey_h rkey); diff --git a/test/gtest/ucp/test_ucp_sockaddr.cc b/test/gtest/ucp/test_ucp_sockaddr.cc index a35094811d5..1bc23ced2f6 100644 --- a/test/gtest/ucp/test_ucp_sockaddr.cc +++ b/test/gtest/ucp/test_ucp_sockaddr.cc @@ -1212,6 +1212,16 @@ UCS_TEST_P(test_ucp_sockaddr_protocols, am_bcopy_64k, "ZCOPY_THRESH=inf") test_am_send_recv(64 * UCS_KBYTE); } +UCS_TEST_P(test_ucp_sockaddr_protocols, am_zcopy_1k, "ZCOPY_THRESH=512") +{ + test_am_send_recv(1 * UCS_KBYTE); +} + +UCS_TEST_P(test_ucp_sockaddr_protocols, am_zcopy_64k, "ZCOPY_THRESH=512") +{ + test_am_send_recv(64 * UCS_KBYTE); +} + /* Only IB transports support CM for now * For DC case, allow fallback to UD if DC is not supported diff --git a/test/gtest/ucs/test_arbiter.cc b/test/gtest/ucs/test_arbiter.cc index 21028be1937..aba4834f430 100644 --- a/test/gtest/ucs/test_arbiter.cc +++ b/test/gtest/ucs/test_arbiter.cc @@ -74,7 +74,7 @@ class test_arbiter : public ucs::test { for (j = 0; j < nelems_per_group; j++) { if (push_head) { int rev_j = nelems_per_group - 1 - j; - ucs_arbiter_group_push_head_elem(NULL, &groups[i], + ucs_arbiter_group_push_head_elem(&groups[i], &elems[nelems_per_group*i+rev_j]); } else { ucs_arbiter_group_push_elem(&groups[i], @@ -570,8 +570,8 @@ UCS_TEST_F(test_arbiter, push_head_scheduled) { ucs_arbiter_elem_init(&elem3.elem); elem1.count = elem2.count = elem3.count = 0; - ucs_arbiter_group_push_head_elem(&m_arb1, &group1, &elem1.elem); - ucs_arbiter_group_push_head_elem(&m_arb1, &group2, &elem2.elem); + ucs_arbiter_group_push_head_elem(&group1, &elem1.elem); + ucs_arbiter_group_push_head_elem(&group2, &elem2.elem); ucs_arbiter_group_schedule(&m_arb1, &group1); ucs_arbiter_group_schedule(&m_arb1, &group2); @@ -584,7 +584,7 @@ UCS_TEST_F(test_arbiter, push_head_scheduled) { EXPECT_EQ(0, elem3.count); /* Adding new head elem to group2 */ - ucs_arbiter_group_push_head_elem(&m_arb1, &group2, &elem3.elem); + ucs_arbiter_group_push_head_elem(&group2, &elem3.elem); m_count = 0; ucs_arbiter_dispatch(&m_arb1, 1, count_cb, this); @@ -598,9 +598,9 @@ UCS_TEST_F(test_arbiter, push_head_scheduled) { EXPECT_EQ(3, m_count); /* Add to single scheduled group */ - ucs_arbiter_group_push_head_elem(&m_arb1, &group2, &elem2.elem); + ucs_arbiter_group_push_head_elem(&group2, &elem2.elem); ucs_arbiter_group_schedule(&m_arb1, &group2); - ucs_arbiter_group_push_head_elem(&m_arb1, &group2, &elem3.elem); + ucs_arbiter_group_push_head_elem(&group2, &elem3.elem); m_count = 0; elem2.count = elem3.count = 0; diff --git a/test/gtest/ucs/test_async.cc b/test/gtest/ucs/test_async.cc index d2e8a0b33b7..a25db1250bb 100644 --- a/test/gtest/ucs/test_async.cc +++ b/test/gtest/ucs/test_async.cc @@ -48,7 +48,7 @@ class base { virtual void ack_event() = 0; virtual int event_id() = 0; - static void cb(int id, void *arg) { + static void cb(int id, int events, void *arg) { base *self = reinterpret_cast(arg); self->handler(); } @@ -724,7 +724,7 @@ class local_event_add_handler : public local_event { } protected: - static void dummy_cb(int id, void *arg) { + static void dummy_cb(int id, int events, void *arg) { } virtual void handler() { diff --git a/test/gtest/ucs/test_memtype_cache.cc b/test/gtest/ucs/test_memtype_cache.cc index 0bd54327f61..e891db0efc3 100644 --- a/test/gtest/ucs/test_memtype_cache.cc +++ b/test/gtest/ucs/test_memtype_cache.cc @@ -27,7 +27,10 @@ class test_memtype_cache : public ucs::test_with_param { } virtual void cleanup() { - ucs_memtype_cache_destroy(m_memtype_cache); + if (m_memtype_cache != NULL) { + ucs_memtype_cache_destroy(m_memtype_cache); + } + ucs::test_with_param::cleanup(); } diff --git a/test/gtest/ucs/test_mpmc.cc b/test/gtest/ucs/test_mpmc.cc index 548075c56bd..6653574ecf2 100644 --- a/test/gtest/ucs/test_mpmc.cc +++ b/test/gtest/ucs/test_mpmc.cc @@ -15,7 +15,7 @@ extern "C" { class test_mpmc : public ucs::test { protected: static const unsigned MPMC_SIZE = 100; - static const uint32_t SENTINEL = 0x7fffffffu; + static const uint64_t SENTINEL = 0x7fffffffu; static const unsigned NUM_THREADS = 4; @@ -44,7 +44,7 @@ class test_mpmc : public ucs::test { static void * consumer_thread_func(void *arg) { ucs_mpmc_queue_t *mpmc = reinterpret_cast(arg); ucs_status_t status; - uint32_t value; + uint64_t value; size_t count; count = 0; @@ -81,7 +81,7 @@ UCS_TEST_F(test_mpmc, basic) { EXPECT_FALSE(ucs_mpmc_queue_is_empty(&mpmc)); - uint32_t value; + uint64_t value; status = ucs_mpmc_queue_pull(&mpmc, &value); ASSERT_UCS_OK(status); diff --git a/test/gtest/uct/ib/test_ib_event.cc b/test/gtest/uct/ib/test_ib_event.cc new file mode 100644 index 00000000000..87cda262cce --- /dev/null +++ b/test/gtest/uct/ib/test_ib_event.cc @@ -0,0 +1,121 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED. +* See file LICENSE for terms. +*/ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include +#include + +extern "C" { +#if HAVE_MLX5_HW +#include +#include +#endif +#include +} + +#include + +class uct_p2p_test_event : public uct_p2p_test { +private: + void rc_mlx5_ep_to_err(entity &e, uint32_t *qp_num_p) { +#if HAVE_MLX5_HW + uct_ib_mlx5_md_t *md = (uct_ib_mlx5_md_t *)e.md(); + uct_rc_mlx5_ep_t *ep = (uct_rc_mlx5_ep_t *)e.ep(0); + uct_ib_mlx5_qp_t *qp = &ep->tx.wq.super; + + uct_ib_mlx5_modify_qp_state(md, qp, IBV_QPS_ERR); + + *qp_num_p = qp->qp_num; +#endif + } + + void rc_verbs_ep_to_err(entity &e, uint32_t *qp_num_p) { + uct_rc_verbs_ep_t *ep = (uct_rc_verbs_ep_t *)e.ep(0); + + uct_ib_modify_qp(ep->qp, IBV_QPS_ERR); + + *qp_num_p = ep->qp->qp_num; + } + +public: + uct_p2p_test_event(): uct_p2p_test(0) {} + + static ucs_log_level_t orig_log_level; + static volatile unsigned flushed_qp_num; + + static ucs_log_func_rc_t + last_wqe_check_log(const char *file, unsigned line, const char *function, + ucs_log_level_t level, + const ucs_log_component_config_t *comp_conf, + const char *message, va_list ap) + { + std::string msg = format_message(message, ap); + + UCS_TEST_MESSAGE << msg.c_str(); + sscanf(msg.c_str(), + "IB Async event on %*s SRQ-attached QP 0x%x was flushed", + &flushed_qp_num); + + return (level <= orig_log_level) ? UCS_LOG_FUNC_RC_CONTINUE + : UCS_LOG_FUNC_RC_STOP; + } + + int wait_for_last_wqe_event(entity &e) { + const resource *r = dynamic_cast(GetParam()); + flushed_qp_num = -1; + uint32_t qp_num = 0; + + if (r->tl_name == "rc_mlx5") { + rc_mlx5_ep_to_err(e, &qp_num); + } else { + rc_verbs_ep_to_err(e, &qp_num); + } + + ucs_time_t deadline = ucs_get_time() + + ucs_time_from_sec(ucs::test_time_multiplier()); + while (ucs_get_time() < deadline) { + if (flushed_qp_num == qp_num) { + return 1; + } + usleep(1000); + } + + return 0; + } +}; + +UCS_TEST_P(uct_p2p_test_event, last_wqe, "ASYNC_EVENTS=y") +{ + const p2p_resource *r = dynamic_cast(GetParam()); + ucs_assert_always(r != NULL); + + mapped_buffer sendbuf(0, 0, sender()); + mapped_buffer recvbuf(0, 0, receiver()); + + ucs_log_push_handler(last_wqe_check_log); + orig_log_level = ucs_global_opts.log_component.log_level; + ucs_global_opts.log_component.log_level = UCS_LOG_LEVEL_DEBUG; + if (!ucs_log_is_enabled(UCS_LOG_LEVEL_DEBUG)) { + UCS_TEST_SKIP_R("Debug logging is disabled"); + } + + UCS_TEST_SCOPE_EXIT() { + ucs_global_opts.log_component.log_level = orig_log_level; + ucs_log_pop_handler(); + } UCS_TEST_SCOPE_EXIT_END + + ASSERT_TRUE(wait_for_last_wqe_event(sender())); + if (!r->loopback) { + ASSERT_TRUE(wait_for_last_wqe_event(receiver())); + } +} + +ucs_log_level_t uct_p2p_test_event::orig_log_level; +volatile unsigned uct_p2p_test_event::flushed_qp_num; + +UCT_INSTANTIATE_RC_TEST_CASE(uct_p2p_test_event); diff --git a/test/gtest/uct/ib/test_rc.cc b/test/gtest/uct/ib/test_rc.cc index e5a9f46c9b6..8a45d93cbbe 100644 --- a/test/gtest/uct/ib/test_rc.cc +++ b/test/gtest/uct/ib/test_rc.cc @@ -9,14 +9,6 @@ #include -#define UCT_INSTANTIATE_RC_TEST_CASE(_test_case) \ - _UCT_INSTANTIATE_TEST_CASE(_test_case, rc_verbs) \ - _UCT_INSTANTIATE_TEST_CASE(_test_case, rc_mlx5) - -#define UCT_INSTANTIATE_RC_DC_TEST_CASE(_test_case) \ - UCT_INSTANTIATE_RC_TEST_CASE(_test_case) \ - _UCT_INSTANTIATE_TEST_CASE(_test_case, dc_mlx5) - void test_rc::init() { uct_test::init(); diff --git a/test/gtest/uct/test_mem.cc b/test/gtest/uct/test_mem.cc index b12551093af..f9c1107156d 100644 --- a/test/gtest/uct/test_mem.cc +++ b/test/gtest/uct/test_mem.cc @@ -14,6 +14,11 @@ class test_mem : public testing::TestWithParam, public: UCS_TEST_BASE_IMPL; + virtual void init() { + ucs::skip_on_address_sanitizer(); + uct_test_base::init(); + } + protected: void check_mem(const uct_allocated_memory &mem, size_t min_length) { diff --git a/test/gtest/uct/uct_test.h b/test/gtest/uct/uct_test.h index 520000afb27..605b92ebe7f 100644 --- a/test/gtest/uct/uct_test.h +++ b/test/gtest/uct/uct_test.h @@ -477,6 +477,19 @@ class test_uct_iface_attrs : public uct_test { #define UCT_INSTANTIATE_SOCKADDR_TEST_CASE(_test_case) \ UCS_PP_FOREACH(_UCT_INSTANTIATE_TEST_CASE, _test_case, UCT_TEST_SOCKADDR_TLS) +/** + * Instantiate the parametrized test case for the RC/DC transports. + * + * @param _test_case Test case class, derived from uct_test. + */ +#define UCT_INSTANTIATE_RC_TEST_CASE(_test_case) \ + _UCT_INSTANTIATE_TEST_CASE(_test_case, rc_verbs) \ + _UCT_INSTANTIATE_TEST_CASE(_test_case, rc_mlx5) + +#define UCT_INSTANTIATE_RC_DC_TEST_CASE(_test_case) \ + UCT_INSTANTIATE_RC_TEST_CASE(_test_case) \ + _UCT_INSTANTIATE_TEST_CASE(_test_case, dc_mlx5) + std::ostream& operator<<(std::ostream& os, const uct_tl_resource_desc_t& resource); #endif